In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, count, col, isnull, when
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline

In [2]:
spark = SparkSession.builder \
    .appName("Spark ML Assignment") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()

In [3]:
train_df = spark.read.csv("hdfs://m01.itversity.com:9000/user/ana017398/train.csv", header=True, inferSchema=True)
test_df = spark.read.csv("hdfs://m01.itversity.com:9000/user/ana017398/test.csv", header=True, inferSchema=True)

In [4]:
avg_purchase = train_df.select(avg("Purchase")).collect()[0][0]
print(f"Average Purchase Amount: {avg_purchase}")

Average Purchase Amount: 9263.968712959126


In [5]:
train_df = train_df.na.drop()

In [6]:
def count_nulls(df):
    return df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show()
count_nulls(train_df)

+-------+----------+------+---+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender|Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+---+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|      0|         0|     0|  0|         0|            0|                         0|             0|                 0|                 0|                 0|       0|
+-------+----------+------+---+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+



In [7]:
for col_name in train_df.columns:
    print(f"{col_name}: {train_df.select(col_name).distinct().count()} unique values")

User_ID: 5870 unique values
Product_ID: 528 unique values
Gender: 2 unique values
Age: 7 unique values
Occupation: 21 unique values
City_Category: 3 unique values
Stay_In_Current_City_Years: 5 unique values
Marital_Status: 2 unique values
Product_Category_1: 12 unique values
Product_Category_2: 14 unique values
Product_Category_3: 15 unique values
Purchase: 13876 unique values


In [8]:
categorical_cols = ["Gender", "Age", "City_Category", "Stay_In_Current_City_Years", "Marital_Status"]
for col_name in categorical_cols:
    train_df.groupBy(col_name).count().show()

+------+------+
|Gender| count|
+------+------+
|     F| 37594|
|     M|129227|
+------+------+

+-----+-----+
|  Age|count|
+-----+-----+
|18-25|31316|
|26-35|66942|
| 0-17| 4873|
|46-50|13374|
|51-55|11166|
|36-45|33285|
|  55+| 5865|
+-----+-----+

+-------------+-----+
|City_Category|count|
+-------------+-----+
|            B|69243|
|            C|56059|
|            A|41519|
+-------------+-----+

+--------------------------+-----+
|Stay_In_Current_City_Years|count|
+--------------------------+-----+
|                         3|29268|
|                         0|22389|
|                        4+|25362|
|                         1|58287|
|                         2|31515|
+--------------------------+-----+

+--------------+-----+
|Marital_Status|count|
+--------------+-----+
|             1|67202|
|             0|99619|
+--------------+-----+



In [9]:
for col_name in categorical_cols:
    train_df.groupBy(col_name).agg(avg("Purchase").alias("Avg_Purchase")).show()

+------+------------------+
|Gender|      Avg_Purchase|
+------+------------------+
|     F|11084.723785710486|
|     M|11824.922756080387|
+------+------------------+

+-----+------------------+
|  Age|      Avg_Purchase|
+-----+------------------+
|18-25|11580.858538766126|
|26-35|11612.248065489528|
| 0-17| 11172.35871126616|
|46-50|11663.978017048004|
|51-55|12035.504298764105|
|36-45|11729.364398377647|
|  55+| 11861.52463768116|
+-----+------------------+

+-------------+------------------+
|City_Category|      Avg_Purchase|
+-------------+------------------+
|            B|11488.090521785596|
|            C| 12207.51699102731|
|            A|11199.868782966834|
+-------------+------------------+

+--------------------------+------------------+
|Stay_In_Current_City_Years|      Avg_Purchase|
+--------------------------+------------------+
|                         3|11630.236333196664|
|                         0|  11503.9256777882|
|                        4+|11695.298438608943|

In [10]:
indexers = [StringIndexer(inputCol=c, outputCol=f"{c}_Index") for c in ["Age", "Gender", "Stay_In_Current_City_Years", "City_Category","Occupation"]]

