# Lab3
In this lab, we will use PySpark ML (pyspark.ml) and PySpark SQL (pyspark.sql) to impletement different classifiers for the document classification task.

## Import Modules

You may need to install numpy to execute this code correctly.

In [2]:
pip install numpy

Collecting numpy
  Using cached numpy-1.19.1-cp36-cp36m-win_amd64.whl (12.9 MB)
Installing collected packages: numpy
Successfully installed numpy-1.19.1
Note: you may need to restart the kernel to use updated packages.


In [3]:
from pyspark.sql import *
from pyspark import SparkConf

from pyspark.sql import DataFrame
from pyspark.sql.functions import rand
from pyspark.sql.types import IntegerType

from pyspark.ml import Pipeline, Transformer
from pyspark.ml.feature import Tokenizer, CountVectorizer, StringIndexer
from pyspark.ml.classification import LogisticRegression, LinearSVC, NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

## Create a Spark Session

In [4]:
conf = SparkConf().setMaster("local[*]").setAppName("lab3")
spark = SparkSession.builder.config(conf=conf).getOrCreate()

## Load and View Data

In [5]:
train_data = spark.read.load("Lab3train.csv", format="csv", sep="\t", inferSchema="true", header="true")
test_data = spark.read.load("Lab3test.csv", format="csv", sep="\t", inferSchema="true", header="true")
train_data.show(10)
print('---------\nSchema of train_data:')
train_data.printSchema()

+--------+--------------------+
|category|            descript|
+--------+--------------------+
|    MISC|I've been there t...|
|    REST|Stay away from th...|
|    REST|Wow over 100 beer...|
|    MISC|Having been a lon...|
|    MISC|This is a consist...|
|    REST|I ate here a week...|
|    MISC|First of all Dal ...|
|    REST|Great food at REA...|
|    REST|While there are p...|
|    MISC|My first encounte...|
+--------+--------------------+
only showing top 10 rows

---------
Schema of train_data:
root
 |-- category: string (nullable = true)
 |-- descript: string (nullable = true)



## Data Preparation and Feature Generation
We are going to evaluate the performance of different models and choose the best one. So, we don't include the classifier in the pipeline.

We tokenize each document into a sequence of tokens and generate features as the frequency of tokens. And, transform the label (e.g., category) into an indexed vector.

We only keep those columns that will be used by the classifier.

In [6]:
# white space expression tokenizer
word_tokenizer = Tokenizer(inputCol="descript", outputCol="words")

# bag of words count
count_vectors = CountVectorizer(inputCol="words", outputCol="features")

# label indexer
label_maker = StringIndexer(inputCol = "category", outputCol = "label")

In [7]:
class Selector(Transformer):
    def __init__(self, outputCols=['features', 'label']):
        self.outputCols=outputCols
        
    def _transform(self, df: DataFrame) -> DataFrame:
        return df.select(*self.outputCols)

selector = Selector(outputCols = ['features', 'label'])

In [8]:
# build the pipeline
pipeline = Pipeline(stages=[word_tokenizer, count_vectors, label_maker, selector])

In [9]:
# Fit the pipeline using train_data.
fitted_pipeline = pipeline.fit(train_data)


