# Incremental Load

For the Initial Load we will use Transform_into_data_model.ipynb, but for subsequent incremental loads we will use this file

Assuming new up coming data will be filled in new_inventory_df and new_orders_df

In [113]:
import pandas as pd

# Note the upcoming data will be filled in new_inventory_df and new_orders_df, here I am only
# taking the existing data for implemntation purposes

new_inventory_df = pd.read_csv('../processed_data/cleaned_data/cleaned_inventory.csv')
new_orders_df = pd.read_csv('../processed_data/cleaned_data/cleaned_orders.csv')



## Script for Appending to Existing Orders Data

In [None]:
from google.cloud import bigquery
from google.oauth2 import service_account
import pandas as pd

# Path to the service account key file
key_path = "C:/Users/epranei/Downloads/calm-cove-423918-t0-ce8d5f6922f1.json"

# Authenticate with Google Cloud
credentials = service_account.Credentials.from_service_account_file(key_path)
client = bigquery.Client(credentials=credentials, project=credentials.project_id)

# Dataset and table names
project_id = 'calm-cove-423918-t0'
dataset_id = 'Dema'
orders_table_id = f'{project_id}.{dataset_id}.Orders'
inventory_table_id = f'{project_id}.{dataset_id}.Inventory'



# Convert dateTime column to datetime type
new_orders_df['dateTime'] = pd.to_datetime(new_orders_df['dateTime'])

# Replace NaN values in 'campaign' column with None
new_orders_df['campaign'] = new_orders_df['campaign'].replace({pd.NA: None, float('nan'): None})

