Ryan Leeson, Keith Jennings

In [2]:
#! scancel -u ryan.leeson -n sparkcluster 

In [6]:
import os, atexit, sys, findspark, sparkhpc, pyspark
from pyspark.context import SparkContext
from pyspark.sql import SQLContext

findspark.init()

# specify your partition (unless you're OK with default)
os.environ['SBATCH_PARTITION']='cpu24'

sj = sparkhpc.sparkjob.sparkjob(
    ncores = 10,                       # total number or cores
    cores_per_executor = 5,            # parallelism of two executor
    memory_per_core = 10240,           # memory per core in MB 
    walltime = "4:0"                   # hh:mm format
)

sj.wait_to_start()
sc = sj.start_spark()
scq = SQLContext(sc)

def exitHandler(sj,sc):
    try:
        print('Trapped Exit cleaning up Spark Context')
        sc.stop()
    except:
        pass
    try:
        print('Trapped Exit cleaning up Spark Job')
        sj.stop()
    except:
        pass
atexit.register(exitHandler,sj,sc);
sc

INFO:sparkhpc.sparkjob:Submitted batch job 8796

INFO:sparkhpc.sparkjob:Submitted cluster 2


In [39]:
from textblob import TextBlob

from pyspark.ml.feature import StopWordsRemover

from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf, concat
from pyspark.sql import functions as f

from pyspark.ml import Pipeline

from pyspark.sql.functions import input_file_name
from pyspark.sql.types import *

from pyspark.sql.functions import split, lower

from pyspark.ml.feature import NGram, FeatureHasher, CountVectorizer, RegexTokenizer

from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.feature import VectorAssembler


from itertools import chain
import numpy as np
import pandas as pd


from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params, TypeConverters
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable  
from pyspark.sql.functions import udf, create_map, lit
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.window import Window

from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder, CrossValidator
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

In [20]:
df = scq.read.parquet ('Files/appliance.parquet').select ('overall', 'reviewText')

In [21]:
df.show (3)

+-------+--------------------+
|overall|          reviewText|
+-------+--------------------+
|    5.0|I like this as a ...|
|    5.0|           good item|
|    5.0|Fit my new LG dry...|
+-------+--------------------+
only showing top 3 rows



In [22]:
print (f'There are {df.count ()} entries in the dataset.')

There are 2277 entries in the dataset.


In [23]:
(df.groupBy ('overall')
 .count ()
 .withColumn ('percent', f.col ('count') / f.sum ('count').over (Window.partitionBy ()))
 .orderBy ('overall', ascending = False)
 #.show ()
)

DataFrame[overall: double, count: bigint, percent: double]

In [24]:
#   Remove properties with duplicate IDs

df_cleaned = df.where (df.overall > 0).dropDuplicates ().na.drop (subset = ['reviewText'])

print (f'After cleaning there are {df_cleaned.count ()} entries remaining.')

After cleaning there are 145 entries remaining.


In [25]:
df_counts = (df_cleaned.groupBy ('overall')
 .count ()
 .withColumn ('percent', f.col ('count') / f.sum ('count').over (Window.partitionBy ()))
 .orderBy ('overall', ascending = False)
)

#display (df_counts.toPandas ())

In [26]:
#   get weights of classes

#Begin Multinomial Logistic Regression with Weights

#   To account for the unbalanced nature of the dataset in terms of the classes (5-star ratings),
#   the weights are calculated to give more value to the classes with fewer entries and reduces 
#   the value of the classes with larger numbers of entries.

#create column weights
label_freq = df_cleaned.select("overall").groupBy("overall").count().collect()
unique_label = [x["overall"] for x in label_freq]
total_label = sum([x["count"] for x in label_freq])
unique_label_count = len(label_freq)
bin_count = [x["count"] for x in label_freq]

label_weights = {i: ii for i, ii in zip(unique_label, total_label / (unique_label_count * np.array(bin_count)))}
print(label_weights)

from itertools import chain

mapping_expr = f.create_map([f.lit(x) for x in chain(*label_weights.items())])

