<a href="https://colab.research.google.com/github/ngoan22mse23088/BigData/blob/master/Analyzing_Sales_Data_with_Apache_Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Project 1

## Set up

In [None]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"
import findspark
findspark.init()

In [None]:
!pip install -q kaggle
from google.colab import files
files.upload()

## 1.Data Preparation:

### Load the sales dataset into Spark RDDs or DataFrames.

In [None]:
!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/

!chmod 600 ~/.kaggle/kaggle.json

!kaggle datasets download -d kyanyoga/sample-sales-data

!unzip -p "sample-sales-data.zip"

In [None]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

In [None]:
sc = SparkContext(conf=SparkConf())
spark = SparkSession(sparkContext=sc)

In [None]:
import pandas as pd
print('Pandas version: {}'. format(pd.__version__))

In [None]:
data_raw = spark.read.csv('/content/sales_data_sample.csv', inferSchema=True, header=True)

# preview the data
# data type
print('-'*10, 'data types', '-'*10)
pd.DataFrame(data_raw.dtypes)

In [None]:
raw_df = data_raw.toPandas()
df = raw_df.copy()
df.head()

In [None]:
# data summary
print('-'*10, 'data summary', '-'*10)
data_raw.describe().toPandas()

In [None]:
# view a small subset of the data
print('-'*10, 'randomely sample 1% data to view', '-'*10)
data_raw.randomSplit([0.01, 0.99])[0].toPandas()

In [None]:
df.shape

In [None]:
df.head()

### Cleanse the data by handling missing values, outliers, or any inconsistencies.

Checking for null values

In [None]:
total_null = df.isnull().sum().sort_values(ascending = False)
percent_null = (df.isnull().sum()/df.isnull().count()).sort_values(ascending = False)
missing_data = pd.concat([total_null,percent_null],axis = 1,keys=['total_null','persent_null'])
missing_data

In [None]:
# @title total_null

from matplotlib import pyplot as plt
missing_data['total_null'].plot(kind='hist', bins=20, title='total_null')
plt.gca().spines[['top', 'right',]].set_visible(False)

Dropping columns

In [None]:
#Removing the variables which dont add significant value to the analysis or majority null value.
to_drop = ['PHONE','ADDRESSLINE1','ADDRESSLINE2','STATE','POSTALCODE','TERRITORY']
df = df.drop(to_drop, axis=1)

Checking for inconsistent data types

In [None]:
df.dtypes

In [None]:
df['ORDERDATE'] = pd.to_datetime(df['ORDERDATE'])

Summary stats of Quantitative variables

In [None]:
quant_vars = ['QUANTITYORDERED','PRICEEACH','SALES','MSRP']
df[quant_vars].describe()

In [None]:
df.sort_values(by = ['ORDERDATE'], inplace = True)
df.set_index('ORDERDATE', inplace = True)

In [None]:
df.head()

## EDA - Exploratory Data Analysis and Visualization

In [None]:
import matplotlib.pyplot as plt

df.plot(kind='box', subplots=True, sharex=False, sharey=False, figsize=(10,10), layout=(3,4))
plt.show()

In [None]:
top_customer = df.groupby(['CUSTOMERNAME']).sum().sort_values('SALES', ascending = False).head(20)
top_customer = top_customer[['SALES']].round(3)
top_customer.reset_index(inplace = True)

In [None]:
plt.figure(figsize = (15,5))
plt.title('20 Most Valueable Customer (2003 - 2005)', fontsize = 18)
plt.bar(top_customer['CUSTOMERNAME'], top_customer['SALES'], color = '#37C6AB', edgecolor = 'black', linewidth = 1)
plt.xlabel('Customer Name', fontsize = 15)
plt.ylabel('Revenue', fontsize = 15)
plt.xticks(fontsize = 12, rotation = 90)
plt.yticks(fontsize = 12)
for k, v in top_customer['SALES'].items():
    if v > 600000:
        plt.text(k, v-270000, '$' + str(v), fontsize = 12, rotation = 90, color = 'black', ha = 'center')
    else:
        plt.text(k, v+ 50000, '$' + str(v), fontsize = 12, rotation = 90, color = 'black', ha = 'center')

