In [1]:
pip install pyspark

Note: you may need to restart the kernel to use updated packages.


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import rand, expr, lit, udf, col, min, max, sum
from pyspark.sql.types import StringType, IntegerType, FloatType
import random
import string

# Initialize Spark Session
spark = SparkSession.builder.appName("PurchaseTransactions").getOrCreate()

# Create Customers Dataset
customers = spark.range(1, 50001).toDF("ID")
customers = customers.withColumn("Name", expr("concat_ws('', array_repeat(rand(), 20))").substr(lit(1), (expr("rand() * 10") % 11 + 10).cast("int")))
customers = customers.withColumn("Age", (rand() * 82 + 18).cast("int"))
customers = customers.withColumn("CountryCode", (rand() * 499 + 1).cast("int"))
customers = customers.withColumn("Salary", (rand() * 9999900 + 100).cast("float"))

# Create Purchases Dataset
purchases = spark.range(1, 5000001).toDF("TransID")
purchases = purchases.withColumn("CustID", (rand() * 49999 + 1).cast("int"))
purchases = purchases.withColumn("TransTotal", (rand() * 1990 + 10).cast("float"))
purchases = purchases.withColumn("TransNumItems", (rand() * 14 + 1).cast("int"))
purchases = purchases.withColumn("TransDesc", expr("concat_ws('', array_repeat(rand(), 50))").substr(lit(1), (expr("rand() * 30") % 31 + 20).cast("int")))

# Show sample data
customers.show(5)
purchases.show(5)

# Save to files (optional)
customers.write.csv("customers.csv")
purchases.write.csv("purchases.csv")


+---+-------------------+---+-----------+---------+
| ID|               Name|Age|CountryCode|   Salary|
+---+-------------------+---+-----------+---------+
|  1|  0.810610733121170| 98|        181|2992884.5|
|  2|0.3609906872060240.| 84|        347|2490460.2|
|  3|      0.46330955914| 61|        449|2101898.5|
|  4|         0.48410904| 23|        323|9231105.0|
|  5|   0.56548575208842| 23|        331|1198504.5|
+---+-------------------+---+-----------+---------+
only showing top 5 rows

+-------+------+----------+-------------+--------------------+
|TransID|CustID|TransTotal|TransNumItems|           TransDesc|
+-------+------+----------+-------------+--------------------+
|      1| 16854|  884.3234|            8|0.756921344018771...|
|      2| 43328| 1692.6108|           13|0.760703220276879...|
|      3| 36470|  328.4825|            5|0.471236749336160...|
|      4| 48945| 16.544546|            5|0.320114708811361...|
|      5| 11551|  55.39714|            3|0.929742905291161...|
+--

AnalysisException: [PATH_ALREADY_EXISTS] Path file:/C:/Users/ishaj/WPI/Big Data Management/customers.csv already exists. Set mode as "overwrite" to overwrite the existing path.

In [3]:
# Task 2.1 - Filter Purchases
T1 = purchases.filter(purchases.TransTotal <= 600)
T1.show()


+-------+------+----------+-------------+--------------------+
|TransID|CustID|TransTotal|TransNumItems|           TransDesc|
+-------+------+----------+-------------+--------------------+
|      3| 36470|  328.4825|            5|0.471236749336160...|
|      4| 48945| 16.544546|            5|0.320114708811361...|
|      5| 11551|  55.39714|            3|0.929742905291161...|
|      7| 29325|  591.4004|            4|0.568340909091979...|
|      9| 40967|  209.5052|            7|0.506588920699211...|
|     11|  4941| 349.11145|           13|0.842579176715833...|
|     14| 46089| 254.39821|           13|0.872243206593437...|
|     19| 43400| 461.20923|            3|0.008683598591497...|
|     22|  3711| 206.09564|            7|0.402029179112343...|
|     24| 30939| 537.83167|            5|0.277229516689049...|
|     25| 32359|  33.30696|            7|0.069395949050830...|
|     27| 30516|  396.9999|            6|0.378191701140191...|
|     33| 10945| 416.82626|           13|0.115175810979

In [4]:
# Task 2.2 - Group by Number of Items
T1_grouped = T1.groupBy("TransNumItems").agg(
    expr("percentile_approx(TransTotal, 0.5)").alias("Median"),
    min("TransTotal").alias("Min"),
    max("TransTotal").alias("Max")
)
T1_grouped.show()