df_weighted = df_cleaned.withColumn("weight", mapping_expr.getItem(f.col("overall")))

#source: https://danvatterott.com/blog/2019/11/18/balancing-model-weights-in-pyspark/

{1.0: 5.8, 4.0: 2.4166666666666665, 3.0: 4.142857142857143, 2.0: 4.142857142857143, 5.0: 0.2543859649122807}


In [27]:
df_weighted.show ()

+-------+--------------------+------------------+
|overall|          reviewText|            weight|
+-------+--------------------+------------------+
|    5.0|      Worked great!!|0.2543859649122807|
|    5.0|works great and v...|0.2543859649122807|
|    5.0|Worked great at h...|0.2543859649122807|
|    5.0|works great instr...|0.2543859649122807|
|    5.0|         received ok|0.2543859649122807|
|    5.0|                  A+|0.2543859649122807|
|    5.0|These start devic...|0.2543859649122807|
|    4.0|Works great. Best...|2.4166666666666665|
|    5.0|Worked great!! I ...|0.2543859649122807|
|    2.0|             ok part| 4.142857142857143|
|    3.0|       they ok parts| 4.142857142857143|
|    5.0|This review is fo...|0.2543859649122807|
|    5.0|Good value for el...|0.2543859649122807|
|    5.0|This is the OEM f...|0.2543859649122807|
|    5.0|This works great,...|0.2543859649122807|
|    5.0|These filters wor...|0.2543859649122807|
|    2.0|               cheap| 4.142857142857143|


In [28]:
#   Custom transformers

class PolarityTransformer (Transformer, HasInputCol, HasOutputCol, DefaultParamsReadable, DefaultParamsWritable):

#   The polairtyTransformer uses the package 'TextBlob' to compute the pority of a string, how negative or how 
#   positive a string might be based on the words used. 

    @keyword_only
    def __init__(self, inputCol=None, outputCol=None):
        super (PolarityTransformer, self).__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def _transform(self, dataset):
        
        def func (string):
            return TextBlob (string).sentiment.polarity

        t = StringType ()
        out_col = self.getOutputCol ()
        in_col = dataset[self.getInputCol ()]
        return dataset.withColumn (out_col, udf (func, t)(in_col).cast ('double'))


class WordSentimentTransformer (Transformer, HasInputCol, HasOutputCol, DefaultParamsReadable, DefaultParamsWritable):
        
#   Similar to PolarityTransformer, WordSentimentTransformer uses 'TextBlob' to evaluate the positivity or the 
#   negativity of a word. WordSentimentTransformer examines each word in a string, determines its polarity (positive,
#   negative, or netural), and returns the counts of positive, negative, and neutral words as a vector.

    @keyword_only
    def __init__(self, inputCol=None, outputCol=None):
        super (WordSentimentTransformer, self).__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def _transform(self, dataset):
        def WordsFreq (array):
            
            posWord = 0
            negWord = 0
            neutWord = 0

            for word in array:
                

                if TextBlob (word).sentiment.polarity > 0:
                    posWord += 1
                elif TextBlob (word).sentiment.polarity < 0:
                    negWord += 1
                else:
                    neutWord += 1

            return posWord, negWord, neutWord

        word_freq = udf (lambda a: Vectors.dense (WordsFreq (a)), VectorUDT ())

        t = ArrayType (StringType ())
        out_col = self.getOutputCol ()
        in_col = dataset[self.getInputCol ()]
        return dataset.withColumn (out_col, word_freq (in_col))

### Random Forest Classification

We are attempting to train a random forest classifier to predict the star rating of a product besed on the review.

In [30]:
tokenizer = RegexTokenizer (inputCol = "reviewText", outputCol = "words", pattern = "\\W", toLowercase = True)

remover = StopWordsRemover (inputCol = tokenizer.getOutputCol (), outputCol = "filtered")

wordsentiment = WordSentimentTransformer (inputCol = remover.getOutputCol (), outputCol = 'word_sentiment')

polarity = PolarityTransformer (inputCol = 'reviewText', outputCol = 'polarity')


assembler = VectorAssembler (inputCols = ['polarity', 'word_sentiment'], outputCol = 'features')


