## MACS 30123
### Coding Section for Assignment 3
### Professor Jon Clindaniel, TA Dhruval Bhatt 
### Submitted by Junho Choi

I note that all of the code below should be run on the AWS EMR notebook, as instructed in Lab 6 by the Professor. I intentionally have not actually run the codes in this notebook as it was written locally; however, the code results will be discussed in the descriptive part of the submission (i.e., the file `junhoc_hw3.pdf`).

### Part A. Question 1-(a)

#### A-1. Load data and install packages

Firstly, let us load the necessary Amazon Customer Reviews data as `data`. Also, following the Lab 6 notebook, we install the necessary packages (`pandas`, `matplotlib`, and `seaborn`).

In [None]:
## data load-in
data = spark.read.parquet('s3://amazon-reviews-pds/parquet/product_category=Books/*.parquet')

## installing packages
sc.install_pypi_package("matplotlib")
sc.install_pypi_package("seaborn")
sc.install_pypi_package("pandas")

#### A-2. Creating the necessary features

In this part, I will create four additional variables to be added into the feature set. These are: `verified`, `body_wordcount`, `marketplace_feature`, and `pre2005`. I will explain each of them as we go through the below code.

**Creating verified**

The variable in `data` called `verified_purchase` is a binary variable that has either `Y` or `N` as possible values (without missing data). For this to be used as a part of the feature set, I will have to turn it into integer values. I therefore create `verified`, which equals to `1` if `verified_purchase` is `Y` (and to `0` if otherwise), using the below code, and add it to `data`.

In [None]:
data = data.withColumn('verified', (data.verified_purchase == 'Y').cast("integer"))

**Creating body_wordcount**

`body_wordcount` is a variable derived from another variable `review_body`, which is the body of the sample review that was written. As its namesake, `body_wordcount` will count the number of words in the said review. The below code `import`s `pyspark.sql.functions` that is necessary to conduct the word count, create the word count, and add it to `data`.

In [None]:
import pyspark.sql.functions as sparkfn

data = data.withColumn('body_wordcount',
    sparkfn.size(sparkfn.split(sparkfn.col('review_body'), ' ')))

**Creating marketplace_feature**

`marketplace_feature` is derived from `marketplace`, where the latter is the string variable to indicate the country in which the product was marketed. From my inspection, there were five countries (`US`, `DE` (Germany), `JP` (Japan), `FR` (France), and `UK`). I used string-indexing and one-hot-encoding to create `marketplace_feature`.

In [None]:
## inspecting the possible marketplaces; none missing
data.groupBy('marketplace').count().show()

In [None]:
## importing one-hot encoding and string-indexer
from pyspark.ml.feature import OneHotEncoderEstimator
from pyspark.ml.feature import StringIndexer

## building the encoder and indexer
indexer = StringIndexer(inputCol='marketplace', outputCol='marketplace_numeric')
onehot = OneHotEncoderEstimator(
    inputCols=['marketplace_numeric'], outputCols=['marketplace_feature'])

## applying and transforming the data
data = indexer.fit(data).transform(data)
data = onehot.fit(data).transform(data)

**Creating pre2005**

`pre2005` is derived from `year`, where the latter indicates the year in which the product was sold or marketed. From my inspection, there were 21 years possible (from 1995 to 2015, inclusive). `pre2005` equals to 1 if the `year` value is (inclusively) between 1995 and 2004, and equals to 0 if not.

In [None]:
## creating the pre2005 variable
data = data.withColumn('pre2005', (data.year < 2005).cast("integer"))

#### A-3. Grouping the necessary features, and outcome variable creation

We were told to add four features in addition to the `total_votes` feature which was used in Lab 6; I will now group `total_votes`, `marketplace_feature`, `pre2005`, `verified`, and `body_wordcount` as `feature_full`. In addition, I will have to create the outcome variable (i.e., `good_review` for being equal to or more than 4 stars). The below code will take care of these processes.

In [None]:
## importing assembler
from pyspark.ml.feature import VectorAssembler

