# Spark Text Classification

This notebook contains the text classification pipeline for the reviews dataset. It can be submitted as a long-running job by converting it into a python script file:

`jupyter nbconvert --to script pipeline.ipynb`

Then, submit the job using:

`spark-submit pipeline.py`

Some implementation details:

For efficiency, I try to get as much preprocessing as possible done before the hyperparameter search. But, since no information from the test/validation set should leak to train set, idf values and $\chi^2$ feature selection is done on each set separately. Since the feature selection is longest stage in preprocessing, the efficiency gains are minimal.

In [None]:
from datetime import datetime

from pyspark import SparkContext, SQLContext
from pyspark.ml import Pipeline, PipelineModel, Transformer
from pyspark.ml.classification import LinearSVC, OneVsRest, DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import ChiSqSelector, CountVectorizer, IDF, Normalizer, OneHotEncoderEstimator, RegexTokenizer, StopWordsRemover, StringIndexer
from pyspark.ml.param.shared import HasInputCol
from pyspark.ml.tuning import TrainValidationSplit, TrainValidationSplitModel, ParamGridBuilder
from pyspark.ml.util import MLReadable, MLWritable
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType

In [None]:
sc = SparkContext.getOrCreate()
sqlc = SQLContext(sc)
reviews_file = "hdfs:///user/pknees/amazon-reviews/full/reviews_devset.json"
selected_terms_file = "output_ds.txt"
model_file = f"dic2/model-{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}"
results_file = f"results-{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.txt"
seed = 42  # seed is used for the dataset splits and the decision tree for reproducibility reasons
save_selected_terms = False
classifier = "decision_tree"

Since there seem to be resource contraints on the Jupyter runtime, only 1000 rows of the whole dataset are loaded. Change this by removing the `limit(1000)` call below.

In [None]:
df = sqlc.read.json(reviews_file).select("category", "reviewText").limit(1000)

## Preprocessing

The preprocessing pipeline leverages built-in Transformers and Estimators. Only for case folding a custom Transformer using a User Defined Function is needed. This is also the only stage which changes a column in-place, all other stages simply append to the data frame.

The feature selection is implemented using ChiSqSelector over the term frequencies, in order to maintain comparability with the MapReduce/RDD jobs.

In [None]:
class CaseFolder(Transformer, HasInputCol, MLReadable, MLWritable):
    def __init__(self, inputCol):
        super().__init__()
        self.setInputCol(inputCol)
        self.udf = UserDefinedFunction(lambda x: x.lower(), StringType())
    
    def _transform(self, df):
        new_df = df.withColumn(self.getInputCol(), self.udf(df[self.getInputCol()]))
        return new_df

casefolder = CaseFolder(inputCol="reviewText")
tokenizer = RegexTokenizer(inputCol="reviewText", outputCol="rawTerms", pattern="""\s|\d|\.|!|\?|,|;|:|\(|\)|\[|]|\{|}|-|_|"|`|~|#|&|\*|%|\$|\|/""")
stopWordsRemover = StopWordsRemover(inputCol="rawTerms", outputCol="filteredTerms")
categoryIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
tfTransformer = CountVectorizer(inputCol="filteredTerms", outputCol="termFrequencies")
chisq = ChiSqSelector(numTopFeatures=4000, featuresCol="termFrequencies", labelCol="categoryIndex", outputCol="selectedTerms")
preprocessingPipeline = Pipeline(stages=[casefolder, tokenizer, stopWordsRemover, categoryIndexer, tfTransformer])

In [None]:
preproModel = preprocessingPipeline.fit(df)
preproDF = preproModel.transform(df)

The preprocessing pipeline needs to be fit to the whole dataset such that the Estimator stages can be fitted on the dataset. This is the case for the CountVectorizer, which maintains a term->ix mapping.

Since the feature selection using the ChiSqSelector is not part of the overall preprocessing (it is later applied to the dataset splits), to get an overall selection of terms the ChiSqSelector needs to be fitted explicitly below.

In [None]:
if save_selected_terms:
    chisq_model = chisq.fit(preproDF)
    vocabulary = preproModel.stages[4].vocabulary
    selectedTerms = [vocabulary[index] for index in chisq_model.selectedFeatures]
    with open(selected_terms_file, "w") as file:
        file.write(", ".join(selectedTerms))
    selectedTerms[:10]

A train dataset (which is later split into a train and validation set) is used to fit the classifier, while the test dataset is used to evaluate the dataset.

In [None]:
df_train, df_test = preproDF.randomSplit([.8, .2], seed=seed)

The proper pipeline contians stages for dataset split-specific preprocessing and the classifiers. If a binary classifier like LinearSVC is used, a OneVsRest strategy has to be added.

In [None]:
if classifier == "svm":
    idf = IDF(inputCol="selectedTerms", outputCol="tfidf")
    normalizer = Normalizer(p=2, inputCol="selectedTerms", outputCol="features")
    svm = LinearSVC(featuresCol="features", labelCol="categoryIndex", predictionCol="prediction")
    ovr = OneVsRest(classifier=svm, featuresCol="features", labelCol="categoryIndex", predictionCol="prediction")
    pipeline = Pipeline(stages=[chisq, idf, normalizer, ovr])
elif classifier == "decision_tree":
    idf = IDF(inputCol="selectedTerms", outputCol="tfidf")
    normalizer = Normalizer(p=2, inputCol="selectedTerms", outputCol="features")
    decision_tree = DecisionTreeClassifier(featuresCol="features", labelCol="categoryIndex", predictionCol="prediction", seed=seed)
    pipeline = Pipeline(stages=[chisq, idf, normalizer, decision_tree])

The parameter grid allows us to test multiple hyperparameter combinations of pipeline stages.

In [None]:
if classifier == "svm":
    paramGrid = ParamGridBuilder()\
        .addGrid(chisq.numTopFeatures, [400, 4000])\
        .addGrid(svm.regParam, [1, .1, 0])\
        .addGrid(svm.maxIter, [100, 10])\
        .addGrid(svm.standardization, [True, False])\
        .build()
elif classifier == "decision_tree":
    paramGrid = ParamGridBuilder()\
        .addGrid(chisq.numTopFeatures, [400, 4000])\
        .addGrid(decision_tree.maxDepth, [2, 4, 9, 15])\
        .addGrid(decision_tree.impurity, ["entropy", "gini"])\
        .addGrid(decision_tree.minInstancesPerNode, [1, 100])\
        .build()

Using TrainValidationSplit, the pipeline is fitted on the train dataset and validated on the validation dataset for each parameter combination.

In [None]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="categoryIndex", metricName="f1")
validation = TrainValidationSplit(estimator=pipeline,
                                  estimatorParamMaps=paramGrid,
                                  evaluator=evaluator,
                                  trainRatio=0.8,
                                  seed=seed)

validationModel = validation.fit(df_train)

In [None]:
validationModel.bestModel.save(model_file)

In [None]:
best_model = PipelineModel.load(model_file)

In [None]:
with open(results_file, "w") as file:
    file.write(",".join(map(str, validationModel.validationMetrics)))

The best hyperparameter combination is:

In [None]:
best_model.explainParams()

Now, to further evaluate the best model, the F1 score on the test/holdout set is computed. It would also be possible to fit a new model using the best hyperparameter combination on the whole train dataset (train + validation) and then evaluate this model with the test dataset.

In [None]:
predictions_test = best_model.transform(df_test)

In [19]:
overall_evaluation = evaluator.evaluate(predictions_test)
print(f"F1 score on test dataset: {overall_evaluation}")

F1 score on test dataset: 0.6802603841579735
