## Big Data Project - Model2: Random Forest
### This file contain code and result of Random Forest trained on review of books.jsonl file with 2 executor on 50 percent of actual dataset

In [1]:
import pandas as pd
import pyspark.sql.functions as F
from pyspark.sql.functions import col, count, when

### --Creating and building spark session with 2 maxExecutors with 50 partition

In [2]:
from pyspark.sql import SparkSession

# Creating a SparkSession
spark = (
  SparkSession.builder
    .appName("AmazonReviewsUsingBERT")
    .master("yarn")
    .config("spark.dynamicAllocation.enabled", "true")
    .config("spark.shuffle.service.enabled",   "true")
    .config("spark.dynamicAllocation.minExecutors","1")
    .config("spark.dynamicAllocation.maxExecutors","2")
    .config("spark.sql.shuffle.partitions",       "50")
    .getOrCreate()
)

25/05/05 15:00:29 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


### --Data Loading and preprocessing
#### I have turned my data as public so you should be able to run the below code, if there is problem while running then please let me know.

In [3]:
# Loadig the JSONL file into a Spark DataFrame
df = spark.read.json("gs://bigdataprojectdata/notebooks/jupyter/Books.jsonl")


                                                                                

In [4]:
#HANDLING MISSING VALUES
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()



+----+------------+------+-----------+------+----+---------+-----+-------+-----------------+
|asin|helpful_vote|images|parent_asin|rating|text|timestamp|title|user_id|verified_purchase|
+----+------------+------+-----------+------+----+---------+-----+-------+-----------------+
|   0|           0|     0|          0|     0|   0|        0|    0|      0|                0|
+----+------------+------+-----------+------+----+---------+-----+-------+-----------------+



                                                                                

In [5]:
from pyspark.sql.functions import when
df = df.withColumn(
    "sentiment", 
    when(df["rating"] >= 4, "positive")
    .when(df["rating"] <= 3, "negative")
)

In [6]:
df = df.dropDuplicates(["title", "user_id"])

In [7]:
df = df.select("text","rating", "sentiment")

In [8]:
#checking if reviews "text" is a proper review i.e. contains more than 3 words. Dropping all the rows which has text containg less that 4 words
df = (df
                         .withColumn("word_count", F.size(F.split(F.col("text"),r"\s+")))
                         .filter(F.col("word_count")>=4)
                         .drop("word_count")
    )
df

DataFrame[text: string, rating: double, sentiment: string]

In [9]:
#checkign the counts of sentiment rows if they are balanced or not
counts_sentiments = (
    df
        .groupBy("sentiment")
        .agg(F.count("*").alias("review_count"))
        .orderBy("sentiment")
)
counts_sentiments.show()



+---------+------------+
|sentiment|review_count|
+---------+------------+
| negative|     4144153|
| positive|    21092378|
+---------+------------+



                                                                                

In [10]:
fractions = {"positive": 0.4, "negative": 1.0}

df = df.sampleBy("sentiment", fractions, seed=42)



In [11]:
# Sampling only 50% of data 
df = df.sample(withReplacement=False, fraction=0.50, seed=42)

In [12]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
from pyspark.ml.classification import RandomForestClassifier

In [13]:
# Text preprocessing
tokenizer = Tokenizer(inputCol="text", outputCol="words")
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=10000)
idf = IDF(inputCol="rawFeatures", outputCol="features")

In [14]:

# Label encoding 
indexer = StringIndexer(inputCol="sentiment", outputCol="label")

In [15]:
# Random Forest Classifier
rf = RandomForestClassifier(featuresCol="features", labelCol="label", numTrees=50)

In [16]:
# Combine all into a pipeline
pipeline = Pipeline(stages=[tokenizer, remover, hashingTF, idf, indexer, rf])

In [17]:
# Split data
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

In [18]:
import time
start = time.time()
model = pipeline.fit(train_df)
predictions = model.transform(test_df)
end = time.time()
print(f"Training Time: {end - start:.2f} seconds")


25/05/05 15:39:24 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_89_16 !
25/05/05 15:39:24 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_89_8 !
25/05/05 15:39:24 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_89_12 !
25/05/05 15:39:24 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_89_0 !
25/05/05 15:39:24 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_89_20 !
25/05/05 15:39:24 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_89_28 !
25/05/05 15:39:24 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_89_4 !
25/05/05 15:39:24 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_89_24 !
25/05/05 15:39:25 WARN YarnAllocator: Container from a bad node: container_1746457157008_0001_01_000002 on host: cluster-c2aa-w-1.northamerica-south1-b.c.academic-timing-458516-v2.internal. Exit status: 137. Diagnostics: [2025-05-05 15:39:25.002]Conta

