<a href="https://colab.research.google.com/github/Analyzing_Sales_Data_with_Apache_Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

SUBJECT: BIG DATA </br>
CLASS : MSE12#HCM <br>
MSHV : 22MSE23094 </br>
MSE STUDENT's NAME : HA QUYET THANG </br>
INSTRUCTOR: DR. MAI HOANG BAO AN

# Final Term Exam 1 - Analyzing Sales Data with Apache Spark

## Problem Statement
In this project, you will use Apache Spark to analyze a large dataset containing
sales data posted here. The dataset includes information such as Sample Sales Data, Order Info, Sales, Customer, Shipping, etc...

Data get from kaggle and và upload to persional kaggle :
[Dataset](https://www.kaggle.com/datasets/thanghq/sales-data-sample)

+ Your goal is to perform various analyses on this dataset to derive valuable insights for the company.

## Setup environment

In [None]:
# Update package information
!apt-get update

# Install OpenJDK 8 (Java Development Kit) headless version
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Download Apache Spark 3.1.1 binary with Hadoop 3.2
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz

# Extract the downloaded Spark archive
!tar xf spark-3.1.1-bin-hadoop3.2.tgz

# Install findspark library to help Python locate Spark
!pip install -q findspark

# Set environment variables for Java and Spark home directories
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

# Initialize findspark to enable locating Spark in Python
import findspark
findspark.init()

In [None]:
# Install the Kaggle library
!pip install -q kaggle

# # Import necessary modules for file upload
# from google.colab import files

# # Upload the Kaggle API key file
# files.upload()

# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load in

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import matplotlib.pyplot as plt
import seaborn as sns

# Input data files are available in the "../input/" directory.
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

In [None]:
# Cấu hình secret key để xác thực truy cập đến Kaggle trên môi trường
os.environ['KAGGLE_USERNAME'] = 'thanghq'
os.environ['KAGGLE_KEY'] = '5a9cdf308c14b0866956569d327d2e82'

if not os.path.exists('./kaggle/sales_data_sample.csv'):
  !kaggle datasets download -d thanghq/sales-data-sample -p ./ # Download dữ liệu từ Kaggle
  !unzip  ./*.zip -d ./kaggle
else:
    print('File already exists, no need to download or unzip.')

print(os.listdir('./kaggle'))

## 1. Data Preparation

### Load the sales dataset into Spark RDDs or DataFrames.

In [None]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

In [None]:
sc = SparkContext(conf=SparkConf())
spark = SparkSession(sparkContext=sc)

In [None]:
import pandas as pd
print('Pandas version: {}'. format(pd.__version__))

In [None]:
data_raw = spark.read.csv('/content/kaggle/sales_data_sample.csv', inferSchema=True, header=True)

# preview the data
# data type
print('-'*10, 'data types', '-'*10)
pd.DataFrame(data_raw.dtypes)

In [None]:
# Convert Spark DataFrame to Pandas DataFrame
raw_df = data_raw.toPandas()

# Create a copy of the Pandas DataFrame
df = raw_df.copy()

df_org = df

# Display the first few rows of the DataFrame
df.head()

In [None]:
# data summary
print('-'*10, 'data summary', '-'*10)
data_raw.describe().toPandas()

In [None]:
# view a small subset of the data
print('-'*10, 'randomely sample 1% data to view', '-'*10)
data_raw.randomSplit([0.01, 0.99])[0].toPandas()

In [None]:
df.shape

In [None]:
df.head()

### Cleanse the data by handling missing values, outliers, or any inconsistencies.

#### Checking for null values

In [None]:
# Display information about the DataFrame
df.info()

In [None]:
# Check NUll Values
isnull=pd.DataFrame(df.isnull().sum())
isnull.style.background_gradient(cmap='Blues')

In [None]:
# Calculate the total number of missing values for each column
total_null = df.isnull().sum().sort_values(ascending=False)

# Calculate the percentage of missing values for each column
percent_null = (df.isnull().sum() / df.isnull().count()).sort_values(ascending=False)

# Combine the total_null and percent_null into a single DataFrame
missing_data = pd.concat([total_null, percent_null], axis=1, keys=['total_null', 'percent_null'])

# Display the DataFrame showing total_null and percent_null for each column
missing_data

In [None]:
# Removing the variables which dont add significant value to the analysis or majority null value.
to_drop = ['PHONE','ADDRESSLINE1','ADDRESSLINE2','STATE','POSTALCODE','TERRITORY']
df = df.drop(to_drop, axis=1)

In [None]:
df.head()

#### Checking for inconsistent data types

In [None]:
# Display the data types of each column in the DataFrame
print(df.dtypes)

In [None]:
df['ORDERDATE'] = pd.to_datetime(df['ORDERDATE'])

#### Summary stats of Quantitative variables

In [None]:
quant_vars = ['QUANTITYORDERED','PRICEEACH','SALES','MSRP']
df[quant_vars].describe()

In [None]:
df.sort_values(by = ['ORDERDATE'], inplace = True)
df.set_index('ORDERDATE', inplace = True)

In [None]:
df.head()

#### Outlier detection
https://www.kaggle.com/code/shohanurrahaman/intro-to-data-science-data-cleaning-02

Outlier detection is very important step in data cleaning and exploring. Outliers can be detected both visually and mathematically. Some plots are very helpful in visualizing outliers, such as box plots and scatter plots. However, it is sometimes tricky to decide whether or not to remove the outliers. We should remove outliers when we are certain that these outliers were results of some errors.

##### Filtering outlier

In [None]:
print('original shape of dataset :',df.shape)

cols = ['SALES', 'MSRP','QUANTITYORDERED']
new_df = df[cols]

#calculation
Q1 = new_df.quantile(0.25)
Q3 = new_df.quantile(0.75)
IQR = Q3-Q1
maximum = Q3+1.5*IQR
minimum = Q1-1.5*IQR
print(minimum)

#filter outlier
cond = (new_df <= maximum) & (new_df >= minimum)
'''
we specify that the condition should be true for all three columns by using the all function with axis=1 argument.
This gives us a list of True/False against each row.
If a row has all three True values, then it gives a True value to that row
'''
cond = cond.all(axis=1)
df = df[cond]
print('filtered dataset shape : ',df.shape)

#plot again to check that if has any outlier
df.plot(kind='box', subplots=True, sharex=False, sharey=False, figsize=(10,10), layout=(3,4))
plt.show()

##### Scatter Plot

In [None]:
new_df = df[['SALES','QUANTITYORDERED','MSRP']]
pd.plotting.scatter_matrix(new_df, figsize = (10,10))

##### Z-Score

In [None]:
print('shape of original data :',df.shape)

mean = new_df['QUANTITYORDERED'].mean()
std_dev = new_df['QUANTITYORDERED'].std()

# find z scores
z_scores = (new_df['QUANTITYORDERED'] - mean) / std_dev
z_scores = np.abs(z_scores)

print(z_scores.min())

#filter data
z_df = new_df[z_scores<3]
print('shape of filtered data : ',z_df.shape)

#plot data
z_df['QUANTITYORDERED'].plot(kind='box')
plt.show()

## 2. EDA - Exploratory Data Analysis and Visualization

### Perform descriptive statistics to understand the basic characteristics of the dataset and visualize.

In [None]:
df.describe().transpose()

In [None]:
ys=df.groupby(['YEAR_ID'])['SALES'].sum().reset_index()
ys.head()

In [None]:
# Group the DataFrame by 'PRODUCTLINE' and 'QTR_ID', count occurrences, and reset index
qtrly = df.groupby(['PRODUCTLINE']).QTR_ID.value_counts().reset_index(name='COUNTS')
qtrly.head()

# Create a figure with a specific size
plt.figure(figsize=(10,8))

# Define colors for the strip plot
colors={'edgecolor':'black','linewidth':1}

# Create a strip plot using Seaborn
sns.stripplot(x='PRODUCTLINE', y='COUNTS', data=qtrly, hue='QTR_ID', palette='bright', size=14, **colors)

# Set the background style of the plot to 'whitegrid'
sns.set_style('whitegrid')

# Set the title of the plot
plt.title("PRODUCTS SOLD ACCORDING TO QUARTER")

# Set labels for the x-axis and y-axis
plt.xlabel('PRODUCTS')
plt.ylabel('NUMBER OF ITEMS SOLD')

# Display the plot
plt.show()

### Explore the distribution of sales across different products, customers, and time periods and visualize.


This plot visualizes the relationship between the product line and sales.

In [None]:
# Create a figure with a specific size for the correlation matrix heatmap
plt.figure(figsize=(10, 10))

# Calculate the correlation matrix for the DataFrame
corr_matrix = df.corr()

# Create a heatmap with annotations using Seaborn
sns.heatmap(corr_matrix, annot=True)

# Set the title of the heatmap
plt.title("Correlation Matrix")

# Display the plot
plt.show()

In [None]:
# Create a bar plot to visualize sales across different product lines
plt.figure(figsize=(15, 5))

# Set the layout of the plot
plt.tight_layout()

# Use Seaborn to create a bar plot
sns.barplot(x='PRODUCTLINE', y='SALES', data=df)

# Set the title of the plot
plt.title("Sales Across Different Product Lines")

# Show the plot
plt.show()

The x-axis represents the unique values of 'STATUS', and the y-axis represents the count of occurrences

In [None]:
# Create a count plot to visualize the distribution of 'STATUS' values
plt.figure(figsize=(12, 6))

# Use Seaborn to create a count plot
sns.countplot(df['STATUS'])

# Set the title of the plot
plt.title("Distribution of Status Values")

# Show the plot
plt.show()

In [None]:
# Create a line plot to visualize the sales trend over time
plt.figure(figsize=(20, 8))

# Use Seaborn to create a line plot
sns.lineplot(x='ORDERDATE', y='SALES', data=df)

# Set the title of the plot
plt.title("Sales Trend Over Time")

# Set labels for the x and y axes
plt.xlabel('Order Date')
plt.ylabel('Sales')

# Show the plot
plt.show()

Then visualized which products give the highest revenue

In [None]:
# Group by 'PRODUCTLINE', sum the 'SALES', and sort in descending order
top_product = df.groupby(['PRODUCTLINE']).sum().sort_values('SALES', ascending=False)

# Select only the 'SALES' column
top_product = top_product[['SALES']]

# Reset the index to make 'PRODUCTLINE' a regular column
top_product.reset_index(inplace=True)

# Calculate the total revenue from all products
total_revenue_product = top_product['SALES'].sum()

# Format the total revenue as a string with '$' symbol
total_revenue_product = f'${int(total_revenue_product):,}'

In [None]:
# Set default figure size and font parameters
plt.rcParams['figure.figsize'] = (13, 7)
plt.rcParams['font.size'] = 12.0
plt.rcParams['font.weight'] = 6

# Define a function for autopct formatting
def autopct_format(values):
    def my_format(pct):
        total = sum(values)
        val = int(round(pct * total / 100.0))
        return ' ${v:d}'.format(v=val)
    return my_format

# Define colors, explode, and create subplots
colors = ['#ff9999', '#66b3ff', '#99ff99', '#ffcc99', '#55B4B0', '#E15D44', '#009B77']
explode = (0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05)
fig1, ax1 = plt.subplots()

# Create a pie chart with custom formatting
pie1 = ax1.pie(top_product['SALES'], colors=colors, labels=top_product['PRODUCTLINE'],
               autopct=autopct_format(top_product['SALES']), startangle=90, explode=explode)

# Rotate fraction text in the pie chart
fraction_text_list = pie1[2]
for text in fraction_text_list:
    text.set_rotation(315)

# Create a white circle in the center to make it a donut chart
center_circle = plt.Circle((0, 0), 0.80, fc='white')
fig = plt.gcf()
fig.gca().add_artist(center_circle)

# Set axis equal, add annotation for total revenue, and display the plot
ax1.axis('equal')
label = ax1.annotate('Total Revenue \n' + str(total_revenue_product), color='red', xy=(0, 0), fontsize=12, ha='center')
plt.tight_layout()
plt.show()

## 3. Feature Engineering
Create additional features from the existing data that might be useful for analysis, such as calculating total sales per customer, average purchase amount per product, or seasonal trends.

### Find out 20 Most Valuable Customers

The Most Valuable Customers are the customer who are the most profitable for a company (have a big sales on them). These customers buy more or higher-value than the other customers.

In [None]:
# Group by 'CUSTOMERNAME', sum the 'SALES', and sort in descending order, then select top 20
top_customer = df.groupby(['CUSTOMERNAME']).sum().sort_values('SALES', ascending=False).head(20)

# Select only the 'SALES' column and round values to 3 decimal places
top_customer = top_customer[['SALES']].round(3)

# Reset the index to make 'CUSTOMERNAME' a regular column
top_customer.reset_index(inplace=True)

In [None]:
# Set up plot parameters
plt.figure(figsize=(15, 5))
plt.title('20 Most Valuable Customers (2003 - 2005)', fontsize=18)

# Create a bar plot
plt.bar(top_customer['CUSTOMERNAME'], top_customer['SALES'], color='#37C6AB', edgecolor='black', linewidth=1)

# Set labels for x-axis and y-axis
plt.xlabel('Customer Name', fontsize=15)
plt.ylabel('Revenue', fontsize=15)

# Customize tick labels and rotation for better readability
plt.xticks(fontsize=12, rotation=90)
plt.yticks(fontsize=12)

# Add revenue values as text on each bar with conditional formatting
for k, v in top_customer['SALES'].items():
    if v > 600000:
        plt.text(k, v - 270000, '$' + str(v), fontsize=12, rotation=90, color='black', ha='center')
    else:
        plt.text(k, v + 50000, '$' + str(v), fontsize=12, rotation=90, color='black', ha='center')

# Show the plot
plt.show()

### Find out 20 Highest Revenue by Country
Here are The Top 20 Country which generated the highest revenue

In [None]:
# Group by 'COUNTRY', sum the 'SALES', and sort in descending order, then select top 20
top_country = df.groupby(['COUNTRY']).sum().sort_values('SALES', ascending=False).head(20)

# Select only the 'SALES' column and round values to 3 decimal places
top_country = top_country[['SALES']].round(3)

# Reset the index to make 'COUNTRY' a regular column
top_country.reset_index(inplace=True)

In [None]:
# Set up plot parameters
plt.figure(figsize=(15, 5))
plt.title('20 Highest Revenue by Country (2003 - 2005)', fontsize=18)

# Create a bar plot
plt.bar(top_country['COUNTRY'], top_country['SALES'], color='#37C6AB', edgecolor='black', linewidth=1)

# Set labels for x-axis and y-axis
plt.xlabel('Country', fontsize=15)
plt.ylabel('Revenue', fontsize=15)

# Customize tick labels and rotation for better readability
plt.xticks(fontsize=12, rotation=90)
plt.yticks(fontsize=12)

# Add revenue values as text on each bar with conditional formatting
for k, v in top_country['SALES'].items():
    if v > 3000000:
        plt.text(k, v - 1200000, '$' + str(v), fontsize=12, rotation=90, color='black', ha='center')
    else:
        plt.text(k, v + 100000, '$' + str(v), fontsize=12, rotation=90, color='black', ha='center')

# Show the plot
plt.show()

### Correlation Test
Plotting correlation matrix to see the overview of how the features are related to one another

In [None]:
# Set up plot parameters
plt.figure(figsize=(10, 10))

# Calculate correlation matrix
corr_matrix = df.corr()

### Observations

There is high co-relation in ORDERNUMBER and YEAR_ID, and between QTR_ID and MONTH_ID
+velly correlated between SALES, QUANTITYORDERED, PRICEEACH and MSRP
YEAR_ID is -velly correlated to QTR_ID and MONTH_ID

## 4. Advanced Analytics

### Time Series Analysis

In [None]:
print('Order Date Description\n')

# Sort the DataFrame by 'ORDERDATE' in ascending order
df.sort_values(by=['ORDERDATE'], inplace=True, ascending=True)

# Create a new DataFrame with only 'SALES' column
new_data = pd.DataFrame(df['SALES'])
new_data.head()

In [None]:
from matplotlib import pyplot as plt

# Plot a histogram of the 'SALES' column with specified parameters
new_data['SALES'].plot(kind='hist', bins=20, title='SALES')

# Hide the spines on the top and right sides of the plot
plt.gca().spines[['top', 'right']].set_visible(False)

# Show the plot
plt.show()

In [None]:
new_data.plot()

A series is said to be stationary when its mean and variance do not change over time. From the above distribution of the sales it is not clear whether the sales distribution is stationary or not. Let us perform some stationarity tests to check whether the time series is stationary or not.

Checking for Stationary

In [None]:
# Resample the 'SALES' column by day and calculate the mean for each day
new_data = pd.DataFrame(new_data['SALES'].resample('D').mean())

# Interpolate missing values using linear interpolation
new_data = new_data.interpolate(method='linear')

Method 1<br/>

To check for stationarity by comparing the change in mean and variance over time, let us split teh data into train, test, and validation

In [None]:
# Split the 'SALES' column into train, test, and validation sets
train, test, validation = np.split(new_data['SALES'].sample(frac=1), [int(0.6 * len(new_data['SALES'])), int(0.8 * len(new_data['SALES']))])

In [None]:
print('Train Dataset')
print(train)
print('Test Dataset')
print(test)
print('Validation Dataset')
print(validation)

From the above values of mean and variance, it can be inferred that their is not much difference in the three values of mean and variance, indicating that the series is stationary. However, to verify our observations, let us perform a standard stationarity test, called Augmented Dicky Fuller test.

Augmented Dicky Fuller Test

- The Augmented Dickey-Fuller test is a type of statistical test alsocalled a unit root test.The base of unit root test is that it helps in determining how strongly a time series is defined by a trend.
- The null hypothesis of the test is that the time series can be represented by a unit root, that it is not stationary. The alternate hypothesis (rejecting the null hypothesis) is that the time series is stationary.
  - Null Hypothesis(H0): Time series is not stationary
  - Alternate Hypothesis (H1): Time series is stationary
- This result is interpreted using the p-value from the test.
  - p-value > 0.05: Fail to reject the null hypothesis (H0), the data has a unit root and is non-stationary.
  - p-value <= 0.05: Reject the null hypothesis (H0), the data does not have a unit root and is stationary.

Method 2 - Augmented Dicky Fuller Test

In [None]:
from statsmodels.tsa.stattools import adfuller

# Extract the 'SALES' column values
data1 = new_data.iloc[:, 0].values

# Perform Augmented Dickey-Fuller test
adf = adfuller(data1)

print(adf)
print('\nADF = ', str(adf[0]))
print('\np-value = ', str(adf[1]))
print('\nCritical Values: ')

# Print critical values and interpret the results
for key, val in adf[4].items():
    print(key, ':', val)
    if adf[0] < val:
        print('Null Hypothesis Rejected. Time Series is Stationary')
    else:
        print('Null Hypothesis Accepted. Time Series is not Stationary')

In [None]:
from pylab import rcParams
import statsmodels.api as sm
import matplotlib.pyplot as plt

# Set the figure size
rcParams['figure.figsize'] = 20, 10

# Seasonal decomposition using the additive model
decomposition = sm.tsa.seasonal_decompose(new_data, model='additive')

# Plot the decomposition components
fig = decomposition.plot()
plt.show()

### Checking for Stationary

In [None]:
import pandas as pd

# Resample the 'SALES' column to daily frequency and calculate the mean for each day
new_data = pd.DataFrame(new_data['SALES'].resample('D').mean())

# Interpolate missing values using linear interpolation
new_data = new_data.interpolate(method='linear')

Method 1
To check for stationarity by comparing the change in mean and variance over time, let us split teh data into train, test, and validation

In [None]:
train, test, validation = np.split(new_data['SALES'].sample(frac = 1), [int(.6*len(new_data['SALES'])), int(.8*len(new_data['SALES']))])

In [None]:
print('Train Dataset')
print(train)
print('Test Dataset')
print(test)
print('Validation Dataset')
print(validation)

From the above values of mean and variance, it can be inferred that their is not much difference in the three values of mean and variance, indicating that the series is stationary. However, to verify our observations, let us perform a standard stationarity test, called Augmented Dicky Fuller test.

Method 2 - Augmented Dicky Fuller Test

In [None]:
from statsmodels.tsa.stattools import adfuller

# Extract the 'SALES' values from the DataFrame
data1 = new_data.iloc[:, 0].values

# Perform the Augmented Dickey-Fuller test
adf = adfuller(data1)

# Print the test results
print(adf)
print('\nADF = ', str(adf[0]))
print('\np-value = ', str(adf[1]))
print('\nCritical Values: ')

# Loop through the critical values
for key, val in adf[4].items():
    print(key, ':', val)
    # Check if the test statistic is less than the critical value
    if adf[0] < val:
        print('Null Hypothesis Rejected. Time Series is Stationary')
    else:
        print('Null Hypothesis Accepted. Time Series is not Stationary')

In [None]:
from pylab import rcParams
import statsmodels.api as sm

# Set the figure size
rcParams['figure.figsize'] = 20, 10

# Perform seasonal decomposition
decomposition = sm.tsa.seasonal_decompose(new_data, model='additive')

# Plot the decomposition components
fig = decomposition.plot()
plt.show()

### Sales Forecasting using ARIMA
Now that we know our time series is data is stationary. Let us begin with model training for forecasting the sales. We have chosen SARIMA model to forecast the sales.

Seasonal Autoregressive Integrated Moving Average, SARIMA or Seasonal ARIMA, is an extension of ARIMA that supports univariate time series data with a seasonal componen

In [None]:
import itertools

# Generate parameter combinations for Seasonal ARIMA
p = d = q = range(0, 2)
pdq = list(itertools.product(p, d, q))
seasonal_pdq_comb = [(i[0], i[1], i[2], 12) for i in list(itertools.product(p, d, q))]

# Display examples of parameter combinations
print('Examples of parameter combinations for Seasonal ARIMA:')
print('SARIMA: {} x {}'.format(pdq[1], seasonal_pdq_comb[1]))
print('SARIMA: {} x {}'.format(pdq[1], seasonal_pdq_comb[2]))
print('SARIMA: {} x {}'.format(pdq[2], seasonal_pdq_comb[3]))
print('SARIMA: {} x {}'.format(pdq[2], seasonal_pdq_comb[4]))

In [None]:
import itertools

# Generate parameter combinations for Seasonal ARIMA
p = d = q = range(0, 2)
pdq = list(itertools.product(p, d, q))
seasonal_pdq_comb = [(i[0], i[1], i[2], 12) for i in list(itertools.product(p, d, q))]

# Iterate over parameter combinations
for parameters in pdq:
    for seasonal_param in seasonal_pdq_comb:
        try:
            # Fit SARIMA model
            mod = sm.tsa.statespace.SARIMAX(new_data,
                                            order=parameters,
                                            seasonal_order=seasonal_param,
                                            enforce_stationarity=False,
                                            enforce_invertibility=False)
            results = mod.fit()

            # Print AIC
            print('SARIMA{}x{}12 - AIC:{}'.format(parameters, seasonal_param, results.aic))
        except Exception as e:
            print(f"Error for SARIMA{parameters}x{seasonal_param}12: {e}")
            continue

In [None]:
mod = sm.tsa.statespace.SARIMAX(new_data,
                                order=(1, 1, 1),
                                seasonal_order=(1, 1, 1, 12),
                                enforce_stationarity=False,
                                enforce_invertibility=False)
results = mod.fit()
print(results.summary().tables[1])

This code fits a Seasonal ARIMA model with specified parameters to the new_data time series and prints a summary of the results, specifically the second table of the summary which includes coefficients and statistical information. Adjust the order and seasonal_order parameters as needed based on your analysis and grid search results.

In [None]:
# Display diagnostic plots for the SARIMA model results
results.plot_diagnostics(figsize=(16, 8))
plt.show()

In [None]:
# Generate and plot one-step ahead forecast with confidence interval
pred = results.get_prediction(start=pd.to_datetime('2003-01-06'), dynamic=False)
pred_val = pred.conf_int()

# Plot observed data and forecast
ax = new_data['2002':].plot(label='observed')
pred.predicted_mean.plot(ax=ax, label='One-step ahead Forecast', alpha=.7, figsize=(14, 7))

# Fill the area between the upper and lower confidence bounds
ax.fill_between(pred_val.index,
                pred_val.iloc[:, 0],
                pred_val.iloc[:, 1], color='k', alpha=.2)

# Set axis labels and display legend
ax.set_xlabel('ORDERDATE')
ax.set_ylabel('SALES')
plt.legend()
plt.show()

In [None]:
# Calculate Mean Squared Error (MSE) and Root Mean Squared Error (RMSE)
y_forecasted = pred.predicted_mean
y_truth = new_data['SALES']

from sklearn.metrics import mean_squared_error
from math import sqrt

mse = mean_squared_error(y_forecasted, y_truth)
rmse = sqrt(mse)

print('The Mean Squared Error of the forecasts is {}'.format(round(rmse, 2)))

Out of sample forecast:

To forecast sales values after some time period of the given data. In our case, we have to forecast sales with time period of 7 days.

### Sales Foecast for Next 7 Days

In [None]:
# Forecast future values for the next 7 steps
forecast = results.forecast(steps=7)
print(forecast.astype('float'))

In [None]:
# Convert the forecast to a DataFrame and save it to a CSV file
forecast = forecast.astype('float')
forecast_df = forecast.to_frame()
forecast_df.reset_index(level=0, inplace=True)
forecast_df.columns = ['Prediction Date', 'Predicted Sales']
prediction = pd.DataFrame(forecast_df).to_csv('prediction.csv', index=False)

In [None]:
# Load the predicted data from the 'prediction.csv' file into a DataFrame
df = pd.read_csv('./prediction.csv')

# Plot the data
df.plot()

## Performance Optimization

Optimize Spark jobs for better performance by tuning parameters such as the number of partitions, memory allocation, and caching strategies.

In [None]:
# Configure Spark properties for better performance
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

try:
    # Initialize Spark configuration
    conf = SparkConf().setMaster("yarn").setAppName("MySparkApp")
    sc = SparkContext(conf=conf)
    spark = SparkSession.builder.getOrCreate()

    # Set the number of partitions based on the cluster configuration
    num_partitions = sc._conf.get("spark.sql.shuffle.partitions")

    # Cache frequently used dataframes to improve performance
    df.cache()

    # Use efficient data structures like Parquet for storing and reading data
    df.write.parquet("hdfs:///path/to/data.parquet")
    df = spark.read.parquet("hdfs:///path/to/data.parquet")

    # Use broadcast variables to efficiently distribute large datasets
    broadcast_var = sc.broadcast(some_large_dataset)

    # Optimize joins by using appropriate join strategies
    df1.join(df2, on="key", how="inner")

    # Tune memory allocation and garbage collection settings
    spark.conf.set("spark.memory.fraction", 0.6)
    spark.conf.set("spark.memory.storageFraction", 0.5)
    spark.conf.set("spark.shuffle.memoryFraction", 0.3)
    spark.conf.set("spark.cleaner.referenceTracking.cleaners", "org.apache.spark.storage.MemoryStoreCleaner")

except Exception as e:
    print(f"An error occurred: {e}")
    # Handle the error or log it as needed
# finally:
    # Close SparkContext to release resources
    # if 'sc' in locals() and sc is not None:
    #     sc.stop()
# Use efficient algorithms and libraries for specific tasks
# (e.g., MLlib for machine learning, GraphFrames for graph processing)


Explore techniques like broadcast variables or accumulator variables to improve efficiency.

In [None]:
# Use Spark's accumulator variables to efficiently aggregate data across multiple nodes
acc = sc.accumulator(0)
def f(x):
    global acc
    acc += x
sc.parallelize(range(100)).foreach(f)
print(acc.value)


## Reporting and Visualization

### Monthly and Weekly Revenue Trend

In [None]:
# Display the first few rows of the DataFrame
df_org.head()

In [None]:
# Revenue by month
import seaborn as sns
import matplotlib.pyplot as plt

order=['Jan','Feb', 'Mar','Apr','May','Jun','Jul','Aug', 'Sep', 'Oct', 'Nov', 'Dec']
monthly_revenue=df_org.groupby(['MONTH_ID', 'YEAR_ID'])[['SALES']].sum().reset_index()
ax=sns.barplot(data=monthly_revenue, x='MONTH_ID', y='SALES', hue='YEAR_ID', palette='pastel')
plt.title('monthly_revenue')
ax.set_xlabel('month')
ax.set_xticklabels(order)
plt.show()

In [None]:
# Revenue by week
# Create new column weekday/month/year+quarter
df_org['ORDERDATE']=pd.to_datetime(df_org['ORDERDATE'])
df_org['ORDERDATE'].info()
df_org['WEEK_DAY']=df_org['ORDERDATE'].dt.strftime('%a')

order=['Mon','Tue','Wed', 'Thu','Fri','Sat', 'Sun']
weekly_revenue=df_org.groupby(['WEEK_DAY', 'YEAR_ID'])[['SALES']].sum().reset_index()
ax=sns.barplot(data=weekly_revenue, x='WEEK_DAY', y='SALES', hue='YEAR_ID', palette='pastel')
plt.title('weekly_revenue')
ax.set_xlabel('WEEK_DAY')
ax.set_xticklabels(order)
weekly_revenue

Observations:</br>
1/ There are only 5 months data in 2005, so it's not intact, we should consider while looking at yearly revenue </br>
2/ From monthly perspective, the second half year (Jul-Dec) has a speedy growth in sales then the first half, it might be a purchasing season for this industry </br>
3/ From a weekly perspective, Thursday has the lowest buy rate thorughout a week, while closer to weekend the stronger purchasing power is than weekday

### Unit Price Distribution

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

# Scatter plot with seaborn
sns.scatterplot(data=df_org, x='PRICEEACH', y='MSRP', hue='DEALSIZE')

# Add a vertical line at x=26
sns.lineplot(x=(26, 26), y=(100, 100), linestyle='--', color='r')

# Set plot title and axis labels
plt.title("Unit Price x MSRP Distribution")
plt.xlabel('Unit Price')
plt.ylabel("MSRP")

# Show the plot
plt.show()

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

# Scatter plot with seaborn
sns.scatterplot(data=df_org, x='PRICEEACH', y='QUANTITYORDERED', hue='YEAR_ID')

# Set plot title and axis labels
plt.title("Unit Price x Q'ty Yearly Distribution")
plt.xlabel('Unit Price')
plt.ylabel("Q'ty")

# Show the plot
plt.show()

Observations:</br>
1/ Minimum unit price is 26 and maximum is 100 throughout 2003-2005 </br>
2/ It's interetsing to see from "Unit Price x Q'ty Yearly Distribution" that 2003 and 2004 order q'ty are pretty stable around 20-50 </br>
3/ While 2005 some order q'ty jump out of 20-50pcs, a little bit less and more, which we cannot see in the last 2 years </br>
4/ From plot"Unit Price Among Different Products", Vintage car usually sells at lower price ($30-$50), plane and ships usually sells at higher price($60-$90), and Train $45-70,</br>
5/ Motorcycle and classic cars have relative same proportion at each price range

### Conclusions
1/ 2005 Sales data only have 5 months, it need to be considered while checking yearly revenue. </br>
2/ Top 10 countries supply over 80-95% revenue </br>
3/ The most popular product line is classic cars, and the biggest market is USA </br>
4/ In 2003&2004 most order q'ty are around 20-50pcs, in 2005 we can see some orders q'ty are more than that section </br>
5/ Deal size distribution : Medium > Small > Large </br>
6/ Vintage car usually sells at lower price ($30-$50), Train $45-70, Plane and ships usually sells at higher price($60-$90); Motorcycle and classic cars have relative same proportion at each price range 7.Second-half year (Jul-Dec) has a speedy growth in sales then the first half, it might be a purchasing season for this industry </br>
7/ From a weekly perspective, Thursday has the lowest buy rate thorughout a week, while closer to weekend stronger the purchasing power is than weekday </br>
8/ There a few msrp(manufactured suggest resell price) lower than unit price, usually it stands for distrbutor is doing a money-losing business, it deserves further investigation </br>
9/ Among orders that are not shipped, one 2004 order is on hold </br>
10/ There are some data missing and their missing rate below:
</br> addressline2 missing: 89.33%/ state missing: 52.66%/ postalcode missing: 2.69%/ territory missing: 38.06% </br>
