# Setup
Before starting with the tasks of the assignment, initialize spark.


In [None]:
import sys
from pathlib import Path

from pyspark.sql.types import FloatType

p = Path().resolve()
BASE_PATH =  p.parent if p.name == "src" else p

# To be sure that the src path is in PYTHONPATH
sys.path.append(str(BASE_PATH / "src"))

import operator
import json
import numpy as np
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
from pyspark import RDD, SparkContext
from pyspark.ml.feature import (
    IDF,
    ChiSqSelector,
    ChiSqSelectorModel,
    CountVectorizer,
    RegexTokenizer,
    HashingTF,
    StopWordsRemover,
    StringIndexer,
    VectorIndexer,
    PCA,
)

from pyspark.ml.classification import LinearSVC, OneVsRest
from pyspark.sql.functions import col
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics

from exercise2.model.review import Review
from exercise2.split_text import split_text
from exercise2.task1.util import calculate_chi_squares, merge_dicts, printable_category, calculate_chi_square_per_token

import matplotlib.pyplot as plt

LOCAL = True

if LOCAL:
    spark: SparkSession = SparkSession.builder \
        .appName("local") \
        .config("spark.driver.host", "localhost") \
        .config("spark.driver.bindAddress", "localhost") \
        .getOrCreate()
    sc: SparkContext = spark.sparkContext
else:
    spark: SparkSession = SparkSession.builder \
        .appName("cluster") \
        .config("spark.executor.instances", 435) \
        .getOrCreate()
    sc: SparkContext = spark.sparkContext
    sc.addPyFile(str(BASE_PATH / "src" / "exercise2.zip"))

# Task 1: RDD
Redo the first assignment, this time utilizing RDDs. 

Start by loading the reviews dataset:

In [None]:
if LOCAL:
    reviews_location = BASE_PATH / "resource" / "reviews_devset_rand100.json" #_first1000.json" #"reviews_devset.json"
else:
    # reviews_location = "hdfs:///user/dic24_shared/amazon-reviews/full/reviews_devset.json"
    reviews_location = "hdfs:///user/dic24_shared/amazon-reviews/full/reviewscombined.json"
reviews: RDD = sc.textFile(str(reviews_location)).map(json.loads)
reviews_cnt = reviews.count()

print(f"Number of Reviews in this dataset: {reviews_cnt}")
reviews.first()

Once the reviews are available, count the documents per category.

In [None]:
category_counts = reviews.map(lambda r: (r["category"], 1)) \
        .reduceByKey(operator.add) \
        .collectAsMap()

Split the reviews text into separate tokens, filter them using the stopwords (loaded from disk)
and reduce the produced counts into a map of maps with token on the top level, each token is 
assigned to a map, which contains the review counts for each category.

In [None]:
with open(BASE_PATH / "src" / "stopwords.txt", "r") as file:
    stopwords = set([line.strip() for line in file.readlines()])

# <term>: {<cat>: cnt, <cat>: cnt, ...}
category_counts_per_token = reviews \
        .flatMap(lambda r: ([(token, r["category"]) for token in set(split_text(r["reviewText"]))])) \
        .filter(lambda r: r[1] not in stopwords) \
        .mapValues(lambda category: {category: 1}) \
        .reduceByKey(lambda dict1, dict2: merge_dicts(dict1, dict2))

Now that we have the counts per token and category, we calculate the chi square values and sort
the values to filter for the top 75 tokens per category.

In [None]:
top75_tokens_per_category: RDD = category_counts_per_token \
        .flatMap(lambda cur_category_counts: calculate_chi_square_per_token(cur_category_counts, category_counts, reviews_cnt)) \
        .groupByKey().mapValues(list) \
        .mapValues(lambda val: sorted(val, key=lambda val: val[1])[:-75:-1]) \
        .sortByKey()

Prepare the job result by concatenating all tokens to a list of all top tokens 
and convert the lists to strings for printing.

In [None]:
top75_tokens_str = "\n".join(
        top75_tokens_per_category.map(lambda el: printable_category(el[0], el[1])).collect())

top_tokens: list[str] = top75_tokens_per_category \
        .flatMap(lambda el: [tup[0] for tup in el[1]]) \
        .distinct() \
        .sortBy(lambda el: el).collect()
top_tokens_str = " ".join(top_tokens)

result = top_tokens_str + '\n' + top75_tokens_str
result

Write the result to a file.

In [None]:
with open(BASE_PATH / "output_rdd.txt", "w") as file:
    file.writelines(result)

# Task 2: DataFrames: Spark ML & Pipelines


In [None]:
reviews_df = spark.read.json(str(reviews_location))
reviews_df.head()

