It is regression machine learning model approach to predict Big Mart sales, optimizing inventory management, and enhancing decision-making for increased profitability and customer satisfaction.

In [1]:
import warnings
warnings.filterwarnings('ignore')

# Installing Spark

In [2]:
!pip3 -q install pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('mywork').getOrCreate()

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


#Loading Data

In [3]:
!wget -O Data_Set.csv -q https://raw.githubusercontent.com/nilanjansonai/BDSN_END-TERM/main/Big_Mart.csv

In [4]:
data=spark.read.csv('Data_Set.csv',inferSchema=True,header=True)

# Data Understanding

In [5]:
data.printSchema()

root
 |-- Item_Identifier: string (nullable = true)
 |-- Item_Weight: double (nullable = true)
 |-- Item_Fat_Content: string (nullable = true)
 |-- Item_Visibility: double (nullable = true)
 |-- Item_Type: string (nullable = true)
 |-- Item_MRP: double (nullable = true)
 |-- Outlet_Identifier: string (nullable = true)
 |-- Outlet_Establishment_Year: integer (nullable = true)
 |-- Outlet_Size: string (nullable = true)
 |-- Outlet_Location_Type: string (nullable = true)
 |-- Outlet_Type: string (nullable = true)
 |-- Item_Outlet_Sales: double (nullable = true)



In [6]:
data.count(), len(data.columns)

(8523, 12)

In [7]:
data.dtypes

[('Item_Identifier', 'string'),
 ('Item_Weight', 'double'),
 ('Item_Fat_Content', 'string'),
 ('Item_Visibility', 'double'),
 ('Item_Type', 'string'),
 ('Item_MRP', 'double'),
 ('Outlet_Identifier', 'string'),
 ('Outlet_Establishment_Year', 'int'),
 ('Outlet_Size', 'string'),
 ('Outlet_Location_Type', 'string'),
 ('Outlet_Type', 'string'),
 ('Item_Outlet_Sales', 'double')]

In [8]:
data.describe().show()

+-------+---------------+------------------+----------------+-------------------+-------------+-----------------+-----------------+-------------------------+-----------+--------------------+-----------------+------------------+
|summary|Item_Identifier|       Item_Weight|Item_Fat_Content|    Item_Visibility|    Item_Type|         Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type| Item_Outlet_Sales|
+-------+---------------+------------------+----------------+-------------------+-------------+-----------------+-----------------+-------------------------+-----------+--------------------+-----------------+------------------+
|  count|           8523|              7060|            8523|               8523|         8523|             8523|             8523|                     8523|       6113|                8523|             8523|              8523|
|   mean|           NULL|12.857645184136183|            NULL|0.06613202877895127|       

In [113]:
data.show()

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|          FDA15|        9.3|         Low Fat|    0.016047301|               Dairy|249.8092|           OUT049|                     1999|     Medium|              Tier 1|Supermarket Type1|         3735.138|
|          DRC01|       5.92|         Regular|    0.019278216|         Soft Drinks| 48.2692|           OUT018|                     2009|     Medium|              Tier 3|Superma

In [10]:
for col in data.columns:
  print('Number of unique values : ',data.select(col).distinct().count())
  print(data.select(col).distinct().show())

Number of unique values :  1559
+---------------+
|Item_Identifier|
+---------------+
|          FDB11|
|          FDO11|
|          DRA24|
|          FDU24|
|          FDW60|
|          FDW52|
|          FDN15|
|          FDQ20|
|          FDU10|
|          FDY43|
|          NCL54|
|          FDP39|
|          NCN30|
|          NCS30|
|          NCW05|
|          NCF42|
|          FDX27|
|          FDD20|
|          FDW32|
|          NCW18|
+---------------+
only showing top 20 rows

None
Number of unique values :  416
+-----------+
|Item_Weight|
+-----------+
|       9.13|
|       15.5|
|       8.51|
|      10.65|
|      16.75|
|       5.86|
|       6.96|
|       5.78|
|      6.765|
|      6.985|
|      7.365|
|       15.7|
|      8.355|
|       5.48|
|      8.235|
|       8.43|
|       8.92|
|       4.88|
|       8.31|
|      20.35|
+-----------+
only showing top 20 rows

