In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
# 1. Initialize Spark Session
spark = SparkSession.builder.appName("SentimentAnalysis").getOrCreate()

# 2. Load Data
data_path = "D:/Study/NLP/Lab/week4/sentiments.csv"  
df = spark.read.csv(data_path, header=True, inferSchema=True)
# Convert -1/1 labels to 0/1: Normalize sentiment labels
df = df.withColumn("label", (col("sentiment").cast("integer") + 1) / 2)
# Drop rows with null sentiment values before processing
initial_row_count = df.count()
df = df.dropna(subset=["sentiment"])
print(f"Loaded {initial_row_count} rows initially, dropped {initial_row_count - df.count()} null rows, final count: {df.count()}")

# Split the data into training and test sets
trainingData, testData = df.randomSplit([0.8, 0.2], seed=42)

# 3. Build Preprocessing Pipeline
# Tokenizer: Splits text into words (tokens).
tokenizer = Tokenizer(inputCol="text", outputCol="words")

# StopWordsRemover: Removes common stop words from the token list.
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered_words")

# HashingTF: Converts a set of tokens into a fixed-size feature vector using a hashing technique.
hashingTF = HashingTF(inputCol="filtered_words", outputCol="raw_features", numFeatures=10000)

# IDF (Inverse Document Frequency): Rescales the feature vectors produced by HashingTF.
idf = IDF(inputCol="raw_features", outputCol="features")

# 4. Train the Model
# LogisticRegression: The model used for classification.
lr = LogisticRegression(maxIter=10, regParam=0.001, featuresCol="features", labelCol="label")

# Assemble the Pipeline: All steps are combined into a single Pipeline.
pipeline = Pipeline(stages=[tokenizer, stopwordsRemover, hashingTF, idf, lr])

# Training: Call pipeline.fit() on the training data.
model = pipeline.fit(trainingData)

# 5. Evaluate the Model
# Use model.transform() on the test data to get predictions.
predictions = model.transform(testData)

# MulticlassClassificationEvaluator is used to calculate metrics like accuracy and f1.
evaluator_accuracy = MulticlassClassificationEvaluator(metricName="accuracy", labelCol="label", predictionCol="prediction")
accuracy = evaluator_accuracy.evaluate(predictions)
print(f"Accuracy: {accuracy:.4f}")

evaluator_f1 = MulticlassClassificationEvaluator(metricName="f1", labelCol="label", predictionCol="prediction")
f1 = evaluator_f1.evaluate(predictions)
print(f"F1 Score: {f1:.4f}")

evaluator_precision = MulticlassClassificationEvaluator(metricName="weightedPrecision", labelCol="label", predictionCol="prediction")
precision = evaluator_precision.evaluate(predictions)
print(f"Weighted Precision: {precision:.4f}")

evaluator_recall = MulticlassClassificationEvaluator(metricName="weightedRecall", labelCol="label", predictionCol="prediction")
recall = evaluator_recall.evaluate(predictions)
print(f"Weighted Recall: {recall:.4f}")

spark.stop()

Loaded 5792 rows initially, dropped 1 null rows, final count: 5791
Accuracy: 0.7295
F1 Score: 0.7266
Weighted Precision: 0.7255
Weighted Recall: 0.7295