In [None]:
top_country = df.groupby(['COUNTRY']).sum().sort_values('SALES', ascending = False).head(20)
top_country = top_country[['SALES']].round(3)
top_country.reset_index(inplace = True)

In [None]:
plt.figure(figsize = (15,5))
plt.title('20 Highest Revenue by Country (2003 - 2005)', fontsize = 18)
plt.bar(top_country['COUNTRY'], top_country['SALES'], color = '#37C6AB', edgecolor = 'black', linewidth = 1)
plt.xlabel('Country', fontsize = 15)
plt.ylabel('Revenue', fontsize = 15)
plt.xticks(fontsize = 12, rotation = 90)
plt.yticks(fontsize = 12)
for k, v in top_country['SALES'].items():
    if v > 3000000:
        plt.text(k, v-1200000, '$' + str(v), fontsize = 12, rotation = 90, color = 'black', ha = 'center')
    else:
        plt.text(k, v+100000, '$' + str(v), fontsize = 12, rotation = 90, color = 'black', ha = 'center')

Find out 20 Highest Revenue by City

Then visualized revenue by city. Here are th Top 20 City which generated the highest revenue

In [None]:
top_city = df.groupby(['CITY']).sum().sort_values('SALES', ascending = False).head(20)
top_city = top_city[['SALES']].round(3)
top_city.reset_index(inplace = True)
plt.figure(figsize = (15,5))
plt.title('20 Highest Revenue by City (2003 - 2005)', fontsize = 18)
plt.bar(top_city['CITY'], top_city['SALES'], color = '#37C6AB', edgecolor = 'black', linewidth = 1 )
plt.xlabel('City', fontsize = 15)
plt.ylabel('Revenue', fontsize = 15)
plt.xticks(fontsize = 12, rotation = 90)
plt.yticks(fontsize = 12)
for k, v, in top_city['SALES'].items():
    if v > 800000:
        plt.text(k, v-350000, '$' + str(v), fontsize = 12, rotation = 90, color = 'black', ha = 'center')
    else:
        plt.text(k, v+35000, '$' + str(v), fontsize = 12, rotation = 90, color = 'black', ha = 'center')

In [None]:
top_product = df.groupby(['PRODUCTLINE']).sum().sort_values('SALES', ascending = False)
top_product = top_product[['SALES']]
top_product.reset_index(inplace = True)
total_revenue_product = top_product['SALES'].sum()
total_revenue_product = str(int(total_revenue_product))
total_revenue_product = '$' + total_revenue_product

In [None]:
plt.rcParams['figure.figsize'] = (13,7)
plt.rcParams['font.size'] = 12.0
plt.rcParams['font.weight'] = 6
def autopct_format(values):
    def my_format(pct):
        total = sum(values)
        val = int(round(pct*total/100.0))
        return ' ${v:d}'.format(v = val)
    return my_format
colors = ['#ff9999','#66b3ff','#99ff99','#ffcc99','#55B4B0','#E15D44','#009B77']
explode = (0.05,0.05,0.05,0.05,0.05,0.05,0.05)
fig1, ax1 = plt.subplots()
pie1 = ax1.pie(top_product['SALES'], colors = colors, labels = top_product['PRODUCTLINE'], autopct = autopct_format(top_product['SALES']), startangle = 90, explode = explode)
fraction_text_list = pie1[2]
for text in fraction_text_list:
    text.set_rotation(315)
center_circle = plt.Circle((0,0), 0.80, fc = 'white')
fig = plt.gcf()
fig.gca().add_artist(center_circle)
ax1.axis('equal')
label = ax1.annotate('Total Revenue \n' + str(total_revenue_product), color = 'red', xy = (0,0), fontsize = 12, ha  ='center')
plt.tight_layout()
plt.show()


In [None]:
plt.figure(figsize = (10,10))
corr_matrix = df.corr()
sns.heatmap(corr_matrix, annot = True)

In [None]:
plt.figure(figsize=(15,5))
plt.tight_layout()
sns.barplot(x='PRODUCTLINE',y='SALES',data=df)

In [None]:
plt.figure(figsize=(12,6))
sns.countplot(df['STATUS'])

In [None]:
plt.figure(figsize=(20,8))
sns.lineplot(x='ORDERDATE', y='SALES', data=df)

## Feature Engineering

### Outlier detection https://www.kaggle.com/code/shohanurrahaman/intro-to-data-science-data-cleaning-02