#   Setting Random Forest Classifier

randforest = RandomForestClassifier (labelCol = 'overall', featuresCol = assembler.getOutputCol (), numTrees = 10, maxMemoryInMB = 1024)

In [31]:
(training, testing) = df_weighted.randomSplit ([0.8, 0.2])

In [None]:
#   Setting Random Forest Classification mdoel
randforest = RandomForestClassifier (labelCol = 'overall', featuresCol = assembler.getOutputCol (), numTrees = 10)

In [45]:
#   Creating a pipeline for the randomforest classification
#   During the pipeline the polarity of each review will be computed,
#   the review will be tokenised,
#   with the tokens, the stopwords will be removed,
#   the polarity of each word in the array will be used to determine if the word is positive, negative, or
#   neutral and a count for each category will be returned.
#   The assembler will combine the polarity and word counts into a vector, 'features,' for the model fitting.
#   The random forest classifier will using the 'overall' and 'features' columns from the training dataset to train
#   the model.

pipeline = Pipeline (stages = [polarity, tokenizer, remover, wordsentiment, assembler, randforest])

In [46]:
#   Fitting the model and saving the trained model

rf_model = pipeline.fit (training)

rf_model.save ('models/rf_model_unbalanced')

In [None]:
#   Model evaluation
#   The model is evaluated for the prediction accuracy of the model and its F1-score.

accuracy = evaluatorAcc.evaluate (rf_model.transform (testing))
print (f'Test Error of logistic regression with cross valiation: {1.0 - accuracy}.')


fscore = evaluatorf1.evaluate (rf_model.transform (testing))
print (f'F1-score of logistic regression with cross validation: {fscore}.')

### Logistic Regression

Because the PySpark RandomForestClassifier is not able to account for the unbalanced nature of the dataset, we will attempt a weighted logistic regression. The weights were calculated above and the values stored in the dataframe in the column 'weight.'

In [43]:
(training, testing) = df_weighted.randomSplit ([0.8, 0.2])

In [None]:
#   Setting Logistic Regression model
logreg = LogisticRegression (labelCol = 'overall', featuresCol = assembler.getOutputCol (), maxIter = 10, 
                             weightCol = 'weight', family = 'multinomial')

In [32]:
#  The pipeline is like the one for random forest, but with the logistic regressor set above.

pipeline2 = Pipeline (stages = [polarity, tokenizer, remover, wordsentiment, assembler, logreg])

In [33]:
#   Fitting the model and saving the trained model

lr_weight_model = pipeline2.fit (training)

lr_weight_model.save ('models/lr_weighted_model')

In [None]:
#   Model evaluation
#   Evaluated using accuracy and F1-score

evaluatorAcc = MulticlassClassificationEvaluator (predictionCol = 'prediction', labelCol = 'overall', metricName = "accuracy")

accuracy = evaluatorAcc.evaluate (lr_weight_model.transform (testing))
print (f'Test Error of logistic regression: {1.0 - accuracy}'.)


evaluatorf1 = MulticlassClassificationEvaluator (predictionCol = 'prediction', labelCol = 'overall', metricName = 'f1')
fscore = evaluatorf1.evaluate (lr_weight_model.transform (testing))
print (f'F1-score of logistic regression: {fscore}.')


### Logistic regression with hyperparameter tuning with cross validation.

If possible we want to train a logistic regression model with hyperparameter tuning using cross validation.

In [44]:
randforest = RandomForestClassifier (labelCol = 'overall', featuresCol = assembler.getOutputCol (), numTrees = 10, maxMemoryInMB = 1024)

In [35]:
(training, testing) = df_weighted.randomSplit ([0.8, 0.2])

In [37]:
logreg2 = LogisticRegression (labelCol = 'overall', featuresCol = assembler.getOutputCol (), maxIter = 10, 
                             weightCol = 'weight', family = 'multinomial')

