In [1]:
import findspark
findspark.init()

In [42]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import length
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from pyspark.ml.feature import CountVectorizer, IDF, StringIndexer
from pyspark.ml.classification import NaiveBayes, RandomForestClassifier, LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector
from pyspark.ml import Pipeline
from pyspark.sql.functions import lit
from bs4 import BeautifulSoup
from pyspark import keyword_only
import pyspark.sql.functions as F
from pyspark.mllib.evaluation import MulticlassMetrics
from datetime import datetime
from pyspark.ml import Transformer
from pyspark.sql.types import FloatType
from pyspark.ml.param.shared import HasInputCol, HasOutputCol 
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType 
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

In [3]:
spark = SparkSession.builder.appName('Rating').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/28 01:26:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Load Data

In [4]:
df1 = spark.read.csv("shopee_final.csv",header=True
                      ,inferSchema=True)

[Stage 1:===>                                                     (1 + 15) / 16]                                                                                

In [5]:
#df1 = df1.withColumn('class',lit('fake'))

In [6]:
df1.show(2)

23/03/28 01:27:03 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , processed_text, class
 Schema: _c0, processed_text, class
Expected: _c0 but found: 
CSV file: file:///Users/tony.ng/Documents/DS-ML/final_project/project1/shopee_final.csv
+---+--------------------+-------+
|_c0|      processed_text|  class|
+---+--------------------+-------+
|  0|miếng dán hơi_dầy...|neutral|
|  1|miếng dán rất_tồi...|neutral|
+---+--------------------+-------+
only showing top 2 rows



In [7]:
df1.count()

934543

In [8]:
df1.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- processed_text: string (nullable = true)
 |-- class: string (nullable = true)



In [9]:
df1.groupby('class').count().show()

+--------+------+
|   class| count|
+--------+------+
|positive|636052|
| neutral|112092|
|negative|186399|
+--------+------+



### Precprocess

In [10]:
df1 = df1.withColumn('length', length(df1['processed_text']))

In [11]:
df1.show(5)

23/03/28 01:27:04 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , processed_text, class
 Schema: _c0, processed_text, class
Expected: _c0 but found: 
CSV file: file:///Users/tony.ng/Documents/DS-ML/final_project/project1/shopee_final.csv
+---+--------------------+--------+------+
|_c0|      processed_text|   class|length|
+---+--------------------+--------+------+
|  0|miếng dán hơi_dầy...| neutral|    28|
|  1|miếng dán rất_tồi...| neutral|    57|
|  2|cường_lực trắng m...|negative|    23|
|  3|hàng cảm_quan đầu...|negative|    63|
|  4|chất_lượng cường_...|negative|   101|
+---+--------------------+--------+------+
only showing top 5 rows



In [12]:
df1.groupBy('class').mean().show()

23/03/28 01:27:05 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , processed_text, class
 Schema: _c0, processed_text, class
Expected: _c0 but found: 
CSV file: file:///Users/tony.ng/Documents/DS-ML/final_project/project1/shopee_final.csv
+--------+------------------+------------------+
|   class|          avg(_c0)|       avg(length)|
+--------+------------------+------------------+
|positive| 477209.3752444769|44.500778346973405|
| neutral| 449924.0700585233| 38.89566112827364|
|negative|443789.80439272744| 43.35922194469124|
+--------+------------------+------------------+



In [13]:
data = df1.select('processed_text','class','length')

In [14]:
data.show()

+--------------------+--------+------+
|      processed_text|   class|length|
+--------------------+--------+------+
|miếng dán hơi_dầy...| neutral|    28|
|miếng dán rất_tồi...| neutral|    57|
|cường_lực trắng m...|negative|    23|
|hàng cảm_quan đầu...|negative|    63|
|chất_lượng cường_...|negative|   101|
|kính chất_lượng k...|negative|    41|
|kính cường_lực dở...|negative|   126|
|bị_vỡ bắt_làm hoà...|negative|    32|
|kính bụi dính kín...|negative|    33|
|đóng hàng cường_l...| neutral|    32|
|hàng cường_lực xi...|negative|    61|
|không_vừa gần_chấ...|negative|    22|
|                  bé|negative|     2|
|         đo thử chán|negative|    11|
|   hàng miếng bảo_vệ|negative|    17|
|sản_phẩm miết rất...|negative|    89|
|chính_xác sản_phẩ...|negative|   151|
|     hàng hãng lưu_ý|negative|    15|
|chất_lượng sản_ph...|negative|   122|
|cường_lực mô_tả b...|negative|    82|
+--------------------+--------+------+
only showing top 20 rows



In [15]:
null_data = data.filter(data['processed_text'].isNull())

In [16]:
null_data.count()

33669