In [None]:
import matplotlib.pyplot as plt

df.plot(kind='box', subplots=True, sharex=False, sharey=False, figsize=(10,10), layout=(3,4))
plt.show()

Filtering outlier

In [None]:
print('original shape of dataset :',df.shape)

cols = ['SALES', 'MSRP','QUANTITYORDERED']
new_df = df[cols]

#calculation
Q1 = new_df.quantile(0.25)
Q3 = new_df.quantile(0.75)
IQR = Q3-Q1
maximum = Q3+1.5*IQR
minimum = Q1-1.5*IQR
#print(minimum)

#filter outlier
cond = (new_df <= maximum) & (new_df >= minimum)
'''
we specify that the condition should be true for all three columns by using the all function with axis=1 argument.
This gives us a list of True/False against each row.
If a row has all three True values, then it gives a True value to that row
'''
cond = cond.all(axis=1)
df = df[cond]
print('filtered dataset shape : ',df.shape)

#plot again to check that if has any outlier
df.plot(kind='box', subplots=True, sharex=False, sharey=False, figsize=(10,10), layout=(3,4))
plt.show()

Scatter Plot

In [None]:
new_df = df[['SALES','QUANTITYORDERED','MSRP']]
pd.plotting.scatter_matrix(new_df, figsize = (10,10))

Z-Score

In [None]:
import numpy as np
print('shape of original data :',df.shape)

mean = new_df['QUANTITYORDERED'].mean()
std_dev = new_df['QUANTITYORDERED'].std()

# find z scores
z_scores = (new_df['QUANTITYORDERED'] - mean) / std_dev
z_scores = np.abs(z_scores)

#print(z_scores.min())

#filter data
z_df = new_df[z_scores<3]
print('shape of filtered data : ',z_df.shape)

#plot data
z_df['QUANTITYORDERED'].plot(kind='box')
plt.show()

## Advanced Analytics

In [None]:
print('Order Date Description\n')

df.sort_values(by = ['ORDERDATE'], inplace = True, ascending = True)

new_data = pd.DataFrame(df['SALES'])
new_data.head()

In [None]:
# @title SALES

from matplotlib import pyplot as plt
new_data['SALES'].plot(kind='hist', bins=20, title='SALES')
plt.gca().spines[['top', 'right',]].set_visible(False)

In [None]:
new_data.plot()

A series is said to be stationary when its mean and variance do not change over time. From the above distribution of the sales it is not clear whether the sales distribution is stationary or not. Let us perform some stationarity tests to check whether the time series is stationary or not.

Checking for Stationary

In [None]:
new_data = pd.DataFrame(new_data['SALES'].resample('D').mean())
new_data = new_data.interpolate(method = 'linear')

Method 1<br/>

To check for stationarity by comparing the change in mean and variance over time, let us split teh data into train, test, and validation

In [None]:
train, test, validation = np.split(new_data['SALES'].sample(frac = 1), [int(.6*len(new_data['SALES'])), int(.8*len(new_data['SALES']))])

In [None]:
print('Train Dataset')
print(train)
print('Test Dataset')
print(test)
print('Validation Dataset')
print(validation)

From the above values of mean and variance, it can be inferred that their is not much difference in the three values of mean and variance, indicating that the series is stationary. However, to verify our observations, let us perform a standard stationarity test, called Augmented Dicky Fuller test.

Augmented Dicky Fuller Test

- The Augmented Dickey-Fuller test is a type of statistical test alsocalled a unit root test.The base of unit root test is that it helps in determining how strongly a time series is defined by a trend.
- The null hypothesis of the test is that the time series can be represented by a unit root, that it is not stationary. The alternate hypothesis (rejecting the null hypothesis) is that the time series is stationary.
  - Null Hypothesis(H0): Time series is not stationary
  - Alternate Hypothesis (H1): Time series is stationary
- This result is interpreted using the p-value from the test.
  - p-value > 0.05: Fail to reject the null hypothesis (H0), the data has a unit root and is non-stationary.
  - p-value <= 0.05: Reject the null hypothesis (H0), the data does not have a unit root and is stationary.

Method 2 - Augmented Dicky Fuller Test

