In [1]:
from json import loads
from pyspark.sql import SparkSession
import warnings
import pandas as pd
warnings.filterwarnings("ignore")
from pyspark.sql.functions import col,from_json,udf,split,explode,lit,array,lower
from pyspark.ml.feature import NGram
from pyspark.sql.types import StructType,StructField, StringType, IntegerType,MapType,FloatType,ArrayType
import numpy as np
import pickle 

from pyspark.sql import functions as F
from itertools import chain
from sklearn.metrics import classification_report

In [2]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import CountVectorizer
from pyspark.ml import Pipeline
from pyspark.ml.feature import IDF
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from sklearn.metrics import classification_report,accuracy_score,precision_score,recall_score,f1_score
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [3]:
spark = SparkSession.\
        builder.\
        appName("ml").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "2048m").\
        getOrCreate()

23/01/28 15:44:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [57]:
class SentimentModel:
    
    def __init__(self):
        self.df_test = spark.read.parquet('hdfs://namenode:9000/ml/test_data')
        self.df_train = spark.read.parquet('hdfs://namenode:9000/ml/train_data')
        self.clean_data()
        self.split_content()
        self.convert_feature()
        self.model = {}
        
    def getNGram(self,df,n):
        ngram = NGram(n=n)
        ngram.setInputCol("comment_term")
        ngram.setOutputCol("nGrams")
        df_nGram = ngram.transform(df)
        result_nGram = df_nGram.withColumn('word',explode(df_nGram.nGrams))\
            .groupBy(['word'])\
            .count()
        return result_nGram
        
    def clean_data(self):
    
        df = self.df_train.withColumn('comment_term',split(self.df_train.clean_content, ' ', -1))

        result_nGram = self.getNGram(df,1)
        result_nGram.createOrReplaceTempView('result_nGram')
        
        stop_word = spark.sql("""
            select word from result_nGram
            where count < 10
        """).toPandas()
        stop_word = stop_word['word'].to_list()
        
        dict_stop_word = {x:1 for x in stop_word}
        self.dict_stop_word = dict_stop_word
        self.df_test.createOrReplaceTempView('df_test')
        self.df_train.createOrReplaceTempView('df_train')
        
        
        def remove_stop_word(txt):
            txt = txt.strip()
            ls_words = txt.split()
            ls_new_words = []
            for word in ls_words:
                if dict_stop_word.get(word) == None:
                    ls_new_words.append(word)
            return ' '.join(ls_new_words)
        spark.udf.register("remove_stop_word", remove_stop_word,StringType())
        
        self.df_test = spark.sql("""
            select remove_stop_word(clean_content) clean_content,rating,sentiment,true_label,label 
            from df_test
        """)
        
        self.df_train = spark.sql("""
            select remove_stop_word(clean_content) clean_content,rating,sentiment,label 
            from df_train
        """)
        

    
    def set_weight(self, w_a = 5,w_b = 5, w_c = 1):
        class_weights_spark = {0:w_a,1:w_b,2:w_c}
        mapping_expr = F.create_map([F.lit(x) for x in chain(*class_weights_spark.items())])
        self.train_idf = self.train_idf.withColumn("weight", mapping_expr.getItem(F.col("label")))
        
    def split_content(self):
        self.train_set = self.df_train.select(split(self.df_train.clean_content, ' ').alias('cmt_token'),'clean_content','rating', 'label')
        self.test_set = self.df_test.select(split(self.df_test.clean_content, ' ').alias('cmt_token'),'clean_content','rating', 'label','true_label')
    
    def convert_feature(self):
        count = CountVectorizer(inputCol="cmt_token", outputCol="rawFeatures")
        idf = IDF(inputCol="rawFeatures", outputCol="featuresTFIDF")
        pipeline = Pipeline(stages=[count, idf])
        self.model_tfidf = pipeline.fit(self.train_set)
        self.train_idf = self.model_tfidf.transform(self.train_set)
        self.test_idf = self.model_tfidf.transform(self.test_set)
    
    def model_logistic(self,weight):
        lr = LogisticRegression(featuresCol = "featuresTFIDF")

        if weight == True:
            paramGrid = ParamGridBuilder()\
                        .addGrid(lr.maxIter, [10, 20, 50])\
                        .addGrid(lr.regParam, [0.1,0.3,0.5])\
                        .addGrid(lr.weightCol,  ["weight"])\
                        .build()
            
        else:
            paramGrid = ParamGridBuilder()\
                        .addGrid(lr.maxIter, [10, 20, 50])\
                        .addGrid(lr.regParam, [0.1,0.3,0.5])\
                        .build()

        evaluator = MulticlassClassificationEvaluator(metricName="accuracy")

        crossval = CrossValidator(estimator=lr,
                                  estimatorParamMaps=paramGrid,
                                  evaluator=evaluator,
                                  numFolds=5) 
        
        model = crossval.fit(self.train_idf)
        if weight == True:
            self.model['lr_yes'] = model
        else:
            self.model['lr_no'] = model
        predictions = model.transform(self.test_idf)
        return predictions
    
    def model_rf(self,weight):
        trainer = RandomForestClassifier(featuresCol = "featuresTFIDF")
        if weight == True:
            paramGrid = ParamGridBuilder()\
                        .addGrid(trainer.numTrees, [10,20,50])\
                        .addGrid(trainer.maxDepth, [2,6,8])\
                        .addGrid(trainer.minInstancesPerNode, [1,3,5])\
                        .addGrid(trainer.weightCol,  ["weight"])\
                        .build()
            
        else:
            paramGrid = ParamGridBuilder()\
                        .addGrid(trainer.numTrees, [10,20,50])\
                        .addGrid(trainer.maxDepth, [2,6,8])\
                        .addGrid(trainer.minInstancesPerNode, [1,3,5])\
                        .build()
        
        evaluator = MulticlassClassificationEvaluator(metricName="accuracy")

        crossval = CrossValidator(estimator=trainer,
                                  estimatorParamMaps=paramGrid,
                                  evaluator=evaluator,
                                  numFolds=5) 
        model = crossval.fit(self.train_idf)
        if weight == True:
            self.model['rf_yes'] = model
        else:
            self.model['rf_no'] = model
        predictions = model.transform(self.test_idf)
        return predictions
    
    def evaluate(self,predictions):
        result = predictions.select('true_label', 'prediction')
        result = result[['true_label','prediction']].toPandas()
        
        print(f'accuracy_score: ',accuracy_score(result.true_label, result.prediction))
        print(f'prediction: ',precision_score(result.true_label, result.prediction, average='weighted'))
        print(f'recall_score: ',recall_score(result.true_label, result.prediction, average='weighted'))
        print(f'f1_score: ',f1_score(result.true_label, result.prediction, average='weighted'))
        print(classification_report(result.true_label, result.prediction))
        
    def save_model(self):
        list_model = ['lr_yes','lr_no','rf_yes','rf_no']
        for model_name in list_model:
            self.model[model_name].write().overwrite().save(f'hdfs://namenode:9000/save_model/{model_name}')
        
        self.model_tfidf.write().overwrite().save(f'hdfs://namenode:9000/save_model/model_tfidf')
        
        with open('data/dict_stop_word.pkl', 'wb') as f:
            pickle.dump(self.dict_stop_word, f)