## bunching the features together
feature_full = ['pre2005', 'verified', 'marketplace_feature'
                'body_wordcount', 'total_votes']
assemble = VectorAssembler(inputCols=feature_full, outputCol='feature_full')
data = assemble.transform(data)

## creating the good_review variable
data = data.withColumn('good_review', (data.star_rating >= 4).cast("integer"))

#### A-4. Creating feature sets with one feature removed

In 1-(b), we are told to produce a plot or a table to show the feature's potential contribution to the model. In order to do so, I will run the logisitic regression models with each of the 4 features (but at most 1) removed. To accomplish this, I create additional feature sets with at most one of the 4 features removed.

In [None]:
## without pre2005
feature_wo1 = ['verified', 'marketplace_feature',
               'body_wordcount', 'total_votes']
assemble_1 = VectorAssembler(inputCols=feature_wo1, outputCol='feature_wo1')
data = assemble_1.transform(data)

## without verified
feature_wo2 = ['pre2005', 'marketplace_feature',
               'body_wordcount', 'total_votes']
assemble_2 = VectorAssembler(inputCols=feature_wo2, outputCol='feature_wo2')
data = assemble_2.transform(data)

## without marketplace_feature
feature_wo3 = ['pre2005', 'verified',
               'body_wordcount', 'total_votes']
assemble_3 = VectorAssembler(inputCols=feature_wo3, outputCol='feature_wo3')
data = assemble_3.transform(data)

## without body_wordcount
feature_wo4 = ['pre2005', 'marketplace_feature', 'verified',
               'total_votes']
assemble_4 = VectorAssembler(inputCols=feature_wo4, outputCol='feature_wo4')
data = assemble_4.transform(data)

### Part B. Question 1-(b)

#### B-1. Running the logistic regression and returning the metrics

Because the step is going to be rather repetitive (and potentially long), I will only demonstrate the code for the full model. I note that one can easily run the model with other feature sets (and produce the necessary metrics like AUC) by replacing `feature_full` with a relevant feature column (e.g., `feature_wo1` above). The results and comparisons are further described in `junhoc_hw3.pdf`.

In [None]:
## splitting the train and test data
train, test = data.randomSplit([0.7, 0.3], seed=60615)

## importing the logit
from pyspark.ml.classification import LogisticRegression

## running the model
lr = LogisticRegression(featuresCol='feature_full', labelCol='good_review')
model_full = lr.fit(train)

In [None]:
## Summaries
summ_train_full = model_full.summary
summ_test_full = model_full.evaluate(test)

## resulting AUC
print("AUC, train, full model: " + str(summ_train_full.areaUnderROC))
print("AUC, test, full model: ", str(summ_test_full.areaUnderROC))
print()

## resulting accuracies
print("Accuracy, train, full model: " + str(summ_train_full.accuracy))
print("Accuracy, test, full model: ", str(summ_test_full.accuracy))
print()

## resulting FPR
print("False positive rate by label (Training):")
for i, rate in enumerate(summ_train_full.falsePositiveRateByLabel):
    print("label %d: %s" % (i, rate))
print()
print("False positive rate by label (Testing):")
for i, rate in enumerate(summ_test_full.falsePositiveRateByLabel):
    print("label %d: %s" % (i, rate))

## resulting TPR
print("\nTrue positive rate by label (Training):")
for i, rate in enumerate(summ_train_full.truePositiveRateByLabel):
    print("label %d: %s" % (i, rate))
    
print("True positive rate by label (Testing):")
for i, rate in enumerate(summ_test_full.truePositiveRateByLabel):
    print("label %d: %s" % (i, rate))

#### B-1. Potential importance of feature, before running the model

Instead of actually running the model to find the feature contributions or importances, we can also create tables and figures to see whether the feature has potential to be important or not. Therefore, I create cross-tabulations for the variables `pre2005`, `verified`, and `marketplace` against the variable `good_review` (the outcome variable). `marketplace_feature` is not used as it is in a one-hot encoded state and is less interpretable by humans. In addition, I create a figure plotting `good_review` against `body_wordcount` using a subsample (0.01%) of the entire dataset, to understand the two's relationship.

