In [1]:
#USER_FLAG = "--user"
#!pip3 install {USER_FLAG} kfp --upgrade
#!pip3 install {USER_FLAG} google_cloud_pipeline_components --upgrade
#!pip3 install {USER_FLAG} 'apache-beam[gcp]'

In [2]:
! python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"

KFP SDK version: 2.0.1


In [3]:
! python3 -c "from google.cloud import aiplatform; print('aiplatform SDK version: {}'.format(aiplatform.__version__))"

aiplatform SDK version: 1.25.0


# 0.0 Imports

In [4]:
from typing import NamedTuple

from kfp.v2 import dsl
from kfp.v2.dsl import (pipeline,
                        Artifact,
                        Dataset,
                        Input,
                        Model,
                        Output,
                        Metrics,
                        ClassificationMetrics,
                        component, 
                        Markdown)

from kfp.v2 import compiler


from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs

from google_cloud_pipeline_components import aiplatform as gcc_aip

  This is separate from the ipykernel package so we can avoid doing imports until


ImportError: cannot import name 'aiplatform' from 'google_cloud_pipeline_components' (/home/jupyter/.local/lib/python3.7/site-packages/google_cloud_pipeline_components/__init__.py)

In [None]:
pipeline_root = 'gs://bucket_pipeline'

# 1.0 Data Capture

In [None]:
@component(packages_to_install=["google-cloud-bigquery","db-dtypes", "pandas"],
          base_image="python:3.10.6",
          output_component_file="captura_dados.yaml")
def captura_dados():
    import logging

    import pandas as pd
    from google.cloud import bigquery
    
    PROJECT_ID = "gcp-vertex"
    DATASET_ID = "gcp_bq"
    TABLE_RAW_ID = "dados_ecommerce_raw"
    TABLE_ID = "ecommerce_cds"
    
    def run_bq_query(sql: str, project_name: str) -> Union[str, pd.DataFrame]:
        """
        Run a BigQuery query and return the job ID or result as a DataFrame
        Args:
            sql: SQL query, as a string, to execute in BigQuery
        Returns:
            df: DataFrame of results from query,  or error, if any
        """
        
        bq_client = bigquery.Client(project=project_name)

        # Try dry run before executing query to catch any errors
        job_config = bigquery.QueryJobConfig(dry_run=True, use_query_cache=False)
        bq_client.query(sql, job_config=job_config)

        # If dry run succeeds without errors, proceed to run query
        job_config = bigquery.QueryJobConfig()
        client_result = bq_client.query(sql, job_config=job_config)

        job_id = client_result.job_id

        # Wait for query/job to finish running. then get & return data frame
        df = client_result.result().to_arrow().to_pandas()
        print(f"Finished job_id: {job_id}")
        
        return df
    
    
    query = f"""
                CREATE OR REPLACE TABLE
               `{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}` (invoice_no STRING,
                stock_code STRING,
                description STRING,
                quantity INT64,
                invoice_date DATE,
                unit_price FLOAT64,
                customer_id FLOAT64,
                country STRING)
            PARTITION BY
              invoice_date AS (
              WITH
                not_nulls AS (
                SELECT
                  *
                FROM
                  `{PROJECT_ID}.{DATASET_ID}.{TABLE_RAW_ID}`
                WHERE
                  invoice_date <= CURRENT_DATE()
                  AND customer_id IS NOT NULL
                  AND description IS NOT NULL),
                filtering_features AS (
                SELECT
                  *
                FROM
                  not_nulls
                WHERE
                  unit_price >= 0.04
                  AND country NOT IN ('European Community',
                    'Unspecified')
                  AND stock_code NOT IN ('POST',
                    'D',
                    'DOT',
                    'M',
                    'S',
                    'AMAZONFEE',
                    'm',
                    'DCGSSBOY',
                    'DCGSSGIRL',
                    'PADS',
                    'B',
                    'CRUK')
                  AND customer_id != 16446)
              SELECT
                *
              FROM
                filtering_features);
    """
    
    run_bq_query(query, project_name=PROJECT_ID)
    logging.info(f'Tabela criada: {PROJECT_ID}.{DATASET_ID}.{TABLE_ID}')

# 2.0 Data Preparation

