## **Dataset Description**
## Dataset Name : 1000000 Sales Records
## Columns:
- Region : (ex: Africa , Middle East , Asia , Central America , etc).
- Country.
- Item Type : Type of product such as Fruits , Clothes , Meat ,etc.
- Sales Channel : Whether the transaction is done online or offline.
- Order Priority : The priority of the order (L:Low , M:Medium , H:High , C:Critical).
- Order Date : Date of ordering the product.
- Order ID : Unique identifier for each order.
- Ship Date : Date of product arrival.
- Units Sold : Number of items sold in this transaction.
- Unit Price : Price of each single product.
- Unit Cost : Cost of each single product.
- Total Revenue.
- Total Cost.
- Total Profit (Label for regression) : Net Profit.

In [None]:
!pip install pyspark



In [None]:
#Importing needed libraries

import pandas as pd
import numpy as np
import pyspark
from pyspark.sql import SparkSession, SQLContext
from pyspark.mllib.stat import Statistics
from pyspark.mllib.feature import StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder , StringIndexer , VectorAssembler
from pyspark.ml.regression import LinearRegression , RandomForestRegressor , GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import *
from pyspark.sql import Window

## **Load Data**

In [None]:
spark=SparkSession.builder.appName("Regression").getOrCreate()

In [None]:
df=spark.read.csv("/content/drive/MyDrive/1000000 Sales Records.csv",header=True)

In [None]:
df.show()

+--------------------+----------------+---------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|              Region|         Country|      Item Type|Sales Channel|Order Priority|Order Date| Order ID| Ship Date|Units Sold|Unit Price|Unit Cost|Total Revenue|Total Cost|Total Profit|
+--------------------+----------------+---------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|  Sub-Saharan Africa|    South Africa|         Fruits|      Offline|             M| 7/27/2012|443368995| 7/28/2012|      1593|      9.33|     6.92|     14862.69|  11023.56|     3839.13|
|Middle East and N...|         Morocco|        Clothes|       Online|             M| 9/14/2013|667593514|10/19/2013|      4611|    109.28|    35.84|    503890.08| 165258.24|   338631.84|
|Australia and Oce...|Papua New Guinea|           Meat|      Offl

## **Displaying the types of Dataframe columns**

In [None]:
df.printSchema()

root
 |-- Region: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Item Type: string (nullable = true)
 |-- Sales Channel: string (nullable = true)
 |-- Order Priority: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Order ID: string (nullable = true)
 |-- Ship Date: string (nullable = true)
 |-- Units Sold: string (nullable = true)
 |-- Unit Price: string (nullable = true)
 |-- Unit Cost: string (nullable = true)
 |-- Total Revenue: string (nullable = true)
 |-- Total Cost: string (nullable = true)
 |-- Total Profit: string (nullable = true)



## **Converting dataframe numerical columns to float data type instead of string**

In [None]:
numerical_columns=['Units Sold','Unit Price','Unit Cost','Total Revenue','Total Cost','Total Profit']

for column in numerical_columns:
  df=df.withColumn(column,df[column].cast("float").alias(column))


In [None]:
df.printSchema()

root
 |-- Region: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Item Type: string (nullable = true)
 |-- Sales Channel: string (nullable = true)
 |-- Order Priority: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Order ID: string (nullable = true)
 |-- Ship Date: string (nullable = true)
 |-- Units Sold: float (nullable = true)
 |-- Unit Price: float (nullable = true)
 |-- Unit Cost: float (nullable = true)
 |-- Total Revenue: float (nullable = true)
 |-- Total Cost: float (nullable = true)
 |-- Total Profit: float (nullable = true)



In [None]:
df.show()

+--------------------+----------------+---------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|              Region|         Country|      Item Type|Sales Channel|Order Priority|Order Date| Order ID| Ship Date|Units Sold|Unit Price|Unit Cost|Total Revenue|Total Cost|Total Profit|
+--------------------+----------------+---------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|  Sub-Saharan Africa|    South Africa|         Fruits|      Offline|             M| 7/27/2012|443368995| 7/28/2012|    1593.0|      9.33|     6.92|     14862.69|  11023.56|     3839.13|
|Middle East and N...|         Morocco|        Clothes|       Online|             M| 9/14/2013|667593514|10/19/2013|    4611.0|    109.28|    35.84|     503890.1| 165258.23|   338631.84|
|Australia and Oce...|Papua New Guinea|           Meat|      Offl

## **Checking for Nulls in each column of the dataframe**

