In [6]:
import findspark

findspark.init()

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import IDF, HashingTF, StopWordsRemover

from utils import normalizeContentDf, tokenizeDf

In [7]:
# Create Spark Session
spark = SparkSession.builder.appName('comment_nlp').config("spark.driver.memory", "6g").getOrCreate()

In [8]:
# Read raw data
rawData = spark.read.csv('./input/train.csv', inferSchema=True, header=True)
rawValidData = spark.read.csv('./input/validation.csv', inferSchema=True, header=True)
rawTestData = spark.read.csv('./input/test.csv', inferSchema=True, header=True)

# Split raw data
rawData.repartition(20).write.partitionBy("status").csv('./splitted/train', mode="overwrite")
rawValidData.repartition(20).write.partitionBy("status").csv('./splitted/valid', mode="overwrite")
rawTestData.repartition(20).write.partitionBy("status").csv('./splitted/test', mode="overwrite")

# Read splitted data
schemaDdl = "content STRING, status INTEGER"
sparkReader = spark.read.schema(schemaDdl)
trainingData = sparkReader.csv('./splitted/train')
validationData = sparkReader.csv('./splitted/valid')
testData = sparkReader.csv('./splitted/test')



In [9]:
# Normalize content
trainingData = normalizeContentDf(trainingData).filter("content != ''").select("content", "status").coalesce(3)
validationData = normalizeContentDf(validationData).filter("content != ''").select("content", "status").coalesce(3)
testData = normalizeContentDf(testData).filter("content != ''").select("content", "status").coalesce(3)

# Tokenize Vietnamese before process
trainingData = tokenizeDf(trainingData)
validationData = tokenizeDf(validationData)
testData = tokenizeDf(testData)

In [10]:
# Read Vietnamese Stopwords
stopwords_file = open("./vietnamese-stopwords.txt", "r")
stopwords_list = stopwords_file.read().split('\n')

# Define pipeline
stopwords_remover = StopWordsRemover(
    inputCol="content1",
    outputCol="content2",
    stopWords=stopwords_list,
)
hashing_tf = HashingTF(
    inputCol="content2",
    outputCol="term_frequency",
)
idf = IDF(
    inputCol="term_frequency",
    outputCol="features",
    minDocFreq=5
)
lr = LogisticRegression(labelCol="status")
sematic_analysis_pipeline = Pipeline(
    stages=[stopwords_remover, hashing_tf, idf, lr]
)

In [11]:
# Create model
model = sematic_analysis_pipeline.fit(trainingData)

21/09/18 23:00:26 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
21/09/18 23:05:52 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
21/09/18 23:07:24 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
21/09/18 23:07:24 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
21/09/18 23:11:49 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
21/09/18 23:11:50 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
21/09/18 23:11:51 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
21/09/18 23:11:52 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
21/09/18 23:11:52 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
21/09/18 23:11:53 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
21/09/18 23:11:54 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
21/09/18 23:11:

In [None]:
# Evaluate model
trained_df = model.transform(trainingData)
val_df = model.transform(validationData)
test_df = model.transform(testData)

trained_df.show()
val_df.show()
test_df.show()

evaluator = MulticlassClassificationEvaluator(labelCol="status", metricName="accuracy")
accuracy_val = evaluator.evaluate(val_df)
accuracy_test = evaluator.evaluate(test_df)

print(f"Validation accuracy: {accuracy_val*100:.5f}%")
print(f"Test accuracy: {accuracy_test*100:.5f}%")

In [13]:
# Save model
model.write().overwrite().save('./model_lr/')

21/09/18 23:13:28 WARN TaskSetManager: Stage 141 contains a task of very large size (4184 KiB). The maximum recommended task size is 1000 KiB.