In [40]:
#cross validate
grid = ParamGridBuilder().addGrid(logreg2.maxIter, [10, 20, 25]) \
                                .addGrid(logreg2.regParam, [0, 0.01, 0.05, 0.1, 0.5, 1]) \
                                .addGrid(logreg2.elasticNetParam, [0.0, 0.1, 0.5, 0.8, 1]) \
                                .build()

logreg_cv = CrossValidator(estimator=logreg2, estimatorParamMaps=grid, \
                        evaluator=evaluatorf1, numFolds=3)

In [41]:
pipeline3 = Pipeline (stages = [polarity, tokenizer, remover, wordsentiment, assembler, logreg_cv])

In [None]:
lr_cv_model = pipeline3.fit (training)

lr_cv_model.save ('models/lr_weighted_model_crossval')

In [None]:
#get performance metrics

accuracy = evaluatorAcc.evaluate (lr_cv_model.bestModel.transform(testing))
print (f'Test Error of logistic regression with cross valiation: {1.0 - accuracy}.')


fscore = evaluatorf1.evaluate (lr_cv_model.bestModel.transform(testing))
print (f'F1-score of logistic regression with cross validation: {fscore}.')

#### Unfortunately, this code could not run on the cpu32-bigmem.

## Slurm job

#### Script file to submit Slurm job

In [None]:
%%file amazon_model_fitting_spark_unbalanced.script
#!/bin/bash
#SBATCH -J slurm-spark
#SBATCH -t 1440 # runtime to request !!! in minutes !!!
#SBATCH -o slurm-spark-%J.log # output extra o means overwrite
#SBATCH -n 1 # requesting n tasks

module load spark/jupyterhub
. /global/software/jupyterhub-spark/anaconda3/etc/profile.d/conda.sh

python amazon_model_fitting_spark_unbalanced.py > amazon_model_fitting_spark_unbalanced_results.txt


#### .py file for fitting of unbalanced dataset

This code was to run on the TALC cpu32-bigmem.

In [None]:
%%file amazon_model_fitting_spark_unbalanced.py
#   Spark 

import os, atexit, sys, findspark, sparkhpc, pyspark
from pyspark.context import SparkContext
from pyspark.sql import SQLContext

findspark.init()

# specify your partition (unless you're OK with default)
os.environ['SBATCH_PARTITION']='cpu32-bigmem'

sj = sparkhpc.sparkjob.sparkjob(
    ncores = 24,                          # total number or cores
    cores_per_executor = 24,              # parallelism of a single executor
    memory_per_core = 1100 * 1024 // 24,  # memory per core in MB
    walltime = "14:0"                      # hh:mm format
)

sj.wait_to_start()
sc = sj.start_spark()
scq = SQLContext(sc)

def exitHandler(sj,sc):
    try:
        print('Trapped Exit cleaning up Spark Context')
        sc.stop()
    except:
        pass
    try:
        print('Trapped Exit cleaning up Spark Job')
        sj.stop()
    except:
        pass
atexit.register(exitHandler,sj,sc);


###########################################################################################################################


from textblob import TextBlob

from pyspark.ml.feature import StopWordsRemover

from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf, concat
from pyspark.sql import functions as f

from pyspark.ml import Pipeline

from pyspark.sql.functions import input_file_name
from pyspark.sql.types import *

from pyspark.sql.functions import split, lower

from pyspark.ml.feature import NGram, FeatureHasher, CountVectorizer, RegexTokenizer

from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.feature import VectorAssembler

from itertools import chain
import numpy as np
import pandas as pd


from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params, TypeConverters
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable  
from pyspark.sql.functions import udf, create_map, lit
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.window import Window

from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder, CrossValidator
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

import time

###########################################################################################################################

start = time.time ()


#   Bring in amazon_reviews.parquet
df = scq.read.parquet ('Files/amazon_reviews.parquet').select ('overall', 'reviewText')

#   Cleaning the dataset
df_cleaned = df.where (df.overall > 0).dropDuplicates ().na.drop (subset = ['reviewText'])

print (f'After cleaning there are {df_cleaned.count ()} entries remaining.')



#   get weights of classes

#Begin Multinomial Logistic Regression with Weights