+-------------+---------+---------+---------+
|TransNumItems|   Median|      Min|      Max|
+-------------+---------+---------+---------+
|           12| 305.8202|10.000234|599.99963|
|            1| 304.2102|10.002489| 599.9961|
|           13| 305.7753|10.004191|599.98914|
|            6|305.31128|10.005886|599.99365|
|            3|304.55826|10.000887|599.99304|
|            5|305.44186|10.005917| 599.9907|
|            9| 303.5532|10.000506|599.99963|
|            4|304.22714|10.007945| 599.9992|
|            8|302.93942|10.006037|599.98895|
|            7| 303.4536|10.013502| 599.9962|
|           10|304.43527|10.009123| 599.9984|
|           11|305.02206|10.001261| 599.9969|
|           14|  306.535|10.006639| 599.9959|
|            2|302.96368|10.000357|  599.988|
+-------------+---------+---------+---------+



In [5]:
# Task 2.3 - Group by Young Customers
young_customers = customers.filter((col("Age") >= 18) & (col("Age") <= 25))
T3 = T1.join(young_customers, T1.CustID == young_customers.ID).groupBy("CustID").agg(
    sum("TransNumItems").alias("TotalItems"),
    sum("TransTotal").alias("TotalSpent"),
    max("Age").alias("Age")
)
T3.show()

+------+----------+------------------+---+
|CustID|TotalItems|        TotalSpent|Age|
+------+----------+------------------+---+
| 38723|       244| 11064.88249206543| 21|
| 22097|       234| 7484.367618560791| 24|
| 46943|       179| 7121.798114776611| 22|
| 24171|       134|5648.5487632751465| 20|
| 16574|       258| 8254.820720672607| 25|
| 18911|       308|13307.521453857422| 18|
| 12799|       304|11755.795509338379| 20|
| 18866|       241|11807.569692611694| 21|
| 18024|       179|  9100.54397201538| 20|
| 32445|       178| 7702.879020690918| 18|
| 22223|       189| 8797.927822113037| 21|
| 16861|       288|11549.448265075684| 23|
| 33375|       162| 6706.709671020508| 20|
| 24354|       184| 5787.491415977478| 20|
| 23571|       263|12044.043930053711| 25|
|  6336|       258|10909.638397216797| 24|
| 43852|       164| 7155.913915634155| 22|
| 38422|       229|  9393.53201675415| 19|
|  1580|       208| 9894.179357528687| 20|
| 13840|       217| 8281.632217407227| 21|
+------+---

In [6]:
# Task 2.4 - Customer Pairs
T4 = T3.alias("t3a").join(T3.alias("t3b"), col("t3a.CustID") < col("t3b.CustID"))
T4 = T4.filter(
    (col("t3a.Age") < col("t3b.Age")) &
    (col("t3a.TotalSpent") > col("t3b.TotalSpent")) &
    (col("t3a.TotalItems") < col("t3b.TotalItems"))
).select(
    col("t3a.CustID").alias("C1_ID"),
    col("t3b.CustID").alias("C2_ID"),
    col("t3a.Age").alias("Age1"),
    col("t3b.Age").alias("Age2"),
    col("t3a.TotalSpent").alias("TotalAmount1"),
    col("t3b.TotalSpent").alias("TotalAmount2"),
    col("t3a.TotalItems").alias("TotalItemCount1"),
    col("t3b.TotalItems").alias("TotalItemCount2")
)

T4.show()
# End Spark Session
#spark.stop()



+-----+-----+----+----+-----------------+------------------+---------------+---------------+
|C1_ID|C2_ID|Age1|Age2|     TotalAmount1|      TotalAmount2|TotalItemCount1|TotalItemCount2|
+-----+-----+----+----+-----------------+------------------+---------------+---------------+
|38723|40900|  21|  22|11064.88249206543|10697.220384597778|            244|            257|
|38723|46076|  21|  22|11064.88249206543|10032.667434692383|            244|            245|
|38723|40253|  21|  23|11064.88249206543| 9099.629432678223|            244|            251|
|38723|44428|  21|  24|11064.88249206543|10587.767572402954|            244|            279|
|38723|42178|  21|  25|11064.88249206543|11014.164167404175|            244|            300|
|38723|42024|  21|  22|11064.88249206543| 8704.888699531555|            244|            277|
|38723|45068|  21|  23|11064.88249206543|10769.933923721313|            244|            252|
|38723|49862|  21|  23|11064.88249206543| 8625.678304672241|          

In [7]:
# Task 2.5 - Dataset generation
Dataset = purchases.join(customers, purchases.CustID == customers.ID, "inner").select(
    customers.ID.alias("CustomerID"),
    purchases.TransID,
    customers.Age,
    customers.Salary,
    purchases.TransNumItems,
    purchases.TransTotal
)

