# **PySpark + Colab Setup**

##  Install PySpark and related dependencies

In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 32 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 49.1 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=34cfd11f6639ad6f3d98213d0dcd94b2081442295e924773e39fc3871029d45c
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


##  Import useful PySpark packages

In [2]:
import pyspark
import requests
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf

##  Create Spark context

In [3]:
# Create the session
conf = SparkConf().set("spark.ui.port", "4050").set('spark.executor.memory', '4G').set('spark.driver.memory', '45G').set('spark.driver.maxResultSize', '10G')

# Create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

##  Check everything is ok

In [4]:
spark

In [5]:
sc._conf.getAll()

[('spark.driver.memory', '45G'),
 ('spark.driver.port', '45005'),
 ('spark.driver.host', '48f99df0b889'),
 ('spark.executor.id', 'driver'),
 ('spark.sql.warehouse.dir', 'file:/content/spark-warehouse'),
 ('spark.driver.maxResultSize', '10G'),
 ('spark.app.name', 'pyspark-shell'),
 ('spark.ui.port', '4050'),
 ('spark.app.startTime', '1654631720437'),
 ('spark.app.id', 'local-1654631722023'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.submit.pyFiles', ''),
 ('spark.submit.deployMode', 'client'),
 ('spark.executor.memory', '4G'),
 ('spark.ui.showConsoleProgress', 'true')]

# **Data acquisition from kaggle**


**Important!** In order to fetch the dataset from kaggle **the user needs to provide the API with his personal kaggle.json file** provided by by Kaggle. After the execution of the cell below there will appear a button to upload the file, after which the cell will finish it's execution.

In [None]:
!pip install kaggle
from google.colab import files
uploaded = files.upload()
for fn in uploaded.keys():
  print('User uploaded file "{name}" with length {length} bytes'.format(
      name=fn, length=len(uploaded[fn])))
# Then move kaggle.json into the folder where the API expects to find it.
!mkdir -p ~/.kaggle/ && mv kaggle.json ~/.kaggle/ && chmod 600 ~/.kaggle/kaggle.json

In [None]:
!kaggle datasets download -d kritanjalijain/amazon-reviews 

In [8]:
!unzip amazon-reviews.zip

Archive:  amazon-reviews.zip
  inflating: amazon_review_polarity_csv.tgz  
  inflating: test.csv                
  inflating: train.csv               


In [9]:
train_df = spark.read.load('train.csv', format="csv", sep=",", inferSchema="true")
test_df = spark.read.load('test.csv', format="csv", sep=",", inferSchema="true")

# We limit the train and test set in order for all the models to fit into 25gb of ram.
train_df = train_df.limit(200000)
test_df = test_df.limit(40000)      

# **Data preprocessing**


## Change column names



In [10]:
# Spark DataFrame
columns = ['label', 'title', 'text']
train_df = train_df.toDF(*columns)
test_df = test_df.toDF(*columns)

## Delete "title" column



In [11]:
train_df = train_df.drop("title")
test_df = test_df.drop("title")

## Delete any null values

In [12]:
train_df = train_df.dropna()
test_df = test_df.dropna()

## Make label of binary form - good reviews are 0, bad reviews are 1

In [13]:
train_df = train_df.replace(2, 0)
test_df = test_df.replace(2, 0)

## **Text cleaning pipeline**

As **preliminary** steps of any NLP task, at least the following pipeline must be executed first:

- Text cleaning:
 - Case normalization (<code>lower</code>) -> convert all text to lower case;
 - Filter out _leading_ and _trailing_ whitespaces (<code>trim</code>);
 - Filter out punctuation symbols (<code>regexp_replace</code>);
 - Filter out any internal extra whitespace resulting from the step above (<code>regexp_replace</code> + <code>trim</code>).
- Tokenization (<code>Tokenizer</code>): splitting raw text into a list of individual _tokens_ (i.e., words), typically using whitespace as delimiter 
- Stopwords removal (<code>StopWordsRemover</code>): removing so-called _stopwords_, namely words that do not contribute to the deeper meaning of the document like "the", "a", "me", etc.
- Stemming (<code>SnowballStemmer</code>): reducing each word to its root or base. For example "fishing", "fished", "fisher" all reduce to the stem "fish".

In [14]:
def clean_text(df, column_name="text"):
    
    from pyspark.sql.functions import udf, col, lower, trim, regexp_replace
    from pyspark.ml.feature import Tokenizer, StopWordsRemover
    from nltk.stem.snowball import SnowballStemmer

    print("***** Text Preprocessing Pipeline *****\n")
    print("# 1. Text Cleaning\n")
    
    print("1.a Case normalization:")
    lower_case_news_df = df.select("label", lower(col(column_name)).alias(column_name))
    lower_case_news_df.show(10)
   
    print("1.b Trimming:")
    trimmed_news_df = lower_case_news_df.select("label", trim(col(column_name)).alias(column_name))
    trimmed_news_df.show(10)
    
    print("1.c Filter out punctuation:")
    no_punct_news_df = trimmed_news_df.select("label", (regexp_replace(col(column_name), "[^a-zA-Z\\s]", "")).alias(column_name))
    no_punct_news_df.show(10)
    
    print("1.d Filter out extra whitespaces:")
    cleaned_news_df = no_punct_news_df.select("label", trim(regexp_replace(col(column_name), " +", " ")).alias(column_name))
    cleaned_news_df.show(10)

    print("# 2. Tokenization:")
    tokenizer = Tokenizer(inputCol=column_name, outputCol="tokens")
    tokens_df = tokenizer.transform(cleaned_news_df)
    tokens_df.show(10)

    print("# 3. Stopwords removal:")
    stopwords_remover = StopWordsRemover(inputCol="tokens", outputCol="terms")
    terms_df = stopwords_remover.transform(tokens_df)
    terms_df.show(10)

    print("# 4. Stemming:")
    stemmer = SnowballStemmer(language="english")
    stemmer_udf = udf(lambda tokens: [stemmer.stem(token) for token in tokens], ArrayType(StringType()))
    terms_stemmed_df = terms_df.withColumn("terms_stemmed", stemmer_udf("terms"))
    terms_stemmed_df.show(10)
    
    return terms_stemmed_df

In [None]:
clean_train_df = clean_text(train_df)
clean_train_df.cache()

clean_test_df = clean_text(test_df)
clean_test_df.cache()

#**Feature Engineering**

Machine learning techniques cannot work directly on text data; in fact, words must be first converted into some numerical representation which machine learning algorithms can make use of. This process is often known as _embedding_ or _vectorization_.

In terms of vectorization, it is important to remember that it isn't merely turning a single word into a single number. While words can be transformed into numbers, an entire document can be translated into a vector. Moreover, vectors derived from text data are usually high-dimensional. This is because each dimension of the feature space will correspond to a word, and the language in the documents may have thousands of words.

## TF-IDF

In information retrieval, **tf-idf** - short for term frequency-inverse document frequency - is a numerical statistic that is intended to reflect how important a word is to a document in a collection or corpus.

The tf-idf value increases proportionally to the number of times a word appears in the document and is offset by the frequency of the word in the corpus, which helps to adjust for the fact that some words appear more frequently in general.

In [16]:
def extract_tfidf_features(df, column_name="terms_stemmed"):

    from pyspark.ml.feature import HashingTF, CountVectorizer, IDF
    from pyspark.ml import Pipeline

    hashingTF = HashingTF(inputCol=column_name, outputCol="tf_features", numFeatures=10000)
    idf = IDF(inputCol="tf_features", outputCol="features")

    pipeline = Pipeline(stages=[hashingTF, idf])
    features = pipeline.fit(df)
    
    tf_idf_features_df = features.transform(df)

    return tf_idf_features_df

In [17]:
tf_idf_train_df = extract_tfidf_features(clean_train_df)
tf_idf_train_df.cache()

tf_idf_test_df = extract_tfidf_features(clean_test_df)
tf_idf_test_df.cache()

DataFrame[label: int, text: string, tokens: array<string>, terms: array<string>, terms_stemmed: array<string>, tf_features: vector, features: vector]

### Clean-up unused variables

In [18]:
# Delete objects which are not needed, and invoking python's garbage collector
import gc
del(train_df)
del(test_df)
del(clean_train_df)
del(clean_test_df)
print("Garbage collector: collected %d objects" % (gc.collect()))

Garbage collector: collected 698 objects


### Check and remove any possible zero-length vector

In [19]:
@udf("long")
def num_nonzeros(v):
    return v.numNonzeros()

### Check if there is any zero-lenght vector



In [20]:
print("Total n. of zero-length vectors in train df: {:d}".
      format(tf_idf_train_df.where(num_nonzeros("features") == 0).count()), 
      "and in test df: {:d}".
      format(tf_idf_test_df.where(num_nonzeros("features") == 0).count()))

Total n. of zero-length vectors in train df: 12 and in test df: 1


### Remove zero-lenght vector(s)


In [21]:
tf_idf_train_df = tf_idf_train_df.where(num_nonzeros("features") > 0)
tf_idf_test_df = tf_idf_test_df.where(num_nonzeros("features") > 0)

### Double-check there is no more zero-length vector

In [22]:
print("Total n. of zero-length vectors (after removal): {:d}".
      format(tf_idf_train_df.where(num_nonzeros("features") == 0).count()), 
      "and in test df: {:d}".
      format(tf_idf_test_df.where(num_nonzeros("features") == 0).count()))

Total n. of zero-length vectors (after removal): 0 and in test df: 0


In [23]:
print("Garbage collector: collected %d objects" % (gc.collect()))

Garbage collector: collected 384 objects


### Select specific columns

In [24]:
tf_idf_train_df = tf_idf_train_df.select(["features", "label"])

# **Model training with different hyperparameter configurations and $k$-fold cross validation**

In the following sections, we try we implement a pipeline making use of different hyperparameters and also of $k$-fold cross validation to get a better estimate of the generalization performance of our 3 models.
1. Naive Bayes model
2. Logistic Regression model
3. Random Forest model


## **1. Naive Bayes Classifier**
Firstly, we will train a naive bayes classifier, using the training set above. 

We will use the `NaiveBayes` object provided by the [PySpark API](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.classification.DecisionTreeClassifier) within the package `pyspark.ml.classification`.

In [25]:
def naive_bayes_pipeline(  train, 
                           with_std=True,
                           with_mean=True,
                           k_fold=5):

    from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
    from pyspark.ml.classification import NaiveBayes
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    from pyspark.ml import Pipeline
 
    stages = []
    nb = NaiveBayes(featuresCol="features", labelCol="label") 
    stages += [nb]
    pipeline = Pipeline(stages=stages)

    # With 6 values for alpha (the smoothing parameter) which helps tackle the problem of zero probability
    param_grid = ParamGridBuilder()\
    .addGrid(nb.smoothing, [0.0, 0.2, 0.4, 0.6, 0.8, 1.0])\
    .build()
    
    cross_val = CrossValidator(estimator=pipeline, 
                               estimatorParamMaps=param_grid,
                               evaluator=BinaryClassificationEvaluator(metricName="areaUnderROC"), # default = "areaUnderROC", alternatively "areaUnderPR"
                               numFolds=k_fold,
                               collectSubModels=True 
                               )
    cv_model = cross_val.fit(train)

    return cv_model

In [26]:
cv_model = naive_bayes_pipeline(tf_idf_train_df)

In [27]:
for i, avg_roc_auc in enumerate(cv_model.avgMetrics):
    print("Avg. ROC AUC computed across k-fold cross validation for model setting #{:d}: {:.3f}".format(i+1, avg_roc_auc))

Avg. ROC AUC computed across k-fold cross validation for model setting #1: 0.469
Avg. ROC AUC computed across k-fold cross validation for model setting #2: 0.469
Avg. ROC AUC computed across k-fold cross validation for model setting #3: 0.469
Avg. ROC AUC computed across k-fold cross validation for model setting #4: 0.469
Avg. ROC AUC computed across k-fold cross validation for model setting #5: 0.469
Avg. ROC AUC computed across k-fold cross validation for model setting #6: 0.469


In [28]:
print("Best model according to k-fold cross validation, smoothing=[{:f}]".
      format(cv_model.bestModel.stages[-1].getSmoothing())
      )
print(cv_model.bestModel.stages[-1])


Best model according to k-fold cross validation, smoothing=[0.000000]
NaiveBayesModel: uid=NaiveBayes_9576143cb991, modelType=multinomial, numClasses=2, numFeatures=10000


### Using the best model from $k$-fold cross validation to make predictions

In [29]:
test_predictions = cv_model.transform(tf_idf_test_df)

In [30]:
test_predictions.select("features", "prediction", "label").show(5)

+--------------------+----------+-----+
|            features|prediction|label|
+--------------------+----------+-----+
|(10000,[281,306,3...|       0.0|    0|
|(10000,[157,158,2...|       0.0|    0|
|(10000,[157,355,4...|       1.0|    1|
|(10000,[490,1604,...|       1.0|    0|
|(10000,[15,157,15...|       0.0|    0|
+--------------------+----------+-----+
only showing top 5 rows



### Evaluate model performance on the Test Set

In [31]:
def evaluate_model(predictions, metric="areaUnderROC"):
    
    from pyspark.ml.evaluation import BinaryClassificationEvaluator

    evaluator = BinaryClassificationEvaluator(metricName=metric)

    return evaluator.evaluate(predictions)

In [32]:
print("***** Test Set *****")
print("Area Under ROC Curve (ROC AUC): {:.3f}".format(evaluate_model(test_predictions)))
print("Area Under Precision-Recall Curve: {:.3f}".format(evaluate_model(test_predictions, metric="areaUnderPR")))
print("***** Test Set *****")

***** Test Set *****
Area Under ROC Curve (ROC AUC): 0.469
Area Under Precision-Recall Curve: 0.464
***** Test Set *****


### Observations

Since we know that we have two classes to classify, a random predictor would have a theoretical maximum of $50\%$ accuracy. For us to say that a model performed well enough, it should have a measurable advantage over the random predictor. This is not the case, as with the Naive Bayes Classifier, the model doen't seem to generalise well. 

## **2. Logistic Regression**

We then train a logistic regression model, using the training set above. To do so, we use the `LogisticRegression` object provided by the [PySpark API](https://spark.apache.org/docs/latest/ml-classification-regression.html#logistic-regression) within the package `pyspark.ml.classification`.


More specifically, we will tune the two hyperparameters: 
- $\lambda$ = `regParam` which is the regulation parameter
- $\alpha$ = `elasticNetParam` that is the tradeoff parameter for regularization penalties

Specifically:
  - `regParam = 0` and `elasticNetParam = 0` means there is no regularization;
  - `regParam > 0` and `elasticNetParam = 0` means there is only L2-regularization; 
  - `regParam > 0` and `elasticNetParam = 1` means there is only L1-regularization;
  - `regParam > 0` and `0 < elasticNetParam < 1` means there is both L1- and L2-regularization (Elastic Net);

In [33]:
def logistic_regression_pipeline(train, 
                                 with_std=True,
                                 with_mean=True,
                                 k_fold=5):

    from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
    from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
    from pyspark.ml.classification import LogisticRegression
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    from pyspark.ml import Pipeline

    stages = []
    log_reg = LogisticRegression(featuresCol="features", labelCol="label", maxIter=100)
    stages += [log_reg]
    pipeline = Pipeline(stages=stages)

    # With 3 values for log_reg.regParam ($\lambda$) and 3 values for log_reg.elasticNetParam ($\alpha$),
    # this grid will have 3 x 3 = 9 parameter settings for CrossValidator to choose from.
    param_grid = ParamGridBuilder()\
    .addGrid(log_reg.regParam, [0.0, 0.05, 0.1]) \
    .addGrid(log_reg.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()
    
    cross_val = CrossValidator(estimator=pipeline, 
                               estimatorParamMaps=param_grid,
                               evaluator=BinaryClassificationEvaluator(metricName="areaUnderROC"), # default = "areaUnderROC", alternatively "areaUnderPR"
                               numFolds=k_fold,
                               collectSubModels=True 
                               )
    cv_model = cross_val.fit(train)

    return cv_model

In [34]:
cv_model = logistic_regression_pipeline(tf_idf_train_df)

In [35]:
# This function summarizes all the models trained during k-fold cross validation
def summarize_all_models(cv_models):
    for k, models in enumerate(cv_models):
        print("*************** Fold #{:d} ***************\n".format(k+1))
        for i, m in enumerate(models):
            print("--- Model #{:d} out of {:d} ---".format(i+1, len(models)))
            print("\tParameters: lambda=[{:.3f}]; alpha=[{:.3f}] ".format(m.stages[-1]._java_obj.getRegParam(), m.stages[-1]._java_obj.getElasticNetParam()))
            print("\tModel summary: {}\n".format(m.stages[-1]))
        print("***************************************\n")

In [36]:
summarize_all_models(cv_model.subModels)

*************** Fold #1 ***************

--- Model #1 out of 9 ---
	Parameters: lambda=[0.000]; alpha=[0.000] 
	Model summary: LogisticRegressionModel: uid=LogisticRegression_8b776098f592, numClasses=2, numFeatures=10000

--- Model #2 out of 9 ---
	Parameters: lambda=[0.000]; alpha=[0.500] 
	Model summary: LogisticRegressionModel: uid=LogisticRegression_8b776098f592, numClasses=2, numFeatures=10000

--- Model #3 out of 9 ---
	Parameters: lambda=[0.000]; alpha=[1.000] 
	Model summary: LogisticRegressionModel: uid=LogisticRegression_8b776098f592, numClasses=2, numFeatures=10000

--- Model #4 out of 9 ---
	Parameters: lambda=[0.050]; alpha=[0.000] 
	Model summary: LogisticRegressionModel: uid=LogisticRegression_8b776098f592, numClasses=2, numFeatures=10000

--- Model #5 out of 9 ---
	Parameters: lambda=[0.050]; alpha=[0.500] 
	Model summary: LogisticRegressionModel: uid=LogisticRegression_8b776098f592, numClasses=2, numFeatures=10000

--- Model #6 out of 9 ---
	Parameters: lambda=[0.050];

In [37]:
for i, avg_roc_auc in enumerate(cv_model.avgMetrics):
    print("Avg. ROC AUC computed across k-fold cross validation for model setting #{:d}: {:.3f}".format(i+1, avg_roc_auc))

Avg. ROC AUC computed across k-fold cross validation for model setting #1: 0.894
Avg. ROC AUC computed across k-fold cross validation for model setting #2: 0.894
Avg. ROC AUC computed across k-fold cross validation for model setting #3: 0.894
Avg. ROC AUC computed across k-fold cross validation for model setting #4: 0.901
Avg. ROC AUC computed across k-fold cross validation for model setting #5: 0.809
Avg. ROC AUC computed across k-fold cross validation for model setting #6: 0.728
Avg. ROC AUC computed across k-fold cross validation for model setting #7: 0.903
Avg. ROC AUC computed across k-fold cross validation for model setting #8: 0.728
Avg. ROC AUC computed across k-fold cross validation for model setting #9: 0.500


In [38]:
print("Best model according to k-fold cross validation: lambda=[{:.3f}]; alfa=[{:.3f}]".
      format(cv_model.bestModel.stages[-1]._java_obj.getRegParam(), 
             cv_model.bestModel.stages[-1]._java_obj.getElasticNetParam(),
             )
      )
print(cv_model.bestModel.stages[-1])

Best model according to k-fold cross validation: lambda=[0.100]; alfa=[0.000]
LogisticRegressionModel: uid=LogisticRegression_8b776098f592, numClasses=2, numFeatures=10000


### Summarize model performance on the Training Set

In [39]:
training_result = cv_model.bestModel.stages[-1].summary
print("***** Training Set *****")
print("Area Under ROC Curve (ROC AUC): {:.3f}".format(training_result.areaUnderROC))
print("Accuracy :",training_result.accuracy)
print("Recall:",training_result.recallByLabel)
print("Precision:",training_result.precisionByLabel)
print("***** Training Set *****")

***** Training Set *****
Area Under ROC Curve (ROC AUC): 0.924
Accuracy : 0.8461657699461967
Recall: [0.8674067557014206, 0.8244240050997177]
Precision: [0.8348969533197587, 0.8586468542522921]
***** Training Set *****


### Using the best model from $k$-fold cross validation to make predictions

In [40]:
test_predictions = cv_model.transform(tf_idf_test_df)

In [41]:
test_predictions.select("features", "prediction", "label").show(5)

+--------------------+----------+-----+
|            features|prediction|label|
+--------------------+----------+-----+
|(10000,[281,306,3...|       0.0|    0|
|(10000,[157,158,2...|       0.0|    0|
|(10000,[157,355,4...|       1.0|    1|
|(10000,[490,1604,...|       1.0|    0|
|(10000,[15,157,15...|       0.0|    0|
+--------------------+----------+-----+
only showing top 5 rows



### Evaluate model performance on the Test Set

In [42]:
print("***** Test Set *****")
print("Area Under ROC Curve (ROC AUC): {:.3f}".format(evaluate_model(test_predictions)))
print("Area Under Precision-Recall Curve: {:.3f}".format(evaluate_model(test_predictions, metric="areaUnderPR")))
print("***** Test Set *****")

***** Test Set *****
Area Under ROC Curve (ROC AUC): 0.901
Area Under Precision-Recall Curve: 0.898
***** Test Set *****


### Observations

This time we can see that this model in about the same training time performs much better than the Naive Bayes Classifier without the space complexity that is required by NB. For a non deep learning method, the results achieved across both metrics are very good.

## **3. Random Forests**
We now train a random forest, using the training set above.

We will use the `RandomForestClassifier` object provided by the [PySpark API](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.classification.DecisionTreeClassifier) within the package `pyspark.ml.classification`.

In [43]:
def random_forest_pipeline(train, 
                           with_std=True,
                           with_mean=True,
                           k_fold=5):

    from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
    from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
    from pyspark.ml.classification import RandomForestClassifier
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    from pyspark.ml import Pipeline
 
    stages = []
    rf = RandomForestClassifier(featuresCol="features", labelCol="label")
    stages += [rf]
    pipeline = Pipeline(stages=stages)

    # With 2 values for rf.maxDepth and 2 values for rf.numTrees
    # this grid will have 2 x 2 = 4 parameter settings for CrossValidator to choose from.8,100
    param_grid = ParamGridBuilder()\
    .addGrid(rf.maxDepth, [3, 5]) \
    .addGrid(rf.numTrees, [10, 50]) \
    .build()
    
    cross_val = CrossValidator(estimator=pipeline, 
                               estimatorParamMaps=param_grid,
                               evaluator=BinaryClassificationEvaluator(metricName="areaUnderROC"), # default = "areaUnderROC", alternatively "areaUnderPR"
                               numFolds=k_fold,
                               collectSubModels=True 
                               )
    cv_model = cross_val.fit(train)
    return cv_model

In [44]:
cv_model = random_forest_pipeline(tf_idf_train_df)

In [45]:
for i, avg_roc_auc in enumerate(cv_model.avgMetrics):
    print("Avg. ROC AUC computed across k-fold cross validation for model setting #{:d}: {:.3f}".format(i+1, avg_roc_auc))

Avg. ROC AUC computed across k-fold cross validation for model setting #1: 0.671
Avg. ROC AUC computed across k-fold cross validation for model setting #2: 0.804
Avg. ROC AUC computed across k-fold cross validation for model setting #3: 0.738
Avg. ROC AUC computed across k-fold cross validation for model setting #4: 0.825


In [46]:
print("Best model according to k-fold cross validation: maxDept=[{:d}]".
      format(cv_model.bestModel.stages[-1]._java_obj.getNumTrees(), 
             )
      )
print(cv_model.bestModel.stages[-1])

Best model according to k-fold cross validation: maxDept=[50]
RandomForestClassificationModel: uid=RandomForestClassifier_f5c1d455d444, numTrees=50, numClasses=2, numFeatures=10000


### Summarize model performance on the Training Set

In [51]:
training_result = cv_model.bestModel.stages[-1].summary
print("***** Training Set *****")
print("Area Under ROC Curve (ROC AUC): {:.3f}".format(training_result.areaUnderROC))
print("Recall:",training_result.recallByLabel)
print("Precision:",training_result.precisionByLabel)
print("Accuracy:", training_result.accuracy)
print("***** Training Set *****")

***** Training Set *****
Area Under ROC Curve (ROC AUC): 0.821
Recall: [0.8650540238634229, 0.5818332675631647]
Precision: [0.679225365777933, 0.808145826599019]
Accuracy: 0.7250935056103366
***** Training Set *****


### Using the best model from $k$-fold cross validation to make predictions

In [48]:
test_predictions = cv_model.transform(tf_idf_test_df)

In [49]:
test_predictions.select("features", "prediction", "label").show(5)

+--------------------+----------+-----+
|            features|prediction|label|
+--------------------+----------+-----+
|(10000,[281,306,3...|       0.0|    0|
|(10000,[157,158,2...|       0.0|    0|
|(10000,[157,355,4...|       1.0|    1|
|(10000,[490,1604,...|       1.0|    0|
|(10000,[15,157,15...|       1.0|    0|
+--------------------+----------+-----+
only showing top 5 rows



### Evaluate model performance on the Test Set

In [50]:
print("***** Test Set *****")
print("Area Under ROC Curve (ROC AUC): {:.3f}".format(evaluate_model(test_predictions)))
print("Area Under Precision-Recall Curve: {:.3f}".format(evaluate_model(test_predictions, metric="areaUnderPR")))
print("***** Test Set *****")

***** Test Set *****
Area Under ROC Curve (ROC AUC): 0.817
Area Under Precision-Recall Curve: 0.814
***** Test Set *****


### Observations
This time although the model performs better than the NB classifier, yielding measurable results, compared to the model which used Logistic Regression for Classification, it achieves worse metric results while it takes more time and more memory.

# **Final Remarks**
As we observed, out of the 3 models, the model which used the Logistic Regression Classifier was able to outperform the other two. While not trading off many computational resources, it was able to achieve great results for a non DL method.