#create column weights
label_freq = df_cleaned.select("overall").groupBy("overall").count().collect()
unique_label = [x["overall"] for x in label_freq]
total_label = sum([x["count"] for x in label_freq])
unique_label_count = len(label_freq)
bin_count = [x["count"] for x in label_freq]

label_weights = {i: ii for i, ii in zip(unique_label, total_label / (unique_label_count * np.array(bin_count)))}
print(label_weights)

mapping_expr = f.create_map([f.lit(x) for x in chain(*label_weights.items())])

df_weighted = df_cleaned.withColumn("weight", mapping_expr.getItem(f.col("overall")))


###########################################################################################################################

#   Custom transformers

class PolarityTransformer (Transformer, HasInputCol, HasOutputCol, DefaultParamsReadable, DefaultParamsWritable):
    
    @keyword_only
    def __init__(self, inputCol=None, outputCol=None):
        super (PolarityTransformer, self).__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def _transform(self, dataset):
        
        def func (string):
            return TextBlob (string).sentiment.polarity

        t = StringType ()
        out_col = self.getOutputCol ()
        in_col = dataset[self.getInputCol ()]
        return dataset.withColumn (out_col, udf (func, t)(in_col).cast ('double'))


class WordSentimentTransformer (Transformer, HasInputCol, HasOutputCol, DefaultParamsReadable, DefaultParamsWritable):
    
    @keyword_only
    def __init__(self, inputCol=None, outputCol=None):
        super (WordSentimentTransformer, self).__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def _transform(self, dataset):
        def WordsFreq (array):
            
            posWord = 0
            negWord = 0
            neutWord = 0

            for word in array:
                

                if TextBlob (word).sentiment.polarity > 0:
                    posWord += 1
                elif TextBlob (word).sentiment.polarity < 0:
                    negWord += 1
                else:
                    neutWord += 1

            return posWord, negWord, neutWord

        word_freq = udf (lambda a: Vectors.dense (WordsFreq (a)), VectorUDT ())

        t = ArrayType (StringType ())
        out_col = self.getOutputCol ()
        in_col = dataset[self.getInputCol ()]
        return dataset.withColumn (out_col, word_freq (in_col))

###########################################################################################################################

#   Set transformers 

tokenizer = RegexTokenizer (inputCol = "reviewText", outputCol = "words", pattern = "\\W", toLowercase = True)

remover = StopWordsRemover (inputCol = tokenizer.getOutputCol (), outputCol = "filtered")

wordsentiment = WordSentimentTransformer (inputCol = remover.getOutputCol (), outputCol = 'word_sentiment')

polarity = PolarityTransformer (inputCol = 'reviewText', outputCol = 'polarity')


assembler = VectorAssembler (inputCols = ['polarity', 'word_sentiment'], outputCol = 'features')

###########################################################################################################################

#   Random Forest classifier

randforest = RandomForestClassifier (labelCol = 'overall', featuresCol = assembler.getOutputCol (), numTrees = 10)


(training, testing) = df_weighted.randomSplit ([0.8, 0.2])

pipeline = Pipeline (stages = [polarity, tokenizer, remover, wordsentiment, assembler, randforest])

rf_model = pipeline.fit (training)

rf_model.save ('models/rf_model_unbalanced_amazon')


accuracy = evaluatorAcc.evaluate (rf_model.transform (testing))
print (f'Test Error of random forest classification with unbalanced dataset: {1.0 - accuracy}.')


fscore = evaluatorf1.evaluate (rf_model.transform (testing))
print (f'F1-score of random forest classification with unbalanced dataset: {fscore}.')

###########################################################################################################################

#   Logistic regression with weighted classes
logreg = LogisticRegression (labelCol = 'overall', featuresCol = assembler.getOutputCol (), maxIter = 10, 
                             weightCol = 'weight', family = 'multinomial')

(training, testing) = df_weighted.randomSplit ([0.8, 0.2])

pipeline2 = Pipeline (stages = [polarity, tokenizer, remover, wordsentiment, assembler, logreg])

lr_weight_model = pipeline2.fit (training)
lr_weight_model.save ('models/lr_weighted_model_amazon')


