In [1]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer,StringIndexer, RegexTokenizer,StopWordsRemover, IDF
from pyspark.ml.feature import CountVectorizerModel, IDFModel, StringIndexerModel
from pyspark.sql.functions import col, udf,regexp_replace,isnull
from pyspark.sql.types import StringType,IntegerType
from pyspark.ml.classification import NaiveBayes, RandomForestClassifier, LogisticRegression, DecisionTreeClassifier, GBTClassifier
from pyspark.ml.classification import NaiveBayesModel, RandomForestClassificationModel, LogisticRegressionModel, DecisionTreeClassificationModel, GBTClassificationModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import string
from pyspark.sql.functions import concat,concat_ws

In [5]:
model_path = "/data/model/"
train = spark.read.format("parquet").option("inferSchema", True).option("header", True).load("/data/train/tf_idf_train")
validate = spark.read.format("parquet").option("inferSchema", True).option("header", True).load("/data/train/tf_idf_valid")

                                                                                

### Random Forest

In [5]:
rf = RandomForestClassifier(featuresCol = 'tf_idf_features', labelCol = 'categoryIndex')
rfModel = rf.fit(train)

                                                                                

In [3]:
rf_save = model_path + "rf_model"
rfModel.save(rf_save)

In [7]:
rfModel = RandomForestClassificationModel.load(rf_save)
rf_predictions = rfModel.transform(validate)
rf_predictions.show(5)

                                                                                