# Define schema for Orders table
orders_schema = [
    bigquery.SchemaField("orderId", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("productId", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("currency", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("quantity", "INT64", mode="NULLABLE"),
    bigquery.SchemaField("shippingCost", "FLOAT64", mode="NULLABLE"),
    bigquery.SchemaField("amount", "FLOAT64", mode="NULLABLE"),
    bigquery.SchemaField("channel", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("channelGroup", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("campaign", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("dateTime", "TIMESTAMP", mode="REQUIRED")
]

# Load data into Orders table
job_config = bigquery.LoadJobConfig(schema=orders_schema, write_disposition=bigquery.WriteDisposition.WRITE_APPEND)
job = client.load_table_from_dataframe(new_orders_df, orders_table_id, job_config=job_config)
job.result()  # Wait for the job to complete.
print("Incremental load for Orders table completed successfully.")


## In order to do Incremental load for dimension table Inventory we take following steps

1. Do an outer join of Upcoming Source dataset with existing in Dataware house Target dataset
     
2. Determining rows for Update, Insert and Delete Operations using Hash mapping and outer join 

- equal hashes means NO CHANGE
- Left_only means DELETE in target
- right_only means INSERT
- both_ with unequal hashes means UPDATE

3. Pushing the changes to dataware house or Google Big Query in our case (Make sure the Inventory Surrogate key is updated appropriately)


In [104]:
import pandas as pd
import hashlib
from google.cloud import bigquery
from google.oauth2 import service_account
from datetime import datetime

# Path to the service account key file
key_path = "C:/Users/epranei/Downloads/calm-cove-423918-t0-ce8d5f6922f1.json"

# Authenticate with Google Cloud
credentials = service_account.Credentials.from_service_account_file(key_path)
client = bigquery.Client(credentials=credentials, project=credentials.project_id)

# Dataset and table names
project_id = 'calm-cove-423918-t0'
dataset_id = 'Dema'
inventory_table_id = f'{project_id}.{dataset_id}.Inventory'

DATE_FORMAT = "%Y-%m-%d"
slowly_changing_cols = ["name", "category", "subCategory"]

# Function to strip whitespaces
def strip_whitespaces(df, cols):
    for col in cols:
        df[col] = df[col].str.strip()
    return df

# Function to calculate hash using hashlib
def calculate_hash(df, cols):
    return df[cols].astype(str).apply(lambda row: hashlib.md5(''.join(row).encode()).hexdigest(), axis=1)


def remove_suffixes(df):
    df.columns = df.columns.str.replace('_target', '').str.replace('_source', '')
    return df

# Load the target inventory data from BigQuery
target_df = client.query(f"SELECT * FROM `{inventory_table_id}`").to_dataframe()

# Rename inventory_skey column to include suffix
target_df = target_df.rename(columns={'inventory_skey': 'inventory_skey_target'})

# Strip whitespaces from columns in target_df
target_df = strip_whitespaces(target_df, slowly_changing_cols)

# Add SCD Type 2 columns to new inventory DataFrame
new_inventory_df['startDate'] = datetime.now().strftime(DATE_FORMAT)
new_inventory_df['endDate'] = None
new_inventory_df['isCurrent'] = True

# Strip whitespaces from columns in new_inventory_df
new_inventory_df = strip_whitespaces(new_inventory_df, slowly_changing_cols)

# Calculate hash for slowly changing columns
target_df['hash_target'] = calculate_hash(target_df, slowly_changing_cols)
new_inventory_df['hash_source'] = calculate_hash(new_inventory_df, slowly_changing_cols)

# Determine action: NOCHANGE, DELETE, INSERT, UPDATE
merged_df = pd.merge(target_df, new_inventory_df, on='productId', suffixes=('_target', '_source'), how='outer', indicator=True)
merged_df['Action'] = 'NOCHANGE'
merged_df.loc[merged_df['_merge'] == 'left_only', 'Action'] = 'DELETE'
merged_df.loc[merged_df['_merge'] == 'right_only', 'Action'] = 'INSERT'
merged_df.loc[(merged_df['_merge'] == 'both') & (merged_df['hash_target'] != merged_df['hash_source']), 'Action'] = 'UPDATE'






# merged_df = merged_df.drop(columns=['hash'])

# # # Define the columns to keep from the merged DataFrame
target_columns = ['productId', 'name_target', 'category_target', 'subCategory_target', 'quantity_target', 'startDate_target', 'endDate_target', 'isCurrent_target', 'inventory_skey_target']
source_columns = ['productId', 'name_source', 'category_source', 'subCategory_source', 'quantity_source', 'startDate_source', 'endDate_source', 'isCurrent_source','inventory_skey_target']

# Process unchanged records
unchanged_records = merged_df[merged_df['Action'] == 'NOCHANGE'][target_columns]

# Rename the column by removing suffix source 



# # # Process insert records
max_sk = target_df['inventory_skey_target'].max() if not target_df.empty else 0
insert_records = merged_df[merged_df['Action'] == 'INSERT'][source_columns]
insert_records['inventory_skey_target'] = range(max_sk + 1, max_sk + 1 + len(insert_records))
insert_records.columns = target_columns

# # # Process update records
update_records_old = merged_df[merged_df['Action'] == 'UPDATE'][target_columns]
update_records_new = merged_df[merged_df['Action'] == 'UPDATE'][source_columns]


# Depreciate old record
update_records_old['endDate_target'] = datetime.now().strftime(DATE_FORMAT)
update_records_old['isCurrent_target'] = False
update_records_old.columns = target_columns

# Update to new record
update_records_new['startDate_source'] = datetime.now().strftime(DATE_FORMAT)
update_records_new['endDate_source'] = None
update_records_new['isCurrent_source'] = True

update_records_new.columns = source_columns

# # # Process delete records
delete_records = merged_df[merged_df['Action'] == 'DELETE'][target_columns]
delete_records['endDate_target'] = datetime.now().strftime(DATE_FORMAT)
delete_records['isCurrent_target'] = False
delete_records.columns = target_columns

# Remove all suffix '_source' or '_target' 
unchanged_records = remove_suffixes(unchanged_records)
insert_records = remove_suffixes(insert_records)
update_records_old = remove_suffixes(update_records_old)
update_records_new = remove_suffixes(update_records_new)
delete_records = remove_suffixes(delete_records)



# # # Combine all records
resultant_df = pd.concat([unchanged_records, insert_records, update_records_old, update_records_new, delete_records])

resultant_df.to_csv("resultant_df.csv", index=False)

# Load the resultant data back to BigQuery
job_config = bigquery.LoadJobConfig(write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE)
job = client.load_table_from_dataframe(resultant_df, inventory_table_id, job_config=job_config)
job.result()  # Wait for the job to complete.

print(f"Incremental load with SCD Type 2 logic for inventory data completed successfully. Data saved to {inventory_table_id}")




In [112]:
# Testing

# new_inventory_df = pd.DataFrame({
#     'productId': ['prod1', 'prod2', 'prod3'],
#     'name': ['Product 1 updated', 'Product 2', 'Product 3'],
#     'category': ['Category 1 updated', 'Category 2', 'Category 3'],
#     'subCategory': ['Subcategory 1', 'Subcategory 2', 'Subcategory 3'],
#     'quantity': [150, 200, 300]
# })