## Installing PySpark Library

In [169]:
# ! pip install  pyspark

## Importing required Library

In [170]:
import pyspark

In [171]:
from pyspark.sql import SparkSession

## Creating PySpark Session

In [172]:
spark = SparkSession.builder.appName("mysparkapplication").getOrCreate()
spark

## Loading the dataset

In [173]:
#df = spark.read.csv("Mall_Customers.csv")
df = spark.read.option('header', 'true').csv("Mall_Customers.csv", inferSchema = True)

## Peeking into spark dataframe

In [174]:
print (df.head(2))
df.show(5)
df.printSchema()

[Row(CustomerID=1, Genre='Male', Age=19, Annual Income (k$)=15, Spending Score (1-100)=39), Row(CustomerID=2, Genre='Male', Age=21, Annual Income (k$)=15, Spending Score (1-100)=81)]
+----------+------+---+------------------+----------------------+
|CustomerID| Genre|Age|Annual Income (k$)|Spending Score (1-100)|
+----------+------+---+------------------+----------------------+
|         1|  Male| 19|                15|                    39|
|         2|  Male| 21|                15|                    81|
|         3|Female| 20|                16|                     6|
|         4|Female| 23|                16|                    77|
|         5|Female| 31|                17|                    40|
+----------+------+---+------------------+----------------------+
only showing top 5 rows