In [None]:
df.select([count(when(col(c).isNull(),c)).alias(c) for c in df.columns]).show()

+------+-------+---------+-------------+--------------+----------+--------+---------+----------+----------+---------+-------------+----------+------------+
|Region|Country|Item Type|Sales Channel|Order Priority|Order Date|Order ID|Ship Date|Units Sold|Unit Price|Unit Cost|Total Revenue|Total Cost|Total Profit|
+------+-------+---------+-------------+--------------+----------+--------+---------+----------+----------+---------+-------------+----------+------------+
|     0|      0|        0|            0|             0|         0|       0|        0|         0|         0|        0|            0|         0|           0|
+------+-------+---------+-------------+--------------+----------+--------+---------+----------+----------+---------+-------------+----------+------------+



## **Checking if there are duplicate rows in the dataframe**

In [None]:
print("Number of duplicates found is: ",df.count()-df.distinct().count())

Number of duplicates found is:  50009


## **Removing duplicate records in the dataframe**

In [None]:
df=df.dropDuplicates()

In [None]:
df.show()

+--------------------+----------------+---------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|              Region|         Country|      Item Type|Sales Channel|Order Priority|Order Date| Order ID| Ship Date|Units Sold|Unit Price|Unit Cost|Total Revenue|Total Cost|Total Profit|
+--------------------+----------------+---------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|  Sub-Saharan Africa|          Malawi|      Household|      Offline|             H| 4/24/2012|358857953| 5/14/2012|    2898.0|    668.27|   502.54|    1936646.5| 1456360.9|   480285.53|
|                Asia|           Nepal|  Personal Care|       Online|             H| 5/20/2011|543531262|  6/3/2011|    1834.0|     81.73|    56.67|    149892.81| 103932.78|    45960.04|
|Middle East and N...|         Algeria|         Fruits|      Offl

## **Converting Order Date and Ship Date to MM/dd/yyyy Format**

In [None]:
# Convert dates of format (M/d/yyyy , MM/d/yyyy , M/dd/yyyy) into a single format (MM/dd/yyyy) for further preprocessing
def convert_date_to_correct_format(date_col,col_name):
  date_col_updated=[]
  for elem in date_col.collect():
    date_components=elem[col_name].split('/')
    reformatted_date=""
    if(len(date_components[0])==1):
      reformatted_date+="0"
    reformatted_date+=date_components[0]
    reformatted_date+="/"
    if(len(date_components[1])==1):
      reformatted_date+="0"
    reformatted_date+=date_components[1]
    reformatted_date+="/"
    reformatted_date+=date_components[2]
    date_col_updated.append(reformatted_date)
  return date_col_updated

order_date_column =df.select(col("Order Date"))
ship_date_column=df.select(col("Ship Date"))

reformatted_order_date=convert_date_to_correct_format(order_date_column,"Order Date")
reformatted_ship_date=convert_date_to_correct_format(ship_date_column,"Ship Date")



In [None]:
# adding the order and ship dates to the original dataframe after reformatting them to MM/dd/yyyy
c1 = spark.createDataFrame([(l,) for l in reformatted_order_date], ['Reformatted Order Date'])
c2 = spark.createDataFrame([(l,) for l in reformatted_ship_date], ['Reformatted Ship Date'])
c1= c1.withColumn("row_idx", row_number().over(Window.orderBy(monotonically_increasing_id())))
c2 =c2.withColumn("row_idx", row_number().over(Window.orderBy(monotonically_increasing_id())))
dates_df = c1.join(c2, c1.row_idx == c2.row_idx).drop(c1.row_idx)
df=df.withColumn("row_idx", row_number().over(Window.orderBy(monotonically_increasing_id())))
df=df.join(dates_df,df.row_idx == dates_df.row_idx).drop('row_idx')
df.show()

+--------------------+----------------+---------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+----------------------+---------------------+
|              Region|         Country|      Item Type|Sales Channel|Order Priority|Order Date| Order ID| Ship Date|Units Sold|Unit Price|Unit Cost|Total Revenue|Total Cost|Total Profit|Reformatted Order Date|Reformatted Ship Date|
+--------------------+----------------+---------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+----------------------+---------------------+
|  Sub-Saharan Africa|          Malawi|      Household|      Offline|             H| 4/24/2012|358857953| 5/14/2012|    2898.0|    668.27|   502.54|    1936646.5| 1456360.9|   480285.53|            04/24/2012|           05/14/2012|
|                Asia|           Nepal|  Personal Care|       Online|   

## **Adding a new column to the dataframe called shipping days which is the difference between the Shipping date and Order date**