**For pre2005**

In [None]:
data.crosstab('good_review', 'pre2005').show()

**For verified**

In [None]:
data.crosstab('good_review', 'verified').show()

**For marketplace (marketplace_feature)**

In [None]:
data.crosstab('good_review', 'marketplace').show()

**For body_wordcount**

In [None]:
plt.close()

plt.figure(figsize=(5, 5))
sampled_df = data.sample(fraction=0.0001).toPandas()
sampled_df.plot.scatter('good_review', 'body_wordcount')
%matplot plt

### Part C. Question 2

Since we need to "balance" the training data (in which there are many more `good_rating=1` observations than those with `good_rating=0`), we can use the `sampleBy` method as mentioned in the documentation. I actually create a `class` called `BinaryBalancer` that utilizes `sampleBy` method and can be fed into the `Pipeline` as well (for later purposes). I note that in creating this (and some of the transformation I define in Part D), I got a lot of help from looking at the following Towards Data Science post (link [here](https://towardsdatascience.com/pyspark-wrap-your-feature-engineering-in-a-pipeline-ee63bdb913)).

**Describing the BinaryBalancer class**

In [None]:
## importing the necessary tool for building the class below
from pyspark.ml.pipeline import Transformer
from pyspark.ml.util import Identifiable

In [None]:
class BinaryBalancer(Transformer):
    '''
    Transformer to return a "balanced" version of
    the dataframe. Note that would work only if
    the target variable to balance has TWO and only TWO
    distinct values, and those values NEED TO BE EITHER
    0 or 1.
    '''
    
    def __init__(self, inputCol='good_review'):
        '''
        Initializer. Specify the input column (i.e., the
        variable you want to balance by) in inputCol.
        '''
        
        self.inputCol = inputCol
        
    def this():
        '''
        For identifying this class.
        '''
        
        this(Identifiable.randomUID('binarybalancer'))
    
    def copy(extra):
        '''
        for retaining a copy.
        '''
        
        defaultCopy(extra)
        
    def _transform(self, df):
        '''
        actual balancing happens here; input your
        target dataframe here.
        '''
        
        ## counting and creating a table of counts
        counter = df.count()
        balancing = df.groupBy(
            self.inputCol).count().collect()
        
        ## initializing the smaller count and its case
        case = balancing[0][0]
        smaller_num = balancing[0][1]
        
        ## finding the actual smaller count
        for i, row in enumerate(balancing):
            if i == 0:
                pass
            else:
                if row[1] < smaller_num:
                    smaller_num = row[1]
                    case = row[0]

        ## target values are either 0 or 1
        othercase = int(1 - case)
        
        ## creating a dictionary for the sampleBy method
        balance_dict = {
            case: 1,
            othercase: (smaller_num / (counter-smaller_num))
        }
        
        ## returning the balanced dataset, with the seed for
        ## replicability
        return df.sampleBy(self.inputCol, balance_dict,
                           seed=60615)

**Creating the transformed (balanced) training data**

In [None]:
## train and test data split
train, test = data.randomSplit([0.7, 0.3], seed=60615)

In [None]:
## initializing the balancer
balancer = BinaryBalancer(inputCol='good_review')

## transforming; stored in train_bal
train_bal = balancer.transform(train)

#### Outputting the results and comparing

In [None]:
## for the original train data
train.groupBy('good_review').count().show()

In [None]:
## for the balanced train data
train_bal.groupBy('good_review').count().show()

### Part D. Question 3-(a)

#### D-1. Pipelines: feature generation

In order to create the pipelines, it is necessary that I create a ``Transformer`` version of the operations I have done in **Part A**. The process for `marketplace_feature` does not need to be altered (as it already uses `StringIndexer` and `OneHotEncoderEstimator` that can be fed into the `Pipeline`), but the other three variables' processes need some clean-up. The below code describes the necessary process.

#### Transformer for the three features: pre2005, body_wordcount, and verified

In [None]:
class ThreeFeatureGenerator(Transformer):
    '''
    Transformer class for generating the following var.s:
    - pre2005: =1 if year<2005, and =0 if not using the
        year variable.
    - body_wordcount: number of words in review_body
    - verified =1 if verified_review='Y', and =0 if not
    '''
    
    def __init__(self,
                 inputCols=['year', 'review_body', 'verified_review'],
                 outputCols=['pre2005', 'body_wordcount', 'verified']):
        '''
        Initializer, for the input columns and output columns.
        '''
        
        self.inputCol_1, self.inputCol_2, self.inputCol_3 = inputCols
        self.outputCol_1, self.outputCol_2, self.outputCol_3 = outputCols
        
    def this():
        '''
        For identifying this class.
        '''
        
        this(Identifiable.randomUID('binarybalancer'))
    
    def copy(extra):
        '''
        for retaining a copy.
        '''
        
        defaultCopy(extra)
        
    def _transform(self, df):
        '''
        Creating the 3 variables mentioned above.
        '''
        
        ## creating pre2005
        rtn_df = df.withColumn(self.outputCol_1,
            sparkfn.when((df[self.inputCol_1] < 2005), 1).otherwise(0))
        
        ## creating body_wordcount
        rtn_df = rtn_df.withColumn(self.outputCol_2,
            sparkfn.size(sparkfn.split(sparkfn.col(self.inputCol_2), ' ')))

        ## creating verified
        rtn_df = rtn_df.withColumn(self.outputCol_3,
            sparkfn.when((df[self.inputCol_3] == 'Y'), 1).otherwise(0))
        
        return rtn_df

#### D-2. Creating the pipeline

I assume that the Dataframe to be passed through the Pipeline described below already has the variable `good_review` as a column (as how we defined it in Lab 6). The Pipeline will accomplish the following steps:

1. Firstly, the Pipeline will create the necessary four features (i.e., `pre2005`, `verified`, `body_wordcount`, and `marketplace_feature`).
2. Using `VectorAssembler`, assemble the necessary features (including `total_votes` variable).
3. Run logistic regression

I emphasize that the below `pipeline` should be applied to a dataset that has _not_ yet generated the features `pre2005`, `verified`, `body_wordcount`, and `marketplace_feature` but has the outcome variable `good_review`. Also, if any training data is used, it should first be "balanced" (using the `BinaryBalancer` in Part C). I demonstrate this in the following sub-part D-3.

In [None]:
## importing Pipeline
from pyspark.ml import Pipeline

In [None]:
## creating the features verified, pre2005, and body_wordcount
tfg = ThreeFeatureGenerator()

## for creating marketplace_feature
indexer = StringIndexer(inputCol='marketplace', outputCol='marketplace_numeric')
onehot = OneHotEncoderEstimator(
    inputCols=['marketplace_numeric'], outputCols=['marketplace_feature'])

## for vector-assembling
feature_full = ['pre2005', 'verified', 'marketplace_feature'
                'body_wordcount', 'total_votes']
assemble = VectorAssembler(inputCols=feature_full, outputCol='feature_full')

## Logistic Regression step
logit = LogisticRegression(featuresCol='feature_full', labelCol='good_review')

## constructing the Pipeline
pipeline = Pipeline(stages=[tfg, indexer, onehot, assemble, logit])

#### D-3. Demonstration of using the pipeline (without the cross-validation)

I will now demonstrate, from data read-in to finally executing the `pipeline`, how to use the `pipeline`.

In [None]:
## reading in the data and creating the outcome variable
df = spark.read.parquet('s3://amazon-reviews-pds/parquet/product_category=Books/*.parquet')
df = df.withColumn('good_review', (df.star_rating >= 4).cast("integer"))

## creating the training and testing data, then balancing the training data
train_df, test_df = df.randomSplit([0.7, 0.3], seed=60615)
balancer = BinaryBalancer(inputCol='good_review')
train_bal_df = balancer.transform(train_df)

## using the pipeline to train the model
## then getting the transformed test data
logit_pipeline = pipeline.fit(train_bal_df)
test_results = logit_pipeline.transform(test_df)

### Part E. Question 4

#### E-1. Function for pipeline-like structure and cross-validation

Despite the `pipeline` above working well on its own, using it with `CrossValidator` seems to cause problems and errors as it lacks sufficient methods (and my coding skills are, unfortunately, not developed enough to add the right method). Instead, therefore, I will define a function that works like a `Pipeline` and also have the `CrossValidator` in the said function (to be denoted `PipeAndCV`).

In [None]:
## importing the necessary tools
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import numpy as np
import pyspark.ml.evaluation as evals

In [None]:
## pipeline to be used in PipeAndCV
pipeline_after_tfg = Pipeline(
    stages=[indexer, onehot, assemble, logit])

In [None]:
def PipeAndCV(df_use, pipeline_outer):
    '''
    A function for pipelining and cross-validating to help
    find the best model
    
    Inputs:
    - df_use: dataframe to be used for training the model
    - pipeline_outer: pipeline to be used after manual cleaning
    
    Output:
    - crossvalidated: cross-validated model by fitting the
        df_use
    
    '''
    
    ## manual cleansing
    tfg = ThreeFeatureGenerator()
    df_tfged = tfg.transform(df_use)
    
    ## setting up the evaluator
    evaluator = evals.BinaryClassificationEvaluator(
        metricName='areaUnderROC', labelCol='good_review')
    
    ## grid building
    params = ParamGridBuilder()
    params = params.addGrid(
        logit.regParam, np.arange(0, 0.1, 0.01)).addGrid(
            logit.elasticNetParam, [0, 1])
    param_built = params.build()
    
    ## Cross validating
    cv = CrossValidator(
        estimator=pipeline_outer,
        estimatorParamMaps=param_built,
        evaluator=evaluator, numFolds=5)
    
    ## Fitting the CVed model
    models = cv.fit(df_tfged)
    
    return models

#### E-2. Retrieving the 'best model'

Here, I will demonstrate running the above function `PipeAndCV`. I note that I use the `train_bal_df` that was defined in sub-part D-3. Then, we can finally retrieve the "best model" in terms of maximized AUC.

In [None]:
## running the PipeAndCV function
models = PipeAndCV(train_bal_df, pipeline_after_tfg)

## bm for best model
bm = models.bestModel

#### E-3. Transforming the test data

I define another function, `TransformTestData`, for transforming the test data. The test data that I will use here is `test_df` that was also defined in sub-part D-3; note that `TransformTestData`, like `PipeAndCV`, has to additionally call upon `ThreeFeatureGenerator` which I had defined earlier.

In [None]:
def TransformTestData(df_use, model):
    '''
    A function for transforming dataframes according to the
    specified model (notably, test datasets after models have
    been fit with the training dataset)
    
    Inputs:
    - df_use: dataframe to be used for transforming according to
        the model specified by the input "model"
    - model: for transforming the df_use after it's been
        cleaned by pipeline_outer
    
    Output:
    - transformed: transformed dataframe to be returned
    
    '''
    ## manual cleansing (using tfg)
    tfg = ThreeFeatureGenerator()
    df_tfged = tfg.transform(df_use)
    
    ## transforming according to the model
    transformed = model.transform(df_tfged)
    
    return transformed

We can apply the above function as follows.

In [None]:
## 'transf' for transformed test dataset
transf = TransformTestData(test_df, bm)

#### E-4. Evaluations from fitting the test data

As the question asks us to return the test data evaluations, I will return AUC, accuracy, true positive rates (for each label) and false positive rates (for each label). The results are discussed in `junhoc_hw4.pdf`.

**AUC**

AUC can be rather easily retrieved, using the following code.

In [None]:
## defining the evaluator
evaluator = evals.BinaryClassificationEvaluator(
    metricName='areaUnderROC', labelCol='good_review')

## saving the AUC
auc_val = evaluator.evaluate(transf)

#### Accuracy, TPR, and FPR

For this, I will try to manually calculate the metrics. In order to do so, I will have to keep track of whether the prediction was successful (i.e., the actual label and predicted label are equal to one another). I save this result in the column `sucess_pred`, and conduct the transformation as follows:

In [None]:
## transformation
transf = transf.withColumn('success_pred',
    sparkfn.when((transf['prediction'] == transf['good_review']), 1).otherwise(0))

## can check by running the below code
transf.select(['good_review', 'prediction', 'success_pred']).show(10)

Now I can calculate the test set prediction accuracy as follows.

In [None]:
## getting the sum of successfully predicted
successful = transf.select(sparkfn.sum('success_pred')).collect()[0][0]

## getting the total count
total = transf.count()

## saving the accuracy
accuracy_val = successful / total

In a similar spirit, we can calculate TPR (true positive rate) for label of 1 (i.e., actual `good_review` value is 1).

In [None]:
## filtering by good_review==1
transf_ones = transf.filter("good_review = 1")

## total number of actually being good_review=1
total_ones = transf_ones.count()

## getting the number of true positives (for good_review=1)
true_pos_ones = transf_ones.select(
    sparkfn.sum('success_pred')).collect()[0][0]

## saving the TPR for good_review=1
tpr_ones = true_pos_ones / total_ones

Notice that TPR for label of 1 is equal to $1$ minus FPR for label of 0, since

$$ TPR(1) = \frac{TP(1)}{TP(1) + FN(1)} = \frac{TN(0)}{TN(0) + FP(0)} = 1 - \frac{FP(0)}{TN(0)+FP(0)} = 1 - FPR(0)$$

where $TP$, $FN$, $TN$, $FP$ refer to true positive, false negative, true negative, and false positive. So, we can get the FPR for the label of 0 as follows:

In [None]:
## saving the FPR for good_review=0
fpr_zeros = 1 - tpr_ones

We can then, in the very similar spirit, output the TPR for label of 0 and FPR for label of 1 as follows:

In [None]:
## filtering by good_review==0
transf_zeros = transf.filter("good_review = 0")

## total number of actually being good_review=0
total_zeros = transf_zeros.count()

## getting the number of true positives (for good_review=1)
true_pos_zeros = transf_zeros.select(
    sparkfn.sum('success_pred')).collect()[0][0]

## saving the TPR for good_review=0
tpr_zeros = true_pos_zeros / total_zeros

## FPR for good_review=1
fpr_ones = 1 - tpr_zeros

#### Printing the results

Results can be output using the following series of `print`s.

In [None]:
print("Test dataset evaluation metrics are as follows:")
print("-----------------------------------------------")
print("Accuracy:", round(accuracy_val, 4))
print("AUC:     ", round(auc_val, 4))
print()
print("True positive rate:")
print("For good_review=1:", round(tpr_ones, 4))
print("For good_review=0:", round(tpr_zeros, 4))
print()
print("False positive rate:")
print("For good_review=1:", round(fpr_ones, 4))
print("For good_review=0:", round(fpr_zeros, 4))

#### E-5. Best model (hyper)parameters

In case the best model (hyper)parameters are needed, I use the code below to output them (for `regParam` and `elasticNetParam`). I refer to the following link ([here](https://stackoverflow.com/questions/36697304/how-to-extract-model-hyper-parameters-from-spark-ml-in-pyspark)) of a StackOverflow post to print the below two hyperparameters.

In [None]:
regparam_val = bm.stages[-1]._java_obj.getRegParam()
elnet_val = bm.stages[-1]._java_obj.getElasticNetParam()

print(
    "Best regParam is {}, and best elasticNetParam is {}.".format(
        regparam_val, elnet_val))

### Part F. Question 5

For this question, it is only about running the above `PipeAndCV` multiple times with different numbers of cores specified, so there is nothing to further elaborate.