In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import CountVectorizer, IDF, StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.pipeline import PipelineModel

In [2]:
spark = SparkSession.builder\
	.master("local")\
	.appName("Classification")\
	.config("spark.executor.memory", "6g") \
    .config("spark.driver.memory", "2g") \
	.config("spark.executor.memoryOverhead", "4g")\
	.getOrCreate()

24/12/10 21:22:24 WARN Utils: Your hostname, hungnd-VirtualBox resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
24/12/10 21:22:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/10 21:22:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
df = spark.read.parquet("hdfs://localhost:9000/final/cleaned_data/")
df.show()
df.printSchema()

                                                                                

+------+---------+---------------+--------------------+-----------+--------------------+--------------------+--------------------+
|ItemID|Sentiment|SentimentSource|       SentimentText|text_length|               words|      filtered_words|    lemmatized_words|
+------+---------+---------------+--------------------+-----------+--------------------+--------------------+--------------------+
|    62|        1|   Sentiment140|i always get what...|         27|[i, always, get, ...| [always, get, want]| [always, get, want]|
|   194|        1|   Sentiment140|tell  i said happ...|         44|[tell, , i, said,...|[tell, , said, ha...|[tell, , said, ha...|
|   436|        0|   Sentiment140|i hope everyone i...|        136|[i, hope, everyon...|[hope, everyone, ...|[hope, everyone, ...|
|   474|        0|   Sentiment140|all my friends ar...|        101|[all, my, friends...|[friends, gone, h...|[friend, gone, ha...|
|   619|        0|   Sentiment140|bcds closed i gue...|         36|[bcds, closed, i

In [4]:
df = df.repartition(8)

indexer = StringIndexer(inputCol="Sentiment", outputCol="label")
data = indexer.fit(df).transform(df)

vectorizer = CountVectorizer(inputCol="lemmatized_words", outputCol="raw_features")

idf = IDF(inputCol="raw_features", outputCol="idf")

assembler = VectorAssembler(inputCols=["idf"], outputCol="features")

# Split train/test dataset
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)


                                                                                

In [None]:
# Logistic Regression
lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=20)
lr_pipeline = Pipeline(stages=[vectorizer, idf, assembler, lr])
lr_model = lr_pipeline.fit(train_data)

lr_predictions = lr_model.transform(test_data)

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
lr_accuracy = evaluator.evaluate(lr_predictions)
print(f"Logistic Regression Accuracy: {lr_accuracy:.2f}")

24/12/10 18:56:30 WARN DAGScheduler: Broadcasting large task binary with size 2.9 MiB
24/12/10 18:56:47 WARN DAGScheduler: Broadcasting large task binary with size 6.9 MiB
24/12/10 18:57:03 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/12/10 18:57:04 WARN DAGScheduler: Broadcasting large task binary with size 6.9 MiB
24/12/10 18:58:49 WARN DAGScheduler: Broadcasting large task binary with size 6.9 MiB
24/12/10 19:00:21 WARN DAGScheduler: Broadcasting large task binary with size 6.9 MiB
24/12/10 19:01:55 WARN DAGScheduler: Broadcasting large task binary with size 6.9 MiB
24/12/10 19:03:29 WARN DAGScheduler: Broadcasting large task binary with size 6.9 MiB
24/12/10 19:05:07 WARN DAGScheduler: Broadcasting large task binary with size 6.9 MiB
24/12/10 19:06:43 WARN DAGScheduler: Broadcasting large task binary with size 6.9 MiB
24/12/10 19:08:19 WARN DAGScheduler: Broadcasting large task binary with size 6.9 MiB
24/12/10 19:09:54 WARN DAGSchedul

Logistic Regression Accuracy: 0.76


                                                                                

In [None]:
#save model to hdfs
lr_model.save("hdfs://localhost:9000/models/lr_sentiment_model")

24/12/10 19:35:40 WARN TaskSetManager: Stage 42 contains a task of very large size (5076 KiB). The maximum recommended task size is 1000 KiB.
24/12/10 19:35:41 WARN TaskSetManager: Stage 46 contains a task of very large size (4189 KiB). The maximum recommended task size is 1000 KiB.
24/12/10 19:35:42 WARN TaskSetManager: Stage 50 contains a task of very large size (2099 KiB). The maximum recommended task size is 1000 KiB.


In [None]:
# Load Logistic Regression model
loaded_lr_model = PipelineModel.load("hdfs://localhost:9000/models/lr_sentiment_model")

lr_predictions = loaded_lr_model.transform(test_data)
lr_predictions.select("SentimentText", "label", "prediction").show(10, truncate=False)

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
lr_accuracy = evaluator.evaluate(lr_predictions)
print(f"Logistic Regression Accuracy: {lr_accuracy:.2f}")

roc_evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="probability", metricName="areaUnderROC")
roc_auc = roc_evaluator.evaluate(lr_predictions)
print(f"Logistic Regression ROC AUC: {roc_auc:.2f}")

24/12/10 21:08:00 WARN DAGScheduler: Broadcasting large task binary with size 8.9 MiB
                                                                                

+-------------------------------------------------------------------------------------------------------------------------------+-----+----------+
|SentimentText                                                                                                                  |label|prediction|
+-------------------------------------------------------------------------------------------------------------------------------+-----+----------+
|hmmmm i wonder how she my number                                                                                               |0.0  |0.0       |
|and the entertainment is over someone complained properly    experimental you say he should experiment with a melody           |1.0  |0.0       |
|i think i may be too friendlylol o well                                                                                        |1.0  |1.0       |
|no pavel tonight lttigersfan gt                                                                                      

24/12/10 21:08:11 WARN DAGScheduler: Broadcasting large task binary with size 8.9 MiB
                                                                                

Logistic Regression Accuracy: 0.82


24/12/10 21:08:27 WARN DAGScheduler: Broadcasting large task binary with size 8.9 MiB

Logistic Regression ROC AUC: 0.90


                                                                                

In [6]:
# Naive Bayes
nb = NaiveBayes(featuresCol="features", labelCol="label", smoothing=1.0, modelType="multinomial")

pipeline = Pipeline(stages=[vectorizer, idf, assembler, nb])

nb_model = pipeline.fit(train_data)

nb_predictions = nb_model.transform(test_data)

nb_accuracy = evaluator.evaluate(nb_predictions)
print(f"Naive Bayes Accuracy: {nb_accuracy:.4f}")

roc_auc = roc_evaluator.evaluate(nb_predictions)
print(f"Naive Bayes ROC AUC: {roc_auc:.2f}")


24/12/10 21:09:14 WARN DAGScheduler: Broadcasting large task binary with size 2.9 MiB
24/12/10 21:09:25 WARN DAGScheduler: Broadcasting large task binary with size 2.9 MiB
24/12/10 21:09:45 WARN DAGScheduler: Broadcasting large task binary with size 20.3 MiB
24/12/10 21:10:00 WARN DAGScheduler: Broadcasting large task binary with size 20.3 MiB
24/12/10 21:10:11 WARN DAGScheduler: Broadcasting large task binary with size 24.3 MiB
24/12/10 21:10:12 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
                                                                                

Naive Bayes Accuracy: 0.7487


24/12/10 21:10:28 WARN DAGScheduler: Broadcasting large task binary with size 24.3 MiB
                                                                                

Naive Bayes ROC AUC: 0.80


                                                                                

In [8]:
#save model to hdfs
nb_model.save("hdfs://localhost:9000/models/nb_sentiment_model")

24/12/10 20:56:31 WARN TaskSetManager: Stage 75 contains a task of very large size (5076 KiB). The maximum recommended task size is 1000 KiB.
24/12/10 20:56:32 WARN TaskSetManager: Stage 79 contains a task of very large size (4189 KiB). The maximum recommended task size is 1000 KiB.
24/12/10 20:56:33 WARN TaskSetManager: Stage 84 contains a task of very large size (4189 KiB). The maximum recommended task size is 1000 KiB.


In [5]:
# Random Forest Classifier
rf = RandomForestClassifier(featuresCol="features", labelCol="label", numTrees=40, maxDepth=10)

rf_pipeline = Pipeline(stages=[vectorizer, idf, assembler, rf])

rf_model = rf_pipeline.fit(train_data)

rf_predictions = rf_model.transform(test_data)
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
rf_accuracy = evaluator.evaluate(rf_predictions)
print(f"Random Forest Accuracy: {rf_accuracy:.4f}")

roc_evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="probability", metricName="areaUnderROC")
rf_roc_auc = roc_evaluator.evaluate(rf_predictions)
print(f"Naive Bayes ROC AUC: {roc_auc:.4f}")

24/12/10 21:23:19 WARN DAGScheduler: Broadcasting large task binary with size 2.9 MiB
24/12/10 21:23:29 WARN DAGScheduler: Broadcasting large task binary with size 2.9 MiB
24/12/10 21:23:52 WARN DAGScheduler: Broadcasting large task binary with size 20.3 MiB
24/12/10 21:23:53 WARN DAGScheduler: Broadcasting large task binary with size 20.3 MiB
24/12/10 21:24:07 WARN DAGScheduler: Broadcasting large task binary with size 22.9 MiB
24/12/10 21:25:23 WARN DAGScheduler: Broadcasting large task binary with size 1033.7 KiB
24/12/10 21:25:25 WARN DAGScheduler: Broadcasting large task binary with size 23.8 MiB
24/12/10 21:25:26 WARN MemoryStore: Not enough space to cache rdd_77_0 in memory! (computed 630.4 MiB so far)
24/12/10 21:25:26 WARN BlockManager: Persisting block rdd_77_0 to disk instead.
24/12/10 21:31:35 ERROR Executor: Exception in task 0.0 in stage 29.0 (TID 85)8]
java.lang.OutOfMemoryError: Java heap space
	at java.base/java.lang.reflect.Array.newInstance(Array.java:78)
	at java.ba

ConnectionRefusedError: [Errno 111] Connection refused