
#### Run the cell below to install the required packages for Copilot


In [None]:

#Run this cell to install the required packages for Copilot
%load_ext dscopilot_installer
%activate_dscopilot


# Contoso Hypermarket Orders sales forecast ML notebook

### Set up MLflow experiment tracking

In [None]:
# Set up MLflow for experiment tracking
import mlflow

IS_SAMPLE = False  # if TRUE, use only rows of data for training, otherwise use all data
SAMPLE_ROWS = 5000  # if IS_SAMPLE is True, use only this number of rows for training
EXPERIMENT_NAME = "orders-sales-forecast"  # MLflow experiment name

mlflow.set_experiment(EXPERIMENT_NAME)
mlflow.autolog(disable=True)  # Disable MLflow autologging

### Load orders data from KQL database to prepare for ML modeling

> [!IMPORTANT]
> Make sure you have enough data generated using data emulator.

In [None]:
# Read from Kusto
# kustoQuery = "['orders'] |  mv-expand li = parse_json(line_items) | project order_date, store_id, order_id, product_id = toint(li.product_id), quantity = toint(li.quantity), price = toreal(li.price), item_total = toreal(li.item_total), order_total"
ordersQuery = "['orders'] |  mv-expand li = parse_json(line_items) | project order_date, store_id, order_id, product_id = toint(li.product_id), quantity = toint(li.quantity), price = toreal(li.price), item_total = toreal(li.item_total), order_total"
inventoryQuery = "['inventory'] | project date_time, store_id, product_id, in_stock, retail_price"
productsQuery = "['products'] | project product_id, name, category, photo_path, price_range, stock"
# The query URI for reading the data e.g. https://<>.kusto.data.microsoft.com.
kustoUri = "https://trd-g8jnmstet4k7q79z9v.z1.kusto.fabric.microsoft.com"
# The database with data to be read.
database = "contosohypermarket"
# The access credentials.
accessToken = mssparkutils.credentials.getToken(kustoUri)
ordersDf  = spark.read\
    .format("com.microsoft.kusto.spark.synapse.datasource")\
    .option("accessToken", accessToken)\
    .option("kustoCluster", kustoUri)\
    .option("kustoDatabase", database)\
    .option("kustoQuery", ordersQuery).load()
inventoryDf  = spark.read\
    .format("com.microsoft.kusto.spark.synapse.datasource")\
    .option("accessToken", accessToken)\
    .option("kustoCluster", kustoUri)\
    .option("kustoDatabase", database)\
    .option("kustoQuery", inventoryQuery).load()
productsDf  = spark.read\
    .format("com.microsoft.kusto.spark.synapse.datasource")\
    .option("accessToken", accessToken)\
    .option("kustoCluster", kustoUri)\
    .option("kustoDatabase", database)\
    .option("kustoQuery", productsQuery).load()

In [None]:
### Verifying token availability
import requests

# Define a simple test query
test_query = "['inventory'] | take 1"

# Define the request headers with the access token
headers = {
    "Authorization": f"Bearer {accessToken}",
    "Content-Type": "application/json"
}

# Define the request payload
payload = {
    "db": database,
    "csl": test_query
}

# Make the request to the Kusto cluster
response = requests.post(f"{kustoUri}/v1/rest/query", headers=headers, json=payload)

# Check if the request was successful
if response.status_code == 200:
    print("Access token is valid and has the necessary permissions.")
else:
    print(f"Failed to validate access token. Status code: {response.status_code}, Response: {response.text}")

## Step 2: Perform Exploratory Data Analysis

### Import libraries

Before any analysis, you need to import the required libraries.

In [None]:
# Importing required libraries
import warnings
import itertools
import numpy as np
import matplotlib.pyplot as plt
warnings.filterwarnings("ignore")
plt.style.use('fivethirtyeight')
import pandas as pd
import statsmodels.api as sm
import matplotlib
matplotlib.rcParams['axes.labelsize'] = 14
matplotlib.rcParams['xtick.labelsize'] = 12
matplotlib.rcParams['ytick.labelsize'] = 12
matplotlib.rcParams['text.color'] = 'k'
from sklearn.metrics import mean_squared_error,mean_absolute_percentage_error


### Display raw data

In [None]:
# Display data in dataframes.
ordersDf.show()
productsDf.show()
inventoryDf.show()

In [None]:
# Importing functions
from pyspark.sql import functions as F

# Filter the orders DataFrame for entries from Chicago
chicago_sales_df = ordersDf.filter(ordersDf.store_id == 'CHI')

# Join the orders DataFrame with the products DataFrame to include product_id and name
sales_chicago_df = (
    chicago_sales_df
    .join(productsDf, on='product_id', how='inner')  # Join on product_id
    .select(
        'order_date',
        'product_id',
        'name',  # Include product name
        'price',
        'quantity',
        (chicago_sales_df.price * chicago_sales_df.quantity).alias('sales')  # Calculate sales
    )
)

# Show the new DataFrame with the additional columns
sales_chicago_df.show()

In [None]:
# Show distinct product names in the dataframe
sales_chicago_df.select("name").distinct().show()

