In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from faker import Faker
import random
from pyspark.sql import SparkSession



# Creating a SparkConf with memory configurations to handle the big data
spark = SparkSession.builder.appName("Purchases").config("spark.executor.memory", "4g").config("spark.driver.memory", "4g").config("spark.network.timeout", "600s").getOrCreate()

fake = Faker()


# DATA CREATION

# Function to generate random data for Customers dataset
def generate_customers_data(num_customers):
    customers_data = []
    for cust_id in range(1, num_customers + 1):
        customers_data.append(
            (
                cust_id,
                fake.name(),
                random.randint(18, 100),
                random.randint(1, 500),
                round(random.uniform(100, 10000000), 2)
            )
        )
    return customers_data

# Function to generate random data for Purchases dataset
def generate_purchases_data(num_purchases, num_customers):
    purchases_data = []
    for trans_id in range(1, num_purchases + 1):
        cust_id = random.randint(1, num_customers)
        purchases_data.append(
            (
                trans_id,
                cust_id,
                round(random.uniform(10, 2000), 2),
                random.randint(1, 15),
                fake.text(random.randint(20, 50)).replace(',', '')
            )
        )
    return purchases_data

# Number of customers and purchases
num_customers = 50000
num_purchases = 5000000

# Generating data for Customers and Purchases
customers_data = generate_customers_data(num_customers)
purchases_data = generate_purchases_data(num_purchases, num_customers)

# Creating DataFrames for Customers and Purchases
customers_df = spark.createDataFrame(customers_data, ["ID", "Name", "Age", "CountryCode", "Salary"])
purchases_df = spark.createDataFrame(purchases_data, ["TransID", "CustID", "TransTotal", "TransNumItems", "TransDesc"])

# Displaying the first few rows of each dataset
customers_df.show(5)
purchases_df.show(5)

# Writing the datasets to HDFS or locally
customers_df.write.mode("overwrite").parquet("customers.parquet")
purchases_df.write.mode("overwrite").parquet("purchases.parquet")

# Registering DataFrames as SQL tables
customers_df.createOrReplaceTempView("Customers")
purchases_df.createOrReplaceTempView("Purchases")


+---+---------------+---+-----------+----------+
| ID|           Name|Age|CountryCode|    Salary|
+---+---------------+---+-----------+----------+
|  1| Melissa Nelson| 98|        256|2192129.07|
|  2|   Dillon Lewis| 69|        358|5609320.09|
|  3|Christine Wolfe| 95|        332|7751332.78|
|  4|    Lisa Greene| 42|        172|4150480.34|
|  5|  Diana Stewart| 83|        146|6896931.95|
+---+---------------+---+-----------+----------+
only showing top 5 rows

+-------+------+----------+-------------+--------------------+
|TransID|CustID|TransTotal|TransNumItems|           TransDesc|
+-------+------+----------+-------------+--------------------+
|      1| 38761|   1620.68|           15|Sound others inve...|
|      2| 49772|     331.5|            2|Health customer t...|
|      3| 13797|    796.66|            4| Direction who wide.|
|      4| 39414|    623.21|           15|Just memory to se...|
|      5| 26491|    1693.1|            9|Management situat...|
+-------+------+----------+---

In [2]:
# Task 2.0 - Loading data into storage.

# Storing the Dataset
customers_df.write.mode("overwrite").csv("Customers", header=True)
purchases_df.write.mode("overwrite").csv("Purchases", header=True)

# Storing it in csvs for clear view as well
customers_dff = customers_df.toPandas()
purchases_dff = purchases_df.toPandas()

customers_dff.to_csv('Customers.csv',index=False)
purchases_dff.to_csv('Purchases.csv',index=False)

In [3]:
# Task 2.1 - Filter out Purchases with a total purchase amount above $600

query_t1 = """
    SELECT *
    FROM Purchases
    WHERE TransTotal <= 600
"""
t1_df = spark.sql(query_t1)
t1_df.createOrReplaceTempView("T1")

In [4]:
# Displaying T1 results

t1_df.show()