In [None]:
from statsmodels.tsa.stattools import adfuller
#statsmodel provied addfuller()
data1 = new_data.iloc[:,0].values
adf = adfuller(data1)

print(adf)
print('\nADF = ', str(adf[0]))
print('\np-value = ', str(adf[1]))
print('\nCritical Values: ')

for key, val in adf[4].items():
    print(key,':',val)
    if adf[0] < val:
        print('Null Hypothesis Rejected. Time Series is Stationary')
    else:
        print('Null Hypothesis Accepted. Time Series is not Stationary')

In [None]:
from pylab import rcParams
rcParams['figure.figsize'] = 20, 10

import statsmodels.api as sm
decomposition = sm.tsa.seasonal_decompose(new_data, model = 'additive')

fig = decomposition.plot()
plt.show()

### Sales Forecasting using ARIMA

In [None]:
from sklearn.preprocessing import LabelEncoder

def labelencoder(df):
    for c in df.columns:
        if df[c].dtype=='object':
            df[c] = df[c].fillna('N')
            lbl = LabelEncoder()
            lbl.fit(list(df[c].values))
            df[c] = lbl.transform(df[c].values)
    return df

In [None]:
data1=labelencoder(df)

In [None]:
target=['PRODUCTLINE']
dataY=data1[target]
dataX=data1.drop(target,axis=1)

In [None]:
df_columns = list(dataX.columns)
print(df_columns)

In [None]:
import random

m=len(dataX)
print(m)
M=list(range(m))
random.seed(2021)
random.shuffle(M)