None
Number of unique values :  5
+----------------+
|Item_Fat_Content|
+----------------+
|         low fat|
| 

In [11]:
for col in data.columns:
  print(col, "has", data.filter(data[col].isNull()).count(), "Null values.")

Item_Identifier has 0 Null values.
Item_Weight has 1463 Null values.
Item_Fat_Content has 0 Null values.
Item_Visibility has 0 Null values.
Item_Type has 0 Null values.
Item_MRP has 0 Null values.
Outlet_Identifier has 0 Null values.
Outlet_Establishment_Year has 0 Null values.
Outlet_Size has 2410 Null values.
Outlet_Location_Type has 0 Null values.
Outlet_Type has 0 Null values.
Item_Outlet_Sales has 0 Null values.


2410 null values in Outlet_Size and 1463 null values in Item_Weight.

# Data Preprocessing

In [12]:
Item_Weight_mean = data.agg({'Item_Weight': 'mean'}).collect()
Item_Weight_mean

[Row(avg(Item_Weight)=12.857645184136183)]

In [13]:
from pyspark.sql import functions as F

In [14]:
item_weight_mean = data.agg(F.mean('Item_Weight').alias('Item_Weight_mean')).collect()[0]['Item_Weight_mean']

df = data.withColumn('Item_Weight', F.when(F.col('Item_Weight').isNull(), item_weight_mean).otherwise(F.col('Item_Weight')))

In [15]:
from pyspark.sql.functions import when

In [16]:
df = df.withColumn("Item_Fat_Content", when(df["Item_Fat_Content"] == 'Regular', 'reg').otherwise(df["Item_Fat_Content"]))

In [17]:
df = df.withColumn("Item_Fat_Content", when(df["Item_Fat_Content"].isin(['low fat','Low Fat']), 'LF').otherwise(df["Item_Fat_Content"]))

In [18]:
df = df.withColumn("Item_Type", df["Item_Identifier"].substr(1, 2))

In [19]:
columns_to_drop = ['Outlet_Size', 'Item_Identifier']
df = df.drop(*columns_to_drop)

Item_identifier ID has first two charachters defining the item type, these are FD, DR, NC means 'Food', 'Drinks', 'Non-Consumables'. Converting Item_Type into these 3 categories Food, Drinks,Non-Consumable. And repeated values in 'Item_Fat_Content' are replaced. Outlet_size and Item_Identifier are dropped as Outlet_size consists 28% null values and Item_Identifier is considered insignificant. And null values of Item_Weight has been replaced by its mean.

In [20]:
for col in df.columns:
  print('Number of unique values : ',df.select(col).distinct().count())
  print(df.select(col).distinct().show())

Number of unique values :  416
+------------------+
|       Item_Weight|
+------------------+
|12.857645184136183|
|              9.13|
|              15.5|
|              8.51|
|             10.65|
|             16.75|
|              5.86|
|              6.96|
|              5.78|
|             6.765|
|             6.985|
|             7.365|
|              15.7|
|             8.355|
|              5.48|
|             8.235|
|              8.43|
|              8.92|
|              4.88|
|              8.31|
+------------------+
only showing top 20 rows

None
Number of unique values :  2
+----------------+
|Item_Fat_Content|
+----------------+
|              LF|
|             reg|
+----------------+

None
Number of unique values :  7880
+---------------+
|Item_Visibility|
+---------------+
|    0.028696932|
|    0.016993225|
|    0.026681262|
|    0.079794329|
|    0.023492524|
|    0.062700289|
|    0.030563449|
|    0.123961415|
|    0.070444232|
|    0.013834247|
|    0.027217468|
|

In [21]:
for col in df.columns:
  print(col, "has", df.filter(df[col].isNull()).count(), "Null values.")

