In [None]:
import pandas as pd
from azure.storage.blob import BlobServiceClient
from io import StringIO

# Azure connection details
connection_string = "DefaultEndpointsProtocol=https;AccountName=lhindstorage;AccountKey=SPFV+Hdv7AWTGTpysBBwEVXJvefoYSSr17wLSWB+onc3PMYbwXEcpFRZvHvXF06eePtCy00PIvZu+AStpz1VzA==;EndpointSuffix=core.windows.net"
container_name = "clean-data"

In [None]:
# File names in the container
files = {
    "order_items": "order_items.csv",
    "orders": "orders.csv",
    "order_payments": "order_payments.csv",
    "products": "products.csv"
}

# Initialize BlobServiceClient
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
container_client = blob_service_client.get_container_client(container_name)

# Function to load a CSV file from Azure Blob Storage into a DataFrame
def load_csv_from_blob(blob_name):
    blob_client = container_client.get_blob_client(blob_name)
    download_stream = blob_client.download_blob()
    content = download_stream.readall().decode("utf-8")
    return pd.read_csv(StringIO(content))

# Load datasets
order_items = load_csv_from_blob(files["order_items"])
orders = load_csv_from_blob(files["orders"])
order_payments = load_csv_from_blob(files["order_payments"])
products = load_csv_from_blob(files["products"])

In [None]:
# Ensure datetime conversion on orders table
orders['order_purchase_timestamp'] = pd.to_datetime(orders['order_purchase_timestamp'])
orders['order_delivered_customer_date'] = pd.to_datetime(orders['order_delivered_customer_date'])

# Rename selected columns for clean merge
orders_subset = orders[['order_id', 'customer_id', 'order_purchase_timestamp', 'order_delivered_customer_date']].copy()
orders_subset.rename(columns={
    'customer_id': 'order_customer_id',
    'order_purchase_timestamp': 'order_purchase_ts',
    'order_delivered_customer_date': 'order_delivered_ts'
}, inplace=True)

# Merge into order_items
order_items = order_items.merge(orders_subset, on='order_id', how='left')

# Create total_price
if 'total_price' not in order_items.columns:
    order_items['total_price'] = order_items['price'].astype(float) + order_items['freight_value'].astype(float)

# Drop rows with missing critical values
order_items.dropna(subset=['order_customer_id', 'order_purchase_ts', 'delivery_time_days'], inplace=True)

# Cumulative sales per customer
order_items = order_items.sort_values(by=['order_customer_id', 'order_purchase_ts'])
order_items['cumulative_sales_per_customer'] = (
    order_items.groupby('order_customer_id')['total_price'].cumsum()
)

#  Merge product category
order_items = order_items.merge(
    products[['product_id', 'product_category_name']],
    on='product_id',
    how='left'
)

#  Drop missing categories
order_items.dropna(subset=['product_category_name'], inplace=True)

# Rolling average delivery time per category (window=5)
order_items = order_items.sort_values(by=['product_category_name', 'order_purchase_ts'])
order_items['avg_delivery_time_per_category'] = (
    order_items
    .groupby('product_category_name')['delivery_time_days']
    .apply(lambda x: x.reset_index(drop=True).rolling(window=5, min_periods=1).mean())
    .reset_index(drop=True)
)

In [None]:
def save_df_to_blob(df, blob_name):
    
    # Write to StringIO (text stream)
    csv_buffer = StringIO()
    df.to_csv(csv_buffer, index=False)

    # Convert to bytes before uploading
    csv_bytes = csv_buffer.getvalue().encode("utf-8")
    container_client.upload_blob(name=blob_name, data=csv_bytes, overwrite=True)

# Save results
save_df_to_blob(order_items, "rolling_avg_cumulative_sales.csv")