In [2]:
from pyspark.sql import SparkSession
from pyspark.mllib.feature import HashingTF
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import LogisticRegressionWithSGD
from pyspark.mllib.evaluation import MulticlassMetrics
from dotenv import load_dotenv
import os

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
# 1. Initialize Spark Session
spark = SparkSession.builder \
    .appName("Sentiment Analysis with JSON Data") \
    .getOrCreate()

sc = spark.sparkContext

# 2. Load JSON Data
load_dotenv("./env.env")
path = os.getenv("DATA_PATH")
data_df = spark.read.json(path)

# 3. Select Relevant Fields
# Filter rows with non-null "reviewText" and "overall"
filtered_df = data_df.select("reviewText", "overall") \
                     .filter("reviewText IS NOT NULL AND overall IS NOT NULL")

# 4. Transform DataFrame to RDD and Label Data
# Map rows to (label, reviewText)
data_rdd = filtered_df.rdd.map(lambda row: (
    1.0 if row["overall"] >= 4 else 0.0,  # Positive sentiment (>=4) -> 1.0, Negative -> 0.0
    row["reviewText"]
))

In [4]:

# 5. Split Data into Training and Testing Sets
train_rdd, test_rdd = data_rdd.randomSplit([0.8, 0.2], seed=42)

# 6. Feature Engineering with HashingTF
hashing_tf = HashingTF(numFeatures=10000)  # 10,000-dimensional feature space

def featurize_data(label, text):
    words = text.split()  # Tokenize the review text
    features = hashing_tf.transform(words)  # Generate feature vector
    return LabeledPoint(label, features)

# Apply feature engineering
train_features = train_rdd.map(lambda x: featurize_data(x[0], x[1]))
test_features = test_rdd.map(lambda x: featurize_data(x[0], x[1]))

In [5]:
# 7. Train Logistic Regression Model
model = LogisticRegressionWithSGD.train(train_features, iterations=100)

# 8. Predictions on the Test Set
predictions_and_labels = test_features.map(lambda x: (float(model.predict(x.features)), float(x.label)))



In [7]:
# 9. Evaluate Model Performance
metrics = MulticlassMetrics(predictions_and_labels)
accuracy = metrics.accuracy
print(f"Test Accuracy: {accuracy:.2f}")

# 10. Display Example Predictions
print("Example Predictions:")
print(predictions_and_labels.take(10))

# Stop Spark Session
spark.stop()

Test Accuracy: 0.79
Example Predictions:
[(1.0, 1.0), (1.0, 1.0), (1.0, 1.0), (1.0, 1.0), (1.0, 0.0), (0.0, 0.0), (0.0, 0.0), (1.0, 0.0), (0.0, 1.0), (0.0, 0.0)]
