## LIBRARIES

In [2]:
import findspark
findspark.init()

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import length
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from pyspark.ml.feature import CountVectorizer, IDF, StringIndexer
from pyspark.ml.classification import NaiveBayes, RandomForestClassifier, LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector
from pyspark.ml import Pipeline
from pyspark.sql.functions import lit
from bs4 import BeautifulSoup
from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.sql.functions import when
import pyspark.sql.functions as F
from pyspark.mllib.evaluation import MulticlassMetrics, BinaryClassificationMetrics
from datetime import datetime
from pyspark.ml import Transformer
from pyspark.sql.types import FloatType
from pyspark.ml.param.shared import HasInputCol, HasOutputCol 
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

In [4]:
spark = SparkSession.builder.appName('Rating').getOrCreate()

### Load Data

In [5]:
df1 = spark.read.csv("comment_final.csv", header=True, inferSchema=True)

In [6]:
df2 = df1.withColumn("class", when(df1['class'] == "negative", "negative") \
      .when(df1['class'] == "neutral", "negative") \
      .otherwise(df1['class']))
df2.show()

+---+--------------------+--------+
|_c0|      processed_text|   class|
+---+--------------------+--------+
|  0|    rất_hài sản_phẩm|positive|
|  1|                null|positive|
|  2|sản_phẩm chất_lượ...|positive|
|  3|sản_phẩm chất_lượ...|positive|
|  4|sản_phẩm chất_lượ...|positive|
|  5|cuộc_sống không_g...|positive|
|  6|viết nhận_xét quá...|negative|
|  7|hàng bị_lỗi màng ...|negative|
|  8|sản_phẩm rất_tốt ...|positive|
|  9|rất_hài sản_phẩm ...|positive|
| 10|băng xài ổn lực c...|positive|
| 11|sản_phẩm sản_phẩm...|negative|
| 12|hàng băng bị_cắt sài|negative|
| 13|mẫu_mã đẹp chất_l...|positive|
| 14|sản_phẩm không_đú...|negative|
| 15|hàng số_lượng đơn...|negative|
| 16|không_nói đừng sả...|negative|
| 17|dán vô rớt hoài v...|negative|
| 18|                băng|positive|
| 19|hàng đúng_hạn đón...|positive|
+---+--------------------+--------+
only showing top 20 rows



In [7]:
df2.count()

196117

In [8]:
df2.groupby('class').count().show()

+--------+------+
|   class| count|
+--------+------+
|positive|162683|
|negative| 33434|
+--------+------+



### Preprocess

In [9]:
df2 = df2.withColumn('length', length(df2['processed_text']))

In [10]:
df2.show(5)

+---+--------------------+--------+------+
|_c0|      processed_text|   class|length|
+---+--------------------+--------+------+
|  0|    rất_hài sản_phẩm|positive|    16|
|  1|                null|positive|  null|
|  2|sản_phẩm chất_lượ...|positive|    85|
|  3|sản_phẩm chất_lượ...|positive|    85|
|  4|sản_phẩm chất_lượ...|positive|    85|
+---+--------------------+--------+------+
only showing top 5 rows



In [11]:
df2.groupBy('class').mean().show()

+--------+------------------+------------------+
|   class|          avg(_c0)|       avg(length)|
+--------+------------------+------------------+
|positive| 96932.46765181365| 37.37301654406584|
|negative|103534.61000777651|47.858651803036366|
+--------+------------------+------------------+



In [12]:
data = df2.select('processed_text','class','length')

In [13]:
data.show()

+--------------------+--------+------+
|      processed_text|   class|length|
+--------------------+--------+------+
|    rất_hài sản_phẩm|positive|    16|
|                null|positive|  null|
|sản_phẩm chất_lượ...|positive|    85|
|sản_phẩm chất_lượ...|positive|    85|
|sản_phẩm chất_lượ...|positive|    85|
|cuộc_sống không_g...|positive|    70|
|viết nhận_xét quá...|negative|   543|
|hàng bị_lỗi màng ...|negative|    69|
|sản_phẩm rất_tốt ...|positive|    57|
|rất_hài sản_phẩm ...|positive|    23|
|băng xài ổn lực c...|positive|    32|
|sản_phẩm sản_phẩm...|negative|    90|
|hàng băng bị_cắt sài|negative|    20|
|mẫu_mã đẹp chất_l...|positive|    21|
|sản_phẩm không_đú...|negative|    30|
|hàng số_lượng đơn...|negative|    47|
|không_nói đừng sả...|negative|    23|
|dán vô rớt hoài v...|negative|    32|
|                băng|positive|     4|
|hàng đúng_hạn đón...|positive|    35|
+--------------------+--------+------+
only showing top 20 rows