In [None]:
tokenizer = RegexTokenizer(
    minTokenLength=2,
    pattern="[\s\d\(\)\[\]{}\.!\?,;:\+=\-_\"'`~#@&\*%€\$§\\\/]",
    inputCol="reviewText",
    outputCol="tokens",
)
stopwords_remover = StopWordsRemover(
    inputCol=tokenizer.getOutputCol(), outputCol="stopWordsFiltered"
)
indexer = StringIndexer(
    inputCol="category",
    outputCol="indexedCategory"
)
vectorizer = CountVectorizer(inputCol=stopwords_remover.getOutputCol(), outputCol="TF")
idf = IDF(inputCol=vectorizer.getOutputCol(), outputCol="TFIDF")
chiSqSelector = ChiSqSelector(
    numTopFeatures=2000,
    featuresCol=idf.getOutputCol(),
    labelCol=indexer.getOutputCol(),
    outputCol="chiSq",
)
pca = PCA(k=10, inputCol="chiSq", outputCol="pca_result")

In [None]:
pipeline = Pipeline(
    stages=[
        tokenizer,
        stopwords_remover,
        indexer,
        vectorizer,
        idf,
        chiSqSelector,
        pca
    ]
)


model = pipeline.fit(reviews_df)
tokenized = model.transform(reviews_df)
tokenized.head(n=10)

Extract the selected tokens by looking up the selected features of the ChiSqSelectorModel.
Those features have been mapped by CountVectorizer to integers, hence the selected tokens are retrieved
by looking up the index in the vocabulary of the VectorizerModel.

In [None]:
vocabulary: list[str] = model.stages[3].vocabulary
chiSqSelectorModel: ChiSqSelectorModel = model.stages[5]
selected_tokens = sorted([vocabulary[i] for i in chiSqSelectorModel.selectedFeatures])

with open(BASE_PATH / "output_ds.txt", "w") as file:
    file.writelines(" ".join(selected_tokens))
" ".join(selected_tokens)

# Task 3: Create SVM to predict the category of a review

First we split the data in a train and a test set.

In [None]:
train_df, test_df = tokenized.randomSplit([8.0, 2.0], seed=1)

The chunk below sets up the cross validation pipeline.
We first set the Support Vector Machine and to use it with a multi classification problem, One vs Rest is used.
For the Cross validation we use the F1 Score to optimize,
The final step of setup is creating the grid according to the task description.
Finally we can create the cross validator.

In [None]:
cv_num_folds = 2

The cell below does not run since LinearSVC + OneVsRest does not play nice with CrossValidator.

In [None]:
# svc = LinearSVC()
# ovr = OneVsRest(classifier=svc, featuresCol="chiSq", labelCol="indexedCategory")
# f1_score = MulticlassClassificationEvaluator(metricName="f1", labelCol="indexedCategory")

# param_grid = ParamGridBuilder().addGrid(svc.maxIter, [10, 100]).addGrid(svc.regParam, [0, 0.01, 0.1]).addGrid(svc.standardization, [False, True]).build()
# cv = CrossValidator(estimator=ovr, estimatorParamMaps=param_grid, evaluator=f1_score, numFolds=cv_num_folds)

On it's own the OneVsRest classifier can be trained and used to make predictions.

In [None]:
svc = LinearSVC()
ovr = OneVsRest(classifier=svc, featuresCol="chiSq", labelCol="indexedCategory")

class_model = ovr.fit(train_df)

On the other hand the CrossValidator works with other binary classifiers.

In [None]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression()
ovr = OneVsRest(classifier=lr, featuresCol="chiSq", labelCol="indexedCategory")
f1_score = MulticlassClassificationEvaluator(metricName="f1", labelCol="indexedCategory")
param_grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
cv = CrossValidator(estimator=ovr, estimatorParamMaps=param_grid, evaluator=f1_score, numFolds=cv_num_folds, seed=1)

class_model = cv.fit(train_df)
best_model = class_model.bestModel

train_res = best_model.transform(train_df)
res = f1_score.evaluate(train_res)

print(f"The F1 score is {res}")

To this end we'd have to do grid search and cross validation manually, which cannot compete in regards of performance with the Pyspark library.

No matter which classifier has been fitted to the training data, we can then use it to make predictions on the training and test data.

In [None]:
train_res = class_model.transform(train_df)
test_res = class_model.transform(test_df)

The performance can be evaluated by some common metricx, many are implemented in the MulticlassClassificationEvaluator previously used for the cross validation.

In [None]:
f1_scorer = MulticlassClassificationEvaluator(metricName="f1", labelCol="indexedCategory")
f1_train = f1_score.evaluate(train_res)

The code snippet below is used to generate the confusion matrices for the report.

In [None]:
def print_conf_matrix(result, labs, title):
    preds_and_labels = result.select(['prediction','indexedCategory']).orderBy('prediction')
    metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))
    conf_matrix = metrics.confusionMatrix().toArray()
    
    fig, ax = plt.subplots(figsize=(30, 20))
    im = ax.imshow(conf_matrix)
    ax.set_xticks(np.arange(len(labs)), labels=labs)
    ax.set_yticks(np.arange(len(labs)), labels=labs)
    plt.setp(ax.get_xticklabels(), rotation=45, ha="right",
             rotation_mode="anchor")
    
    for i in range(len(labs)):
        for j in range(len(labs)):
            text = ax.text(j, i, conf_matrix[i, j],
                           ha="center", va="center", color="w")
    
    ax.set_title("title")
    fig.tight_layout()
    plt.show()