Item_Weight has 0 Null values.
Item_Fat_Content has 0 Null values.
Item_Visibility has 0 Null values.
Item_Type has 0 Null values.
Item_MRP has 0 Null values.
Outlet_Identifier has 0 Null values.
Outlet_Establishment_Year has 0 Null values.
Outlet_Location_Type has 0 Null values.
Outlet_Type has 0 Null values.
Item_Outlet_Sales has 0 Null values.


In [56]:
df.show(5)

+-----------+----------------+---------------+---------+--------+-----------------+-------------------------+--------------------+-----------------+-----------------+
|Item_Weight|Item_Fat_Content|Item_Visibility|Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|
+-----------+----------------+---------------+---------+--------+-----------------+-------------------------+--------------------+-----------------+-----------------+
|        9.3|              LF|    0.016047301|       FD|249.8092|           OUT049|                     1999|              Tier 1|Supermarket Type1|         3735.138|
|       5.92|             reg|    0.019278216|       DR| 48.2692|           OUT018|                     2009|              Tier 3|Supermarket Type2|         443.4228|
|       17.5|              LF|    0.016760075|       FD| 141.618|           OUT049|                     1999|              Tier 1|Supermarket Type1|          2097.27

# String Indexer and One Hot Encoder

In [22]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import OneHotEncoder

In [96]:
#Platform_indexer = StringIndexer(inputCol="Item_Fat_Content", outputCol="Item_Fat_Content_Num")
#dfWork = Platform_indexer.fit(df).transform(df)

In [97]:
#Platform_encoder = OneHotEncoder(inputCol="Item_Fat_Content_Num", outputCol="Item_Fat_Vector")
#Platform_encoder.setDropLast(False)
#dfWork = Platform_encoder.fit(dfWork).transform(dfWork)

In [98]:
#Platform_indexer = StringIndexer(inputCol="Item_Type", outputCol="Item_Type_Num")
#dfWork = Platform_indexer.fit(dfWork).transform(dfWork)

In [99]:
#Platform_encoder = OneHotEncoder(inputCol="Item_Type_Num", outputCol="Item_Type_Vector")
#Platform_encoder.setDropLast(False)
#dfWork = Platform_encoder.fit(dfWork).transform(dfWork)

In [100]:
#Platform_indexer = StringIndexer(inputCol="Outlet_Identifier", outputCol="Outlet_Identifier_Num")
#dfWork = Platform_indexer.fit(dfWork).transform(dfWork)

In [101]:
#Platform_encoder = OneHotEncoder(inputCol="Outlet_Identifier_Num", outputCol="Outlet_Identifier_Vector")
#Platform_encoder.setDropLast(False)
#dfWork = Platform_encoder.fit(dfWork).transform(dfWork)

In [102]:
#Platform_indexer = StringIndexer(inputCol="Outlet_Location_Type", outputCol="Outlet_Location_Num")
#dfWork = Platform_indexer.fit(dfWork).transform(dfWork)

In [103]:
#Platform_encoder = OneHotEncoder(inputCol="Outlet_Location_Num", outputCol="Outlet_Location_Vector")
#Platform_encoder.setDropLast(False)
#dfWork = Platform_encoder.fit(dfWork).transform(dfWork)

In [104]:
#Platform_indexer = StringIndexer(inputCol="Outlet_Type", outputCol="Outlet_Type_Num")
#dfWork = Platform_indexer.fit(dfWork).transform(dfWork)

In [105]:
#Platform_encoder = OneHotEncoder(inputCol="Outlet_Type_Num", outputCol="Outlet_Type_Vector")
#Platform_encoder.setDropLast(False)
#dfWork = Platform_encoder.fit(dfWork).transform(dfWork)

In [106]:
#Platform_indexer = StringIndexer(inputCol="Outlet_Establishment_Year", outputCol="Outlet_Establishment_Year_Num")
#dfWork = Platform_indexer.fit(dfWork).transform(dfWork)