In [17]:
data = data.filter(data['processed_text'].isNotNull())

### Feature & Transform

In [18]:
class BsTextExtractor(Transformer, HasInputCol, HasOutputCol):
    
    @keyword_only
    def __init__(self, inputCol=None, outputCol=None):
        super(BsTextExtractor, self).__init__() 
        kwargs = self._input_kwargs 
        self.setParams(**kwargs)
        
    @keyword_only
    def setParams(self, inputCol=None, outputCol=None): 
        kwargs = self._input_kwargs
        return self._set(**kwargs)
    def _transform(self, dataset):
        def f(s):
            cleaned_text = BeautifulSoup(s).text 
            return cleaned_text
        t = StringType()
        out_col = self.getOutputCol()
        in_col = dataset[self.getInputCol()]
        return dataset.withColumn(out_col, udf(f, t)(in_col))

In [19]:
text_extractor = BsTextExtractor(inputCol="processed_text", outputCol="cleaned_text")
tokenizer = Tokenizer(inputCol='cleaned_text', outputCol='token_text')
stopremove= StopWordsRemover(inputCol='token_text', outputCol='stop_tokens')
count_vec = CountVectorizer(inputCol='stop_tokens', outputCol='c_vec')
idf = IDF(inputCol='c_vec', outputCol='tf_idf')
ham_spam_to_num = StringIndexer(inputCol='class',outputCol='label', handleInvalid='keep')

In [20]:
clean_up = VectorAssembler(inputCols =['tf_idf','length'],
                           outputCol='features')

In [21]:
data_prep_pipe = Pipeline(stages=[ham_spam_to_num,
                                  text_extractor,
                                  tokenizer,
                                  stopremove,
                                  count_vec,
                                  idf,
                                  clean_up])

In [22]:
cleaner = data_prep_pipe.fit(data)

                                                                                

23/03/28 01:27:34 WARN DAGScheduler: Broadcasting large task binary with size 1157.6 KiB




23/03/28 01:27:59 WARN DAGScheduler: Broadcasting large task binary with size 1158.6 KiB


                                                                                

In [23]:
clean_data = cleaner.transform(data)

In [24]:
clean_data = clean_data.select('label','features')

In [25]:
clean_data.show()