evaluatorAcc = MulticlassClassificationEvaluator (predictionCol = 'prediction', labelCol = 'overall', metricName = "accuracy")

accuracy = evaluatorAcc.evaluate (lr_weight_model.transform (testing))
print (f'Test Error of weighted logistic regression: {1.0 - accuracy}.')


evaluatorf1 = MulticlassClassificationEvaluator (predictionCol = 'prediction', labelCol = 'overall', metricName = 'f1')
fscore = evaluatorf1.evaluate (lr_weight_model.transform (testing))
print (f'F1-score of weighted logistic regression: {fscore}.')



print (f'Program took {time.time () - start} seconds.')



### Logistic Regression with cross-validation

#### Script file to submit Slurm job

This job was not able to be submitted.

In [None]:
%%file amazon_model_fitting_spark_crossval.script
#!/bin/bash
#SBATCH -J slurm-spark
#SBATCH -t 1440 # runtime to request !!! in minutes !!!
#SBATCH -o slurm-spark-%J.log # output extra o means overwrite
#SBATCH -n 1 # requesting n tasks

module load spark/jupyterhub
. /global/software/jupyterhub-spark/anaconda3/etc/profile.d/conda.sh

python amazon_model_fitting_spark_crossval.py > amazon_model_fitting_spark_crossval_results.txt


#### .py file for fitting of unbalanced dataset with cross-validation.

In [None]:
%%file 

#   Spark 

import os, atexit, sys, findspark, sparkhpc, pyspark
from pyspark.context import SparkContext
from pyspark.sql import SQLContext

findspark.init()

# specify your partition (unless you're OK with default)
os.environ['SBATCH_PARTITION']='cpu32-bigmem'

sj = sparkhpc.sparkjob.sparkjob(
    ncores = 24,                          # total number or cores
    cores_per_executor = 24,              # parallelism of a single executor
    memory_per_core = 1100 * 1024 // 24,  # memory per core in MB
    walltime = "13:0"                      # hh:mm format
)

sj.wait_to_start()
sc = sj.start_spark()
scq = SQLContext(sc)

def exitHandler(sj,sc):
    try:
        print('Trapped Exit cleaning up Spark Context')
        sc.stop()
    except:
        pass
    try:
        print('Trapped Exit cleaning up Spark Job')
        sj.stop()
    except:
        pass
atexit.register(exitHandler,sj,sc);


###########################################################################################################################


from textblob import TextBlob

from pyspark.ml.feature import StopWordsRemover

from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf, concat
from pyspark.sql import functions as f

from pyspark.ml import Pipeline

from pyspark.sql.functions import input_file_name
from pyspark.sql.types import *

from pyspark.sql.functions import split, lower

from pyspark.ml.feature import NGram, FeatureHasher, CountVectorizer, RegexTokenizer

from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.feature import VectorAssembler

from itertools import chain
import numpy as np
import pandas as pd


from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params, TypeConverters
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable  
from pyspark.sql.functions import udf, create_map, lit
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.window import Window

from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder, CrossValidator
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

import time

###########################################################################################################################

start = time.time ()


#   Bring in amazon_reviews.parquet
df = scq.read.parquet ('Files/amazon_reviews.parquet').select ('overall', 'reviewText')

#   Cleaning the dataset
df_cleaned = df.where (df.overall > 0).dropDuplicates ().na.drop (subset = ['reviewText'])

print (f'After cleaning there are {df_cleaned.count ()} entries remaining.')



#   get weights of classes

#Begin Multinomial Logistic Regression with Weights

#create column weights
label_freq = df_cleaned.select("overall").groupBy("overall").count().collect()
unique_label = [x["overall"] for x in label_freq]
total_label = sum([x["count"] for x in label_freq])
unique_label_count = len(label_freq)
bin_count = [x["count"] for x in label_freq]

label_weights = {i: ii for i, ii in zip(unique_label, total_label / (unique_label_count * np.array(bin_count)))}
print(label_weights)

from itertools import chain

mapping_expr = f.create_map([f.lit(x) for x in chain(*label_weights.items())])