In [58]:
model = SentimentModel()

23/01/28 16:42:00 WARN SimpleFunctionRegistry: The function remove_stop_word replaced a previously registered function.
                                                                                

# Oversampling

In [59]:
lb0_cnt = model.train_set.filter(col('label') == 0).count()
lb1_cnt = model.train_set.filter(col('label') == 1).count()
lb2_cnt = model.train_set.filter(col('label') == 2).count()

In [60]:
w_a = int(lb2_cnt/lb0_cnt)
w_b = int(lb2_cnt/lb1_cnt)

In [61]:
model.set_weight(w_a,w_b)

# LogicRegression

## Weight balance

In [None]:
predictions_wb = model.model_logistic(weight=True)

In [63]:
res = model.model['lr_yes'].transform(model.train_idf).select(['label','prediction'])

In [64]:
res.createOrReplaceTempView('res')

In [65]:
spark.sql("""
    select sum(if(label == prediction,1,0))/count(1) acc
    from res
""").show()

23/01/28 16:53:20 WARN DAGScheduler: Broadcasting large task binary with size 1106.5 KiB

+------------------+
|               acc|
+------------------+
|0.7811439082250107|
+------------------+



                                                                                

In [66]:
model.evaluate(predictions_wb)

23/01/28 16:53:28 WARN DAGScheduler: Broadcasting large task binary with size 1101.6 KiB