+--------------------+-------------+--------------------+--------------------+----------+
|     tf_idf_features|categoryIndex|       rawPrediction|         probability|prediction|
+--------------------+-------------+--------------------+--------------------+----------+
|(262144,[0,1,2,3,...|          6.0|[2.70147797584139...|[0.13507389879206...|       6.0|
|(262144,[0,1,2,3,...|          0.0|[3.71300106617375...|[0.18565005330868...|       0.0|
|(262144,[0,1,2,3,...|          0.0|[3.21400412377093...|[0.16070020618854...|       6.0|
|(262144,[0,1,2,3,...|          0.0|[2.95372909216025...|[0.14768645460801...|       0.0|
|(262144,[0,1,2,3,...|          5.0|[3.10380177943187...|[0.15519008897159...|       0.0|
+--------------------+-------------+--------------------+--------------------+----------+
only showing top 5 rows



In [8]:
evaluator = MulticlassClassificationEvaluator(labelCol="categoryIndex", predictionCol="prediction", metricName="accuracy")
rf_accuracy = evaluator.evaluate(rf_predictions)
print("Accuracy of Random Forest is = %g"% (rf_accuracy))



Accuracy of Random Forest is = 0.300404


                                                                                

In [10]:
evaluator = MulticlassClassificationEvaluator(labelCol="categoryIndex", predictionCol="prediction", metricName="f1")
rf_f1 = evaluator.evaluate(rf_predictions)
print("F1 of Random Forest is = %g"% (rf_f1))



F1 of Random Forest is = 0.208187


                                                                                

### Decision Tree

In [11]:
dt = DecisionTreeClassifier(featuresCol = 'tf_idf_features', labelCol = 'categoryIndex', maxDepth = 3)
dtModel = dt.fit(train)

In [13]:
dt_save = model_path + "dt_model"
dtModel.save(dt_save)

In [14]:
dtModel = DecisionTreeClassificationModel.load(dt_save)
dt_predictions = dtModel.transform(validate)
dt_predictions.show(5)

+--------------------+-------------+--------------------+--------------------+----------+
|     tf_idf_features|categoryIndex|       rawPrediction|         probability|prediction|
+--------------------+-------------+--------------------+--------------------+----------+
|(262144,[0,1,2,3,...|          6.0|[2374.0,26.0,790....|[0.14384391662627...|       6.0|
|(262144,[0,1,2,3,...|          0.0|[32353.0,3672.0,1...|[0.23033931851514...|       0.0|
|(262144,[0,1,2,3,...|          0.0|[2374.0,26.0,790....|[0.14384391662627...|       6.0|
|(262144,[0,1,2,3,...|          0.0|[2374.0,26.0,790....|[0.14384391662627...|       6.0|
|(262144,[0,1,2,3,...|          5.0|[32353.0,3672.0,1...|[0.23033931851514...|       0.0|
+--------------------+-------------+--------------------+--------------------+----------+
only showing top 5 rows



In [15]:
evaluator = MulticlassClassificationEvaluator(labelCol="categoryIndex", predictionCol="prediction", metricName="accuracy")
dt_accuracy = evaluator.evaluate(dt_predictions)
print("Accuracy of Decision Tree is = %g"% (dt_accuracy))



Accuracy of Decision Tree is = 0.344771


                                                                                

In [16]:
evaluator = MulticlassClassificationEvaluator(labelCol="categoryIndex", predictionCol="prediction", metricName="f1")
dt_f1 = evaluator.evaluate(dt_predictions)
print("F1 of Decision Tree is = %g"% (dt_f1))



F1 of Decision Tree is = 0.252727


                                                                                

### Logistic Regression

In [3]:
lr_save = model_path + "lr_model"
lr = LogisticRegression(labelCol="categoryIndex", featuresCol="tf_idf_features", maxIter=10)
lrModel = lr.fit(train)

2022-07-12 09:13:51,260 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
2022-07-12 09:13:51,261 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
                                                                                

In [4]:
lrModel.save(lr_save)

2022-07-12 09:15:28,056 WARN scheduler.TaskSetManager: Stage 27 contains a task of very large size (48081 KB). The maximum recommended task size is 100 KB.
                                                                                

In [5]:
lrModel = LogisticRegressionModel.load(lr_save)
lr_predictions = lrModel.transform(validate)
lr_predictions.show(5)

                                                                                

+--------------------+-------------+--------------------+--------------------+----------+
|     tf_idf_features|categoryIndex|       rawPrediction|         probability|prediction|
+--------------------+-------------+--------------------+--------------------+----------+
|(262144,[0,1,2,3,...|          6.0|[47.3172670847114...|[1.14598360486372...|       6.0|
|(262144,[0,1,2,3,...|          0.0|[15.8609794101976...|[0.99836134720835...|       0.0|
|(262144,[0,1,2,3,...|          0.0|[25.6044959062234...|[0.99999987517797...|       0.0|
|(262144,[0,1,2,3,...|          0.0|[23.9841190843060...|[4.05183770238990...|      17.0|
|(262144,[0,1,2,3,...|          5.0|[-0.3161508569837...|[4.79096324255021...|      11.0|
+--------------------+-------------+--------------------+--------------------+----------+
only showing top 5 rows





In [6]:
evaluator = MulticlassClassificationEvaluator(labelCol="categoryIndex", predictionCol="prediction", metricName="accuracy")
lr_accuracy = evaluator.evaluate(lr_predictions)
print("Accuracy of Logistic Regression is = %g"% (lr_accuracy))



Accuracy of Logistic Regression is = 0.815551


                                                                                

In [7]:
evaluator = MulticlassClassificationEvaluator(labelCol="categoryIndex", predictionCol="prediction", metricName="f1")
lr_f1 = evaluator.evaluate(lr_predictions)
print("F1 of Logistic Regression is = %g"% (lr_f1))



F1 of Logistic Regression is = 0.812805


                                                                                

In [6]:
test_df = spark.read.format("csv").option("inferSchema", True).option("header", True).option("delimiter","\t").load("/data/test/test_raw")
indexer = StringIndexerModel.load(model_path + "string_indexer")
indexed = indexer.transform(test_df)

fullContents = indexed.select(concat_ws(' ', indexed.Title, indexed.Summary, indexed.Contents).alias("fullContents"), "Category", "categoryIndex")
only_str = fullContents.withColumn("only_str",regexp_replace(col('fullContents'), '\d+|“|”', ''))
only_letter = only_str.withColumn("only_letter",regexp_replace(col('only_str'), '\\p{Punct}', '')) 
regex_tokenizer = RegexTokenizer(inputCol="only_letter", outputCol="words")
raw_words = regex_tokenizer.transform(only_letter)

loadedCv = CountVectorizerModel.load(model_path + "/count_vectorizer")
countVectorizer_train = loadedCv.transform(raw_words)
idfModel = IDFModel.load(model_path + "/idf")
tf_idf_test = idfModel.transform(countVectorizer_train)

lr_save = model_path + "lr_model"
lrModel = LogisticRegressionModel.load(lr_save)
lr_predictions = lrModel.transform(tf_idf_test)
lr_predictions.show(5)

[Stage 16:>                                                         (0 + 1) / 1]

+--------------------+--------+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|        fullContents|Category|categoryIndex|            only_str|         only_letter|               words|            features|     tf_idf_features|       rawPrediction|         probability|prediction|
+--------------------+--------+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|Hỗ trợ người lao ...| Thời sự|          8.0|Hỗ trợ người lao ...|Hỗ trợ người lao ...|[hỗ, trợ, người, ...|(262144,[0,1,2,3,...|(262144,[0,1,2,3,...|[2.41615323401068...|[1.35525323396821...|       8.0|
|Đưa Hải Phòng trở...| Thời sự|          8.0|Đưa Hải Phòng trở...|Đưa Hải Phòng trở...|[đưa, hải, phòng,...|(262144,[0,1,2,3,...|(262144,[0,1,2,3,...|[3.81709821665486...|[8.4801554648

                                                                                

In [7]:
evaluator = MulticlassClassificationEvaluator(labelCol="categoryIndex", predictionCol="prediction", metricName="accuracy")
lr_accuracy = evaluator.evaluate(lr_predictions)
print("Accuracy of Logistic Regression in test set is = %g"% (lr_accuracy))



Accuracy of Logistic Regression in test set is = 0.75917


                                                                                

In [8]:
evaluator = MulticlassClassificationEvaluator(labelCol="categoryIndex", predictionCol="prediction", metricName="f1")
lr_f1 = evaluator.evaluate(lr_predictions)
print("F1 of Logistic Regression in test set is = %g"% (lr_f1))



F1 of Logistic Regression in test set is = 0.755934


                                                                                