### Null

In [14]:
null_data = data.filter(data['processed_text'].isNull())

In [15]:
null_data.count()

21720

In [16]:
data = data.filter(data['processed_text'].isNotNull())

In [17]:
#drop null data
data = data.na.drop()

### Duplicated

In [18]:
data.dropDuplicates(['processed_text']).show()

+--------------------+--------+------+
|      processed_text|   class|length|
+--------------------+--------+------+
|aaaaaaaaaaaaaaaaa...|positive|    41|
|          abhhgycycy|negative|    10|
|       ahas việt nam|positive|    13|
|airpod máy case t...|positive|    31|
|akg bàn rùi ổn_áp...|positive|    49|
|      ammo dòng mack|positive|    14|
|anessa hàng_xách ...|positive|   109|
|anhận hàng rất_th...|positive|    79|
|anker vọng bền lâ...|positive|    53|
|     anten tốc_độ ul|positive|    15|
|app báo hàng hàng...|positive|    57|
|apply mặt bị_đen ...|positive|    80|
|basss kết_nối hộp...|positive|    70|
|bhjo bhuio bhtt b...|negative|    31|
|bkav nhiêu rẻ tổn...|positive|    22|
|boot android copy...|positive|    65|
|bphone tích_hợp s...|positive|    85|
|brush rẻ tốt khôn...|positive|    29|
|bus hàng nhập_khẩ...|negative|    58|
|buôn_bán mất_dạy ...|negative|    50|
+--------------------+--------+------+
only showing top 20 rows



In [19]:
data.show()

+--------------------+--------+------+
|      processed_text|   class|length|
+--------------------+--------+------+
|    rất_hài sản_phẩm|positive|    16|
|sản_phẩm chất_lượ...|positive|    85|
|sản_phẩm chất_lượ...|positive|    85|
|sản_phẩm chất_lượ...|positive|    85|
|cuộc_sống không_g...|positive|    70|
|viết nhận_xét quá...|negative|   543|
|hàng bị_lỗi màng ...|negative|    69|
|sản_phẩm rất_tốt ...|positive|    57|
|rất_hài sản_phẩm ...|positive|    23|
|băng xài ổn lực c...|positive|    32|
|sản_phẩm sản_phẩm...|negative|    90|
|hàng băng bị_cắt sài|negative|    20|
|mẫu_mã đẹp chất_l...|positive|    21|
|sản_phẩm không_đú...|negative|    30|
|hàng số_lượng đơn...|negative|    47|
|không_nói đừng sả...|negative|    23|
|dán vô rớt hoài v...|negative|    32|
|                băng|positive|     4|
|hàng đúng_hạn đón...|positive|    35|
|gọn bỏ_túi quảng_...|positive|    71|
+--------------------+--------+------+
only showing top 20 rows



In [20]:
data.count()

174397

In [21]:
data.groupby('class').count().show()

+--------+------+
|   class| count|
+--------+------+
|positive|141924|
|negative| 32473|
+--------+------+



### Resampling

In [22]:
from pyspark.sql.functions import col, explode, array, lit
major_df = data.filter(col("class") == 'positive')
minor_df = data.filter(col("class") == 'negative')
ratio = int(major_df.count()/minor_df.count())
print("ratio: {}".format(ratio))


ratio: 4


In [23]:
sampled_majority_df = major_df.sample(False, 1/ratio)
combined_df_2 = sampled_majority_df.unionAll(minor_df)
combined_df_2.show()