In [None]:
@component(packages_to_install=["pandas", "google-cloud-bigquery", "db-dtypes", "pandas-gbq"],
    base_image="python:3.10.6",
    output_component_file="data_preparation_ecommerce.yaml")
def data_preparation():
    import os
    import logging
    from typing import Tuple

    import pandas as pd
    import pandas_gbq
    from google.cloud import bigquery
    
    logging.info('Iniciando o componente')
    
    PROJECT_ID = 'gcpproject'
    DATASET_ID = 'gcp_bq'
    TABLE_RAW_ID = 'dados_ecommerce_raw'
    TABLE_ID = 'ecommerce_cds'
    TABLE_FILTERED_TEMP_ID = 'temp_data_filtered'
    TABLE_PURCHASES_TEMP_ID = 'temp_data_purchases'
    TABLE_RETURNS_TEMP_ID = 'temp_data_returns'
    PROJECT_NUMBER = os.environ["CLOUD_ML_PROJECT_ID"]

    def keep_features(dataframe: pd.DataFrame, keep_columns: list) -> pd.DataFrame:
        """
        Retorna um DataFrame com as colunas especificadas em keep_columns.

        Args:
            dataframe (pd.DataFrame): O DataFrame a ser processado.
            keep_columns (list): A lista de nomes de colunas a serem mantidas no DataFrame resultante.

        Returns:
            pd.DataFrame: O DataFrame resultante com apenas as colunas especificadas em keep_columns.
        """
        return dataframe[keep_columns]
    def column_to_int(dataframe: pd.DataFrame, column_name: str) -> bool:
        """
        Converte a coluna especificada em um dataframe para o tipo inteiro.

        Args:
            dataframe (pd.DataFrame): O dataframe a ser processado.
            column_name (str): O nome da coluna a ser convertida.

        Returns:
            bool: True se a conversão foi bem sucedida, False caso contrário.
        """
        try:
            dataframe[column_name] = dataframe[column_name].astype(int)
        except (ValueError, TypeError):
            # Lidar com valores ausentes e conversões inválidas
            return False

        # Retorna True se a conversão foi bem sucedida
        return True
    
    def column_to_date(dataframe: pd.DataFrame, column_name: str, date_format: str = None) -> bool:
        """
        Converte a coluna especificada em um dataframe para o tipo data.

        Args:
            dataframe (pd.DataFrame): O dataframe a ser processado.
            column_name (str): O nome da coluna a ser convertida.
            date_format (str, opcional): O formato de data personalizado. Se nenhum formato for especificado, o pandas usará o padrão 'YYYY-MM-DD'.

        Returns:
            bool: True se a conversão foi bem sucedida, False caso contrário.
        """
        try:
            if date_format:
                dataframe[column_name] = pd.to_datetime(dataframe[column_name], format=date_format)
            else:
                dataframe[column_name] = pd.to_datetime(dataframe[column_name])
        except (ValueError, TypeError):
            # Lidar com valores ausentes e conversões inválidas
            return False

        # Retorna True se a conversão foi bem sucedida
        return True


    def change_column_type(dataframe_raw: pd.DataFrame):
        """
        Changes the data type of a given column in a DataFrame.

        Args:
            dataframe_raw: A pandas DataFrame.

        Returns:
            None.
        """
        column_to_int(dataframe_raw, 'customer_id')
        column_to_date(dataframe_raw, 'invoice_date')

    def filtering_features(dataframe_raw: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
        """
        Filters and preprocesses the input dataframe.

        Args:
            dataframe_raw: A pandas DataFrame containing raw sales data.

        Returns:
            Three pandas DataFrames containing the filtered returns and purchases data, and the filtered main data.
        """
        # Filter returns and purchases data
        df_returns = dataframe_raw.loc[dataframe_raw['quantity'] < 0, ['customer_id', 
                                                                       'quantity']]
        df_purchases = dataframe_raw.loc[dataframe_raw['quantity'] >= 0, :]

        # Filter main data
        df_filtered = keep_features(dataframe_raw, ['invoice_no', 'stock_code', 'quantity',
                                                    'invoice_date', 'unit_price', 
                                                    'customer_id', 'country'])
        return df_filtered, df_purchases, df_returns

    def run_data_preparation(dataframe_raw: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
        """
        Preprocesses the input dataframe by performing column type conversion and filtering features.

        Args:
            dataframe_raw (pd.DataFrame): A pandas DataFrame containing raw sales data.

        Returns:
            A tuple of three pandas DataFrames: df_filtered, df_purchases, and df_returns.
            - df_filtered: A DataFrame containing the filtered main data.
            - df_purchases: A DataFrame containing the filtered purchases data.
            - df_returns: A DataFrame containing the filtered returns data.
        """
        change_column_type(dataframe_raw)
        return filtering_features(dataframe_raw)
    
    query_sql = f"""SELECT *
                    FROM  `{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}`
                    WHERE invoice_date <= CURRENT_DATE """
    
    data = pd.read_gbq(query=query_sql, 
                         project_id=PROJECT_NUMBER) 
    logging.info(f'Tabela carregada: `{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}`')
    
    df_filtered, df_purchases, df_returns = run_data_preparation(data)
    
    pandas_gbq.to_gbq(df_filtered, f'{PROJECT_ID}.{DATASET_ID}.{TABLE_FILTERED_TEMP_ID}', project_id=PROJECT_NUMBER, if_exists='replace')
    pandas_gbq.to_gbq(df_purchases, f'{PROJECT_ID}.{DATASET_ID}.{TABLE_PURCHASES_TEMP_ID}', project_id=PROJECT_NUMBER, if_exists='replace')
    pandas_gbq.to_gbq(df_returns, f'{PROJECT_ID}.{DATASET_ID}.{TABLE_RETURNS_TEMP_ID}', project_id=PROJECT_NUMBER, if_exists='replace')
    
    logging.info(f'Tabelas criadas no BigQuery: {TABLE_FILTERED_TEMP_ID} e {TABLE_PURCHASES_TEMP_ID} e {TABLE_RETURNS_TEMP_ID}')

# 3.0 Feature Engineering

In [None]:
@component(packages_to_install=["pandas", "google-cloud-bigquery", "db-dtypes", "pandas-gbq", "google-cloud"],
    base_image="python:3.10.6",
    output_component_file="feature_engineering_ecommerce.yaml")
def feature_engineering():
    from google.cloud import bigquery
    from google.cloud.exceptions import NotFound
    import pandas as pd
    import pandas_gbq
    import os
    import logging
    from functools import reduce
    from typing import Union
    
    PROJECT_ID = 'gcpproject'
    DATASET_ID = 'gcp_bq'
    TABLE_ID = 'dados_engenharia_features'
    TABLE_RAW_ID = 'dados_ecommerce_raw'
    TABLE_FILTERED_TEMP_ID = 'temp_data_filtered'
    TABLE_PURCHASES_TEMP_ID = 'temp_data_purchases'
    TABLE_RETURNS_TEMP_ID = 'temp_data_returns'
    PROJECT_NUMBER = os.environ["CLOUD_ML_PROJECT_ID"]
    
    logging.info('Iniciando o componente')
    def column_to_date(dataframe: pd.DataFrame, column_name: str, date_format: str = None) -> bool:
        """
        Converte a coluna especificada em um dataframe para o tipo data.

        Args:
            dataframe (pd.DataFrame): O dataframe a ser processado.
            column_name (str): O nome da coluna a ser convertida.
            date_format (str, opcional): O formato de data personalizado. Se nenhum formato for especificado, o pandas usará o padrão 'YYYY-MM-DD'.

        Returns:
            bool: True se a conversão foi bem sucedida, False caso contrário.
        """
        try:
            if date_format:
                dataframe[column_name] = pd.to_datetime(dataframe[column_name], format=date_format)
            else:
                dataframe[column_name] = pd.to_datetime(dataframe[column_name])
        except (ValueError, TypeError):
            # Lidar com valores ausentes e conversões inválidas
            return False

        # Retorna True se a conversão foi bem sucedida
        return True
    def keep_features(dataframe: pd.DataFrame, keep_columns: list) -> pd.DataFrame:
        """
        Retorna um DataFrame com as colunas especificadas em keep_columns.

        Args:
            dataframe (pd.DataFrame): O DataFrame a ser processado.
            keep_columns (list): A lista de nomes de colunas a serem mantidas no DataFrame resultante.

        Returns:
            pd.DataFrame: O DataFrame resultante com apenas as colunas especificadas em keep_columns.
        """
        return dataframe[keep_columns]
    
    def calculate_gross_revenue(dataframe_purchases: pd.DataFrame) -> pd.DataFrame:
        """
        Calcula a receita bruta de cada cliente com base nas colunas 'Quantity' e 'UnitPrice' e retorna
        um DataFrame com as colunas 'CustomerID' e 'gross_revenue'.

        Args:
            dataframe_purchases (pd.DataFrame): O DataFrame das compras contendo as colunas 'CustomerID', 'Quantity' e 'UnitPrice'.

        Returns:
            pd.DataFrame: O DataFrame resultante contendo as colunas 'CustomerID' e 'gross_revenue'.
        """
        # Verifica se as colunas necessárias estão presentes no DataFrame de entrada
        required_columns = {'customer_id', 'quantity', 'UnitPrice'}
        missing_columns = required_columns - set(dataframe_purchases.columns)
        if missing_columns:
            raise ValueError(f"O DataFrame de entrada está faltando as seguintes colunas: {missing_columns}")

        # Calcula a receita bruta e agrupa por CustomerID
        dataframe_purchases.loc[:, 'gross_revenue'] = dataframe_purchases.loc[:, 'quantity'] * dataframe_purchases.loc[:, 'UnitPrice']
        grouped_df = dataframe_purchases.groupby('customer_id').agg({'gross_revenue': 'sum'}).reset_index()

        return grouped_df

    def create_recency(dataframe_purchases: pd.DataFrame, dataframe_filtered: pd.DataFrame) -> pd.DataFrame:
        """
        Calcula a recência da última compra para cada cliente.

        Args:
            dataframe_purchases (pd.DataFrame): DataFrame com as informações de compras de todos os clientes.
            dataframe_filtered (pd.DataFrame): DataFrame filtrado apenas com as informações dos clientes que desejamos calcular a recência.

        Returns:
            pd.DataFrame: DataFrame com as colunas 'CustomerID' e 'recency_days', indicando a recência em dias da última compra para cada cliente.

        """
        # calcula a data da última compra de cada cliente
        df_recency = dataframe_purchases.loc[:, ['customer_id', 'invoice_date']].groupby('customer_id').max().reset_index()

        # calcula a recência em dias da última compra de cada cliente em relação à data mais recente da base de dados filtrada
        df_recency.loc[:, 'recency_days'] = (dataframe_filtered['invoice_date'].max() - df_recency['invoice_date']).dt.days

        # retorna o DataFrame apenas com as colunas 'customer_id' e 'recency_days'
        return df_recency[['customer_id', 'recency_days']]

    def create_quantity_purchased(dataframe_purchases: pd.DataFrame) -> pd.DataFrame:
        """
        Calcula a quantidade de produtos adquiridos por cada cliente.

        Args:
            dataframe_purchases (pd.DataFrame): DataFrame com as informações de compras de todos os clientes.

        Returns:
            pd.DataFrame: DataFrame com as colunas 'customer_id' e 'qty_products', indicando a quantidade de produtos adquiridos por cada cliente.
        """
        # agrupa as informações de compras por customer_id e conta o número de StockCode para cada grupo
        qty_purchased = dataframe_purchases.loc[:, ['customer_id', 'stock_code']].groupby('customer_id').count()

        # renomeia a coluna StockCode para qty_products e reseta o índice para transformar o CustomerID em uma coluna
        qty_purchased = qty_purchased.reset_index().rename(columns={'stock_code': 'qty_products'})

        # retorna o DataFrame com as colunas 'customer_id' e 'qty_products'
        return qty_purchased

    def create_freq_purchases(dataframe_purchases: pd.DataFrame) -> pd.DataFrame:
        """
        Calculates the purchase frequency of each customer based on the purchase history.

        Parameters
        ----------
        dataframe_purchases : pd.DataFrame
            DataFrame with purchase history of each customer, containing columns CustomerID, InvoiceNo, and InvoiceDate.

        Returns
        -------
        pd.DataFrame
            DataFrame with the purchase frequency of each customer, containing columns CustomerID and frequency.
        """

        # Calculate time range of purchases for each customer
        df_aux = (dataframe_purchases[['customer_id', 'invoice_no', 'invoice_date']]
                  .drop_duplicates()
                  .groupby('customer_id')
                  .agg(max_=('invoice_date', 'max'),
                       min_=('invoice_date', 'min'),
                       days_=('invoice_date', lambda x: ((x.max() - x.min()).days) + 1),
                       buy_=('invoice_no', 'count'))
                  .reset_index())

        # Calculate frequency of purchases for each customer
        df_aux['frequency'] = df_aux[['buy_', 'days_']].apply(
            lambda x: x['buy_'] / x['days_'] if x['days_'] != 0 else 0, axis=1)

        return df_aux

    def create_qty_returns(dataframe_returns: pd.DataFrame) -> pd.DataFrame:
        """
        Computes the total quantity of returned products for each customer.

        Args:
            dataframe_returns: A pandas DataFrame containing information about returns.

        Returns:
            A pandas DataFrame with the total quantity of returned products for each customer.
        """
        # Validate input data
        # if dataframe_returns is None:
        #     raise ValueError("Input DataFrame is empty")
        # if not all(col in dataframe_returns.columns for col in ['CustomerID', 'Quantity']):
        #     raise ValueError("Input DataFrame must contain 'CustomerID' and 'Quantity' columns")

        # Compute quantity of returns
        df_returns = dataframe_returns[['CustomerID', 'Quantity']].groupby('CustomerID').sum().reset_index().rename(columns={'Quantity': 'qty_returns'})
        df_returns['qty_returns'] = df_returns['qty_returns']* -1

        return df_returns

    def run_feature_engineering(dataframe_filtered: pd.DataFrame, dataframe_purchases: pd.DataFrame, dataframe_returns: pd.DataFrame) -> pd.DataFrame:
        """
        Performs feature engineering on the input dataframes and returns a new dataframe with the engineered features.

        Args:
            dataframe_filtered: A pandas DataFrame containing filtered customer order data.
            dataframe_purchases: A pandas DataFrame containing customer purchase data.
            dataframe_returns: A pandas DataFrame containing customer return data.

        Returns:
            A pandas DataFrame with the engineered features for each customer.
        """
        # Check if input dataframes are empty
        if dataframe_filtered.empty:
            raise ValueError("Input DataFrame 'dataframe_filtered' is empty")
        if dataframe_purchases.empty:
            raise ValueError("Input DataFrame 'dataframe_purchases' is empty")
        # if dataframe_returns.empty:
        #     raise ValueError("Input DataFrame 'dataframe_returns' is empty")

        # Check if required columns are present in input dataframes
        required_columns = ['CustomerID', 'InvoiceDate', 'StockCode', 'Quantity', 'UnitPrice']
        for df, name in zip([dataframe_filtered, dataframe_purchases], ['dataframe_filtered', 'dataframe_purchases']):
            missing_columns = set(required_columns) - set(df.columns)
            if missing_columns:
                raise ValueError(f"Missing columns {missing_columns} in input DataFrame '{name}'")
        if 'CustomerID' not in dataframe_returns.columns:
            raise ValueError("Column 'customer_id' not found in input DataFrame 'dataframe_returns'")
        if 'Quantity' not in dataframe_returns.columns:
            raise ValueError("Column 'quantity' not found in input DataFrame 'dataframe_returns'")

        # Perform feature engineering
        df_fengi = keep_features(dataframe_filtered, ['customer_id']).drop_duplicates(ignore_index=True)
        gross_revenue = calculate_gross_revenue(dataframe_purchases)
        df_recency = create_recency(dataframe_purchases, dataframe_filtered)
        df_qty_products = create_quantity_purchased(dataframe_purchases)
        df_freq = create_freq_purchases(dataframe_purchases)
        returns = create_qty_returns(dataframe_returns)

        # Merge dataframes
        dfs = [df_fengi, gross_revenue, df_recency, df_qty_products, df_freq, returns]
        df_fengi = reduce(lambda left,right: pd.merge(left, right, on='CustomerID', how='left'), dfs)

        # Fill NaN values
        df_fengi['qty_returns'] = df_fengi['qty_returns'].fillna(0)

        # Select final features and return dataframe
        features = ['customer_id', 'gross_revenue', 'recency_days', 'qty_products', 'frequency', 'qty_returns']
        return keep_features(df_fengi, features).dropna()
    def table_exists(dataset_table_id: str) -> bool:
        client = bigquery.Client()

        try:
            client.get_table(dataset_table_id)  # Make an API request.
            return True
        except NotFound:
            return False
    
    def save_to_bigquery(dataframe: pd.DataFrame, project_name: str, dataset_table_name: str):
        client = bigquery.Client(project=project_name)

        # Load data to BQ
        job = client.load_table_from_dataframe(dataframe, dataset_table_name)
        job.result()
        
    def run_bq_query(sql: str, project_name: str) -> Union[str, pd.DataFrame]:
        """
        Run a BigQuery query and return the job ID or result as a DataFrame
        Args:
            sql: SQL query, as a string, to execute in BigQuery
        Returns:
            df: DataFrame of results from query,  or error, if any
        """
        
        bq_client = bigquery.Client(project=project_name)

        # Try dry run before executing query to catch any errors
        job_config = bigquery.QueryJobConfig(dry_run=True, use_query_cache=False)
        bq_client.query(sql, job_config=job_config)

        # If dry run succeeds without errors, proceed to run query
        job_config = bigquery.QueryJobConfig()
        client_result = bq_client.query(sql, job_config=job_config)

        job_id = client_result.job_id
        # Wait for query/job to finish running. then get & return data frame
        df = client_result.result().to_arrow().to_pandas()
        print(f"Finished job_id: {job_id}")
        return df

    logging.info('Carregando as tabelas da preparacao de dados')
    query_filtered = f"""SELECT *
                    FROM  `{PROJECT_ID}.{DATASET_ID}.{TABLE_FILTERED_TEMP_ID}`
                    WHERE InvoiceDate <= CURRENT_TIMESTAMP() """
    df_filtered = pd.read_gbq(query=query_filtered, 
                         project_id=PROJECT_NUMBER)
    
    query_purchases = f"""SELECT *
                    FROM  `{PROJECT_ID}.{DATASET_ID}.{TABLE_PURCHASES_TEMP_ID}`
                    WHERE InvoiceDate <= CURRENT_TIMESTAMP() """
    df_purchases = pd.read_gbq(query=query_purchases, 
                         project_id=PROJECT_NUMBER)
    
    query_returns = f"""SELECT *
                    FROM  `{PROJECT_ID}.{DATASET_ID}.{TABLE_RETURNS_TEMP_ID}`"""
    df_returns = pd.read_gbq(query=query_returns, 
                         project_id=PROJECT_NUMBER) 
    
    logging.info('Transformando a coluna InvoiceDate para o tipo DATE')
    column_to_date(df_filtered, 'InvoiceDate')
    column_to_date(df_purchases, 'InvoiceDate')
    
    logging.info(f'Iniciando a verificacao de existencia da tabela: {DATASET_ID}.{TABLE_ID}')
    # Verifica se a tabela existe
    if table_exists(f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}"):
        logging.info('Tabela existente, inicia insercao de novos dados')
        sql_new_customers = f"""SELECT
                                      DISTINCT CustomerID
                                    FROM
                                      `{PROJECT_ID}.{DATASET_ID}.{TABLE_RAW_ID}`
                                    WHERE
                                      InvoiceDate = CURRENT_DATE()"""
        new_customers = pd.read_gbq(sql_new_customers, project_id=PROJECT_NUMBER)['CustomerID'].tolist()
       
        df_fengi = run_feature_engineering(df_filtered.loc[df_filtered['CustomerID'].isin(new_customers)], 
                                           df_purchases.loc[df_purchases['CustomerID'].isin(new_customers)], 
                                           df_returns.loc[df_returns['CustomerID'].isin(new_customers)])
        
        # Inserir os dados na tabela usando SQL
        pandas_gbq.to_gbq(df_fengi, f'{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}', project_id=PROJECT_NUMBER, if_exists='append')
        sql_update_new_customer = f"""
                                        UPDATE `{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}`
                                        SET values = generate_uuid(),
                                        timestamp = current_timestamp()
                                        WHERE CustomerID IN {tuple(new_customers)}"""
        logging.info(sql_update_new_customer)
        run_bq_query(sql_update_new_customer, project_name=PROJECT_ID)
    else:
        # Cria a tabela e insere os dados
        logging.info('Tabela nao existente, cria a tabela e inicia insercao dos dados')
        df_fengi = run_feature_engineering(df_filtered, df_purchases, df_returns)
        pandas_gbq.to_gbq(df_fengi, f'{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}', project_id=PROJECT_NUMBER, if_exists='fail')
        query = f"""CREATE OR REPLACE TABLE `{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}` as (
                    SELECT
                        *,
                        generate_uuid() as values,
                        current_timestamp() as timestamp,
                    FROM 
                        `{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}`);"""
        run_bq_query(query, project_name=PROJECT_ID)

# 4.0 Feature Store 

In [None]:
@component(packages_to_install=["google-cloud-aiplatform", "pyarrow"],
    base_image="python:3.10.6",
    output_component_file="feature_store.yaml")
def create_feature_store():
    import os
    import logging

    from google.cloud import aiplatform
    from google.cloud.aiplatform import Feature, Featurestore
    #https://medium.com/google-cloud/how-do-you-use-feature-store-in-the-mlops-process-on-vertex-ai-802ddca2cac4
    #https://www.youtube.com/watch?v=jXD8Sfx4hvQ
    
    logging.info('Iniciando o componente')
    
    PROJECT_ID = 'gcpproject'
    DATASET_ID = 'gcp_bq'
    TABLE_ID = 'dados_engenharia_features'
    FEATURESTORE_ID="ecommerce_feature_store"
    VALUES_ENTITY_ID = "values"
    VALUES_BQ_SOURCE_URI = f"bq://{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}"
    FEATURE_TIME = 'timestamp'
    REGION = "us-central1"
    
    project_number = os.environ["CLOUD_ML_PROJECT_ID"]
    aiplatform.init(project = project_number, location= REGION)
    
    try:
        # Checks if there is already a Featurestore
        ecommerce_feature_store = aiplatform.Featurestore(f"{FEATURESTORE_ID}")
        logging.info(f"""A feature store {FEATURESTORE_ID} ja existe.""")
    except:
        # Creates a Featurestore
        logging.info(f"""Criando a feature store: {FEATURESTORE_ID}.""")
        ecommerce_feature_store = aiplatform.Featurestore.create(
            featurestore_id=f"{FEATURESTORE_ID}",
            online_store_fixed_node_count=1,
            sync=True,
        )
        
    try:
        # get entity type, if it already exists
        values_entity_type = ecommerce_feature_store.get_entity_type(entity_type_id=VALUES_ENTITY_ID)
    except:
        # else, create entity type
        values_entity_type = ecommerce_feature_store.create_entity_type(
            entity_type_id=VALUES_ENTITY_ID, description="Values Entity", sync=True
        )
    
    values_feature_configs = {
                                "gross_revenue": {
                                    "value_type": "DOUBLE",
                                    "description": "Gross Revenue",
                                    "labels": {"status": "passed"},
                                },
                                "recency_days": {
                                    "value_type": "DOUBLE",
                                    "description": "Recency Days",
                                    "labels": {"status": "passed"},
                                },
                                "qty_products": {
                                    "value_type": "DOUBLE",
                                    "description": "Quantity products",
                                    "labels": {"status": "passed"},
                                },
                                "frequency": {
                                    "value_type": "DOUBLE",
                                    "description": "Frequency",
                                    "labels": {"status": "passed"},
                                },
                                "qty_returns": {
                                    "value_type": "INT64",
                                    "description": "Quantity returns",
                                    "labels": {"status": "passed"},
                            }}
    values_feature_ids = values_entity_type.batch_create_features(
        feature_configs=values_feature_configs, sync=True
    )
    
    VALUES_FEATURES_IDS = [feature.name for feature in values_feature_ids.list_features()]
    
    logging.info(f"""Ingerindo os dados na feature store: {FEATURESTORE_ID}.""")
    values_entity_type.ingest_from_bq(
                                        feature_ids=VALUES_FEATURES_IDS,
                                        feature_time=FEATURE_TIME,
                                        bq_source_uri=VALUES_BQ_SOURCE_URI,
                                        entity_id_field=VALUES_ENTITY_ID,
                                        disable_online_serving=True,
                                        worker_count=2,
                                        sync=True,
                                    )
    # enable api: https://console.developers.google.com/apis/api/cloudresourcemanager.googleapis.com/overview?project=343941956592%22
    #https://aiinpractice.com/gcp-mlops-vertex-ai-feature-store/
    #https://medium.com/hacking-talent/vertexais-feature-store-for-dummies-3d798b45ece4