accuracy_score:  0.8171589310829818
prediction:  0.8456354036964362
recall_score:  0.8171589310829818
f1_score:  0.8279409056626836
              precision    recall  f1-score   support

           0       0.78      0.65      0.71       805
           1       0.36      0.54      0.43       538
           2       0.93      0.90      0.91      3634

    accuracy                           0.82      4977
   macro avg       0.69      0.70      0.68      4977
weighted avg       0.85      0.82      0.83      4977



## No balance

In [67]:
predictions_no_wb = model.model_logistic(weight=False)

23/01/28 17:12:07 WARN DAGScheduler: Broadcasting large task binary with size 1139.8 KiB
23/01/28 17:12:26 WARN DAGScheduler: Broadcasting large task binary with size 1139.8 KiB
23/01/28 17:12:32 WARN DAGScheduler: Broadcasting large task binary with size 1139.8 KiB
23/01/28 17:12:43 WARN DAGScheduler: Broadcasting large task binary with size 1139.9 KiB
23/01/28 17:12:52 WARN DAGScheduler: Broadcasting large task binary with size 1139.9 KiB
23/01/28 17:13:02 WARN DAGScheduler: Broadcasting large task binary with size 1139.9 KiB
23/01/28 17:13:24 WARN DAGScheduler: Broadcasting large task binary with size 1140.1 KiB
23/01/28 17:13:46 WARN DAGScheduler: Broadcasting large task binary with size 1140.1 KiB
23/01/28 17:14:03 WARN DAGScheduler: Broadcasting large task binary with size 1140.0 KiB
23/01/28 17:14:26 WARN DAGScheduler: Broadcasting large task binary with size 1139.8 KiB
23/01/28 17:14:44 WARN DAGScheduler: Broadcasting large task binary with size 1139.8 KiB
23/01/28 17:14:49 WAR

In [68]:
model.evaluate(predictions_no_wb)

accuracy_score:  0.8109302792847096
prediction:  0.7753315801526328
recall_score:  0.8109302792847096
f1_score:  0.7595490082817656
              precision    recall  f1-score   support

           0       0.88      0.52      0.65       805
           1       0.41      0.02      0.04       538
           2       0.81      0.99      0.89      3634

    accuracy                           0.81      4977
   macro avg       0.70      0.51      0.53      4977
weighted avg       0.78      0.81      0.76      4977



23/01/28 17:23:14 WARN DAGScheduler: Broadcasting large task binary with size 1101.7 KiB


# RandomForestClassifier

## Weight balance

In [None]:
rf_predictions_wb = model.model_rf(weight=True)

In [70]:
model.evaluate(rf_predictions_wb)

23/01/29 00:20:57 WARN DAGScheduler: Broadcasting large task binary with size 1000.3 KiB


accuracy_score:  0.7305605786618445
prediction:  0.7534757256019623
recall_score:  0.7305605786618445
f1_score:  0.65939196428338
              precision    recall  f1-score   support

           0       1.00      0.03      0.06       805
           1       0.27      0.21      0.24       538
           2       0.77      0.96      0.86      3634

    accuracy                           0.73      4977
   macro avg       0.68      0.40      0.38      4977
weighted avg       0.75      0.73      0.66      4977



## No balance

In [None]:
rf_predictions_no_wb = model.model_rf(weight=False)

In [72]:
model.evaluate(rf_predictions_no_wb)

23/01/29 07:20:43 WARN DAGScheduler: Broadcasting large task binary with size 1025.4 KiB


accuracy_score:  0.7303596544102873
prediction:  0.6949829343597914
recall_score:  0.7303596544102873
f1_score:  0.6167536903259228
              precision    recall  f1-score   support

           0       1.00      0.00      0.00       805
           1       0.00      0.00      0.00       538
           2       0.73      1.00      0.84      3634

    accuracy                           0.73      4977
   macro avg       0.58      0.33      0.28      4977
weighted avg       0.69      0.73      0.62      4977



# Analysis result

## Save model 

In [73]:
model.save_model()

                                                                                

## Best param LR

In [74]:
model.model['lr_yes'].getEstimatorParamMaps()[ np.argmax(model.model['lr_yes'].avgMetrics) ]

