# AUEB M.Sc. in Data Science (part-time)

- Semester: Summer 2020

- Course: Big Data Systems and Techniques

- Instructor: Prof. D. Arkoumanis

- Author: Spiros Politis (p3351814)

# Task 3 - ML (15% data processing, 25% algorithm training)

Research *Spark* documentation and select two classification algorithms. The goal is to train classifiers to detect the category of each product using any information on the other columns (i.e. *name*, *description*, *brand*) except *category_name*, *category_code*, *category_id*.

Think what parameters you should tune. Select as K whatever you think best for the problem.

Write a spark program that reads the Task 2 *Parquet* file in a *DataFrame* and processes the data in order to be suitable to be input in the algorithms. 

Apply cross validation verbose to train the algorithm and tune the params. Select the params with maximum performance.

Save the best model to *HDFS*.

Run the training program in *Spark* in distributed mode.

As deliverable give the execution commands, the above program, dumps of its execution in cross validation verbose and the model file.

## Imports and custom code

In [1]:
import pyspark
import pyspark.ml
import pyspark.ml.classification
import pyspark.ml.feature
import pyspark.ml.evaluation
import sys

In [2]:
sys.path.append("../src/")

from Preprocessing.Transformers import LowercaseTransformer, RemoveHTMLTransformer, RemoveNonAlphanumericTransformer

In [8]:
def classification_report(model, df):
    import pyspark.mllib.evaluation

    # Make prediction
    predictionAndTarget = model.transform(df).select("category_id_indexed", "prediction")
    
    # Create both evaluators
    metrics_binary = pyspark.mllib.evaluation.BinaryClassificationMetrics(predictionAndTarget.rdd.map(tuple))
    metrics_multi = pyspark.mllib.evaluation.MulticlassMetrics(predictionAndTarget.rdd.map(tuple))

    accuracy = metrics_multi.accuracy
    precision = metrics_multi.precision(1.0)
    recall = metrics_multi.recall(1.0)
    f1_score = metrics_multi.fMeasure(1.0)
    auc = metrics_binary.areaUnderROC
    
    print "| Metric    | Value |"
    print "|-----------|-------|"
    print "| Accuracy  | {:.3f} |".format(accuracy)
    print "| Error     | {:.3f} |".format(1.0 - accuracy)
    print "| Precision | {:.3f} |".format(precision)
    print "| Recall    | {:.3f} |".format(recall)
    print "| F1-score  | {:.3f} |".format(f1_score)
    print "| AUC       | {:.3f} |".format(auc)

    return accuracy, precision, recall, f1_score, auc

In [3]:
print "PySpark version: {}".format(sc.version)

PySpark version: 2.4.5


## Read Parquet file

In [4]:
%%time

shoes_df = spark.read.parquet("output/query_shoes.parquet")

CPU times: user 2.87 ms, sys: 806 µs, total: 3.68 ms
Wall time: 3.92 s


## Perform transformations on some columns

In [5]:
# Cast category_id to double
shoes_df = shoes_df.withColumn("category_id", shoes_df["category_id"].cast("double"))

# Required by the ML algorithms, the label column should be transformed.
label_indexer = pyspark.ml.feature.StringIndexer(
    inputCol = "category_id", 
    outputCol = "category_id_indexed"
)

label_indexer = label_indexer.fit(shoes_df)

shoes_df = label_indexer.transform(shoes_df)

In [6]:
shoes_df.printSchema()

root
 |-- product_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- upc_id: string (nullable = true)
 |-- descr: string (nullable = true)
 |-- vendor_catalog_url: string (nullable = true)
 |-- buy_url: string (nullable = true)
 |-- manufacturer_name: string (nullable = true)
 |-- sale_price: decimal(38,18) (nullable = true)
 |-- retail_price: decimal(38,18) (nullable = true)
 |-- manufacturer_part_no: string (nullable = true)
 |-- country: string (nullable = true)
 |-- vendor_id: integer (nullable = true)
 |-- category_name: string (nullable = true)
 |-- category_code: string (nullable = true)
 |-- category_id: double (nullable = true)
 |-- category_id_indexed: double (nullable = false)