+-------+------+----------+-------------+--------------------+
|TransID|CustID|TransTotal|TransNumItems|           TransDesc|
+-------+------+----------+-------------+--------------------+
|      2| 49772|     331.5|            2|Health customer t...|
|      6|  8556|    277.57|            4|Growth already vi...|
|     10| 27212|    288.92|            5|Common at write c...|
|     19| 42739|    352.36|           11|Compare ready civ...|
|     31|  3363|    526.67|            7|   Sea pattern huge.|
|     32| 26679|    170.93|            6|Cell career rathe...|
|     36| 32405|    122.82|            9|  Food surface fire.|
|     37| 36329|    268.54|            5|    Similar despite.|
|     53| 48545|    596.66|            4|Everything coach ...|
|     57|  3916|     440.4|            7|Wall financial ri...|
|     59|  7551|    290.34|            3|Arrive term shoul...|
|     61| 36052|     231.2|            2|Fund learn very n...|
|     62|  1283|     102.6|           14|Community incl

In [5]:
# Storing the output in a CSV File

t1q_df = t1_df.toPandas()
t1q_df.to_csv('T1_Output.csv',index=False)

In [6]:
t1q_df.count() # Checking the number of records in this case

TransID          1481866
CustID           1481866
TransTotal       1481866
TransNumItems    1481866
TransDesc        1481866
dtype: int64

In [7]:
# Task 2.2 - Grouping the Purchases in T1 by the Number of Items and calculating its statistics

query_t2 = """
    SELECT TransNumItems,
           percentile_approx(TransTotal, 0.5) AS Median,
           min(TransTotal) AS Min,
           max(TransTotal) AS Max
    FROM T1
    GROUP BY TransNumItems
"""
t2_df = spark.sql(query_t2)
t2_df.createOrReplaceTempView("T2")

In [8]:
# Displaying T2 results

t2_df.show()

+-------------+------+-----+------+
|TransNumItems|Median|  Min|   Max|
+-------------+------+-----+------+
|            7|303.97|10.01| 600.0|
|            6|304.58| 10.0| 600.0|
|            9| 305.1|10.01| 600.0|
|            5|305.31| 10.0| 600.0|
|            1|305.17| 10.0| 600.0|
|           10|305.49| 10.0| 600.0|
|            3|304.56| 10.0| 600.0|
|           12|305.74|10.01| 600.0|
|            8|304.18|10.01| 600.0|
|           11|303.72| 10.0| 600.0|
|            2|305.51| 10.0|599.99|
|            4|305.63|10.01| 600.0|
|           13|302.87|10.01| 600.0|
|           14|305.44| 10.0|599.99|
|           15|303.07|10.01| 600.0|
+-------------+------+-----+------+



In [9]:
# Storing the output in a CSV File

t2q_df = t2_df.toPandas()
t2q_df.to_csv('T2_Output.csv',index=False)

In [10]:
t2q_df.count() # Checking the number of records in this case

TransNumItems    15
Median           15
Min              15
Max              15
dtype: int64

In [11]:
# Task 2.3: Grouping Purchases in T1 by customer ID for young customers (18-25 years old)

query_t3 = """
    SELECT t1.CustID,
           MAX(Customers.Age) AS Age,
           COUNT(t1.TransNumItems) AS TotalNumItems,
           SUM(t1.TransTotal) AS TotalAmount
    FROM T1 t1
    JOIN Customers ON t1.CustID = Customers.ID
    WHERE Customers.Age BETWEEN 18 AND 25
    GROUP BY t1.CustID
"""
t3_df = spark.sql(query_t3)
t3_df.createOrReplaceTempView("T3")

In [12]:
# Displaying T3 results

t3_df.show()

