# Exercise 2: Text Processing and Classification using Spark

In [1]:
from pyspark.sql import SparkSession, Window
import pyspark.sql.functions as F
from pyspark.sql.types import MapType, IntegerType, DoubleType
from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, HashingTF, IDF, ChiSqSelector, Normalizer, StringIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import LinearSVC, OneVsRest
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from multiprocessing import cpu_count
from operator import add
import itertools
import re

spark: SparkSession = SparkSession.builder \
    .master("yarn") \
    .config("spark.executor.memory", "7g") \
    .config("spark.driver.memory", "7g") \
    .config("spark.driver.maxResultSize", "7g") \
    .config("spark.executor.instances", 5) \
    .config("spark.executor.cores", 4) \
    .config("spark.default.parallelism", cpu_count()) \
    .config("spark.sql.autoBroadcastJoinThreshold", -1) \
    .getOrCreate()

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/spark/jars/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
24/05/26 16:45:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/05/26 16:46:00 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/05/26 16:46:00 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
24/05/26 16:46:00 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
24/05/26 16:46:00 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.
24/05/26 16:46:0

In [2]:
with open('../data/stopwords.txt') as f:
    stopwords = set(f.read().splitlines())

In [3]:
K = 75
FILE_PATH = 'hdfs:///user/dic24_shared/amazon-reviews/full/reviews_devset.json' # devset
# FILE_PATH = 'hdfs:///user/dic24_shared/amazon-reviews/full/reviewscombined.json' # full dataset

## Part 1) RDDs

Repeat the steps of Assignment 1, i.e. calculation of chi-square values and output of the sorted top terms per category, as well as the joined dictionary, using RDDs and transformations. Write the output to a file `output_rdd.txt`. Compare the generated `output_rdd.txt` with your generated `output.txt` from Assignment 1 and describe your observations briefly in the submission report (see Part 3).

In [4]:
rdd = spark.read.json(FILE_PATH).rdd

                                                                                

In [5]:
%%time
def preprocessing(row):
    category = row['category']

    # lower text
    # tokenises each line by using whitespaces, tabs, digits, and the characters ()[]{}.!?,;:+=-_"'`~#@&*%€$§\/ as delimiters
    tokens = re.split(r'[^a-zA-Z<>^|]+', row['reviewText'].lower())

    # remove stopwords
    tokens = filter(lambda token: len(token) > 1 and (token not in stopwords), tokens)

    # remove duplicates
    tokens = set(tokens)

    # count all documents in category
    yield (category, None), 1

    # count all documents in category containing token
    for token in tokens:
        yield (category, token), 1


def token_to_key(row):
    (category, token), count = row
    return token, (category, count)


def token_sum(row):
    token, values = row
    counts = {category: count for category, count in values}
    n_t = sum(counts.values())

    for category, count in counts.items():
        yield category, (token, count, n_t)


def chi_squared(row):
    category, values = row

    # dictionary of tokens with their counts and total number of documents
    counts = {token: (count, n_t) for token, count, n_t in values}
    
    # total number of documents in category and dataset
    n_c, n = counts.pop(None)
    
    result = []

    for token, (a, n_t) in counts.items():
        b = n_t - a
        c = n_c - a
        d = n - a - b - c

        chi_squared = n * ((a * d - b * c) ** 2) / ((a + b) * (a + c) * (b + d) * (c + d))
        result.append((chi_squared, token))

    return category, sorted(result, key=lambda x: (-x[0], x[1]))[:K]

CPU times: user 6 µs, sys: 0 ns, total: 6 µs
Wall time: 10.3 µs


In [6]:
%%time
topk = rdd.flatMap(preprocessing) \
    .reduceByKey(add) \
    .map(token_to_key) \
    .groupByKey() \
    .flatMap(token_sum) \
    .groupByKey() \
    .map(chi_squared) \
    .sortByKey()

                                                                                

CPU times: user 48.3 ms, sys: 11.9 ms, total: 60.1 ms
Wall time: 12.4 s