root
 |-- CustomerID: integer (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Annual Income (k$): integer (nullable = true)
 |-- Spending Score (1-100

In [175]:
print (df.count())
print(df.columns)
print (df.dtypes)
(df.describe().show())

200
['CustomerID', 'Genre', 'Age', 'Annual Income (k$)', 'Spending Score (1-100)']
[('CustomerID', 'int'), ('Genre', 'string'), ('Age', 'int'), ('Annual Income (k$)', 'int'), ('Spending Score (1-100)', 'int')]
+-------+------------------+------+-----------------+------------------+----------------------+
|summary|        CustomerID| Genre|              Age|Annual Income (k$)|Spending Score (1-100)|
+-------+------------------+------+-----------------+------------------+----------------------+
|  count|               200|   200|              200|               200|                   200|
|   mean|             100.5|  NULL|            38.85|             60.56|                  50.2|
| stddev|57.879184513951124|  NULL|13.96900733155888| 26.26472116527124|    25.823521668370173|
|    min|                 1|Female|               18|                15|                     1|
|    max|               200|  Male|               70|               137|                    99|
+-------+-------------

## Handling data in dataframe

In [176]:
df.select(["Genre", "Age", "Annual Income (k$)"]).show(4)

+------+---+------------------+
| Genre|Age|Annual Income (k$)|
+------+---+------------------+
|  Male| 19|                15|
|  Male| 21|                15|
|Female| 20|                16|
|Female| 23|                16|
+------+---+------------------+
only showing top 4 rows



In [177]:
## Adding column

df2 = df.withColumn("Spending score (1-10)", df["Spending Score (1-100)"]/10)

In [178]:
## Renaming column
df2 = df2.withColumnRenamed("Spending score (1-10)", "New Spending Score")

In [179]:
## Dropping column

df2 = df2.drop("New Spending Score")

In [180]:
df2.show()

+----------+------+---+------------------+----------------------+
|CustomerID| Genre|Age|Annual Income (k$)|Spending Score (1-100)|
+----------+------+---+------------------+----------------------+
|         1|  Male| 19|                15|                    39|
|         2|  Male| 21|                15|                    81|
|         3|Female| 20|                16|                     6|
|         4|Female| 23|                16|                    77|
|         5|Female| 31|                17|                    40|
|         6|Female| 22|                17|                    76|
|         7|Female| 35|                18|                     6|
|         8|Female| 23|                18|                    94|
|         9|  Male| 64|                19|                     3|
|        10|Female| 30|                19|                    72|
|        11|  Male| 67|                19|                    14|
|        12|Female| 35|                19|                    99|
|        1

In [181]:
df = df.withColumnRenamed("Annual Income (k$)" ,"Annual_Income")
df = df.withColumnRenamed("Spending Score (1-100)" ,"Spending_Score")
df = df.withColumnRenamed ("Genre", "gender")
df.show()

+----------+------+---+-------------+--------------+
|CustomerID|gender|Age|Annual_Income|Spending_Score|
+----------+------+---+-------------+--------------+
|         1|  Male| 19|           15|            39|
|         2|  Male| 21|           15|            81|
|         3|Female| 20|           16|             6|
|         4|Female| 23|           16|            77|
|         5|Female| 31|           17|            40|
|         6|Female| 22|           17|            76|
|         7|Female| 35|           18|             6|
|         8|Female| 23|           18|            94|
|         9|  Male| 64|           19|             3|
|        10|Female| 30|           19|            72|
|        11|  Male| 67|           19|            14|
|        12|Female| 35|           19|            99|
|        13|Female| 58|           20|            15|
|        14|Female| 24|           20|            77|
|        15|  Male| 37|           20|            13|
|        16|  Male| 22|           20|         

## Handling nulls

In [182]:
from pyspark.sql.functions import isnan, when, count, col

df.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).show()

+----------+------+---+-------------+--------------+
|CustomerID|gender|Age|Annual_Income|Spending_Score|
+----------+------+---+-------------+--------------+
|         0|     0|  0|            0|             0|
+----------+------+---+-------------+--------------+



In [183]:
## imputing zeros(0) at nulls

df =  df.na.fill(0)

In [184]:
## imputing mean value of the cols at nulls

# from pyspark.ml.feature import Imputer
# imputer = Imputer (inputCols=['CustomerID', 'Age', 'Annual_Income', 'Spending_Score'],
#                    outputCols= ["{}_imputed".format(c) for c in ['CustomerID', 'Age', 'Annual_Income', 'Spending_Score']]).setStrategy("mean")

# df = imputer.fit(df).transform(df)

## Filtering Methods

In [185]:
df.filter("Annual_Income>120").show()

+----------+------+---+-------------+--------------+
|CustomerID|gender|Age|Annual_Income|Spending_Score|
+----------+------+---+-------------+--------------+
|       197|Female| 45|          126|            28|
|       198|  Male| 32|          126|            74|
|       199|  Male| 32|          137|            18|
|       200|  Male| 30|          137|            83|
+----------+------+---+-------------+--------------+



## Data Aggregation

In [186]:
df.groupby("gender").count().show()
df.groupby("gender").sum("Annual_Income" ).show()
df.groupby("gender").mean("Annual_Income").show()

+------+-----+
|gender|count|
+------+-----+
|Female|  112|
|  Male|   88|
+------+-----+

+------+------------------+
|gender|sum(Annual_Income)|
+------+------------------+
|Female|              6636|
|  Male|              5476|
+------+------------------+

+------+------------------+
|gender|avg(Annual_Income)|
+------+------------------+
|Female|             59.25|
|  Male| 62.22727272727273|
+------+------------------+



## Preprocessing

In [187]:
from pyspark.ml.feature import VectorAssembler, StringIndexer

genderLabeler = StringIndexer(inputCol="gender", outputCol="gender_labeled", )
df = genderLabeler.fit(df).transform(df)
df.show()

+----------+------+---+-------------+--------------+--------------+
|CustomerID|gender|Age|Annual_Income|Spending_Score|gender_labeled|
+----------+------+---+-------------+--------------+--------------+
|         1|  Male| 19|           15|            39|           1.0|
|         2|  Male| 21|           15|            81|           1.0|
|         3|Female| 20|           16|             6|           0.0|
|         4|Female| 23|           16|            77|           0.0|
|         5|Female| 31|           17|            40|           0.0|
|         6|Female| 22|           17|            76|           0.0|
|         7|Female| 35|           18|             6|           0.0|
|         8|Female| 23|           18|            94|           0.0|
|         9|  Male| 64|           19|             3|           1.0|
|        10|Female| 30|           19|            72|           0.0|
|        11|  Male| 67|           19|            14|           1.0|
|        12|Female| 35|           19|           

In [188]:
df.dtypes

[('CustomerID', 'int'),
 ('gender', 'string'),
 ('Age', 'int'),
 ('Annual_Income', 'int'),
 ('Spending_Score', 'int'),
 ('gender_labeled', 'double')]

In [189]:
vectAssem = VectorAssembler(inputCols=["gender_labeled","Age","Annual_Income"], outputCol="pred_variables", handleInvalid='keep')
df = vectAssem.transform(df)
df.show()

+----------+------+---+-------------+--------------+--------------+---------------+
|CustomerID|gender|Age|Annual_Income|Spending_Score|gender_labeled| pred_variables|
+----------+------+---+-------------+--------------+--------------+---------------+
|         1|  Male| 19|           15|            39|           1.0|[1.0,19.0,15.0]|
|         2|  Male| 21|           15|            81|           1.0|[1.0,21.0,15.0]|
|         3|Female| 20|           16|             6|           0.0|[0.0,20.0,16.0]|
|         4|Female| 23|           16|            77|           0.0|[0.0,23.0,16.0]|
|         5|Female| 31|           17|            40|           0.0|[0.0,31.0,17.0]|
|         6|Female| 22|           17|            76|           0.0|[0.0,22.0,17.0]|
|         7|Female| 35|           18|             6|           0.0|[0.0,35.0,18.0]|
|         8|Female| 23|           18|            94|           0.0|[0.0,23.0,18.0]|
|         9|  Male| 64|           19|             3|           1.0|[1.0,64.0

In [190]:
df_to_model = df.select(["Spending_Score", "pred_variables"])
df_to_model.show(10)

+--------------+---------------+
|Spending_Score| pred_variables|
+--------------+---------------+
|            39|[1.0,19.0,15.0]|
|            81|[1.0,21.0,15.0]|
|             6|[0.0,20.0,16.0]|
|            77|[0.0,23.0,16.0]|
|            40|[0.0,31.0,17.0]|
|            76|[0.0,22.0,17.0]|
|             6|[0.0,35.0,18.0]|
|            94|[0.0,23.0,18.0]|
|             3|[1.0,64.0,19.0]|
|            72|[0.0,30.0,19.0]|
+--------------+---------------+
only showing top 10 rows



## Predictive Modeling

In [191]:
from pyspark.ml.regression import LinearRegression, LinearRegressionSummary

train_data, test_data = df_to_model.randomSplit([0.8, 0.2], seed=7)

lr_model = LinearRegression( featuresCol= "pred_variables"  , labelCol="Spending_Score")

lr_model = lr_model.fit(train_data)

In [192]:
print ("intercept: ", lr_model.intercept)
print("coefficients: ", lr_model.coefficients)


intercept:  81.79525361561433
coefficients:  [-3.8707080628006554,-0.6432091345345733,-0.04948032810108157]


## Testing the model

In [193]:
pred = lr_model.evaluate(test_data)
pred.predictions.show(), pred.residuals.show()

+--------------+----------------+------------------+
|Spending_Score|  pred_variables|        prediction|
+--------------+----------------+------------------+
|             5| [0.0,46.0,25.0]|50.970625224496914|
|             5| [1.0,19.0,81.0]| 61.69566542046917|
|             6| [0.0,20.0,16.0]| 68.13938567530556|
|             6| [0.0,35.0,18.0]| 58.39228800108479|
|            10| [1.0,19.0,74.0]|62.042027717176744|
|            11| [1.0,59.0,71.0]| 36.46210332009706|
|            13| [0.0,52.0,88.0]| 43.99410974692134|
|            14| [0.0,49.0,33.0]|48.645155196084545|
|            14| [1.0,59.0,93.0]| 35.37353610187326|
|            14| [1.0,67.0,19.0]| 33.88940730507671|
|            16|[0.0,47.0,120.0]|  45.6267849203596|
|            28| [0.0,49.0,39.0]| 48.34827322747805|
|            29| [1.0,52.0,23.0]|43.339623010690985|
|            31| [0.0,40.0,29.0]| 54.63195871930003|
|            34| [0.0,25.0,72.0]| 62.15244162897213|
|            41| [1.0,18.0,59.0]| 63.427441773

(None, None)

## Evaluation of Model

In [194]:
print ("Evaluation of Model")
print ("----------")
print("R2 Score: ", pred.r2)
print ("degree of freedom: ", pred.degreesOfFreedom)
print ("explainedVariance: ", pred.explainedVariance)

print ("----------")
print ("MAE: ", pred.meanAbsoluteError)
print ("MSE: ", pred.meanSquaredError)
print("RMSE: ", pred.rootMeanSquaredError)


Evaluation of Model
----------
R2 Score:  -0.16859795699986435
degree of freedom:  37
explainedVariance:  174.60341209510239
----------
MAE:  21.52354927774421
MSE:  707.9673693164307
RMSE:  26.607656216142576


## Conclusion

### we can see the prediction results of model.
- *R2 score is -16%*
- *explained Variance is 174.6*
- *Mean Absolute Error is 21.52*
- *Mean Squared Error is 707.96*

## **The model is not so good to predict spending pattern of the mall customers. We might need to tune this model to get reasonable predictions.**