In [73]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
import matplotlib.pyplot as plt
import seaborn as sns

In [2]:
spark=SparkSession.builder.appName("Models") \
.config ("spark.sql.shuffle.partitions", "16") \
.config("spark.driver.maxResultSize","4g") \
.config ("spark.sql.execution.arrow.pyspark.enabled", "true") \
.config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
.config("spark.driver.memory", "4g") \
.config("spark.executor.cores", "4") \
.getOrCreate()

spark.sparkContext.setLogLevel("WARN")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/09/02 07:06:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
df = spark.read.csv("ecomerce.csv", header=True, inferSchema=True)

                                                                                

In [32]:
#null values
df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show()



+----------+----------+----------+-----------+-------------+-------+-----+-------+------------+
|event_time|event_type|product_id|category_id|category_code|  brand|price|user_id|user_session|
+----------+----------+----------+-----------+-------------+-------+-----+-------+------------+
|         0|         0|         0|          0|     13515609|6113008|    0|      0|           2|
+----------+----------+----------+-----------+-------------+-------+-----+-------+------------+





In [35]:
# Remove rows with null values in the "brand" column
df = df.na.drop(subset=["brand"])

# Since we won't be working through the category code, instead category_id will be used

In [34]:
df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show()



+----------+----------+----------+-----------+-------------+-----+-----+-------+------------+
|event_time|event_type|product_id|category_id|category_code|brand|price|user_id|user_session|
+----------+----------+----------+-----------+-------------+-----+-----+-------+------------+
|         0|         0|         0|          0|      9775134|    0|    0|      0|           2|
+----------+----------+----------+-----------+-------------+-----+-----+-------+------------+



                                                                                

In [51]:
# Preprocess the data and encode categorical variables
event_type_indexer = StringIndexer(inputCol="event_type", outputCol="event_type_index")

In [52]:
# Preprocess the data and encode categorical variables
brand_indexer = StringIndexer(inputCol="brand", outputCol="brand_index")

In [53]:
pipeline = Pipeline(stages=[event_type_indexer, brand_indexer])

indexed = pipeline.fit(df).transform(df)

                                                                                

In [54]:
indexed

DataFrame[event_time: string, event_type: string, product_id: int, category_id: bigint, category_code: string, brand: string, price: double, user_id: int, user_session: string, event_type_index: double, brand_index: double]

In [59]:
# Select relevant features
feature_columns = ["category_id", "price"]  # Replace with actual feature names
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
data = assembler.transform(indexed)

In [60]:
data.show()

+--------------------+----------+----------+-------------------+--------------------+--------+-------+---------+--------------------+----------------+-----------+--------------------+
|          event_time|event_type|product_id|        category_id|       category_code|   brand|  price|  user_id|        user_session|event_type_index|brand_index|            features|
+--------------------+----------+----------+-------------------+--------------------+--------+-------+---------+--------------------+----------------+-----------+--------------------+
|2019-10-01 00:00:...|      view|  44600062|2103807459595387724|                null|shiseido|  35.79|541312140|72d76fde-8bb3-4e0...|             0.0|      595.0|[2.10380745959538...|
|2019-10-01 00:00:...|      view|   3900821|2053013552326770905|appliances.enviro...|    aqua|   33.2|554748717|9333dfbd-b87a-470...|             0.0|      260.0|[2.05301355232677...|
|2019-10-01 00:00:...|      view|   1307067|2053013558920217191|  computers.note

In [61]:
# Split the data into training and testing sets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=123)

In [62]:
# Train a Random Forest classifier
rf = RandomForestClassifier(labelCol="event_type_index", featuresCol="features", numTrees=10)
model = rf.fit(train_data)

23/09/02 07:37:29 WARN MemoryStore: Not enough space to cache rdd_247_5 in memory! (computed 7.8 MiB so far)
23/09/02 07:37:29 WARN BlockManager: Persisting block rdd_247_5 to disk instead.
23/09/02 07:37:29 WARN MemoryStore: Not enough space to cache rdd_247_4 in memory! (computed 7.8 MiB so far)
23/09/02 07:37:29 WARN BlockManager: Persisting block rdd_247_4 to disk instead.
23/09/02 07:37:29 WARN MemoryStore: Not enough space to cache rdd_247_2 in memory! (computed 3.4 MiB so far)
23/09/02 07:37:29 WARN BlockManager: Persisting block rdd_247_2 to disk instead.
23/09/02 07:37:29 WARN MemoryStore: Not enough space to cache rdd_247_6 in memory! (computed 7.8 MiB so far)
23/09/02 07:37:29 WARN BlockManager: Persisting block rdd_247_6 to disk instead.
23/09/02 07:37:29 WARN MemoryStore: Not enough space to cache rdd_247_3 in memory! (computed 17.6 MiB so far)
23/09/02 07:37:29 WARN BlockManager: Persisting block rdd_247_3 to disk instead.
23/09/02 07:37:29 WARN MemoryStore: Not enough sp

23/09/02 07:38:15 WARN MemoryStore: Not enough space to cache rdd_247_11 in memory! (computed 17.6 MiB so far)
23/09/02 07:38:15 WARN MemoryStore: Not enough space to cache rdd_247_8 in memory! (computed 17.6 MiB so far)
23/09/02 07:38:15 WARN MemoryStore: Not enough space to cache rdd_247_10 in memory! (computed 17.6 MiB so far)
23/09/02 07:38:15 WARN MemoryStore: Not enough space to cache rdd_247_14 in memory! (computed 7.8 MiB so far)
23/09/02 07:38:15 WARN MemoryStore: Not enough space to cache rdd_247_13 in memory! (computed 11.7 MiB so far)
23/09/02 07:38:15 WARN MemoryStore: Not enough space to cache rdd_247_12 in memory! (computed 17.6 MiB so far)
23/09/02 07:38:15 WARN MemoryStore: Not enough space to cache rdd_247_9 in memory! (computed 26.6 MiB so far)
23/09/02 07:38:15 WARN MemoryStore: Not enough space to cache rdd_247_15 in memory! (computed 7.8 MiB so far)
23/09/02 07:38:16 WARN MemoryStore: Not enough space to cache rdd_247_17 in memory! (computed 41.6 MiB so far)
23/09

In [63]:
# Make predictions on the test data
predictions = model.transform(test_data)

In [65]:
# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol="event_type_index", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy:", accuracy)



Accuracy: 0.9561969352836949




In [67]:
metrics = metrics = MulticlassMetrics(predictions.select("prediction", "event_type_index").rdd)
confusion_matrix = metrics.confusionMatrix()

                                                                                

In [68]:
confusion_matrix

DenseMatrix(3, 3, [6950214.0, 181049.0, 137338.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0], 0)

In [69]:
# Create a heatmap of the confusion matrix
plt.figure(figsize=(8, 6))
sns.heatmap(confusion_matrix, annot=True, fmt='d', cmap='Blues', )
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.title('Confusion Matrix')
plt.show()

NameError: name 'plt' is not defined