In [None]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark import StorageLevel
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, IDF, StringIndexer, ChiSqSelector, CountVectorizer, CountVectorizerModel, Normalizer
from pyspark.ml import Pipeline
from pyspark.ml.classification import LinearSVC, OneVsRest
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import pandas as pd
import numpy as np
from pyspark.sql.functions import col
import pathlib
from pyspark.ml import Transformer
from pyspark.sql.functions import size
from pyspark.ml.param import Param, Params
import multiprocessing
from datetime import datetime
import logging
import itertools
from tqdm.notebook import tqdm
import pandas as pd
from pyspark.ml import PipelineModel

base_path = pathlib.Path("Exercise_2/sparkly-svm/src")

In [None]:
# Setting up a logger to track some of the stuff
logging.basicConfig(filename=base_path.parent/'data/svm_training.log', level=logging.INFO, 
                    format='%(asctime)s:%(levelname)s:%(message)s')

In [None]:
def create_spark_session():
    """
    Setting up the Spark Session in cluster mode, managed by yarn. 
    5 Executors with each 4 cores and 4GB of RAM. 
    """
    conf = SparkConf() \
    .setAppName("svm") \
    .setMaster("yarn") \
    .set("spark.executor.memory", "4g") \
    .set("spark.driver.memory", "4g") \
    .set("spark.driver.maxResultSize", "2g") \
    .set("spark.executor.instances", "5") \
    .set("spark.executor.cores", "4") \
    .set("spark.default.parallelism", "20")

    spark = SparkSession.builder.config(conf=conf).getOrCreate()
    return spark

In [None]:
DATA_PATH = "hdfs:///user/dic24_shared/amazon-reviews/full/reviews_devset.json"
STOPWORD_PATH = base_path.parent/"data/stopwords.txt"

spark = create_spark_session()

with open(STOPWORD_PATH, 'r') as file:
    stopwords = file.read().splitlines()

df = spark.read.json(str(DATA_PATH)).select("reviewText", "category")
regex = r'[ \t\d()\[\]{}.!?,;:+=\-_"\'~#@&*%€$§\/]+'

logging.info(f"Stopwords loaded from: {STOPWORD_PATH}")
logging.info(f"Data loaded from hdfs: {DATA_PATH}")

The preprocessing pipeline serves to prepare the data for model fitting. This modular approach allows us to decouple data preprocessing from model hyperparameter tuning and fitting, enhancing code clarity and maintainability. To eliminate any overhead generated during preprocessing and to materialize any pending lazy operations, we save the data and restart the Spark session.

**Important:** To ensure reproducibility, it is crucial to cache or persist the DataFrame prior to splitting the data into training, validation, and test sets. Failing to do so can result in non-deterministic behavior from PySpark's `randomSplit()` function.

In [12]:
logging.info("Preprocessing started.")
preprocess_pipeline = Pipeline(stages=[
    RegexTokenizer(inputCol="reviewText", outputCol="rawTerms", pattern=regex),
    StopWordsRemover(inputCol="rawTerms", outputCol="terms", stopWords=stopwords),
    StringIndexer(inputCol="category", outputCol="label"),
    CountVectorizer(inputCol="terms", outputCol="rawFeatures"),
    IDF(inputCol="rawFeatures", outputCol="features"),
])

preprocessor = preprocess_pipeline.fit(df)
df_prep = preprocessor.transform(df)
logging.info("Preprocessing finished.")

df_prep = df_prep.filter(size(df_prep.terms) > 0)
df_prep = df_prep.persist(StorageLevel.MEMORY_AND_DISK)

train, val, test = df_prep.randomSplit([0.6, 0.2, 0.2], seed=1)
train.write.mode('overwrite').parquet("train.parquet")
val.write.mode('overwrite').parquet("val.parquet")
test.write.mode('overwrite').parquet("test.parquet")

preprocessor.write().overwrite().save(str(base_path/"preprocessor"))
logging.info("Preprocessor pipeline model saved.")
logging.info("Preprocessed data saved.")
spark.stop()
spark = create_spark_session()
train = spark.read.parquet("train.parquet")
val = spark.read.parquet("val.parquet")
test = spark.read.parquet("test.parquet")
train.persist(StorageLevel.MEMORY_AND_DISK)
logging.info("Preprocessed data loaded with new SparkSession.")
logging.info(f"Training size: {train.count()}")
logging.info(f"Validation size: {val.count()}")
logging.info(f"Testing size: {test.count()}")

Given the ample amount of data available for training, validation, and testing, we opt not to employ K-Fold CrossValidation. This decision is motivated by the computational intensity of K-Fold CrossValidation, which is unnecessary given our substantial sample size. Validation on a separate set is sufficient for our needs. Consequently, we manually conduct a grid search to explore various hyperparameter settings. In total we obtail 24 different combinations of hyperparameter settings. 

In [13]:
s0 = datetime.now()
hyperparameter_configs = {
    "numTopFeatures": [10, 2000], 
    "regParam": [0.01, 0.1, 1], 
    "maxIter": [10, 100], 
    "p": [1, 2]
}