trainX=dataX.iloc[M[0:(m//4)*3]]
trainY=dataY.iloc[M[0:(m//4)*3]]
testX=dataX.iloc[M[(m//4)*3:]]
testY=dataY.iloc[M[(m//4)*3:]]

In [None]:
train_df=trainX
test_df=testX

In [None]:
train_df.columns=df_columns
test_df.columns=df_columns

In [None]:
def create_numeric_feature(input_df):
    use_columns = df_columns
    return input_df[use_columns].copy()

In [None]:
from contextlib import contextmanager
from time import time

class Timer:
    def __init__(self, logger=None, format_str='{:.3f}[s]', prefix=None, suffix=None, sep=' '):

        if prefix: format_str = str(prefix) + sep + format_str
        if suffix: format_str = format_str + sep + str(suffix)
        self.format_str = format_str
        self.logger = logger
        self.start = None
        self.end = None

    @property
    def duration(self):
        if self.end is None:
            return 0
        return self.end - self.start

    def __enter__(self):
        self.start = time()

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.end = time()
        out_str = self.format_str.format(self.duration)
        if self.logger:
            self.logger.info(out_str)
        else:
            print(out_str)

In [None]:
from tqdm import tqdm

def to_feature(input_df):

    processors = [
        create_numeric_feature,
    ]

    out_df = pd.DataFrame()

    for func in tqdm(processors, total=len(processors)):
        with Timer(prefix='create' + func.__name__ + ' '):
            _df = func(input_df)

        assert len(_df) == len(input_df), func.__name__
        out_df = pd.concat([out_df, _df], axis=1)

    return out_df

In [None]:
train_feat_df = to_feature(train_df)
test_feat_df = to_feature(test_df)

Model

In [None]:
from sklearn.metrics import mean_squared_error
from lightgbm import LGBMRegressor, early_stopping

def fit_lgbm(X, y, cv,
             params: dict=None,
             verbose: int=50):

    if params is None:
        params = {}

    models = []
    oof_pred = np.zeros_like(y, dtype=np.float)

    for i, (idx_train, idx_valid) in enumerate(cv):
        x_train, y_train = X[idx_train], y[idx_train]
        x_valid, y_valid = X[idx_valid], y[idx_valid]

        clf = LGBMRegressor(**params)
        early_stopping_callback = early_stopping(stopping_rounds=100)
        with Timer(prefix='fit fold={} '.format(i)):
            clf.fit(x_train, y_train,
                    eval_set=[(x_valid, y_valid)])

        pred_i = clf.predict(x_valid)
        oof_pred[idx_valid] = pred_i
        models.append(clf)
        print(f'Fold {i} RMSLE: {mean_squared_error(y_valid, pred_i) ** .5:.4f}')
        print()

    score = mean_squared_error(y, oof_pred) ** .5
    print('-' * 50)
    print('FINISHED | Whole RMSLE: {:.4f}'.format(score))
    return oof_pred, models

In [None]:
params = {
    'objective': 'rmse',
    'learning_rate': .1,
    'reg_lambda': 1.,
    'reg_alpha': .1,
    'max_depth': 5,
    'n_estimators': 10000,
    'colsample_bytree': .5,
    'min_child_samples': 10,
    'subsample_freq': 3,
    'subsample': .9,
    'importance_type': 'gain',
    'random_state': 71,
    'num_leaves': 62
}

In [None]:
y = trainY
ydf=pd.DataFrame(y)
ydf

In [None]:
# @title PRODUCTLINE

from matplotlib import pyplot as plt
ydf['PRODUCTLINE'].plot(kind='hist', bins=20, title='PRODUCTLINE')
plt.gca().spines[['top', 'right',]].set_visible(False)

In [None]:

from sklearn.model_selection import KFold
np.float = float
from lightgbm import early_stopping


for i in range(1):
    fold = KFold(n_splits=5, shuffle=True, random_state=71)
    ydfi=ydf.iloc[:,i]
    y=np.array(ydfi)
    cv = list(fold.split(train_feat_df, y))
    oof, models = fit_lgbm(train_feat_df.values, y, cv, params=params, verbose=500)

    fig,ax = plt.subplots(figsize=(6,6))
    ax.set_title(target[i],fontsize=20)
    ax.set_ylabel('predicted',fontsize=12)
    ax.set_xlabel('actual',fontsize=12)
    ax.scatter(y,oof)

## Visualize Importance

In [None]:
def visualize_importance(models, feat_train_df):

    feature_importance_df = pd.DataFrame()
    for i, model in enumerate(models):
        _df = pd.DataFrame()
        _df['feature_importance'] = model.feature_importances_
        _df['column'] = feat_train_df.columns
        _df['fold'] = i + 1
        feature_importance_df = pd.concat([feature_importance_df, _df],
                                          axis=0, ignore_index=True)

    order = feature_importance_df.groupby('column')\
        .sum()[['feature_importance']]\
        .sort_values('feature_importance', ascending=False).index[:50]

    fig, ax = plt.subplots(figsize=(8, max(6, len(order) * .25)))
    sns.boxenplot(data=feature_importance_df,
                  x='feature_importance',
                  y='column',
                  order=order,
                  ax=ax,
                  palette='viridis',
                  orient='h')

    ax.tick_params(axis='x', rotation=0)
    #ax.set_title('Importance')
    ax.grid()
    fig.tight_layout()

    return fig,ax

#fig, ax = visualize_importance(models, train_feat_df)

In [None]:
for i in range(1):
    fold = KFold(n_splits=5, shuffle=True, random_state=71)
    ydfi=ydf.iloc[:,i]
    y=np.array(ydfi)
    cv = list(fold.split(train_feat_df, y))
    oof, models = fit_lgbm(train_feat_df.values, y, cv, params=params, verbose=500)
    fig, ax = visualize_importance(models, train_feat_df)
    ax.set_title(target[i]+' Imortance',fontsize=20)

In [None]:
preds=[]
for i in range(5):
    preds += [models[i].predict(test_feat_df.values)/5]
predsT=np.array(preds).T
preds2=[]
for item in predsT:
    value=sum(item)
    preds2+=[value]
print(preds2[0:5])

In [None]:
for i in range(1):
    fig, ax = plt.subplots(figsize=(10,5))
    sns.histplot(oof, label='Train Predicted '+target[i], ax=ax, color='C1',bins=30)
    sns.histplot(preds2, label='Test Predicted '+target[i], ax=ax, color='black',bins=30)
    ax.legend()
    ax.grid()

In [None]:
y_true=testY
y_pred=pd.Series(preds2).apply(lambda x: round(x,0))

In [None]:
# @title ORDERNUMBER

from matplotlib import pyplot as plt
df['ORDERNUMBER'].plot(kind='hist', bins=20, title='ORDERNUMBER')
plt.gca().spines[['top', 'right',]].set_visible(False)

In [None]:
from sklearn.metrics import classification_report
print(classification_report(y_true, y_pred, target_names=raw_df['PRODUCTLINE'].unique().tolist(), digits=4))