# Showing the generated dataset
Dataset.show(5)

+----------+-------+---+---------+-------------+----------+
|CustomerID|TransID|Age|   Salary|TransNumItems|TransTotal|
+----------+-------+---+---------+-------------+----------+
|     16854|      1| 93|1565120.6|            8|  884.3234|
|     43328|      2| 30|8116702.0|           13| 1692.6108|
|     36470|      3| 31|9804968.0|            5|  328.4825|
|     48945|      4| 52|5680860.5|            5| 16.544546|
|     11551|      5| 62|6833448.5|            3|  55.39714|
+----------+-------+---+---------+-------------+----------+
only showing top 5 rows



In [8]:
#Saving Dataset as a csv file
Dataset.write.csv("Dataset.csv")

AnalysisException: [PATH_ALREADY_EXISTS] Path file:/C:/Users/ishaj/WPI/Big Data Management/Dataset.csv already exists. Set mode as "overwrite" to overwrite the existing path.

In [9]:
# Task 2.6- Split the dataset into Trainset and Testset
trainset, testset = Dataset.randomSplit([0.8, 0.2], seed=22)

# Show the sizes of the Trainset and Testset
print("Trainset size:", trainset.count())
print("Testset size:", testset.count())


Trainset size: 3999475
Testset size: 1000525


In [10]:
# Task 2.7 and 2.8- Using ML to predict Transaction total with Age", "Salary" and "TransNumItems" as the features, and evaluating the models on the test set

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, RandomForestRegressor
from pyspark.ml import Pipeline


# Prepare the features using VectorAssembler
feature_cols = ["Age", "Salary", "TransNumItems"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Linear Regression
lr = LinearRegression(labelCol="TransTotal", featuresCol="features")

# Decision Tree Regression
dt = DecisionTreeRegressor(labelCol="TransTotal", featuresCol="features")

# Random Forest Regression
rf = RandomForestRegressor(labelCol="TransTotal", featuresCol="features", numTrees=10)

# Create pipelines
lr_pipeline = Pipeline(stages=[assembler, lr])
dt_pipeline = Pipeline(stages=[assembler, dt])
rf_pipeline = Pipeline(stages=[assembler, rf])

# Train the models using the data from trainset
lr_model = lr_pipeline.fit(trainset)
dt_model = dt_pipeline.fit(trainset)
rf_model = rf_pipeline.fit(trainset)

# Make predictions on the test set using the models created
lr_predictions = lr_model.transform(testset)
dt_predictions = dt_model.transform(testset)
rf_predictions = rf_model.transform(testset)

# Evaluate the models using MAE, R2 and RMSE
evaluator_mae = RegressionEvaluator(labelCol="TransTotal", metricName="mae")
evaluator_r2 = RegressionEvaluator(labelCol="TransTotal", metricName="r2")
evaluator_rmse = RegressionEvaluator(labelCol="TransTotal", metricName="rmse")


lr_mae = evaluator_mae.evaluate(lr_predictions)
dt_mae = evaluator_mae.evaluate(dt_predictions)
rf_mae = evaluator_mae.evaluate(rf_predictions)

lr_r2 = evaluator_r2.evaluate(lr_predictions)
dt_r2 = evaluator_r2.evaluate(dt_predictions)
rf_r2 = evaluator_r2.evaluate(rf_predictions)

lr_rmse = evaluator_rmse.evaluate(lr_predictions)
dt_rmse = evaluator_rmse.evaluate(dt_predictions)
rf_rmse = evaluator_rmse.evaluate(rf_predictions)

# Displaying results
print("Linear Regression MAE:", lr_mae)
print("Decision Tree MAE:", dt_mae)
print("Random Forest MAE:", rf_mae)

print("Linear Regression R2:", lr_r2)
print("Decision Tree R2:", dt_r2)
print("Random Forest R2:", rf_r2)

print("Linear Regression RMSE:", lr_rmse)
print("Decision Tree RMSE:", dt_rmse)
print("Random Forest RMSE:", rf_rmse)


Linear Regression MAE: 497.32398530185117
Decision Tree MAE: 497.33576271191635
Random Forest MAE: 497.3238065146036
Linear Regression R2: -3.1955897634716735e-06
Decision Tree R2: -5.553486257525719e-05
Random Forest R2: -8.697428186099643e-06
Linear Regression RMSE: 574.3071428005508
Decision Tree RMSE: 574.3221719649848
Random Forest RMSE: 574.3087226658814