In [None]:
df=df.select("*",
              datediff(to_date(col("Reformatted Ship Date"),"MM/dd/yyyy"),
                       to_date(col("Reformatted Order Date"),"MM/dd/yyyy")).alias("Shipping days"))
df.show()

+--------------------+----------------+---------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+----------------------+---------------------+-------------+
|              Region|         Country|      Item Type|Sales Channel|Order Priority|Order Date| Order ID| Ship Date|Units Sold|Unit Price|Unit Cost|Total Revenue|Total Cost|Total Profit|Reformatted Order Date|Reformatted Ship Date|Shipping days|
+--------------------+----------------+---------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+----------------------+---------------------+-------------+
|  Sub-Saharan Africa|          Malawi|      Household|      Offline|             H| 4/24/2012|358857953| 5/14/2012|    2898.0|    668.27|   502.54|    1936646.5| 1456360.9|   480285.53|            04/24/2012|           05/14/2012|           20|
|               

## **Feature engineering the categorical columns using one hot encoding technique**

In [None]:
## Showing the number of unique values for each categorical column 
text_columns=["Region","Country","Item Type","Sales Channel","Order Priority"]
for col in text_columns:
  print(col+" has "+str(df.select(col).distinct().count())+" Distinct Values")

Region has 7 Distinct Values
Country has 185 Distinct Values
Item Type has 12 Distinct Values
Sales Channel has 2 Distinct Values
Order Priority has 4 Distinct Values


In [None]:
text_columns=["Region","Country","Item Type","Sales Channel","Order Priority"]
indexers=[StringIndexer(inputCol=c,outputCol="{}_indexed".format(c)) for c in text_columns]
encoders = [OneHotEncoder(inputCol=indexer.getOutputCol(),
                        outputCol="{}_encoded".format(indexer.getOutputCol()))
                        for indexer in indexers]

all_columns=[]
filtered_numerical_columns=['Unit Price','Unit Cost','Shipping days']
all_columns.extend(filtered_numerical_columns)
encoder_columns=[encoder.getOutputCol() for encoder in encoders]
all_columns.extend(encoder_columns)

print(all_columns)
assembler = VectorAssembler(
    inputCols=all_columns,
    outputCol="features"
)
pipeline = Pipeline(stages=indexers + encoders + [assembler])
final_features=pipeline.fit(df).transform(df)
final_features.show()

['Unit Price', 'Unit Cost', 'Shipping days', 'Region_indexed_encoded', 'Country_indexed_encoded', 'Item Type_indexed_encoded', 'Sales Channel_indexed_encoded', 'Order Priority_indexed_encoded']
+--------------------+----------------+---------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+----------------------+---------------------+-------------+--------------+---------------+-----------------+---------------------+----------------------+----------------------+-----------------------+-------------------------+-----------------------------+------------------------------+--------------------+
|              Region|         Country|      Item Type|Sales Channel|Order Priority|Order Date| Order ID| Ship Date|Units Sold|Unit Price|Unit Cost|Total Revenue|Total Cost|Total Profit|Reformatted Order Date|Reformatted Ship Date|Shipping days|Region_indexed|Country_indexed|Item Type_indexed|Sales Channel_ind

## **Dropping some columns after feature engineering and data cleaning and keeping only the columns that will be used during modeling**

In [None]:
columns_to_drop=['Order ID','Total Revenue','Total Cost','Order Date','Ship Date','Reformatted Order Date','Reformatted Ship Date']
final_features=final_features.drop(*(columns_to_drop))

In [None]:
df.show()

+--------------------+----------------+---------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+----------------------+---------------------+-------------+
|              Region|         Country|      Item Type|Sales Channel|Order Priority|Order Date| Order ID| Ship Date|Units Sold|Unit Price|Unit Cost|Total Revenue|Total Cost|Total Profit|Reformatted Order Date|Reformatted Ship Date|Shipping days|
+--------------------+----------------+---------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+----------------------+---------------------+-------------+
|  Sub-Saharan Africa|          Malawi|      Household|      Offline|             H| 4/24/2012|358857953| 5/14/2012|    2898.0|    668.27|   502.54|    1936646.5| 1456360.9|   480285.53|            04/24/2012|           05/14/2012|           20|
|               

In [None]:
final_features.select("features","Total Profit").show()