+------+---+-------------+------------------+
|CustID|Age|TotalNumItems|       TotalAmount|
+------+---+-------------+------------------+
| 27651| 19|           26| 6890.619999999999|
| 45410| 20|           28|           8440.57|
| 17048| 25|           29|           8968.87|
| 39256| 23|           38|12813.800000000003|
| 39713| 23|           36|           10156.1|
| 25649| 19|           29|10122.279999999999|
| 23492| 23|           28| 8017.809999999999|
| 19141| 24|           23|6299.0199999999995|
| 19158| 20|           32|          11906.54|
| 13638| 21|           27|           6538.03|
| 39104| 22|           30| 9884.439999999999|
| 40634| 25|           24| 6560.539999999999|
| 32912| 20|           30|          10139.25|
| 18147| 22|           31| 8702.710000000001|
| 39473| 22|           35|          10069.37|
| 45298| 22|           31|10837.039999999997|
| 15375| 24|           25|           5237.67|
| 32667| 24|           33|           9026.79|
| 49048| 20|           26| 7291.82

In [13]:
# Storing the output in a CSV File

t3q_df = t3_df.toPandas()
t3q_df.to_csv('T3_Output.csv',index=False)

In [14]:
t3q_df.count() # Checking the number of records in this case

CustID           4813
Age              4813
TotalNumItems    4813
TotalAmount      4813
dtype: int64

In [15]:
# Task 2.4: Returning customer pairs meeting specified conditions

query_t4 = """
    SELECT t3a.CustID AS C1_ID,
           t3b.CustID AS C2_ID,
           t3a.Age AS Age1,
           t3b.Age AS Age2,
           t3a.TotalAmount AS TotalAmount1,
           t3b.TotalAmount AS TotalAmount2,
           t3a.TotalNumItems AS TotalItemCount1,
           t3b.TotalNumItems AS TotalItemCount2
    FROM T3 t3a
    JOIN T3 t3b ON t3a.CustID < t3b.CustID
               AND t3a.Age < t3b.Age
               AND t3a.TotalAmount > t3b.TotalAmount
               AND t3a.TotalNumItems < t3b.TotalNumItems
"""
t4_df = spark.sql(query_t4)

In [16]:
# Displaying T4 results

t4_df.show()

+-----+-----+----+----+------------------+-----------------+---------------+---------------+
|C1_ID|C2_ID|Age1|Age2|      TotalAmount1|     TotalAmount2|TotalItemCount1|TotalItemCount2|
+-----+-----+----+----+------------------+-----------------+---------------+---------------+
|27651|45145|  19|  21| 6890.619999999999|6584.800000000001|             26|             28|
|27651|30617|  19|  23| 6890.619999999999|          6757.33|             26|             27|
|27651|45599|  19|  21| 6890.619999999999|6279.000000000001|             26|             27|
|45410|45621|  20|  24|           8440.57|8297.279999999999|             28|             29|
|45410|46638|  20|  24|           8440.57|7610.399999999998|             28|             30|
|25649|39104|  19|  22|10122.279999999999|9884.439999999999|             29|             30|
|25649|39473|  19|  22|10122.279999999999|         10069.37|             29|             35|
|25649|32667|  19|  24|10122.279999999999|          9026.79|          

In [17]:
# Storing the output in a CSV File

t4q_df = t4_df.toPandas()
t4q_df.to_csv('T4_Output.csv',index=False)

In [18]:
t4q_df.count() # Checking the number of records in this case

C1_ID              343226
C2_ID              343226
Age1               343226
Age2               343226
TotalAmount1       343226
TotalAmount2       343226
TotalItemCount1    343226
TotalItemCount2    343226
dtype: int64

In [19]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression, RandomForestRegressor, GBTRegressor, DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline


# Task 2.5 - Generating Dataset composed of specified attributes
combined_dataset_df = purchases_df.join(customers_df, purchases_df.CustID == customers_df.ID) \
    .select("CustID", "TransID", "Age", "Salary", "TransNumItems", "TransTotal")


# Storing the Dataset
combined_dataset_df.write.mode("overwrite").csv("Combined_Dataset", header=True)


# Storing it in CSV for clear view as well
combined_ds = combined_dataset_df.toPandas()
combined_ds.to_csv('Combined_Dataset.csv',index=False)

In [21]:
# Task 2.6 - Randomly split Dataset into Trainset (80%) and Testset (20%)
train_df, test_df = combined_dataset_df.randomSplit([0.8, 0.2], seed=42)

In [22]:
# Task 2.7 & 2.8 - Training and applying regression models & evaluating them using regression evaluation metrics.