In [None]:
### Pre-processing data
# Dropping columns that have no impact. Retaining Order Date and Sales.
# cols = ['order_id', 'product_id', 'price', 'order_total']
# products = products.drop(*cols)
# products.show()
# Importing functions
from pyspark.sql import functions as F

# Filter the orders DataFrame for entries from Chicago
chicago_sales_df = ordersDf.filter(ordersDf.store_id == 'CHI')

# Join the orders DataFrame with the products DataFrame to include product_id and name
sales_chicago_df = (
    chicago_sales_df
    .join(productsDf, on='product_id', how='inner')  # Join on product_id
    .select(
        'order_date',
        (chicago_sales_df.price * chicago_sales_df.quantity).alias('sales')  # Calculate sales
    )
)

# Show the new DataFrame with only order_date and sales columns
sales_chicago_df.show()

In [None]:
# products = products.groupBy('product_id')
# Sort the DataFrame by 'order_date'
sorted_sales_chicago_df = sales_chicago_df.orderBy('order_date')

# Show the sorted DataFrame
sorted_sales_chicago_df.show()

### Data Preparation for ML Experiment

In [None]:
from pyspark.sql import functions as F

# Truncate 'order_date' to the first day of the month and group by it, summing the 'sales'
grouped_sales_chicago_df = sorted_sales_chicago_df.withColumn(
    'order_date_trunc', F.date_trunc('month', 'order_date')
).groupBy('order_date_trunc').agg(F.sum('sales').alias('sales_sum'))

In [None]:
# Add 67 months to 'order_date_trunc'
adjusted_sales_chicago_df = grouped_sales_chicago_df.withColumn(
    'adjusted_order_date', F.expr("add_months(order_date_trunc, 67)")
)

In [None]:
# Select relevant columns
final_sales_chicago_df = adjusted_sales_chicago_df.select(
    'adjusted_order_date', 'sales_sum'
)

In [None]:
# Find the maximum 'adjusted_order_date'
max_date = final_sales_chicago_df.agg(F.max('adjusted_order_date')).collect()[0][0]
print("Maximum adjusted order date:", max_date)

In [None]:
# Show the resulting DataFrame
final_sales_chicago_df.show()

##### Demonstrate the impact order date on the sales for all sales in Chicago.

In [None]:
import matplotlib.pyplot as plt

# Step 1: Convert the Spark DataFrame to Pandas
final_sales_chicago_pd_df = final_sales_chicago_df.toPandas()

# Step 2: Plot the impact of 'adjusted_order_date' on 'sales_sum'
plt.figure(figsize=(12, 3))
plt.plot(final_sales_chicago_pd_df['adjusted_order_date'], final_sales_chicago_pd_df['sales_sum'])

# Add labels and title
plt.xlabel('Order Date')
plt.ylabel('Sales')
plt.title('Impact of Order Date on Sales')

# Step 3: Show the plot
plt.show()

In [None]:
### Debugging
# Check if the DataFrame has data
print(final_sales_chicago_pd_df.head())  # This will print the first few rows to check the contents
print(final_sales_chicago_pd_df.info())  # This will show if there are any NaN values or data type issues
# Ensure 'adjusted_order_date' is in datetime format
final_sales_chicago_pd_df['adjusted_order_date'] = pd.to_datetime(final_sales_chicago_pd_df['adjusted_order_date'])

# Sort the DataFrame by date
final_sales_chicago_pd_df = final_sales_chicago_pd_df.sort_values('adjusted_order_date')

# Check the data after sorting
print(final_sales_chicago_pd_df.head())


In [None]:
import pandas as pd
import matplotlib.pyplot as plt

# Filtering Chicago data
sales_chicago_df = ordersDf.filter(ordersDf.store_id == 'CHI')

# Creating a 'sales' column by multiplying price and quantity
sales_chicago_df = sales_chicago_df.withColumn('sales', sales_chicago_df['price'] * sales_chicago_df['quantity'])

# Selecting relevant columns: order_date, sales
sales_chicago_df = sales_chicago_df.select('order_date', 'sales')

# Converting the DataFrame to Pandas
sales_chicago_pd_df = sales_chicago_df.toPandas()

# Confirming that 'order_date' is in datetime format
sales_chicago_pd_df['order_date'] = pd.to_datetime(sales_chicago_pd_df['order_date'])

# Group by 'order_date' and sum the sales
sales_chicago_pd_df = sales_chicago_pd_df.groupby('order_date')['sales'].sum().reset_index()

# Resample the data to get monthly sales
sales_chicago_pd_df.set_index('order_date', inplace=True)
sales_chicago_monthly = sales_chicago_pd_df['sales'].resample('MS').sum().reset_index()

# Plot sales over time
plt.figure(figsize=(12, 6))
plt.plot(sales_chicago_monthly['order_date'], sales_chicago_monthly['sales'], marker='o')

# Adding labels and title
plt.xlabel('Order Date')
plt.ylabel('Total Sales')
plt.title('Impact of Order Date on Sales in Chicago')

plt.show()