In [107]:
#Platform_encoder = OneHotEncoder(inputCol="Outlet_Establishment_Year_Num", outputCol="Outlet_Establishment_Year_Vector")
#Platform_encoder.setDropLast(False)
#dfWork = Platform_encoder.fit(dfWork).transform(dfWork)

In [50]:
#dfWork.show(5)

# Pipeline for string indexer, one hot encoder and assembler

In [115]:
from pyspark.ml import Pipeline
from pyspark.ml import PipelineModel

In [116]:
columns_to_index = [
    "Item_Fat_Content",
    "Item_Type",
    "Outlet_Identifier",
    "Outlet_Location_Type",
    "Outlet_Type",
    "Outlet_Establishment_Year"
]

In [117]:
indexer_stages = [StringIndexer(inputCol=col, outputCol=col+"_Num") for col in columns_to_index]

In [70]:
#pipeline1 = Pipeline(stages=indexer_stages)

In [71]:
#dfwork = pipeline1.fit(df).transform(df)

In [72]:
#dfwork.show(5)

In [118]:
columns_to_encode = [
    "Item_Fat_Content_Num",
    "Item_Type_Num",
    "Outlet_Identifier_Num",
    "Outlet_Location_Type_Num",
    "Outlet_Type_Num",
    "Outlet_Establishment_Year_Num"
]

In [119]:
encoder_stages = [OneHotEncoder(inputCol=col, outputCol=col+"_Vector", dropLast=False) for col in columns_to_encode]

In [73]:
#pipeline2 = Pipeline(stages=encoder_stages)

In [74]:
#dfwork = pipeline2.fit(dfwork).transform(dfwork)

In [75]:
#dfwork.show(5)

In [120]:
input_cols_for_assembler=['Item_Weight',
'Item_Visibility',
'Item_MRP',
'Item_Fat_Content_Num_Vector',
'Item_Type_Num_Vector',
'Outlet_Identifier_Num_Vector',
'Outlet_Location_Type_Num_Vector',
'Outlet_Type_Num_Vector',
'Outlet_Establishment_Year_Num_Vector']


In [121]:
F_assembler = VectorAssembler(inputCols=input_cols_for_assembler, outputCol='features')

In [78]:
#F_assembler.transform(dfwork).show(5)

In [125]:
basePipe = Pipeline(stages=indexer_stages + encoder_stages + [F_assembler])
basePipe.fit(df).transform(df).show(3)  # Dataframe after indexing, one hot encoding and assembling

+-----------+----------------+---------------+---------+--------+-----------------+-------------------------+--------------------+-----------------+-----------------+--------------------+-------------+---------------------+------------------------+---------------+-----------------------------+---------------------------+--------------------+----------------------------+-------------------------------+----------------------+------------------------------------+--------------------+
|Item_Weight|Item_Fat_Content|Item_Visibility|Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|Item_Fat_Content_Num|Item_Type_Num|Outlet_Identifier_Num|Outlet_Location_Type_Num|Outlet_Type_Num|Outlet_Establishment_Year_Num|Item_Fat_Content_Num_Vector|Item_Type_Num_Vector|Outlet_Identifier_Num_Vector|Outlet_Location_Type_Num_Vector|Outlet_Type_Num_Vector|Outlet_Establishment_Year_Num_Vector|            features|
+-----------+---------------

In [124]:
df.show(3) # Dataframe after Preprocessing

+-----------+----------------+---------------+---------+--------+-----------------+-------------------------+--------------------+-----------------+-----------------+
|Item_Weight|Item_Fat_Content|Item_Visibility|Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|
+-----------+----------------+---------------+---------+--------+-----------------+-------------------------+--------------------+-----------------+-----------------+
|        9.3|              LF|    0.016047301|       FD|249.8092|           OUT049|                     1999|              Tier 1|Supermarket Type1|         3735.138|
|       5.92|             reg|    0.019278216|       DR| 48.2692|           OUT018|                     2009|              Tier 3|Supermarket Type2|         443.4228|
|       17.5|              LF|    0.016760075|       FD| 141.618|           OUT049|                     1999|              Tier 1|Supermarket Type1|          2097.27