## Statistics

### Unique category names and their distribution

In [7]:
%%time

shoes_df.groupBy("category_code").count().show()

+-------------------+-----+
|      category_code|count|
+-------------------+-----+
|     shoes-athletic| 4899|
| mens-lace-up-shoes|12353|
|        girls-shoes|21632|
|mens-shoes-athletic| 7935|
|      evening-shoes|  901|
|         boys-shoes|15400|
|       bridal-shoes|  848|
+-------------------+-----+

CPU times: user 3.71 ms, sys: 1.1 ms, total: 4.81 ms
Wall time: 2.81 s


## Data pre-processing

### Apply preprocessing pipeline

In [8]:
shoes_df.select("descr").show(3, truncate = False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|descr                                                                                                                                                                                                                                                                                                                                                                              

In [9]:
%%time

# Defining pipeline steps.
lowercase_transformer = LowercaseTransformer.LowercaseTransformer(
    column = "descr"
)

remove_html_transformer = RemoveHTMLTransformer.RemoveHTMLTransformer(
    column = "descr", 
    sc = sc
)

remove_non_alphanumeric_transformer = RemoveNonAlphanumericTransformer.RemoveNonAlphanumericTransformer(
    column = "descr"
)

# Defining the pipeline.
preprocessing_pipeline = pyspark.ml.Pipeline(
    stages = [
        lowercase_transformer, 
        remove_html_transformer, 
        remove_non_alphanumeric_transformer
    ]
)

shoes_df = preprocessing_pipeline.fit(shoes_df).transform(shoes_df)

# Persisting the RDD for faster performance.
shoes_df = shoes_df.persist(pyspark.StorageLevel.MEMORY_AND_DISK)

CPU times: user 81.5 ms, sys: 13.5 ms, total: 95.1 ms
Wall time: 4.58 s


In [10]:
%%time

shoes_df.select("descr").show(3, truncate = False)

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|descr                                                                                                                                                                                                                                                                                                                                                                                                           

Checking if RDD persitence is in effect, response time should be orders of magnitude lower than previous call because the processing graph has been executed.

In [11]:
%%time

shoes_df.select("descr").show(3, truncate = False)

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|descr                                                                                                                                                                                                                                                                                                                                                                                                           

## Train-test split

$80\%$ - $20\%$ train / test split.

In [15]:
(train_df, test_df) = shoes_df.randomSplit([0.8, 0.2])

## Classification

### One-vs-Rest with Logistic Regression pipeline

Source: https://spark.apache.org/docs/2.2.0/ml-classification-regression.html#logistic-regression

In [89]:
%%time

# Defining pipeline steps.

one_vs_rest_log_reg_tokenizer = pyspark.ml.feature.Tokenizer(
    inputCol = "descr", 
    outputCol = "tokens"
)

one_vs_rest_log_reg_hashing_tf = pyspark.ml.feature.HashingTF(
    inputCol = one_vs_rest_log_reg_tokenizer.getOutputCol(), 
    outputCol = "features", 
    numFeatures = 5000
)

logistic_regression = pyspark.ml.classification.LogisticRegression(
    maxIter = 200, 
    regParam = 0.01
)
one_vs_rest_log_reg_logistic_regression = logistic_regression.setFeaturesCol("features")
one_vs_rest_log_reg_logistic_regression = logistic_regression.setLabelCol("category_id_indexed")

one_vs_rest_classifier = pyspark.ml.classification.OneVsRest(
    classifier = one_vs_rest_log_reg_logistic_regression
)
one_vs_rest_classifier = one_vs_rest_classifier.setFeaturesCol("features")
one_vs_rest_classifier = one_vs_rest_classifier.setLabelCol("category_id_indexed")

# Defining the pipline.
one_vs_rest_log_reg_pipeline = pyspark.ml.Pipeline(
    stages = [
        one_vs_rest_log_reg_tokenizer, 
        one_vs_rest_log_reg_hashing_tf, 
        one_vs_rest_classifier
    ]
)

one_vs_rest_model = one_vs_rest_log_reg_pipeline.fit(train_df)

CPU times: user 523 ms, sys: 122 ms, total: 644 ms
Wall time: 57.6 s


In [90]:
_ = classification_report(one_vs_rest_model, test_df)

| Metric    | Value |
|-----------|-------|
| Accuracy  | 0.794 |
| Error     | 0.206 |
| Precision | 0.758 |
| Recall    | 0.706 |
| F1-score  | 0.731 |
| AUC       | 0.895 |


### Random Forest pipeline

Source: https://spark.apache.org/docs/2.2.0/ml-classification-regression.html#random-forest-classifier

In [91]:
%%time

# Define pipeline steps
random_forest_tokenizer = pyspark.ml.feature.Tokenizer(
    inputCol = "descr", 
    outputCol = "tokens"
)

random_forest_hashing_tf = pyspark.ml.feature.HashingTF(
    inputCol = random_forest_tokenizer.getOutputCol(), 
    outputCol = "features", 
    numFeatures = 5000
)

random_forest_classifier = pyspark.ml.classification.RandomForestClassifier(
    labelCol = "category_id_indexed", 
    featuresCol = "features", 
    numTrees = 200
)

# Define pipeline
random_forest_pipeline = pyspark.ml.Pipeline(
    stages = [
        random_forest_tokenizer, 
        random_forest_hashing_tf, 
        random_forest_classifier
    ]
)

random_forest_model = random_forest_pipeline.fit(train_df)

CPU times: user 187 ms, sys: 62.5 ms, total: 250 ms
Wall time: 1min 19s


In [92]:
_ = classification_report(random_forest_model, test_df)

| Metric    | Value |
|-----------|-------|
| Accuracy  | 0.474 |
| Error     | 0.526 |
| Precision | 0.081 |
| Recall    | 0.954 |
| F1-score  | 0.149 |
| AUC       | 0.716 |


## Verbose cross-validation

In [93]:
import pyspark.ml.tuning

In [94]:
import numpy as np

from pyspark.ml.tuning import CrossValidator, CrossValidatorModel
from pyspark.sql.functions import rand

class CrossValidatorVerbose(CrossValidator):

    def _fit(self, dataset):
        est = self.getOrDefault(self.estimator)
        epm = self.getOrDefault(self.estimatorParamMaps)
        numModels = len(epm)

        eva = self.getOrDefault(self.evaluator)
        metricName = eva.getMetricName()

        nFolds = self.getOrDefault(self.numFolds)
        seed = self.getOrDefault(self.seed)
        h = 1.0 / nFolds

        randCol = self.uid + "_rand"
        df = dataset.select("*", rand(seed).alias(randCol))
        metrics = [0.0] * numModels

        for i in range(nFolds):
            foldNum = i + 1
            print("Comparing models on fold %d" % foldNum)

            validateLB = i * h
            validateUB = (i + 1) * h
            condition = (df[randCol] >= validateLB) & (df[randCol] < validateUB)
            validation = df.filter(condition)
            train = df.filter(~condition)

            for j in range(numModels):
                paramMap = epm[j]
                model = est.fit(train, paramMap)
                # TODO: duplicate evaluator to take extra params from input
                metric = eva.evaluate(model.transform(validation, paramMap))
                metrics[j] += metric

                avgSoFar = metrics[j] / foldNum
                print("params: %s\t%s: %f\tavg: %f" % (
                    {param.name: val for (param, val) in paramMap.items()},
                    metricName, metric, avgSoFar))

        if eva.isLargerBetter():
            bestIndex = np.argmax(metrics)
        else:
            bestIndex = np.argmin(metrics)

        bestParams = epm[bestIndex]
        bestModel = est.fit(dataset, bestParams)
        avgMetrics = [m / nFolds for m in metrics]
        bestAvg = avgMetrics[bestIndex]
        print("Best model:\nparams: %s\t%s: %f" % (
            {param.name: val for (param, val) in bestParams.items()},
            metricName, bestAvg))

        return self._copyValues(CrossValidatorModel(bestModel, avgMetrics))

In [99]:
one_vs_rest_log_reg_param_grid = pyspark.ml.tuning.ParamGridBuilder() \
    .addGrid(
        one_vs_rest_log_reg_hashing_tf.numFeatures, 
        [100, 250, 500, 1000, 2500, 5000]
    ) \
    .addGrid(
        one_vs_rest_log_reg_logistic_regression.regParam, [0.1, 0.01]
    ) \
    .build()

one_vs_rest_log_reg_crossval = CrossValidatorVerbose(
    estimator = one_vs_rest_log_reg_pipeline,
    estimatorParamMaps = one_vs_rest_log_reg_param_grid,
    evaluator = pyspark.ml.evaluation.MulticlassClassificationEvaluator(
        labelCol = "category_id_indexed", 
        predictionCol = "prediction", 
        metricName = "accuracy"
    ),
    numFolds = 3
)

In [100]:
%%time

# Run cross-validation, and choose the best set of parameters.
one_vs_rest_log_reg_crossval = one_vs_rest_log_reg_crossval.fit(train_df)

Comparing models on fold 1
params: {'regParam': 0.1, 'numFeatures': 100}	accuracy: 0.554878	avg: 0.554878
params: {'regParam': 0.01, 'numFeatures': 100}	accuracy: 0.574099	avg: 0.574099
params: {'regParam': 0.1, 'numFeatures': 250}	accuracy: 0.617287	avg: 0.617287
params: {'regParam': 0.01, 'numFeatures': 250}	accuracy: 0.643188	avg: 0.643188
params: {'regParam': 0.1, 'numFeatures': 500}	accuracy: 0.686200	avg: 0.686200
params: {'regParam': 0.01, 'numFeatures': 500}	accuracy: 0.713742	avg: 0.713742
params: {'regParam': 0.1, 'numFeatures': 1000}	accuracy: 0.730735	avg: 0.730735
params: {'regParam': 0.01, 'numFeatures': 1000}	accuracy: 0.756636	avg: 0.756636
params: {'regParam': 0.1, 'numFeatures': 2500}	accuracy: 0.766657	avg: 0.766657
params: {'regParam': 0.01, 'numFeatures': 2500}	accuracy: 0.780018	avg: 0.780018
params: {'regParam': 0.1, 'numFeatures': 5000}	accuracy: 0.781072	avg: 0.781072
params: {'regParam': 0.01, 'numFeatures': 5000}	accuracy: 0.791444	avg: 0.791444
Comparing mod

## Best model

### Classification report

In [112]:
_ = classification_report(one_vs_rest_log_reg_crossval.bestModel, test_df)

| Metric    | Value |
|-----------|-------|
| Accuracy  | 0.794 |
| Error     | 0.206 |
| Precision | 0.758 |
| Recall    | 0.706 |
| F1-score  | 0.731 |
| AUC       | 0.895 |


### Best params

params: {'regParam': 0.01, 'numFeatures': 5000}	accuracy: 0.792899	avg: 0.792172

## Saving and loading the model (for verification)

In [118]:
one_vs_rest_log_reg_crossval.bestModel.write().overwrite().save("output/one_vs_rest_log_reg.model")

In [119]:
saved_model = pyspark.ml.pipeline.PipelineModel.load("output/one_vs_rest_log_reg.model")

In [120]:
_ = classification_report(saved_model, test_df)

| Metric    | Value |
|-----------|-------|
| Accuracy  | 0.794 |
| Error     | 0.206 |
| Precision | 0.758 |
| Recall    | 0.706 |
| F1-score  | 0.731 |
| AUC       | 0.895 |


---