+--------------------+------------+
|            features|Total Profit|
+--------------------+------------+
|(208,[0,1,2,3,71,...|   480285.53|
|(208,[0,1,2,5,92,...|    45960.04|
|(208,[0,1,2,6,16,...|      371.14|
|(208,[0,1,2,3,54,...|    738773.6|
|(208,[0,1,2,3,170...|    292101.6|
|(208,[0,1,2,4,192...|    420166.8|
|(208,[0,1,2,3,76,...|   146514.95|
|(208,[0,1,2,3,23,...|   501443.66|
|(208,[0,1,2,8,106...|    229270.0|
|(208,[0,1,2,6,62,...|    513999.2|
|(208,[0,1,2,4,113...|     11579.4|
|(208,[0,1,2,7,155...|     9215.11|
|(208,[0,1,2,8,163...|   189854.56|
|(208,[0,1,2,5,31]...|    373919.0|
|(208,[0,1,2,7,86,...|   725211.75|
|(208,[0,1,2,4,65,...|    496260.0|
|(208,[0,1,2,7,141...|   880761.75|
|(208,[0,1,2,3,20,...|    146665.8|
|(208,[0,1,2,98,19...|   424908.84|
|(208,[0,1,2,4,24,...|    587621.8|
+--------------------+------------+
only showing top 20 rows



## **Splitting the data into 80% training and 20% testing**

In [None]:
train_df , test_df= final_features.randomSplit([0.8,0.2])

In [None]:
test_df.show()

+------+----------+---------+-------------+--------------+----------+----------+---------+------------+-------------+--------------+---------------+-----------------+---------------------+----------------------+----------------------+-----------------------+-------------------------+-----------------------------+------------------------------+--------------------+
|Region|   Country|Item Type|Sales Channel|Order Priority|Units Sold|Unit Price|Unit Cost|Total Profit|Shipping days|Region_indexed|Country_indexed|Item Type_indexed|Sales Channel_indexed|Order Priority_indexed|Region_indexed_encoded|Country_indexed_encoded|Item Type_indexed_encoded|Sales Channel_indexed_encoded|Order Priority_indexed_encoded|            features|
+------+----------+---------+-------------+--------------+----------+----------+---------+------------+-------------+--------------+---------------+-----------------+---------------------+----------------------+----------------------+-----------------------+--------

## **Linear Regression model**

In [None]:
model=LinearRegression(featuresCol='features',labelCol='Total Profit')
Linear_reg_model=model.fit(train_df)

In [None]:
Linear_reg_model_summary=Linear_reg_model.summary
#print("RMSE on training data: ",Linear_reg_model_summary.rootMeanSquaredError)
print("R2 score on training data: ",Linear_reg_model_summary.r2)

R2 score on training data:  0.4828501394296415


In [None]:
preds=Linear_reg_model.transform(test_df)
evaluator=RegressionEvaluator(labelCol="Total Profit",predictionCol="prediction",metricName="r2")
print("R2 score on test data is: "+str(evaluator.evaluate(preds)))

R2 score on test data is: 0.4803826735352318


## **Random Forest Regression**

In [None]:
RF=RandomForestRegressor(featuresCol="features",labelCol="Total Profit")
RF_Model_reg=RF.fit(train_df)

In [None]:
RF_preds_train=RF_Model_reg.transform(train_df)
evaluator=RegressionEvaluator(labelCol="Total Profit",predictionCol="prediction",metricName="r2")
print("R2 score on train data is: "+str(evaluator.evaluate(RF_preds_train)))

R2 score on train data is: 0.4798028302236652


In [None]:
RF_preds_test=RF_Model_reg.transform(test_df)
evaluator=RegressionEvaluator(labelCol="Total Profit",predictionCol="prediction",metricName="r2")
print("R2 score on test data is: "+str(evaluator.evaluate(RF_preds_test)))

R2 score on test data is: 0.47722794815997105


## **Gradient-boosted tree regression**

In [None]:
GBT=GBTRegressor(featuresCol="features",labelCol="Total Profit")
GBT_model=GBT.fit(train_df)

In [None]:
GBT_preds_train=GBT_model.transform(train_df)
evaluator=RegressionEvaluator(labelCol="Total Profit",predictionCol="prediction",metricName="r2")
print("R2 score on train data is: "+str(evaluator.evaluate(GBT_preds_train)))

R2 score on train data is: 0.48337291955989126


In [None]:
GBT_preds_test=GBT_model.transform(test_df)
evaluator=RegressionEvaluator(labelCol="Total Profit",predictionCol="prediction",metricName="r2")
print("R2 score on train data is: "+str(evaluator.evaluate(GBT_preds_test)))

R2 score on train data is: 0.480178240809525