# Transform the train_data using fitted pipeline
training_set = fitted_pipeline.transform(train_data)
training_set.show(10)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(5421,[1,18,31,39...|  1.0|
|(5421,[0,1,15,20,...|  0.0|
|(5421,[3,109,556,...|  0.0|
|(5421,[1,2,3,5,6,...|  1.0|
|(5421,[2,3,4,8,11...|  1.0|
|(5421,[1,2,5,25,4...|  0.0|
|(5421,[7,40,142,1...|  1.0|
|(5421,[8,13,19,25...|  0.0|
|(5421,[2,3,7,8,21...|  0.0|
|(5421,[2,16,22,49...|  1.0|
+--------------------+-----+
only showing top 10 rows



In order to perform cross validation, we need to generate a random group id for each row.

In [10]:
training_set = training_set.withColumn('group', (rand()*5).cast(IntegerType()))
training_set.show(10)

+--------------------+-----+-----+
|            features|label|group|
+--------------------+-----+-----+
|(5421,[1,18,31,39...|  1.0|    4|
|(5421,[0,1,15,20,...|  0.0|    3|
|(5421,[3,109,556,...|  0.0|    1|
|(5421,[1,2,3,5,6,...|  1.0|    0|
|(5421,[2,3,4,8,11...|  1.0|    2|
|(5421,[1,2,5,25,4...|  0.0|    4|
|(5421,[7,40,142,1...|  1.0|    4|
|(5421,[8,13,19,25...|  0.0|    0|
|(5421,[2,3,7,8,21...|  0.0|    4|
|(5421,[2,16,22,49...|  1.0|    2|
+--------------------+-----+-----+
only showing top 10 rows



## Model Construction
We construct and evaluate three different models.

In [11]:
# Logistic Regression
bow_lr = LogisticRegression(featuresCol='features', labelCol='label', predictionCol='lr_prediction',
                            maxIter=20, regParam=1., elasticNetParam=0)

#bow_lr_model = bow_lr.fit(train_dataset)
#bow_lr_predictions = bow_lr_model.transform(dev_dataset)

In [12]:
# Naive Bayes
bow_nb = NaiveBayes(featuresCol='features', labelCol='label', predictionCol='nb_prediction')
#bow_nb_model = bow_nb.fit(train_dataset)
#bow_nb_predictions = bow_nb_model.transform(dev_dataset)

In [13]:
# SVM
bow_svm = LinearSVC(featuresCol='features', labelCol='label', predictionCol='svm_prediction')
#bow_svm_model = bow_svm.fit(train_dataset)
#bow_svm_predictions = bow_svm_model.transform(dev_dataset)

## Cross Validation 

In [14]:
# Evaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction",metricName='f1')

In [15]:
lr_f1 = []
nb_f1 = []
svm_f1 = []
for i in range(5):
    condition = training_set['group'] == i
    c_train = training_set.filter(~condition).cache()
    c_test = training_set.filter(condition).cache()
    
    lr_model = bow_lr.fit(c_train)
    lr_pred = lr_model.transform(c_test)
    lr_f1.append(evaluator.evaluate(lr_pred, {evaluator.predictionCol:'lr_prediction'}))
    
    nb_model = bow_nb.fit(c_train)
    nb_pred = nb_model.transform(c_test)
    nb_f1.append(evaluator.evaluate(nb_pred, {evaluator.predictionCol:'nb_prediction'}))
    
    svm_model = bow_svm.fit(c_train)
    svm_pred = svm_model.transform(c_test)
    svm_f1.append(evaluator.evaluate(svm_pred, {evaluator.predictionCol:'svm_prediction'}))

In [16]:
print('Performance of LR: {}'.format(sum(lr_f1)/len(lr_f1)))
print('Performance of NB: {}'.format(sum(nb_f1)/len(nb_f1)))
print('Performance of SVM: {}'.format(sum(svm_f1)/len(svm_f1)))

Performance of LR: 0.6886858544469151
Performance of NB: 0.7885129416306912
Performance of SVM: 0.793896137970462


In [17]:
lr_pred.show()

+--------------------+-----+-----+--------------------+--------------------+-------------+
|            features|label|group|       rawPrediction|         probability|lr_prediction|
+--------------------+-----+-----+--------------------+--------------------+-------------+
|(5421,[1,18,31,39...|  1.0|    4|[-8.1708583257705...|[0.49979572855322...|          1.0|
|(5421,[1,2,5,25,4...|  0.0|    4|[0.64151991353993...|[0.65509695827801...|          0.0|
|(5421,[7,40,142,1...|  1.0|    4|[0.47345461071266...|[0.61620108980942...|          0.0|
|(5421,[2,3,7,8,21...|  0.0|    4|[0.38928879537708...|[0.59611147954531...|          0.0|
|(5421,[0,7,47,49,...|  0.0|    4|[0.72311531302956...|[0.67329266099368...|          0.0|
|(5421,[0,3,4,14,7...|  0.0|    4|[0.82187128127872...|[0.69463341674291...|          0.0|
|(5421,[2,4,7,221,...|  1.0|    4|[0.47132983871007...|[0.61569846304354...|          0.0|
|(5421,[3,5,6,9,11...|  1.0|    4|[-0.2712467851487...|[0.43260103673976...|          1.0|

Apparently, SVM has the best performance among all three models. Hence we will use SVM to train the classifier on the whole training_set, and evaluate it on the test data.

## Train and test the classifier

In [18]:
# We apply the pipeline on the testing set
test_set = fitted_pipeline.transform(test_data)
test_set.show(10)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(5421,[0,4,33,236...|  0.0|
|(5421,[0,3,5,7,9,...|  0.0|
|(5421,[1,3,4,13,5...|  0.0|
|(5421,[0,4,5,19,3...|  0.0|
|(5421,[0,1,4,9,10...|  0.0|
|(5421,[0,5,10,25,...|  1.0|
|(5421,[30,72,114,...|  0.0|
|(5421,[28,47,224,...|  0.0|
|(5421,[0,20,78,15...|  0.0|
|(5421,[0,1,4,5,13...|  0.0|
+--------------------+-----+
only showing top 10 rows



In [19]:
svm_model = bow_svm.fit(training_set)
svm_pred = svm_model.transform(test_set)
print('Performance on test data: {}'.format(evaluator.evaluate(svm_pred, {evaluator.predictionCol:'svm_prediction'})))

Performance on test data: 0.8228157745681111


In [20]:
# close spark context
spark.stop()