23/03/28 01:28:00 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
+-----+--------------------+
|label|            features|
+-----+--------------------+
|  2.0|(83245,[81,235,44...|
|  2.0|(83245,[81,139,23...|
|  1.0|(83245,[16,40,56,...|
|  1.0|(83245,[1,2,49,55...|
|  1.0|(83245,[2,25,61,9...|
|  1.0|(83245,[2,49,271,...|
|  1.0|(83245,[12,24,33,...|
|  1.0|(83245,[3,180,553...|
|  1.0|(83245,[81,166,27...|
|  2.0|(83245,[1,22,45,3...|
|  1.0|(83245,[0,1,108,2...|
|  1.0|(83245,[49,175,14...|
|  1.0|(83245,[45,83244]...|
|  1.0|(83245,[88,95,504...|
|  1.0|(83245,[1,235,889...|
|  1.0|(83245,[0,10,156,...|
|  1.0|(83245,[0,1,53,81...|
|  1.0|(83245,[1,221,367...|
|  1.0|(83245,[0,2,6,25,...|
|  1.0|(83245,[3,43,53,8...|
+-----+--------------------+
only showing top 20 rows



[Stage 26:>                                                         (0 + 1) / 1]                                                                                

In [26]:
clean_data.groupBy('label').count().show()

+-----+------+
|label| count|
+-----+------+
|  0.0|614122|
|  1.0|178959|
|  2.0|107793|
+-----+------+



In [27]:
(train, test) = clean_data.randomSplit([0.7,0.3])

## Buil Model

### Naive Bayes

In [30]:
nb = NaiveBayes()
prediction = nb.fit(train)
test_results = prediction.transform(test)
start_time = datetime.now()
train_time = datetime.now() - start_time  
test_results.show()
print(train_time)

23/03/28 01:29:29 WARN DAGScheduler: Broadcasting large task binary with size 6.9 MiB




23/03/28 01:29:56 WARN DAGScheduler: Broadcasting large task binary with size 6.8 MiB


                                                                                

23/03/28 01:29:57 WARN DAGScheduler: Broadcasting large task binary with size 8.8 MiB


[Stage 36:>                                                         (0 + 1) / 1]

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(83245,[0,1,2,3,5...|[-427.43693111719...|[1.0,5.7782993051...|       0.0|
|  0.0|(83245,[0,1,2,3,5...|[-461.99208736574...|[0.99999999999939...|       0.0|
|  0.0|(83245,[0,1,2,3,6...|[-519.25518085957...|[1.0,3.9799456631...|       0.0|
|  0.0|(83245,[0,1,2,3,6...|[-798.61314764267...|[1.0,2.7528084083...|       0.0|
|  0.0|(83245,[0,1,2,3,6...|[-399.71776716824...|[1.0,1.2549581970...|       0.0|
|  0.0|(83245,[0,1,2,3,6...|[-393.95755093125...|[1.0,9.2940590066...|       0.0|
|  0.0|(83245,[0,1,2,3,6...|[-150.78717162810...|[1.0,4.7692103554...|       0.0|
|  0.0|(83245,[0,1,2,3,6...|[-150.78717162810...|[1.0,4.7692103554...|       0.0|
|  0.0|(83245,[0,1,2,3,6...|[-150.78717162810...|[1.0,4.7692103554...|       0.0|
|  0.0|(83245,[0

                                                                                

In [31]:
test_results.groupBy('label', 'prediction').count().show()

23/03/28 01:30:46 WARN DAGScheduler: Broadcasting large task binary with size 8.8 MiB




23/03/28 01:31:11 WARN DAGScheduler: Broadcasting large task binary with size 8.8 MiB
+-----+----------+------+
|label|prediction| count|
+-----+----------+------+
|  2.0|       0.0|  8628|
|  1.0|       1.0| 32254|
|  0.0|       1.0|  8994|
|  1.0|       0.0|  6893|
|  2.0|       2.0| 15062|
|  2.0|       1.0|  8397|
|  1.0|       2.0| 14556|
|  0.0|       0.0|149839|
|  0.0|       2.0| 25417|
+-----+----------+------+



                                                                                

### Report

In [43]:
#important: need to cast to float type, and order by prediction, else it won't work
preds_and_labels = test_results.select(['prediction','label']).withColumn('label', F.col('label').cast(FloatType())).orderBy('prediction')

#select only prediction and label columns
preds_and_labels = preds_and_labels.select(['prediction','label'])

metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))
print(metrics.confusionMatrix().toArray())

23/03/28 01:34:17 WARN DAGScheduler: Broadcasting large task binary with size 8.8 MiB


                                                                                

23/03/28 01:34:41 WARN DAGScheduler: Broadcasting large task binary with size 8.8 MiB




23/03/28 01:35:08 WARN DAGScheduler: Broadcasting large task binary with size 8.8 MiB


                                                                                

23/03/28 01:35:09 WARN DAGScheduler: Broadcasting large task binary with size 8.8 MiB


[Stage 45:>                                                         (0 + 1) / 1]

[[149839.   8994.  25417.]
 [  6893.  32254.  14556.]
 [  8628.   8397.  15062.]]


                                                                                

In [54]:
acc_eva_nb = MulticlassClassificationEvaluator()
acc_nb = acc_eva_nb.evaluate(test_results)
print('Accuracy of model: {}'.format(acc_nb))

23/03/28 01:53:06 WARN DAGScheduler: Broadcasting large task binary with size 8.8 MiB




Accuracy of model: 0.750074693228309




### Logistic Regression

In [55]:
lg = LogisticRegression(maxIter=10, regParam=0.3)
pre_lg = lg.fit(train)
result_lg = pre_lg.transform(test)
start_time = datetime.now()
train_time = datetime.now() - start_time  
print(train_time)
result_lg.groupBy('label', 'prediction').count().show()

23/03/28 01:54:54 WARN DAGScheduler: Broadcasting large task binary with size 6.9 MiB




23/03/28 01:55:20 WARN DAGScheduler: Broadcasting large task binary with size 6.9 MiB


                                                                                

23/03/28 01:55:21 WARN DAGScheduler: Broadcasting large task binary with size 6.9 MiB


[Stage 82:>                                                       (0 + 16) / 16]

23/03/28 01:55:25 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/03/28 01:55:25 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS




23/03/28 01:55:49 WARN DAGScheduler: Broadcasting large task binary with size 6.9 MiB


                                                                                

23/03/28 01:55:49 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
23/03/28 01:55:49 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
23/03/28 01:55:50 WARN DAGScheduler: Broadcasting large task binary with size 6.9 MiB


[Stage 84:>                                                       (0 + 16) / 16]

23/03/28 01:55:51 WARN DAGScheduler: Broadcasting large task binary with size 6.9 MiB


                                                                                

23/03/28 01:55:51 WARN DAGScheduler: Broadcasting large task binary with size 6.9 MiB


[Stage 86:===>                                                    (1 + 15) / 16]

23/03/28 01:55:53 WARN DAGScheduler: Broadcasting large task binary with size 6.9 MiB


                                                                                

23/03/28 01:55:54 WARN DAGScheduler: Broadcasting large task binary with size 6.9 MiB




23/03/28 01:55:55 WARN DAGScheduler: Broadcasting large task binary with size 6.9 MiB


[Stage 89:>                                                         (0 + 4) / 4]                                                                                

23/03/28 01:55:56 WARN DAGScheduler: Broadcasting large task binary with size 6.9 MiB


[Stage 90:===>                                                    (1 + 15) / 16]

23/03/28 01:55:58 WARN DAGScheduler: Broadcasting large task binary with size 6.9 MiB


                                                                                

23/03/28 01:55:59 WARN DAGScheduler: Broadcasting large task binary with size 6.9 MiB


[Stage 92:===>                                                    (1 + 15) / 16]

23/03/28 01:56:00 WARN DAGScheduler: Broadcasting large task binary with size 6.9 MiB


                                                                                

23/03/28 01:56:01 WARN DAGScheduler: Broadcasting large task binary with size 6.9 MiB


[Stage 94:===>                                                    (1 + 15) / 16]

23/03/28 01:56:02 WARN DAGScheduler: Broadcasting large task binary with size 6.9 MiB


                                                                                

23/03/28 01:56:03 WARN DAGScheduler: Broadcasting large task binary with size 6.9 MiB


[Stage 96:===>                                                    (1 + 15) / 16]

23/03/28 01:56:04 WARN DAGScheduler: Broadcasting large task binary with size 6.9 MiB




23/03/28 01:56:04 WARN DAGScheduler: Broadcasting large task binary with size 6.9 MiB


[Stage 98:>                                                       (0 + 16) / 16][Stage 98:===>                                                    (1 + 15) / 16]

23/03/28 01:56:06 WARN DAGScheduler: Broadcasting large task binary with size 6.9 MiB


                                                                                

23/03/28 01:56:06 WARN DAGScheduler: Broadcasting large task binary with size 6.9 MiB




23/03/28 01:56:07 WARN DAGScheduler: Broadcasting large task binary with size 6.9 MiB


                                                                                

23/03/28 01:56:08 WARN DAGScheduler: Broadcasting large task binary with size 6.9 MiB


[Stage 102:===>                                                   (1 + 15) / 16]

23/03/28 01:56:09 WARN DAGScheduler: Broadcasting large task binary with size 6.9 MiB


                                                                                

0:00:00.000140
23/03/28 01:56:10 WARN DAGScheduler: Broadcasting large task binary with size 9.4 MiB




23/03/28 01:56:37 WARN DAGScheduler: Broadcasting large task binary with size 9.4 MiB
+-----+----------+------+
|label|prediction| count|
+-----+----------+------+
|  2.0|       0.0| 23868|
|  1.0|       1.0| 25386|
|  0.0|       1.0|  4173|
|  1.0|       0.0| 27467|
|  2.0|       2.0|  1745|
|  2.0|       1.0|  6474|
|  1.0|       2.0|   850|
|  0.0|       0.0|178960|
|  0.0|       2.0|  1117|
+-----+----------+------+



                                                                                

### Report

In [56]:
#important: need to cast to float type, and order by prediction, else it won't work
preds_and_labels_lg = result_lg.select(['prediction','label']).withColumn('label', F.col('label').cast(FloatType())).orderBy('prediction')

#select only prediction and label columns
preds_and_labels_lg = preds_and_labels_lg.select(['prediction','label'])

metrics_lg = MulticlassMetrics(preds_and_labels_lg.rdd.map(tuple))
print(metrics_lg.confusionMatrix().toArray())

23/03/28 01:56:58 WARN DAGScheduler: Broadcasting large task binary with size 9.4 MiB


                                                                                

23/03/28 01:57:23 WARN DAGScheduler: Broadcasting large task binary with size 9.4 MiB




23/03/28 01:57:50 WARN DAGScheduler: Broadcasting large task binary with size 9.4 MiB


                                                                                

23/03/28 01:57:50 WARN DAGScheduler: Broadcasting large task binary with size 9.4 MiB


[Stage 112:>                                                        (0 + 1) / 1]

[[178960.   4173.   1117.]
 [ 27467.  25386.    850.]
 [ 23868.   6474.   1745.]]


                                                                                

In [57]:
acc_eva_lg = MulticlassClassificationEvaluator()
acc_lg = acc_eva_lg.evaluate(result_lg)
print('Accuracy of model: {}'.format(acc_lg))

23/03/28 01:58:09 WARN DAGScheduler: Broadcasting large task binary with size 9.4 MiB




Accuracy of model: 0.7132097976225282


                                                                                

#### Nhận xét:
- đối với việc sử dụng Pyspark, thời gian xử lý nhanh hơn
- Model Naive bayes cũng cho kết quả tốt hơn so với sử dụng ML thông thường cho bài toán.