25/05/05 15:45:25 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_89_49 !
25/05/05 15:45:25 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_89_6 !
25/05/05 15:45:25 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_89_41 !
25/05/05 15:45:25 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_89_26 !
25/05/05 15:45:25 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_89_34 !
25/05/05 15:45:25 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_89_18 !
25/05/05 15:45:25 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_89_10 !
25/05/05 15:45:25 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_89_22 !
25/05/05 15:45:25 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_89_14 !
25/05/05 15:45:25 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_89_30 !
25/05/05 15:45:25 WARN BlockManagerMasterEndpoint: No more re

25/05/05 16:00:37 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_89_5 !
25/05/05 16:00:37 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_89_32 !
25/05/05 16:00:37 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_89_16 !
25/05/05 16:00:37 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_89_44 !
25/05/05 16:00:37 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_89_36 !
25/05/05 16:00:37 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_89_7 !
25/05/05 16:00:37 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_89_2 !
25/05/05 16:00:37 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_89_33 !
25/05/05 16:00:37 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_89_12 !
25/05/05 16:00:37 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_89_20 !
25/05/05 16:00:37 WARN BlockManagerMasterEndpoint: No more repl

Training Time: 3859.86 seconds


In [19]:
predictions.select("text", "sentiment", "prediction").show(10)

[Stage 48:>                                                         (0 + 1) / 1]

+--------------------+---------+----------+
|                text|sentiment|prediction|
+--------------------+---------+----------+
|" '...There will ...| negative|       0.0|
|""" Lies<br />by ...| negative|       0.0|
|"...In the year 2...| positive|       0.0|
|"...you cannot se...| positive|       0.0|
|"A Dragons Tale" ...| positive|       0.0|
|"A Navy Maverick ...| negative|       0.0|
|"A man lay on the...| positive|       0.0|
|"All that Glitter...| positive|       0.0|
|"Athena" -- starr...| negative|       0.0|
|"BIRDS OF BRITAIN...| positive|       0.0|
+--------------------+---------+----------+
only showing top 10 rows



                                                                                

In [20]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Test Accuracy: {accuracy:.4f}")

25/05/05 16:15:40 WARN YarnAllocator: Container from a bad node: container_1746457157008_0001_01_000009 on host: cluster-c2aa-w-0.northamerica-south1-b.c.academic-timing-458516-v2.internal. Exit status: 143. Diagnostics: [2025-05-05 16:15:40.161]Container killed on request. Exit code is 143
[2025-05-05 16:15:40.163]Container exited with a non-zero exit code 143. 
[2025-05-05 16:15:40.163]Killed by external signal
.
25/05/05 16:15:40 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 8 for reason Container from a bad node: container_1746457157008_0001_01_000009 on host: cluster-c2aa-w-0.northamerica-south1-b.c.academic-timing-458516-v2.internal. Exit status: 143. Diagnostics: [2025-05-05 16:15:40.161]Container killed on request. Exit code is 143
[2025-05-05 16:15:40.163]Container exited with a non-zero exit code 143. 
[2025-05-05 16:15:40.163]Killed by external signal
.
25/05/05 16:15:40 ERROR YarnScheduler: Lost executor 8 on cluster-c2aa-w-0.northame

Test Accuracy: 0.6702


                                                                                

In [21]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
bce = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction",
                                    labelCol="label",
                                    metricName="areaUnderROC")
roc_auc = bce.evaluate(predictions)

                                                                                

In [22]:
f1 = evaluator.setMetricName("f1").evaluate(predictions)


                                                                                

In [23]:
print(f"Test ROC AUC = {roc_auc:.4f}")
print(f"Test Accuracy = {accuracy:.4f}")
print(f"Test F1 Score = {f1:.4f}")

Test ROC AUC = 0.7638
Test Accuracy = 0.6702
Test F1 Score = 0.5383


In [24]:
from pyspark import SparkContext

sc = spark.sparkContext
executors = sc._jsc.sc().getExecutorMemoryStatus().keySet()
print(f"Active Executors: {executors}")

Active Executors: Set(cluster-c2aa-m.northamerica-south1-b.c.academic-timing-458516-v2.internal:33325, cluster-c2aa-w-1.northamerica-south1-b.c.academic-timing-458516-v2.internal:46435, cluster-c2aa-w-0.northamerica-south1-b.c.academic-timing-458516-v2.internal:42817, cluster-c2aa-w-0.northamerica-south1-b.c.academic-timing-458516-v2.internal:42183, cluster-c2aa-w-1.northamerica-south1-b.c.academic-timing-458516-v2.internal:46815)
