In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml.feature import Tokenizer, StringIndexer, Word2VecModel, IndexToString, Normalizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


# To simulate a cluster environment, change the instance size to test multi instance performance

spark = (
    SparkSession.builder.master("local[*]")
    .appName("Spark-Word2Vec")
    .config("spark.driver.memory", "20g")
    # .config("spark.driver.cores", "2")
    # .config("spark.executor.cores", "2")
    # .config("spark.executor.memory", "2g")
    # .config("spark.driver.maxResultSize", "3g")
    # .config("spark.executor.instances", "2")
    .getOrCreate()
)

# 1-load train data and word2vec model

In [None]:
train_data = spark.read.parquet("data/keywords.parquet")

word2vec_model = Word2VecModel.load("data/word2vec_model")

train_data.show(5, truncate=50)

## 1.1-Tokenizde the training data and vectorize

In [None]:
# Tokenize the text for word2vec vectorization
train_data = Tokenizer(inputCol="word", outputCol="filtered_tokens").transform(train_data)

# Use word2vec to transform the tokens into vectors
train_data = word2vec_model.transform(train_data)

# Use String indexer to convert the string labels into numerical labels
string_indexer = StringIndexer(inputCol="label", outputCol="label_index").fit(train_data)
train_data = string_indexer.transform(train_data)

train_data.show(5, truncate=50)

# 2-Train Logistic Regression Model

In [None]:
# Train logistic regression model

lr_model = LogisticRegression(
    featuresCol="word2vec_features",
    labelCol="label_index",
    maxIter=500,
    regParam=0.3,
    elasticNetParam=0.8,
    standardization=True,
).fit(train_data)

# Evaluate the model
trainingSummary = lr_model.summary

accuracy = trainingSummary.accuracy
falsePositiveRate = trainingSummary.weightedFalsePositiveRate
truePositiveRate = trainingSummary.weightedTruePositiveRate
fMeasure = trainingSummary.weightedFMeasure()
precision = trainingSummary.weightedPrecision
recall = trainingSummary.weightedRecall

print(
    "Accuracy: %s\nFPR: %s\nTPR: %s\nF-measure: %s\nPrecision: %s\nRecall: %s"
    % (accuracy, falsePositiveRate, truePositiveRate, fMeasure, precision, recall)
)

# 3-Test the model with unseed data to classify sentences
## 3-1. Read the test data and filter 

In [None]:
# Read the JSON lines file
comments = (
    spark.read.json("data/RC_2010-07")
    .select("body", "subreddit")
    .where("body != '[deleted]' AND body != '[removed]'")
    # Filter by subreddits of interest
    .where(F.col("subreddit").isin("Music", "gaming", "politics", "programming", "science"))
    # Replace newline and carriage return characters with a space
    .withColumn("body", F.regexp_replace(F.col("body"), "[\\r\\n]+", " "))
    # Remove URLs (matches strings starting with http or https)
    .withColumn("body", F.regexp_replace(F.col("body"), "https?://\\S+", ""))
    # Remove characters that are not letters, digits, whitespace, or apostrophes
    .withColumn("body", F.regexp_replace(F.col("body"), "[^a-zA-Z0-9\\s']", ""))
    # lowercase the subreddit column
    .withColumn("subreddit", F.lower(F.col("subreddit")))
)

comments.show(5, truncate=50)

# Get basic statistics
print(f"Number of records: {comments.count()}")
print(f"Number of columns: {len(comments.columns)}")
print(comments.groupBy("subreddit").count().show())

## 3.2-Tokenize and Vectorize for classification

In [None]:
# Tokenize the text for word2vec vectorization
comments = Tokenizer(inputCol="body", outputCol="filtered_tokens").transform(comments)

# Use word2vec to transform the tokens into vectors
comments = word2vec_model.transform(comments)

comments.show(5, truncate=50)

## 3.3-Classify new data using Lr model

In [None]:
predictions = lr_model.transform(comments)

predictions.show(5, truncate=50)

In [None]:
# extract labels from the StringIndexer you used on training data
labels = string_indexer.labels

# now apply IndexToString transformer with the specified labels
predictions = IndexToString(
    inputCol="prediction", 
    outputCol="label", 
    labels=labels
).transform(predictions)

predictions.show(5, truncate=50)

In [None]:
# Fit an indexer on the actual subreddit column
predictions = (
    StringIndexer(inputCol="subreddit", outputCol="subreddit_index")
    .fit(predictions)
    .transform(predictions)
)


# Similarly, convert the predicted label to an index
predictions = (
    StringIndexer(inputCol="label", outputCol="label_index")
    .fit(predictions)
    .transform(predictions)
)


predictions.show(5, truncate=50)

# Evaluate using Spark's internal mechanism (accuracy metric)
evaluator = MulticlassClassificationEvaluator(
    labelCol="subreddit_index", predictionCol="label_index", metricName="accuracy"
)
accuracy_indexed = evaluator.evaluate(predictions)
print(f"Accuracy (indexed comparison): {accuracy_indexed}")