keys, values = zip(*hyperparameter_configs.items())
hyperparameter_configs_list = [dict(zip(keys, v)) for v in itertools.product(*values)]

In [14]:
current_best_model = 0
training_results = []
evaluator = MulticlassClassificationEvaluator(metricName="f1")
for hyperparam in tqdm(hyperparameter_configs_list, desc='Hyperparameters'): 
    pipeline_training = Pipeline(stages=[
        ChiSqSelector(
            featuresCol="features", 
            outputCol="selectedFeatures", 
            labelCol="label", 
            numTopFeatures=hyperparam['numTopFeatures']),
        # l2/l1-normalize vector length
        Normalizer(
            inputCol="selectedFeatures", 
            outputCol="normalizedFeatures", 
            p = hyperparam['p']),
        # train SVM
        OneVsRest(classifier=LinearSVC(
            featuresCol="normalizedFeatures", 
            labelCol="label", 
            regParam=hyperparam['regParam'], 
            maxIter=hyperparam['maxIter']))])
    
    t0 = datetime.now()
    logging.info(f"Training started with configs {hyperparam}.")
    
    fitted_model = pipeline_training.fit(train)
    t1 = datetime.now()
    logging.info(f"Training finished. Runtime: {t1-t0}")
    
    predictions_train = fitted_model.transform(train)
    t2 = datetime.now()
    logging.info(f"Train prediction runtime: {t2-t1}")

    predictions_val = fitted_model.transform(val)
    t3 = datetime.now()
    logging.info(f"Val prediction runtime: {t3-t2}")
    
    f1_train = evaluator.evaluate(predictions_train)
    t4 = datetime.now()
    logging.info(f"Train evaluation runtime: {t4-t3}, F1: {f1_train}")

    f1_val = evaluator.evaluate(predictions_val)
    t5 = datetime.now()
    logging.info(f"Val evaluation runtime: {t5-t4}, F1: {f1_val}")
    
    if f1_val > current_best_model:
        logging.info(f"New best hyperparameter combinatioon found with validation F1: {f1_val} and configs: {hyperparam}.")
        fitted_model.write().overwrite().save(str(base_path.parent/"data/best_svm_model"))
        logging.info(f"Model saved.")
    
    train_run_results = {
        "numTopFeatures": hyperparam["numTopFeatures"], 
        "regParam": hyperparam["regParam"], 
        "maxIter": hyperparam["maxIter"], 
        "p": hyperparam["p"],
        "f1_train": f1_train, 
        "f1_val": f1_val,
        "runtime_training": int((t1 - t0).total_seconds()),
        "runtime_train_pred": int((t2 - t1).total_seconds()), 
        "runtime_train_val": int((t3 - t2).total_seconds()), 
        "runtime_train_eval": int((t4 - t3).total_seconds()), 
        "runtime_train_eval": int((t5 - t4).total_seconds())}
    
    train_run_results_df = pd.DataFrame([train_run_results])
    training_results.append(train_run_results)
    if len(training_results) == 1:
        train_run_results_df.to_csv(str(base_path.parent/"data/model_training_metrics.csv"), mode='a', index=False, header=True)
    else:
        train_run_results_df.to_csv(str(base_path.parent/"data/model_training_metrics.csv"), mode='a', index=False, header=False)\
    
    print(train_run_results)

Hyperparameters:   0%|          | 0/24 [00:00<?, ?it/s]

----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 44390)
Traceback (most recent call last):
  File "/usr/lib64/python3.9/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/lib64/python3.9/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/usr/lib64/python3.9/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/lib64/python3.9/socketserver.py", line 747, in __init__
    self.handle()
  File "/usr/lib/spark/python/pyspark/accumulators.py", line 260, in handle
    poll(authenticate_and_accum_updates)
  File "/usr/lib/spark/python/pyspark/accumulators.py", line 235, in poll
    if func():
  File "/usr/lib/spark/python/pyspark/accumulators.py", line 250, in authenticate_and_accum_updates
    received_token = received_token.decode("utf-8")
UnicodeDeco

{'numTopFeatures': 10, 'regParam': 0.01, 'maxIter': 10, 'p': 1, 'f1_train': 0.8727422906218616, 'f1_val': 0.6018395632466248, 'runtime_training': 96, 'runtime_train_pred': 4, 'runtime_train_val': 3, 'runtime_train_eval': 10}
{'numTopFeatures': 10, 'regParam': 0.01, 'maxIter': 10, 'p': 2, 'f1_train': 0.8727422906218616, 'f1_val': 0.6018395632466248, 'runtime_training': 80, 'runtime_train_pred': 3, 'runtime_train_val': 3, 'runtime_train_eval': 5}


KeyboardInterrupt: 

Performance on Test Set

In [None]:
loaded_model = PipelineModel.load(str(base_path.parent/"data/best_svm_model"))
t0 = datetime.now()
predictions_test = loaded_model.transform(test) 
t1 = datetime.now()
f1_test = evaluator.evaluate(predictions_val)
t2 = datetime.now()
logging.info(f"Runtime prediciton: {t1-t0}, runtime evalutation {t2-t1}, F1 Test: {f1_test}")
spark.stop()