df_weighted = df_cleaned.withColumn("weight", mapping_expr.getItem(f.col("overall")))


###########################################################################################################################

#   Custom transformers

class PolarityTransformer (Transformer, HasInputCol, HasOutputCol, DefaultParamsReadable, DefaultParamsWritable):
    
    @keyword_only
    def __init__(self, inputCol=None, outputCol=None):
        super (PolarityTransformer, self).__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def _transform(self, dataset):
        
        def func (string):
            return TextBlob (string).sentiment.polarity

        t = StringType ()
        out_col = self.getOutputCol ()
        in_col = dataset[self.getInputCol ()]
        return dataset.withColumn (out_col, udf (func, t)(in_col).cast ('double'))


class WordSentimentTransformer (Transformer, HasInputCol, HasOutputCol, DefaultParamsReadable, DefaultParamsWritable):
    
    @keyword_only
    def __init__(self, inputCol=None, outputCol=None):
        super (WordSentimentTransformer, self).__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def _transform(self, dataset):
        def WordsFreq (array):
            
            posWord = 0
            negWord = 0
            neutWord = 0

            for word in array:
                

                if TextBlob (word).sentiment.polarity > 0:
                    posWord += 1
                elif TextBlob (word).sentiment.polarity < 0:
                    negWord += 1
                else:
                    neutWord += 1

            return posWord, negWord, neutWord

        word_freq = udf (lambda a: Vectors.dense (WordsFreq (a)), VectorUDT ())

        t = ArrayType (StringType ())
        out_col = self.getOutputCol ()
        in_col = dataset[self.getInputCol ()]
        return dataset.withColumn (out_col, word_freq (in_col))

###########################################################################################################################

#   Set transformers 

tokenizer = RegexTokenizer (inputCol = "reviewText", outputCol = "words", pattern = "\\W", toLowercase = True)

remover = StopWordsRemover (inputCol = tokenizer.getOutputCol (), outputCol = "filtered")

wordsentiment = WordSentimentTransformer (inputCol = remover.getOutputCol (), outputCol = 'word_sentiment')

polarity = PolarityTransformer (inputCol = 'reviewText', outputCol = 'polarity')


assembler = VectorAssembler (inputCols = ['polarity', 'word_sentiment'], outputCol = 'features')



evaluatorAcc = MulticlassClassificationEvaluator (predictionCol = 'prediction', labelCol = 'overall', metricName = "accuracy")
evaluatorf1 = MulticlassClassificationEvaluator (predictionCol = 'prediction', labelCol = 'overall', metricName = 'f1')


###########################################################################################################################

#   Logistic Regression with cross-validation

(training, testing) = df_weighted.randomSplit ([0.8, 0.2])

logreg2 = LogisticRegression (labelCol = 'overall', featuresCol = assembler.getOutputCol (), maxIter = 10, 
                             weightCol = 'weight', family = 'multinomial')



#cross validate
grid = ParamGridBuilder().addGrid(logreg2.maxIter, [10, 20, 25]) \
                                .addGrid(logreg2.regParam, [0, 0.01, 0.05, 0.1, 0.5, 1]) \
                                .addGrid(logreg2.elasticNetParam, [0.0, 0.1, 0.5, 0.8, 1]) \
                                .build()

logreg_cv = CrossValidator(estimator=logreg2, estimatorParamMaps=grid, \
                        evaluator=evaluatorf1, numFolds=3)

pipeline = Pipeline (stages = [polarity, tokenizer, remover, wordsentiment, assembler, logreg_cv])


lr_cv_model = pipeline2.fit (training)

#lr_cv_model.save ('models/lr_weighted_model_crossval')


#get performance metrics

accuracy = evaluatorAcc.evaluate (lr_cv_model.bestModel.transform(testing))
print (f'Test Error of logistic regression with cross valiation: {1.0 - accuracy}.')


fscore = evaluatorf1.evaluate (lr_cv_model.bestModel.transform(testing))
print (f'F1-score of logistic regression with cross validation: {fscore}.')



print (f'Program took {time.time () - start} seconds.')