+--------------------+--------+------+
|      processed_text|   class|length|
+--------------------+--------+------+
|cuộc_sống không_g...|positive|    70|
|dính tốt tội bị_b...|positive|    33|
|cũng_rất dính tội...|positive|    34|
|          chất_lượng|positive|    10|
|         độ dính tốt|positive|    11|
|      máy hút xài ổn|positive|    14|
|                 tốt|positive|     3|
|                 bám|positive|     3|
| sản_phẩm dính kháng|positive|    19|
|   băng xài tốt hàng|positive|    17|
|kích_thước quá_nh...|positive|    30|
|         rất_rất tốt|positive|    11|
|                dính|positive|     4|
|        dất hài_lòng|positive|    12|
|sản_phẩm chất_lượ...|positive|    85|
|    hàng đóng_gói kỹ|positive|    16|
|       hàng đóng_gói|positive|    13|
|            hàng tốt|positive|     8|
|sản_phẩm cụng gấp...|positive|    45|
|                  ổn|positive|     2|
+--------------------+--------+------+
only showing top 20 rows



In [24]:
combined_df_2.groupby('class').count().show()

+--------+-----+
|   class|count|
+--------+-----+
|positive|35841|
|negative|32473|
+--------+-----+



### Feature & Transform

In [25]:
class BsTextExtractor(Transformer, HasInputCol, HasOutputCol):
    
    @keyword_only
    def __init__(self, inputCol=None, outputCol=None):
        super(BsTextExtractor, self).__init__() 
        kwargs = self._input_kwargs 
        self.setParams(**kwargs)
        
    @keyword_only
    def setParams(self, inputCol=None, outputCol=None): 
        kwargs = self._input_kwargs
        return self._set(**kwargs)
    def _transform(self, dataset):
        def f(s):
            cleaned_text = BeautifulSoup(s).text 
            return cleaned_text
        t = StringType()
        out_col = self.getOutputCol()
        in_col = dataset[self.getInputCol()]
        return dataset.withColumn(out_col, udf(f, t)(in_col))

In [26]:
text_extractor = BsTextExtractor(inputCol="processed_text", outputCol="cleaned_text")
tokenizer = Tokenizer(inputCol='cleaned_text', outputCol='token_text')
stopremove= StopWordsRemover(inputCol='token_text', outputCol='stop_tokens')
count_vec = CountVectorizer(inputCol='stop_tokens', outputCol='c_vec')
idf = IDF(inputCol='c_vec', outputCol='tf_idf')
ham_spam_to_num = StringIndexer(inputCol='class',outputCol='label', handleInvalid='keep')

In [27]:
clean_up = VectorAssembler(inputCols =['tf_idf','length'],
                           outputCol='features')

In [28]:
data_prep_pipe = Pipeline(stages=[ham_spam_to_num,
                                  text_extractor,
                                  tokenizer,
                                  stopremove,
                                  count_vec,
                                  idf,
                                  clean_up])

In [29]:
cleaner = data_prep_pipe.fit(combined_df_2)

In [30]:
clean_data = cleaner.transform(combined_df_2)

In [31]:
clean_data = clean_data.select('label','features')