{Param(parent='LogisticRegression_60e8183c199d', name='maxIter', doc='max number of iterations (>= 0).'): 10,
 Param(parent='LogisticRegression_60e8183c199d', name='regParam', doc='regularization parameter (>= 0).'): 0.5,
 Param(parent='LogisticRegression_60e8183c199d', name='weightCol', doc='weight column name. If this is not set or empty, we treat all instance weights as 1.0.'): 'weight'}

In [75]:
model.model['lr_no'].getEstimatorParamMaps()[ np.argmax(model.model['lr_no'].avgMetrics) ]

{Param(parent='LogisticRegression_8bc30e75277a', name='maxIter', doc='max number of iterations (>= 0).'): 20,
 Param(parent='LogisticRegression_8bc30e75277a', name='regParam', doc='regularization parameter (>= 0).'): 0.1}

## Best param Rf

In [76]:
model.model['rf_yes'].getEstimatorParamMaps()[ np.argmax(model.model['rf_yes'].avgMetrics)]

{Param(parent='RandomForestClassifier_5eee6cd5b48a', name='numTrees', doc='Number of trees to train (>= 1).'): 50,
 Param(parent='RandomForestClassifier_5eee6cd5b48a', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 2,
 Param(parent='RandomForestClassifier_5eee6cd5b48a', name='minInstancesPerNode', doc='Minimum number of instances each child must have after split. If a split causes the left or right child to have fewer than minInstancesPerNode, the split will be discarded as invalid. Should be >= 1.'): 1,
 Param(parent='RandomForestClassifier_5eee6cd5b48a', name='weightCol', doc='weight column name. If this is not set or empty, we treat all instance weights as 1.0.'): 'weight'}

In [77]:
model.model['rf_no'].getEstimatorParamMaps()[ np.argmax(model.model['rf_no'].avgMetrics) ]

{Param(parent='RandomForestClassifier_82cd36a37daf', name='numTrees', doc='Number of trees to train (>= 1).'): 10,
 Param(parent='RandomForestClassifier_82cd36a37daf', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 8,
 Param(parent='RandomForestClassifier_82cd36a37daf', name='minInstancesPerNode', doc='Minimum number of instances each child must have after split. If a split causes the left or right child to have fewer than minInstancesPerNode, the split will be discarded as invalid. Should be >= 1.'): 3}

## Avg accuracy

In [78]:
model.model['lr_yes'].avgMetrics

[0.7744775666844111,
 0.7752882396209828,
 0.7754004232794556,
 0.773107796944257,
 0.7699993222521928,
 0.7689681944196578,
 0.7717868728099262,
 0.7698096272012491,
 0.7689979106322342]

In [79]:
model.model['lr_no'].avgMetrics

[0.8421807601195797,
 0.8264344780496538,
 0.816763137594442,
 0.843301753281112,
 0.827020616586316,
 0.8171269056732042,
 0.8431327155892354,
 0.8269242613315927,
 0.8170879598842157]

In [80]:
model.model['rf_yes'].avgMetrics

[0.706762339136425,
 0.706762339136425,
 0.706762339136425,
 0.7196760831496611,
 0.7243176650319815,
 0.7157331072316102,
 0.7236021809443158,
 0.7111343184476644,
 0.7078266258231677,
 0.7438364149915796,
 0.7438364149915796,
 0.7438394905509383,
 0.7380920613199236,
 0.7439621573280522,
 0.734733336678855,
 0.7297544013783568,
 0.7391717123175789,
 0.7308883591316635,
 0.7688883407210081,
 0.7688883396836772,
 0.7688873155394778,
 0.7463913108970772,
 0.7420297298921222,
 0.7535682905868656,
 0.7422187133639224,
 0.7454319671837563,
 0.7494553377675395]

In [81]:
model.model['rf_no'].avgMetrics

[0.7905455877839739,
 0.7905455877839739,
 0.7905455877839739,
 0.7905640469187728,
 0.7905773455705579,
 0.7905507190627796,
 0.7906635443762967,
 0.7907187434711159,
 0.7906132662602084,
 0.7905455877839739,
 0.7905455877839739,
 0.7905455877839739,
 0.7905476374182047,
 0.7905476404927535,
 0.7905455877839739,
 0.7905742712335331,
 0.790590690661206,
 0.7905773436408612,
 0.7905455877839739,
 0.7905455877839739,
 0.7905455877839739,
 0.7905455877839739,
 0.7905455877839739,
 0.7905466145676255,
 0.790552756205904,
 0.7905568560243882,
 0.7905578746992065]