In [7]:
%%time
with open('../output_rdd.txt', 'w') as f:
    tokens = set()

    for category, values in topk.toLocalIterator():
        tokens.update(map(lambda x: x[1], values))
        value_strings = [f'{value[1]}:{value[0]}' for value in values]
        print(' '.join([f'<{category}>'] + value_strings), file=f)

    print(' '.join(sorted(tokens)), file=f)



CPU times: user 14.3 ms, sys: 2.09 ms, total: 16.4 ms
Wall time: 854 ms


## Part 2) Datasets/DataFrames: Spark ML and Pipelines

Convert the review texts to a classic vector space representation with TFIDF-weighted features based on the Spark DataFrame/Dataset API by building a transformation [pipeline](https://spark.apache.org/docs/latest/ml-pipeline.html). The primary goal of this part is the preparation of the pipeline for Part 3 (see below). Note: although parts of this pipeline will be very similar to Assignment 1 or Part 1 above, do not expect to obtain identical results or have access to all intermediate outputs to compare the individual steps.

Use built-in functions for [tokenization](https://spark.apache.org/docs/latest/ml-features.html#tokenizer) to unigrams at whitespaces, tabs, digits, and the delimiter characters ()\[\]{}.!?,;:+=-\_"'\`~#@&\*%€$§\\/, casefolding, [stopword removal](https://spark.apache.org/docs/latest/ml-features.html#stopwordsremover), [TF-IDF calculation](https://spark.apache.org/docs/latest/ml-features.html#tf-idf), and [chi square selection](https://spark.apache.org/docs/latest/ml-features.html#chisqselector) ) (using 2000 top terms overall). Write the terms selected this way to a file `output_ds.txt` and compare them with the terms selected in Assignment 1. Describe your observations briefly in the submission report (see Part 3).

In [4]:
df = spark.read.json(FILE_PATH)

                                                                                

In [4]:
tokenizer = RegexTokenizer(inputCol='reviewText', outputCol='rawTokens', pattern=r'[^a-zA-Z<>^|]+', toLowercase=True, minTokenLength=2)

In [5]:
stopwordRemover = StopWordsRemover(inputCol='rawTokens', outputCol='tokens', stopWords=list(stopwords))

In [6]:
# hashingTF = HashingTF(inputCol="tokens", outputCol="rawFeatures")
countVectorizer = CountVectorizer(inputCol="tokens", outputCol="rawFeatures")
idf = IDF(inputCol="rawFeatures", outputCol="features")

In [7]:
indexer = StringIndexer(inputCol='category', outputCol='label')

In [8]:
selector = ChiSqSelector(numTopFeatures=2000, featuresCol="features", outputCol="selectedFeatures", labelCol="label")

In [10]:
pipeline = Pipeline(stages=[
    tokenizer,
    stopwordRemover,
    # hashingTF,
    countVectorizer,
    idf,
    indexer,
    selector,
])

In [15]:
model = pipeline.fit(df)

24/05/25 14:58:58 WARN DAGScheduler: Broadcasting large task binary with size 1059.7 KiB
24/05/25 14:59:09 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
24/05/25 14:59:10 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
24/05/25 14:59:18 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
                                                                                

In [16]:
vocabulary = model.stages[2].vocabulary
selectedFeatures = model.stages[-1].selectedFeatures

In [17]:
chi_squared = model.transform(df).select('category', 'selectedFeatures')

In [19]:
sparse_vector_to_map = F.udf(lambda v: {i: float(v[i]) for i in v.indices.tolist()}, returnType=MapType(IntegerType(), DoubleType()))
index_to_token = F.udf(lambda i: vocabulary[selectedFeatures[i]], returnType='string')

topk = chi_squared.select('category', F.explode(sparse_vector_to_map('selectedFeatures')).alias('token', 'chi_squared'))
topk = topk.groupBy('category', 'token').agg(F.mean('chi_squared').alias('chi_squared'))
topk = topk.withColumn('token', index_to_token('token'))
topk = topk.withColumn('rank', F.row_number().over(Window.partitionBy('category').orderBy(F.desc('chi_squared'), F.asc('token'))))
topk = topk.filter(F.col('rank') <= K)
topk = topk.withColumn('token_chisq', F.array('token', 'chi_squared'))
topk = topk.groupBy('category').agg(F.collect_list('token_chisq').alias('topk'))
topk = topk.sort('category')

In [20]:
with open('../output_ds.txt', 'w') as f:
    tokens = set()

    for row in topk.toLocalIterator():
        tokens.update(map(lambda x: x[0], row['topk']))
        value_strings = [f'{value[0]}:{value[1]}' for value in row['topk']]
        print(' '.join([f'<{row["category"]}>'] + value_strings), file=f)

    print(' '.join(sorted(tokens)), file=f)

24/05/25 14:59:47 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB
24/05/25 14:59:58 WARN DAGScheduler: Broadcasting large task binary with size 3.6 MiB
24/05/25 14:59:59 WARN DAGScheduler: Broadcasting large task binary with size 3.6 MiB
24/05/25 15:00:00 WARN DAGScheduler: Broadcasting large task binary with size 3.6 MiB
24/05/25 15:00:00 WARN DAGScheduler: Broadcasting large task binary with size 3.6 MiB


## Part 3) Text Classification

In this part, you will train a text classifier from the features extracted in Part 2. The goal is to learn a model that can predict the product category from a review's text.

To this end, extend the pipeline from Part 2 such that a **Support Vector Machine** classifier is trained. Since we are dealing with multi-class problems, make sure to put a strategy in place that allows binary classifiers to be applicable. Apply vector length normalization before feeding the feature vectors into the classifier (use [`Normalizer`](https://spark.apache.org/docs/latest/mllib-feature-extraction.html#normalizer) with L2 norm).

Follow best practices for machine learning experiment design and investigate the effects of parameter settings using the functions provided by Spark:

*   Split the review data into training, validation, and test set.
    
*   Make experiments reproducible.
    
*   Use a grid search for parameter optimization:
    
    *   Compare chi square overall top 2000 filtered features with another, heavier filtering with much less dimensionality (see Spark ML documentation for options).
        
    *   Compare different SVM settings by varying the regularization parameter (choose 3 different values), standardization of training features (2 values), and maximum number of iterations (2 values).
        
*   Use the [`MulticlassClassificationEvaluator`](https://spark.apache.org/docs/3.5.1/ml-tuning.html#model-selection-aka-hyperparameter-tuning) to estimate performance of your trained classifiers on the test set, using F1 measure as criterion.

### Spark Gridsearch Implementation

In [9]:
SEED = 42

In [10]:
tokenization_pipeline = Pipeline(stages=[
    tokenizer,
    stopwordRemover,
])

In [13]:
tokenized = tokenization_pipeline.fit(df).transform(df).select('category', 'tokens')

In [14]:
train, test = tokenized.randomSplit([0.8, 0.2], seed=SEED)

In [15]:
train.write.mode('overwrite').parquet('train.parquet')
test.write.mode('overwrite').parquet('test.parquet')

                                                                                

In [11]:
train = spark.read.parquet('train.parquet')
test = spark.read.parquet('test.parquet')

                                                                                

In [12]:
normalizer = Normalizer(inputCol='selectedFeatures', outputCol='normFeatures')

In [13]:
classifier = LinearSVC()
ovrClassifier = OneVsRest(classifier=classifier, featuresCol='normFeatures', labelCol='label')

In [14]:
pipeline = Pipeline(stages=[
    indexer,
    countVectorizer,
    idf,
    selector,
    normalizer,
    ovrClassifier,
])

In [15]:
grid = ParamGridBuilder()
grid = grid.addGrid(selector.numTopFeatures, [20, 2000])
grid = grid.addGrid(classifier.regParam, [0, 0.01, 0.1])
grid = grid.addGrid(classifier.standardization, [True, False])
grid = grid.addGrid(classifier.maxIter, [10, 100])
grid = grid.build()

In [16]:
tvs = TrainValidationSplit(estimator=pipeline,
                           estimatorParamMaps=grid,
                           evaluator=MulticlassClassificationEvaluator(),
                           trainRatio=0.8,
                           seed=42,
                           parallelism=cpu_count())

In [None]:
model = tvs.fit(train)

In [None]:
bestModel = model.bestModel
numTopFeatures = bestModel.stages[-3].getNumTopFeatures()
regParam = bestModel.stages[-1].getRegParam()
standardization = bestModel.stages[-1].getStandardization()
maxIter = bestModel.stages[-1].getMaxIter()
{'numTopFeatures': numTopFeatures, 'regParam': regParam, 'standardization': standardization, 'maxIter': maxIter}

### Custom Gridsearch Implementation

In [11]:
def persist_read(df, path):
    df.write.mode('overwrite').save(path)
    return spark.read.load(path)

In [12]:
SEED = 42

In [13]:
train, validation, test = df.randomSplit([0.6, 0.2, 0.2], seed=SEED)
train = persist_read(train, 'train.parquet')
validation = persist_read(validation, 'validation.parquet')
test = persist_read(test, 'test.parquet')

                                                                                

In [14]:
normalizer = Normalizer(inputCol='selectedFeatures', outputCol='normFeatures')

In [15]:
pp_pipeline = Pipeline(stages=[
    tokenizer,
    stopwordRemover,
    countVectorizer,
    idf,
    indexer,
    selector,
    normalizer,
])

In [16]:
pp_grid = ParamGridBuilder()
pp_grid = pp_grid.addGrid(selector.numTopFeatures, [20, 2000])
pp_grid = pp_grid.build()

In [17]:
preprocessed = []
columns = ['category', 'label', 'normFeatures']

for i, preprocessing_params in enumerate(pp_grid):
    pp_model = pp_pipeline.fit(train)
    pp_train = pp_model.transform(train)
    pp_validation = pp_model.transform(validation)
    pp_train = persist_read(pp_train.select(columns), f'pp_train_{i}.parquet')
    pp_validation = persist_read(pp_validation.select(columns), f'pp_validation_{i}.parquet')
    
    preprocessed.append((preprocessing_params, pp_train, pp_validation))

24/05/26 16:02:26 WARN DAGScheduler: Broadcasting large task binary with size 1992.4 KiB
24/05/26 16:02:26 WARN DAGScheduler: Broadcasting large task binary with size 1994.6 KiB
24/05/26 16:02:28 WARN DAGScheduler: Broadcasting large task binary with size 1997.6 KiB
24/05/26 16:02:42 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
24/05/26 16:02:44 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
24/05/26 16:02:54 WARN DAGScheduler: Broadcasting large task binary with size 1992.4 KiB
24/05/26 16:02:54 WARN DAGScheduler: Broadcasting large task binary with size 1994.6 KiB
24/05/26 16:02:56 WARN DAGScheduler: Broadcasting large task binary with size 1997.5 KiB
24/05/26 16:03:05 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
24/05/26 16:03:07 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
                                                                                

In [18]:
classifier = LinearSVC()
ovrClassifier = OneVsRest(classifier=classifier, featuresCol='normFeatures', labelCol='label')

In [19]:
model_pipeline = Pipeline(stages=[
    ovrClassifier,
])

In [20]:
model_grid = ParamGridBuilder()
model_grid = model_grid.addGrid(classifier.regParam, [0, 0.01, 0.1])
model_grid = model_grid.addGrid(classifier.standardization, [True, False])
model_grid = model_grid.addGrid(classifier.maxIter, [10, 100])
model_grid = model_grid.build()

In [None]:
from multiprocessing.pool import ThreadPool

evaluator = MulticlassClassificationEvaluator()
param_grid = itertools.product(preprocessed, model_grid)

def train_model(pp, model_params):
    pp_params, pp_train, pp_validation = pp
    model = model_pipeline.fit(pp_train, model_params)
    prediction = model.transform(pp_validation)
    f1 = evaluator.evaluate(prediction)
    current_params = pp_params | model_params

    print(f'F1: {f1}, Params: {current_params}')
    
    return f1, current_params

with ThreadPool() as pool:
    results = pool.starmap(train_model, param_grid)
    
f1, best_params = max(results, key=lambda x: x[0])
f1, best_params

In [None]:
final_pipeline = Pipeline(stages=pp_pipeline.getStages() + model_pipeline.getStages())

In [None]:
model = final_pipeline.fit(train.union(validation), best_params)
prediction = model.transform(test)
f1 = evaluator.evaluate(prediction)
f1