# Doc

## What is this for?

As of 2023Q2, Hungerstation (HS) data is not joineable with our tables due to BigQuery restriction. HS is stored in Europe, while Pricing is in USA. BigQuery doesn't allow to join data that are not stored in the same place.

Given this situation, we must first download the data to a local machine and then upload it back into our dataset dh-logistics-product-ops.pricing to have it available for analysis. 

This Notebook does that. It takes X days of HS data, starting always from the current date, makes a temporary file with it and loads it back to BigQuery but to our table in the Pricing dataset. 

## How to use it

- the only parameter to set is __DAYS_BACK__ which is the number of lookback days we want to load data. For example, today is 5th April and days_back = 14, then we'd fetch data from 22th March until 5th April.

- Once __DAYS_BACK__ is set, click Runtime -> Run all. Accept all permissions request and let it run. At the end of the notebook, there will be log messages indicating the job progress. 

In [1]:
DAYS_BACK = 14

# Code

## Imports

In [2]:
!pip install polars -q
!pip install tqdm -q

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m16.4/16.4 MB[0m [31m33.8 MB/s[0m eta [36m0:00:00[0m
[?25h

In [3]:
import os
import tempfile
from datetime import datetime, timedelta
import pyarrow as pa
import polars as pl
from google.cloud import bigquery

## Queries

In [4]:
STAGING_QUERY = """

DECLARE start_date, end_date DATE;

SET end_date = "{0}";
SET start_date = "{1}";

SELECT 
    order_id as platform_order_code
    , operation_day
    , order_created_at_sa
    , branch_id
    , branch_name_en
    , OD_delivery_fee
    , is_acquisition
    , rdf_offer_applied
    , rdf_offer_restaurant_max_charge
    , rdf_offer_type
    , is_subscribed
    , is_user_subscribed
    , delivery_fee_discount
    , subscribed_discount_amount

FROM `dhub-hungerstation.reporting_prod.orders_fact_non_pii` 
WHERE operation_day BETWEEN start_date AND end_date
AND rdf_offer_applied = 1;
"""



MERGE_QUERY = """
MERGE INTO `dh-logistics-product-ops.pricing.{0}` prd
  USING  `dh-logistics-product-ops.pricing.{1}` stg
    ON prd.platform_order_code = stg.platform_order_code
  WHEN MATCHED THEN
    UPDATE SET
        platform_order_code = stg.platform_order_code
        , operation_day = stg.operation_day
        , order_created_at_sa = stg.order_created_at_sa
        , branch_id = stg.branch_id
        , branch_name_en = stg.branch_name_en
        , OD_delivery_fee = stg.OD_delivery_fee
        , rdf_offer_applied = stg.rdf_offer_applied
        , rdf_offer_restaurant_max_charge = stg.rdf_offer_restaurant_max_charge
        , rdf_offer_type = stg.rdf_offer_type
        , is_subscribed = stg.is_subscribed
        , is_user_subscribed = stg.is_user_subscribed
        , delivery_fee_discount = stg.delivery_fee_discount
        , subscribed_discount_amount = stg.subscribed_discount_amount
  WHEN NOT MATCHED THEN
    INSERT ROW
  ;
"""

## Operators

In [5]:

class HSTransferOperator:
    """
    Class contains the logic to run a DAG required to make HungerStation RDF data available in
    the pricing dataset
    """

    def __init__(
    self,
    env:str,
    project_id:str,
    dest_project_id:str,
    dataset_id:str,
    credentials:str = None
    ):
        self.env = env
        self.project = project_id
        self.dest_project_id = dest_project_id
        self.dataset_id = dataset_id
        self.credentials = credentials
        self.init_bq_client()

    def init_bq_client(self):
        if self.env == "LOCAL":
            self.credentials = self.credentials
            os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = self.credentials
            self.client = bigquery.Client(project=self.project)

        if self.env == "COLAB":
            from google.colab import auth, drive
            auth.authenticate_user()
            print('Authenticated')
            drive.mount('/content/gdrive')
            self.client = bigquery.Client(project=self.project)
            # set the working directory to the user gdrive
            os.chdir("/content/gdrive/MyDrive")

    def _load_job_config(self) -> bigquery.LoadJobConfig():
        job_config = bigquery.LoadJobConfig()
        job_config.source_format = bigquery.SourceFormat.PARQUET
        job_config.create_disposition = "CREATE_IF_NEEDED"
        job_config.write_disposition = "WRITE_TRUNCATE"
        return job_config


    def _get_query_as_arrow_table(self, query:str) -> pa.Table:
        """Loads a Bigquery table dataframe into a Arrow Table.

        Args:
            query (str): The query to run

        Returns:
            (pa.Table)
        """
        return self.client.query(query).to_arrow(progress_bar_type="tqdm")

    def load_bigquery_into_polars(self, query:str) -> pl.DataFrame:
        """Loads a Bigquery table dataframe into a Polars DataFrame.

        Args:
            query (str): The query to run

        Returns:
            (pl.DataFrame)
        """
        arrow_data = self._get_query_as_arrow_table(query)
        df_polars = pl.from_arrow(arrow_data)
        del arrow_data
        return df_polars


    def load_polars_to_bigquery(
        self,
        dataframe: pl.DataFrame,
        job_config: bigquery.LoadJobConfig(),
        table_name:str
    ):
        """Loads a Polars dataframe to BigQuery.

        Args:
            project_id (str): The project ID for the BigQuery destination.
            dataset_id (str): The dataset ID for the BigQuery destination.
            table_name (str): The table name for the BigQuery destination.
            dataframe (pl.DataFrame): The Polars dataframe to load.

        Returns:
            None
        """

        #save local parquet
        with tempfile.NamedTemporaryFile(suffix='.parquet', delete=False) as temp_file:
            dataframe.write_parquet(temp_file.name)
            file_path = temp_file.name

        # set table name
        table_ref = f"{self.dest_project_id}.{self.dataset_id}.{table_name}"

        # Load the data into BigQuery
        with open(file_path, "rb") as temp_parquet:
            job = self.client.load_table_from_file(temp_parquet, table_ref, job_config= job_config)
            job.result()

        print(f"Loaded {len(dataframe)} rows to BigQuery table {table_name} in {self.dest_project_id}:{self.dataset_id}")

    def create_staging_data(self, query:str, table_name:str, end_date:datetime, days_back:str):
        """Operator to triggert the staging part of the DAG.
        This runs a query that fetch data from HS local table, save a temporary local copy and then
        load such copy to pricing dataset

        Args:
            query (str): query to load HS data
            table_name (str): name of the destination table
            end_date (datetime): run date of the task
            days_back (str): how many days back we want to fetch data
        """
        print("Initiating staging task...")
        query_with_dates = query.format(*self._return_job_dates(end_date, days_back))
        polars_df = self.load_bigquery_into_polars(query_with_dates)
        job_config = self._load_job_config()

        print("Loading from BigQuery into Polars successful")
        self.load_polars_to_bigquery(polars_df, job_config, table_name)

    def merge_into_prod(self, query:str, staging_table:str, prod_table:str):
        """Function that creates a BQ job that merge the staging data into production table.

        Args:
            query (str): Merge query statement
            staging_table (str): staging pricing HS table
            prod_table (str): production pricing HS table
        """
        print("Initiating merging task...")
        job = self.client.query(
            query.format(prod_table, staging_table)
        )
        job.result()
        print("Merge has finished")

    def _return_job_dates(self, end_date:datetime, days_back:datetime) -> list[str]:
        """Return the run period  as list of string

        Args:
            end_date (datetime): run date of the task
            days_back (datetime): how many days back we want to fetch data

        Returns:
            list[str]: list of [start_date, end_date] used to fetch data
        """
        start_date = end_date - timedelta(days=days_back)
        return [end_date.strftime("%Y-%m-%d"), start_date.strftime("%Y-%m-%d")]
    
    def run_dag(
        self
        , staging_query:str
        , merge_query:str
        , staging_table_name:str
        , production_table_name:str
        , end_date:datetime
        , days_back:int
    ):
        """Run whole DAG. Updates Pricing HS RDF data

        Args:
            staging_query (str): Query to create staging table
            merge_query (str): Query to merge staging into prod table
            staging_table_name (str): Staging table name
            production_table_name (str): Production table name
            end_date (datetime): run date of the task
            days_back (int): how many days back we want to fetch data
        """
        self.create_staging_data(
            staging_query
            , staging_table_name
            , end_date
            , days_back
        )

        self.merge_into_prod(
            merge_query
            , staging_table_name
            , production_table_name
        )



# DAG

In [6]:
# init operator
hs_transfer_operator = HSTransferOperator(
    project_id = "logistics-data-staging-flat"
    , dest_project_id = "dh-logistics-product-ops"
    , dataset_id = "pricing"
    , env="COLAB"
)

# create staging data
hs_transfer_operator.run_dag(
            staging_query = STAGING_QUERY
        , merge_query = MERGE_QUERY
        , staging_table_name = "hs_sa_rdf_orders_stg"
        , production_table_name = "hs_sa_rdf_orders"
        , end_date = datetime.today()
        , days_back = DAYS_BACK
)

Authenticated
Mounted at /content/gdrive
Initiating staging task...
Job ID 03c07840-57a7-4e80-91a6-0344b166776e successfully executed: 100%|[32m██████████[0m|
Downloading: 100%|[32m██████████[0m|
Loading from BigQuery into Polars successful
Loaded 1482671 rows to BigQuery table hs_sa_rdf_orders_stg in dh-logistics-product-ops:pricing
Initiating merging task...
Merge has finished
