# Machine Learning

In [1]:
# global imports
import numpy as np
# pyspark
import findspark
findspark.init()
import pyspark
from pyspark.sql.session import SparkSession
from pyspark.sql import Window
import pyspark.sql.functions as W
# machine learning models
from pyspark.ml.classification import RandomForestClassifier, GBTClassifier
from pyspark.ml.classification import NaiveBayes, LinearSVC, LogisticRegression
# evaluator
from pyspark.ml.evaluation import Evaluator
# cross validation
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel


spark = SparkSession.builder.getOrCreate()

# Import Data

In [2]:
# import data
%store -r dfs

In [3]:
# import train data
train_start = spark.createDataFrame(dfs[0])
train_start.show(2)

+--------------------+-----+-------------------+
|           count_vec|label|            weights|
+--------------------+-----+-------------------+
|(495,[1,2,43,279]...|    0|0.08970309538850285|
|(495,[0,2,7,8,10,...|    0|0.08970309538850285|
+--------------------+-----+-------------------+
only showing top 2 rows



In [4]:
# import test data
test = spark.createDataFrame(dfs[1])
test.show(2)

+--------------------+-----+
|           count_vec|label|
+--------------------+-----+
|         (495,[],[])|    0|
|(495,[1,5,6,7,10,...|    0|
+--------------------+-----+
only showing top 2 rows



In [5]:
# import vocabulary
vocab = dfs[3]

## Create Folds in Train

`fold` column specifies which fold each row belongs to for stratified cross validation.

In [6]:
# shuffle the dataset
shuffled = train_start.orderBy(W.rand(340))
# create folds for positive and negative classes
positive = train_start.filter(shuffled['label']==1)\
                      .withColumn('id', W.row_number().over(Window.orderBy(W.lit('A'))))\
                      .withColumn('fold', W.col('id')%5).drop('id')
negative = train_start.filter(shuffled['label']==0)\
                      .withColumn('id', W.row_number().over(Window.orderBy(W.lit('A'))))\
                      .withColumn('fold', W.col('id')%5).drop('id')
# combine data frames
train = positive.unionAll(negative)
# output data frame
train.show(2)

+--------------------+-----+------------------+----+
|           count_vec|label|           weights|fold|
+--------------------+-----+------------------+----+
|(495,[6,56,59,78,...|    1|0.9102969046114971|   1|
|(495,[1,3,4,5,6,8...|    1|0.9102969046114971|   2|
+--------------------+-----+------------------+----+
only showing top 2 rows



# Create Evaluators

In [7]:
# class for fMeasure
class fMeasure(Evaluator):
    def __init__(self, predictionCol="prediction", labelCol="label"):
        self.predictionCol = predictionCol
        self.labelCol = labelCol
    def _evaluate(self, dataset):
        tp = dataset.where(dataset[self.labelCol]==1).filter(
            dataset[self.labelCol]==dataset[self.predictionCol]).count()
        fp = dataset.where(dataset[self.labelCol]==0).filter(
            dataset[self.labelCol]!=dataset[self.predictionCol]).count()
        fn = dataset.where(dataset[self.labelCol]==1).filter(
            dataset[self.labelCol]!=dataset[self.predictionCol]).count()
        precision = 0
        recall = 0
        if (tp+fp) != 0:
            precision = tp/(tp+fp)
        if (tp+fn) != 0:
            recall = tp/(tp+fn)
        if (precision+recall) == 0:
            return 0
        else:
            return (2*precision*recall)/(precision+recall)
    def isLargerBetter(self):
        return True

In [8]:
# create f1 score evaluator
f1_score = fMeasure()

In [9]:
# class for accuracy
class accuracy(Evaluator):
    def __init__(self, predictionCol="prediction", labelCol="label"):
        self.predictionCol = predictionCol
        self.labelCol = labelCol
    def _evaluate(self, dataset):
        return dataset.filter(dataset[self.labelCol]==dataset[self.predictionCol]).count()/dataset.count()
    def isLargerBetter(self):
        return True

In [10]:
# create accuracy evaluator
accuracy = accuracy()

# Create Stratified Cross Validator

Very basic stratified cross validator that can be used with 5 folds.

In [11]:
# this is designed for 5 fold cross validation
# I tried other iterations that are more flexible but have significantly longer run times
class stratifiedCV(CrossValidator):
    def __init__(self,estimator=None,estimatorParamMaps=None,evauator=None):
        super(stratifiedCV, self).__init__()
        kwargs = self._input_kwargs
        self._set(**kwargs)
    def _fit(self, dataset):
        est = self.getOrDefault(self.estimator)
        epm = self.getOrDefault(self.estimatorParamMaps)
        numModels = len(epm)
        eva = self.getOrDefault(self.evaluator)
        metrics = [0.0]*numModels
        for i in range(5):
            train = dataset.filter(W.col('fold')!=i)
            validation = dataset.filter(W.col('fold')==i)
            models = est.fit(train,epm)
            for j in range(numModels):
                model = models[j]
                metric = eva.evaluate(model.transform(validation,epm[j]))
                metrics[j] += metric/5
        if eva.isLargerBetter():
            bestIndex = np.argmax(metrics)
        else:
            bestIndex = np.argmin(metrics)
        bestModel = est.fit(dataset,epm[bestIndex])
        return self._copyValues(CrossValidatorModel(bestModel,metrics))

# Gradient Boosted Tree

***Cross Validation***

Tune `stepSize` and `maxIter` parameters. 

In [12]:
 # create gbt model
gbt_cv = GBTClassifier(featuresCol='count_vec', labelCol='label').setSeed(987) # parameter grid
gbt_pars = ParamGridBuilder().addGrid(gbt_cv.stepSize,[0.1,0.5,0.9])\
                             .addGrid(gbt_cv.maxIter,[50,100,200])\
                             .build()
# create cross validator
cv_gbt = stratifiedCV().setEstimator(gbt_cv)\
                       .setEvaluator(f1_score)\
                       .setEstimatorParamMaps(gbt_pars)

In [13]:
# run cross validation
cv_gbt_model = cv_gbt.fit(train)
# get best model
best_gbt = cv_gbt_model.bestModel
# output best parameters
print('Best step size: ',best_gbt._java_obj.getStepSize())
print('Best number of trees: ',best_gbt._java_obj.getMaxIter())

Best step size:  0.9
Best number of trees:  200


***Final Model***

In [14]:
# fit the model
gbt = GBTClassifier(featuresCol='count_vec',labelCol='label',
                   stepSize=0.9,maxIter=200).setSeed(987)
# train the model
gbt_model = gbt.fit(train)
# obtain predictions
gbt_train_pred = gbt_model.transform(train)
gbt_test_pred = gbt_model.transform(test)

In [15]:
# confusion matrix for training data
gbt_train_confuse = gbt_train_pred.select('label','prediction')
# 'label' is row and 'prediction' is column
gbt_train_confuse.stat.crosstab('label','prediction').toPandas()

Unnamed: 0,label_prediction,0.0,1.0
0,1,7,135
1,0,1441,0


In [16]:
# confusion matrix for test data
gbt_test_confuse = gbt_test_pred.select('label','prediction')
# 'label' is row and 'prediction' is column
gbt_test_confuse.stat.crosstab('label','prediction').toPandas()

Unnamed: 0,label_prediction,0.0,1.0
0,1,50,14
1,0,632,17


In [17]:
# output the f1 score
print('F1 score for training set: ',f1_score.evaluate(gbt_train_pred))
print('F1 score for test set: ',f1_score.evaluate(gbt_test_pred))

F1 score for training set:  0.9747292418772564
F1 score for test set:  0.29473684210526313


In [18]:
# output the accuracy
print('Accuracy for training set: ',accuracy.evaluate(gbt_train_pred))
print('Accuracy for test set: ',accuracy.evaluate(gbt_test_pred))

Accuracy for training set:  0.9955780164245104
Accuracy for test set:  0.906030855539972


# Naive Bayes

***Cross Validation***

Tune `smoothing` parameter.

In [19]:
# create naive bayes model
nb_cv = NaiveBayes(featuresCol='count_vec',labelCol='label',weightCol='weights')
# parameter grid
nb_pars = ParamGridBuilder().addGrid(nb_cv.smoothing,[0.5,1,2,5]).build()
# create cross validator
cv_nb = stratifiedCV().setEstimator(nb_cv)\
                      .setEvaluator(f1_score)\
                      .setEstimatorParamMaps(nb_pars)

In [20]:
# run cross validation
cv_nb_model = cv_nb.fit(train)
# get best model
best_nb = cv_nb_model.bestModel
# output best parameter
print('Best smoothing value: ',best_nb._java_obj.getSmoothing())

Best smoothing value:  1.0


In [21]:
# create naive bayes model
nb_cv = NaiveBayes(featuresCol='count_vec',labelCol='label',weightCol='weights')
# parameter grid
nb_pars = ParamGridBuilder().addGrid(nb_cv.smoothing,[0.6,0.8,1,1.5]).build()
# create cross validator
cv_nb = stratifiedCV().setEstimator(nb_cv)\
                      .setEvaluator(f1_score)\
                      .setEstimatorParamMaps(nb_pars)

In [22]:
# run cross validation
cv_nb_model = cv_nb.fit(train)
# get best model
best_nb = cv_nb_model.bestModel
# output best parameter
print('Best smoothing value: ',best_nb._java_obj.getSmoothing())

Best smoothing value:  0.6


***Final Model***

In [23]:
# fit the model
nb = NaiveBayes(featuresCol='count_vec',labelCol='label',weightCol='weights',smoothing=0.6)
# train the model
nb_model = nb.fit(train)
# obtain predictions
nb_train_pred = nb_model.transform(train)
nb_test_pred = nb_model.transform(test)

In [24]:
# confusion matrix for training data
nb_train_confuse = nb_train_pred.select('label','prediction')
# 'label' is row and 'prediction' is column
nb_train_confuse.stat.crosstab('label','prediction').toPandas()

Unnamed: 0,label_prediction,0.0,1.0
0,1,25,117
1,0,1272,169


In [25]:
# confusion matrix for training data
nb_test_confuse = nb_test_pred.select('label','prediction')
# 'label' is row and 'prediction' is column
nb_test_confuse.stat.crosstab('label','prediction').toPandas()

Unnamed: 0,label_prediction,0.0,1.0
0,1,17,47
1,0,579,70


In [26]:
# output the f1 score
print('F1 score for training set: ',f1_score.evaluate(nb_train_pred))
print('F1 score for test set: ',f1_score.evaluate(nb_test_pred))

F1 score for training set:  0.5467289719626168
F1 score for test set:  0.5193370165745856


In [27]:
# output the accuracy
print('Accuracy for training set: ',accuracy.evaluate(nb_train_pred))
print('Accuracy for test set: ',accuracy.evaluate(nb_test_pred))

Accuracy for training set:  0.8774478837650032
Accuracy for test set:  0.8779803646563815


# Logistic Regression

## Ordinary Least Squares

***Final Model***

In [28]:
# create model
ols = LogisticRegression(featuresCol='count_vec',labelCol='label',
                         weightCol='weights',elasticNetParam=0,regParam=0)
# train the model
ols_model = ols.fit(train)
# obtain predictions
ols_train_pred = ols_model.transform(train)
ols_test_pred = ols_model.transform(test)

In [29]:
# confusion matrix for training data
ols_train_confuse = ols_train_pred.select('label','prediction')
# 'label' is row and 'prediction' is column
ols_train_confuse.stat.crosstab('label','prediction').toPandas()

Unnamed: 0,label_prediction,0.0,1.0
0,1,0,142
1,0,1420,21


In [30]:
# confusion matrix for test data
ols_test_confuse = ols_test_pred.select('label','prediction')
# 'label' is row and 'prediction' is column
ols_test_confuse.stat.crosstab('label','prediction').toPandas()

Unnamed: 0,label_prediction,0.0,1.0
0,1,33,31
1,0,582,67


In [31]:
# output the f1 score
print('F1 score for training set: ',f1_score.evaluate(ols_train_pred))
print('F1 score for test set: ',f1_score.evaluate(ols_test_pred))

F1 score for training set:  0.9311475409836066
F1 score for test set:  0.3827160493827161


In [32]:
# output the accuracy
print('Accuracy for training set: ',accuracy.evaluate(ols_train_pred))
print('Accuracy for test set: ',accuracy.evaluate(ols_test_pred))

Accuracy for training set:  0.9867340492735313
Accuracy for test set:  0.8597475455820477


## Lasso

***Cross Validation***

Tune `regParam` parameter.

In [33]:
# create lasso model
lasso_cv = LogisticRegression(featuresCol='count_vec',labelCol='label',
                              weightCol='weights',elasticNetParam=1)
# parameter grid
lasso_pars = ParamGridBuilder().addGrid(lasso_cv.regParam,[0.01,0.1,1,10]).build()
# create cross validator
cv_lasso = stratifiedCV().setEstimator(lasso_cv)\
                         .setEvaluator(f1_score)\
                         .setEstimatorParamMaps(lasso_pars)

In [34]:
# run cross validation
cv_lasso_model = cv_lasso.fit(train)
# get best model
best_lasso = cv_lasso_model.bestModel
# output best parameter
print('Best lambda: ',best_lasso._java_obj.getRegParam())

Best lambda:  0.01


In [35]:
# create lasso model
lasso_cv = LogisticRegression(featuresCol='count_vec',labelCol='label',
                              weightCol='weights',elasticNetParam=1)
# parameter grid
lasso_pars = ParamGridBuilder().addGrid(lasso_cv.regParam,[0.001,0.005,0.01,0.05]).build()
# create cross validator
cv_lasso = stratifiedCV().setEstimator(lasso_cv)\
                         .setEvaluator(f1_score)\
                         .setEstimatorParamMaps(lasso_pars)

In [36]:
# run cross validation
cv_lasso_model = cv_lasso.fit(train)
# get best model
best_lasso = cv_lasso_model.bestModel
# output best parameter
print('Best lambda: ',best_lasso._java_obj.getRegParam())

Best lambda:  0.01


***Final Model***

In [37]:
# create model
lasso = LogisticRegression(featuresCol='count_vec',labelCol='label',
                         weightCol='weights',elasticNetParam=1,regParam=0.01)
# train the model
lasso_model = lasso.fit(train)
# obtain predictions
lasso_train_pred = lasso_model.transform(train)
lasso_test_pred = lasso_model.transform(test)

In [38]:
# confusion matrix for training data
lasso_train_confuse = lasso_train_pred.select('label','prediction')
# 'label' is row and 'prediction' is column
lasso_train_confuse.stat.crosstab('label','prediction').toPandas()

Unnamed: 0,label_prediction,0.0,1.0
0,1,2,140
1,0,1201,240


In [39]:
# confusion matrix for test data
lasso_test_confuse = lasso_test_pred.select('label','prediction')
# 'label' is row and 'prediction' is column
lasso_test_confuse.stat.crosstab('label','prediction').toPandas()

Unnamed: 0,label_prediction,0.0,1.0
0,1,17,47
1,0,516,133


In [40]:
# output the f1 score
print('F1 score for training set: ',f1_score.evaluate(lasso_train_pred))
print('F1 score for test set: ',f1_score.evaluate(lasso_test_pred))

F1 score for training set:  0.5363984674329502
F1 score for test set:  0.3852459016393443


In [41]:
# output the accuracy
print('Accuracy for training set: ',accuracy.evaluate(lasso_train_pred))
print('Accuracy for test set: ',accuracy.evaluate(lasso_test_pred))

Accuracy for training set:  0.8471257106759318
Accuracy for test set:  0.7896213183730715


## Ridge

***Cross Validation***

Tune `regParam` parameter.

In [42]:
# create ridge model
ridge_cv = LogisticRegression(featuresCol='count_vec',labelCol='label',
                              weightCol='weights',elasticNetParam=0)
# parameter grid
ridge_pars = ParamGridBuilder().addGrid(ridge_cv.regParam,[0.01,0.1,1,10]).build()
# create cross validator
cv_ridge = stratifiedCV().setEstimator(ridge_cv)\
                         .setEvaluator(f1_score)\
                         .setEstimatorParamMaps(ridge_pars)

In [43]:
# run cross validation
cv_ridge_model = cv_ridge.fit(train)
# get best model
best_ridge = cv_ridge_model.bestModel
# output best parameter
print('Best lambda: ',best_ridge._java_obj.getRegParam())

Best lambda:  1.0


In [44]:
# create ridge model
ridge_cv = LogisticRegression(featuresCol='count_vec',labelCol='label',
                              weightCol='weights',elasticNetParam=0)
# parameter grid
ridge_pars = ParamGridBuilder().addGrid(ridge_cv.regParam,[0.5,1,3,5,7,9]).build()
# create cross validator
cv_ridge = stratifiedCV().setEstimator(ridge_cv)\
                         .setEvaluator(f1_score)\
                         .setEstimatorParamMaps(ridge_pars)

In [45]:
# run cross validation
cv_ridge_model = cv_ridge.fit(train)
# get best model
best_ridge = cv_ridge_model.bestModel
# output best parameter
print('Best lambda: ',best_ridge._java_obj.getRegParam())

Best lambda:  1.0


***Final Model***

In [46]:
# create model
ridge = LogisticRegression(featuresCol='count_vec',labelCol='label',
                         weightCol='weights',elasticNetParam=0,regParam=1)
# train the model
ridge_model = ridge.fit(train)
# obtain predictions
ridge_train_pred = ridge_model.transform(train)
ridge_test_pred = ridge_model.transform(test)

In [47]:
# confusion matrix for training data
ridge_train_confuse = ridge_train_pred.select('label','prediction')
# 'label' is row and 'prediction' is column
ridge_train_confuse.stat.crosstab('label','prediction').toPandas()

Unnamed: 0,label_prediction,0.0,1.0
0,1,28,114
1,0,1351,90


In [48]:
# confusion matrix for test data
ridge_test_confuse = ridge_test_pred.select('label','prediction')
# 'label' is row and 'prediction' is column
ridge_test_confuse.stat.crosstab('label','prediction').toPandas()

Unnamed: 0,label_prediction,0.0,1.0
0,1,27,37
1,0,617,32


In [49]:
# output the f1 score
print('F1 score for training set: ',f1_score.evaluate(ridge_train_pred))
print('F1 score for test set: ',f1_score.evaluate(ridge_test_pred))

F1 score for training set:  0.6589595375722543
F1 score for test set:  0.556390977443609


In [50]:
# output the accuracy
print('Accuracy for training set: ',accuracy.evaluate(ridge_train_pred))
print('Accuracy for test set: ',accuracy.evaluate(ridge_test_pred))

Accuracy for training set:  0.9254579911560329
Accuracy for test set:  0.9172510518934082


## Elastic Net

***Cross Validation***

Tune `regParam` and `elasticNetParam` parameters.

In [51]:
# create elastic net model
en_cv = LogisticRegression(featuresCol='count_vec',labelCol='label',weightCol='weights')
# parameter grid
en_pars = ParamGridBuilder().addGrid(en_cv.regParam,[0.01,0.1,1,10])\
                            .addGrid(en_cv.elasticNetParam,[0.1,0.3,0.5,0.7,0.9]).build()
# create cross validator
cv_en = stratifiedCV().setEstimator(en_cv)\
                      .setEvaluator(f1_score)\
                      .setEstimatorParamMaps(en_pars)

In [52]:
# run cross validation
cv_en_model = cv_en.fit(train)
# get best model
best_en = cv_en_model.bestModel
# output best parameter
print('Best lambda: ',best_en._java_obj.getRegParam())
print('Best elastic net parameter: ',best_en._java_obj.getElasticNetParam())

Best lambda:  0.1
Best elastic net parameter:  0.1


***Final Model***

In [53]:
# create model
en = LogisticRegression(featuresCol='count_vec',labelCol='label',
                        weightCol='weights',elasticNetParam=0.1,regParam=0.1)
# train the model
en_model = en.fit(train)
# obtain predictions
en_train_pred = en_model.transform(train)
en_test_pred = en_model.transform(test)

In [54]:
# confusion matrix for training data
en_train_confuse = en_train_pred.select('label','prediction')
# 'label' is row and 'prediction' is column
en_train_confuse.stat.crosstab('label','prediction').toPandas()

Unnamed: 0,label_prediction,0.0,1.0
0,1,14,128
1,0,1309,132


In [55]:
# confusion matrix for test data
en_test_confuse = en_test_pred.select('label','prediction')
# 'label' is row and 'prediction' is column
en_test_confuse.stat.crosstab('label','prediction').toPandas()

Unnamed: 0,label_prediction,0.0,1.0
0,1,26,38
1,0,588,61


In [56]:
# output the f1 score
print('F1 score for training set: ',f1_score.evaluate(en_train_pred))
print('F1 score for test set: ',f1_score.evaluate(en_test_pred))

F1 score for training set:  0.6368159203980099
F1 score for test set:  0.4662576687116564


In [57]:
# output the accuracy
print('Accuracy for training set: ',accuracy.evaluate(en_train_pred))
print('Accuracy for test set: ',accuracy.evaluate(en_test_pred))

Accuracy for training set:  0.9077700568540745
Accuracy for test set:  0.8779803646563815


# Linear Support Vector Machine

***Cross Validation***

Tune `regParam` and `maxIter` parameters.

In [58]:
# create linear support vector machine model
lsv_cv = LinearSVC(featuresCol='count_vec',labelCol='label',weightCol='weights')
# parameter grid
lsv_pars = ParamGridBuilder().addGrid(lsv_cv.regParam,[0,0.01,0.1,1,10])\
                             .addGrid(lsv_cv.maxIter,[50,100,200,300]).build()
# create cross validator
cv_lsv = stratifiedCV().setEstimator(lsv_cv)\
                       .setEvaluator(f1_score)\
                       .setEstimatorParamMaps(lsv_pars)

In [59]:
# run cross validation
cv_lsv_model = cv_lsv.fit(train)
# get best model
best_lsv = cv_lsv_model.bestModel
# output best parameter
print('Best reg parameter: ',best_lsv._java_obj.getRegParam())
print('Best : max iterations',best_lsv._java_obj.getMaxIter())

Best reg parameter:  0.01
Best : max iterations 200


***Final Model***

In [60]:
# create model
lsv = LinearSVC(featuresCol='count_vec',labelCol='label',
                weightCol='weights',regParam=0.01,maxIter=200)
# train the model
lsv_model = lsv.fit(train)
# obtain predictions
lsv_train_pred = lsv_model.transform(train)
lsv_test_pred = lsv_model.transform(test)

In [61]:
# confusion matrix for training data
lsv_train_confuse = lsv_train_pred.select('label','prediction')
# 'label' is row and 'prediction' is column
lsv_train_confuse.stat.crosstab('label','prediction').toPandas()

Unnamed: 0,label_prediction,0.0,1.0
0,1,6,136
1,0,1374,67


In [62]:
# confusion matrix for test data
lsv_test_confuse = lsv_test_pred.select('label','prediction')
# 'label' is row and 'prediction' is column
lsv_test_confuse.stat.crosstab('label','prediction').toPandas()

Unnamed: 0,label_prediction,0.0,1.0
0,1,34,30
1,0,607,42


In [63]:
# output the f1 score
print('F1 score for training set: ',f1_score.evaluate(lsv_train_pred))
print('F1 score for test set: ',f1_score.evaluate(lsv_test_pred))

F1 score for training set:  0.7884057971014492
F1 score for test set:  0.4411764705882353


In [64]:
# output the accuracy
print('Accuracy for training set: ',accuracy.evaluate(lsv_train_pred))
print('Accuracy for test set: ',accuracy.evaluate(lsv_test_pred))

Accuracy for training set:  0.9538850284270373
Accuracy for test set:  0.8934081346423562


# Save the Best Model

The best model was ridge regression because it had the largest f1 score on the test set. I will use this model as my final model for predicting sentiment of Amazon Alexa reviews.

In [65]:
# save data frames
ml_results = [ridge_test_pred.toPandas(),ridge_model.summary.pr.toPandas(),ridge_model.coefficients.toArray()]
%store ml_results

Stored 'ml_results' (list)