# Splitting of traing and validation dataset

In [128]:
train_df,val_df = df.randomSplit([0.7,0.3])
print(train_df.count())
print(val_df.count())

6013
2510


# Multiple regression using pipeline

In [86]:
from pyspark.ml.regression import LinearRegression

In [None]:
lr = LinearRegression(labelCol='Item_Outlet_Sales', featuresCol='features', maxIter=5)
lr_pipeline = Pipeline(stages=[basePipe, lr])
lr_model = lr_pipeline.fit(train_df)
lr_predictions = lr_model.transform(val_df)
lr_predictions.show(5)


In [87]:
from pyspark.ml.evaluation import RegressionEvaluator

In [126]:
evaluator = RegressionEvaluator(labelCol="Item_Outlet_Sales", predictionCol="prediction", metricName="r2")
lr_r2 = evaluator.evaluate(lr_predictions)
print(f"R-squared (explained variance) on validation data: {lr_r2}")

R-squared (explained variance) on validation data: 0.5607712139656048


# Random Forest regression using pipeline

In [89]:
from pyspark.ml.regression import RandomForestRegressor

In [127]:
rf = RandomForestRegressor(labelCol='Item_Outlet_Sales', featuresCol='features', numTrees=100)
rf_pipeline = Pipeline(stages=[basePipe, rf])
rf_model = rf_pipeline.fit(train_df)
rf_predictions = rf_model.transform(val_df)
rf_predictions.show(5)

+-----------+----------------+---------------+---------+--------+-----------------+-------------------------+--------------------+-----------------+-----------------+--------------------+-------------+---------------------+------------------------+---------------+-----------------------------+---------------------------+--------------------+----------------------------+-------------------------------+----------------------+------------------------------------+--------------------+------------------+
|Item_Weight|Item_Fat_Content|Item_Visibility|Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|Item_Fat_Content_Num|Item_Type_Num|Outlet_Identifier_Num|Outlet_Location_Type_Num|Outlet_Type_Num|Outlet_Establishment_Year_Num|Item_Fat_Content_Num_Vector|Item_Type_Num_Vector|Outlet_Identifier_Num_Vector|Outlet_Location_Type_Num_Vector|Outlet_Type_Num_Vector|Outlet_Establishment_Year_Num_Vector|            features|        pr

In [99]:
evaluator = RegressionEvaluator(labelCol="Item_Outlet_Sales", predictionCol="prediction", metricName="r2")
rf_r2 = evaluator.evaluate(rf_predictions)
print(f"R-squared (explained variance) on validation data: {rf_r2}")

R-squared (explained variance) on validation data: 0.5667404021037244


# Gradient boosting regression using pipeline

In [93]:
from pyspark.ml.regression import GBTRegressor

In [None]:
gbt = GBTRegressor(labelCol='Item_Outlet_Sales', featuresCol='features', maxIter=10, maxDepth=5, seed=42)
gbt_pipeline = Pipeline(stages=[basePipe, gbt])
gbt_model = gbt_pipeline.fit(train_df)
gbt_predictions = gbt_model.transform(val_df)
gbt_predictions.show(5)

In [100]:
evaluator = RegressionEvaluator(labelCol="Item_Outlet_Sales", predictionCol="prediction", metricName="r2")
gb_r2 = evaluator.evaluate(gbt_predictions)
print(f"R-squared (explained variance) on validation data: {gb_r2}")

R-squared (explained variance) on validation data: 0.5951745493860277


# Model Accuracy

In [101]:
import pandas as pd

In [109]:
r2_dict = {'linear_regressor':lr_r2,'randomforest_regressor':rf_r2,'gradientboostingtree_regressor':gb_r2}
r2_df = pd.DataFrame(list(r2_dict.items()), columns=['Model', 'R-squared'])

In [110]:
r2_df

Unnamed: 0,Model,R-squared
0,linear_regressor,0.560771
1,randomforest_regressor,0.56674
2,gradientboostingtree_regressor,0.595175