In [11]:
encoders = [OneHotEncoder(inputCol=f"{c}_Index", outputCol=f"{c}_OHE") for c in ["Gender", "City_Category", "Occupation"]]

In [12]:
feature_cols = ["Gender_OHE", "City_Category_OHE", "Occupation_OHE", "Product_Category_1", "Product_Category_2", "Product_Category_3"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

In [13]:
lr = LinearRegression(featuresCol="features", labelCol="Purchase")

In [18]:
paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.1, 0.01]).build()
evaluator = RegressionEvaluator(labelCol="Purchase", predictionCol="prediction", metricName="rmse")
crossval = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=3)

In [19]:
pipeline = Pipeline(stages=indexers + encoders + [assembler, crossval])

In [20]:
final_model = pipeline.fit(train_df)

In [21]:
test_predictions = final_model.transform(train_df)
test_predictions.select("User_ID", "Product_ID", "Purchase").show()

+-------+----------+--------+
|User_ID|Product_ID|Purchase|
+-------+----------+--------+
|1000001| P00248942|   15200|
|1000004| P00184942|   19215|
|1000005| P00145042|   15665|
|1000006| P00231342|    5378|
|1000006|  P0096642|   13055|
|1000007| P00036842|   11788|
|1000008| P00249542|   19614|
|1000008| P00303442|   11927|
|1000009| P00078742|    5391|
|1000010| P00085942|   16352|
|1000010| P00182642|   12909|
|1000010| P00155442|   15212|
|1000010| P00221342|   15705|
|1000010| P00111142|   18963|
|1000010|  P0094542|   16406|
|1000010| P00148642|   12642|
|1000010| P00113242|   11562|
|1000011| P00110842|   19327|
|1000013| P00182342|   12107|
|1000015| P00042142|   11458|
+-------+----------+--------+
only showing top 20 rows



In [26]:
for col_name in test_df.columns:
    print(f"{col_name}: {test_df.select(col_name).distinct().count()} unique values")

User_ID: 5891 unique values
Product_ID: 3491 unique values
Gender: 2 unique values
Age: 7 unique values
Occupation: 21 unique values
City_Category: 3 unique values
Stay_In_Current_City_Years: 5 unique values
Marital_Status: 2 unique values
Product_Category_1: 18 unique values
Product_Category_2: 18 unique values
Product_Category_3: 16 unique values


In [19]:
categorical_cols = ["Gender", "Age", "City_Category", "Stay_In_Current_City_Years", "Marital_Status"]
for col in categorical_cols:
    test_df.groupBy(col).count().show()

+------+------+
|Gender| count|
+------+------+
|     F| 57827|
|     M|175772|
+------+------+

+-----+-----+
|  Age|count|
+-----+-----+
|18-25|42293|
|26-35|93428|
| 0-17| 6232|
|46-50|19577|
|51-55|16283|
|36-45|46711|
|  55+| 9075|
+-----+-----+

+-------------+-----+
|City_Category|count|
+-------------+-----+
|            B|98566|
|            C|72509|
|            A|62524|
+-------------+-----+

+--------------------------+-----+
|Stay_In_Current_City_Years|count|
+--------------------------+-----+
|                         3|40143|
|                         0|31318|
|                        4+|35945|
|                         1|82604|
|                         2|43589|
+--------------------------+-----+

+--------------+------+
|Marital_Status| count|
+--------------+------+
|             1| 95792|
|             0|137807|
+--------------+------+



In [22]:
rmse = evaluator.evaluate(test_predictions, {evaluator.metricName: "rmse"})
r2 = evaluator.evaluate(test_predictions, {evaluator.metricName: "r2"})
mae = evaluator.evaluate(test_predictions, {evaluator.metricName: "mae"})
print(f"RMSE: {rmse}, R²: {r2}, MAE: {mae}")

RMSE: 4629.462365578306, R²: 0.1702539708445051, MAE: 3716.32267958458