In [32]:
clean_data.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(18722,[266,482,9...|
|  0.0|(18722,[2,45,106,...|
|  0.0|(18722,[11,106,21...|
|  0.0|(18722,[3,18721],...|
|  0.0|(18722,[2,59,106,...|
|  0.0|(18722,[7,9,12,46...|
|  0.0|(18722,[2,18721],...|
|  0.0|(18722,[358,18721...|
|  0.0|(18722,[1,106,321...|
|  0.0|(18722,[0,2,9,213...|
|  0.0|(18722,[380,491,8...|
|  0.0|(18722,[2,783,187...|
|  0.0|(18722,[106,18721...|
|  0.0|(18722,[51,16704,...|
|  0.0|(18722,[0,1,3,10,...|
|  0.0|(18722,[0,10,90,1...|
|  0.0|(18722,[0,10,1872...|
|  0.0|(18722,[0,2,18721...|
|  0.0|(18722,[0,1,86,25...|
|  0.0|(18722,[12,18721]...|
+-----+--------------------+
only showing top 20 rows



In [33]:
clean_data.groupBy('label').count().show()

+-----+-----+
|label|count|
+-----+-----+
|  0.0|35841|
|  1.0|32473|
+-----+-----+



In [34]:
(train, test) = clean_data.randomSplit([0.7,0.3])

## Build Model

### Naive Bayes

In [35]:
nb = NaiveBayes()
prediction = nb.fit(train)
test_results = prediction.transform(test)
start_time = datetime.now()
train_time = datetime.now() - start_time  
test_results.show()
print(train_time)

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(18722,[0,1,2,3,4...|[-844.96257594922...|[1.0,3.1249067575...|       0.0|
|  0.0|(18722,[0,1,2,3,4...|[-879.67821422150...|[0.99999999980958...|       0.0|
|  0.0|(18722,[0,1,2,3,6...|[-177.76184014021...|[0.99999999908058...|       0.0|
|  0.0|(18722,[0,1,2,3,7...|[-1364.2679534025...|[1.0,5.4787251881...|       0.0|
|  0.0|(18722,[0,1,2,3,8...|[-543.52451854130...|[0.99999757602871...|       0.0|
|  0.0|(18722,[0,1,2,3,8...|[-611.78993982239...|[1.64882595820722...|       1.0|
|  0.0|(18722,[0,1,2,3,1...|[-596.23661614330...|[4.91595059890390...|       1.0|
|  0.0|(18722,[0,1,2,3,1...|[-762.93620611633...|[0.99999999999990...|       0.0|
|  0.0|(18722,[0,1,2,3,1...|[-479.44299123650...|[1.0,1.6365471699...|       0.0|
|  0.0|(18722,[0

In [36]:
test_results.groupBy('label', 'prediction').count().show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  0.0|       1.0| 1546|
|  0.0|       0.0| 9190|
|  1.0|       1.0| 7687|
|  1.0|       0.0| 2050|
+-----+----------+-----+



### Report

In [37]:
#important: need to cast to float type, and order by prediction, else it won't work
preds_and_labels = test_results.select(['prediction','label']).withColumn('label', F.col('label').cast(FloatType())).orderBy('prediction')

#select only prediction and label columns
preds_and_labels = preds_and_labels.select(['prediction','label'])

metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))
print(metrics.confusionMatrix().toArray())



[[9190. 1546.]
 [2050. 7687.]]


In [38]:
acc_eva = MulticlassClassificationEvaluator()
acc = acc_eva.evaluate(test_results)
print('Accuracy of model: {}'.format(acc))

Accuracy of model: 0.8240348648449818


### Logistic Regression

In [39]:
lg = LogisticRegression(maxIter=10, regParam=0.3)
pre_lg = lg.fit(train)
result_lg = pre_lg.transform(test)
start_time = datetime.now()
train_time = datetime.now() - start_time 
print(train_time)

0:00:00


In [40]:
result_lg.groupBy('label', 'prediction').count().show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  0.0|       1.0| 1111|
|  0.0|       0.0| 9625|
|  1.0|       1.0| 7552|
|  1.0|       0.0| 2185|
+-----+----------+-----+



### Report

In [41]:
#important: need to cast to float type, and order by prediction, else it won't work
preds_and_labels_lg = result_lg.select(['prediction','label']).withColumn('label', F.col('label').cast(FloatType())).orderBy('prediction')

#select only prediction and label columns
preds_and_labels_lg = preds_and_labels_lg.select(['prediction','label'])

metrics_lg = MulticlassMetrics(preds_and_labels_lg.rdd.map(tuple))
print(metrics_lg.confusionMatrix().toArray())



[[9625. 1111.]
 [2185. 7552.]]


In [42]:
acc_eva_lg = MulticlassClassificationEvaluator()
acc_lg = acc_eva_lg.evaluate(result_lg)
print('Accuracy of model: {}'.format(acc_lg))

Accuracy of model: 0.8381434568804969


#### Nhận xét:
- đối với việc sử dụng Pyspark, thời gian xử lý nhanh hơn, NB nhanh hơn về tốc độ xử lý so với logistic Regression
- Tuy nhiên kết quả thì logistic lại tốt hơn về độ chính xác

##### Như vậy Pyspark cho kết quả tốt hơn cả về thời gian xử lý lân độ chính xác