# Preparing feature vector
feature_cols = ["Age", "Salary", "TransNumItems"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

In [23]:
# Linear Regression Model

# Initialization
lr = LinearRegression(labelCol="TransTotal", featuresCol="features")

# Fitting model on Trainset
lr_model = lr.fit(assembler.transform(train_df))

# Fitting model on Testset
lr_predictions = lr_model.transform(assembler.transform(test_df))

evaluator = RegressionEvaluator(labelCol="TransTotal", predictionCol="prediction")

# Metrics for Linear Regression
lr_rmse = evaluator.evaluate(lr_predictions, {evaluator.metricName: "rmse"})
lr_mae = evaluator.evaluate(lr_predictions, {evaluator.metricName: "mae"})
lr_r2 = evaluator.evaluate(lr_predictions, {evaluator.metricName: "r2"})

# Printing metrics
print("Linear Regression Metrics:")
print(f"RMSE: {lr_rmse}, MAE: {lr_mae}, R^2: {lr_r2}")


Linear Regression Metrics:
RMSE: 574.3698283992732, MAE: 497.4818900678744, R^2: -2.815956533286368e-06


In [24]:
# Decision Tree Model

# Initialization
dt = DecisionTreeRegressor(labelCol="TransTotal", featuresCol="features")

# Fitting model on Trainset
dt_model = dt.fit(assembler.transform(train_df))

# Fitting model on Testset
dt_predictions = dt_model.transform(assembler.transform(test_df))

# Metrics for Decision Tree
dt_rmse = evaluator.evaluate(dt_predictions, {evaluator.metricName: "rmse"})
dt_mae = evaluator.evaluate(dt_predictions, {evaluator.metricName: "mae"})
dt_r2 = evaluator.evaluate(dt_predictions, {evaluator.metricName: "r2"})

# Printing metrics
print("\nDecision Tree Metrics:")
print(f"RMSE: {dt_rmse}, MAE: {dt_mae}, R^2: {dt_r2}")



Decision Tree Metrics:
RMSE: 574.3800349482891, MAE: 497.4878180779392, R^2: -3.835636251503516e-05


In [25]:
# Random Forest Model

# Initialization
rf = RandomForestRegressor(labelCol="TransTotal", featuresCol="features")

# Fitting model on Trainset
rf_model = rf.fit(assembler.transform(train_df))

# Fitting model on Testset
rf_predictions = rf_model.transform(assembler.transform(test_df))

# Metrics for RandomForest
rf_rmse = evaluator.evaluate(rf_predictions, {evaluator.metricName: "rmse"})
rf_mae = evaluator.evaluate(rf_predictions, {evaluator.metricName: "mae"})
rf_r2 = evaluator.evaluate(rf_predictions, {evaluator.metricName: "r2"})

# Printing metrics
print("\nRandomForest Metrics:")
print(f"RMSE: {rf_rmse}, MAE: {rf_mae}, R^2: {rf_r2}")


RandomForest Metrics:
RMSE: 574.3703855447804, MAE: 497.4820522243702, R^2: -4.755986523941047e-06


In [26]:
# GBT Model

# Initialization
gbt = GBTRegressor(labelCol="TransTotal", featuresCol="features")

# Fitting model on Trainset
gbt_model = gbt.fit(assembler.transform(train_df))

# Fitting model on Testset
gbt_predictions = gbt_model.transform(assembler.transform(test_df))

# Metrics for GBT
gbt_rmse = evaluator.evaluate(gbt_predictions, {evaluator.metricName: "rmse"})
gbt_mae = evaluator.evaluate(gbt_predictions, {evaluator.metricName: "mae"})
gbt_r2 = evaluator.evaluate(gbt_predictions, {evaluator.metricName: "r2"})

# Printing metrics
print("\nGBT Metrics:")
print(f"RMSE: {gbt_rmse}, MAE: {gbt_mae}, R^2: {gbt_r2}")


GBT Metrics:
RMSE: 574.3878801488834, MAE: 497.4911068734467, R^2: -6.567470441232182e-05


In [27]:
# Stop the Spark session
spark.stop()