In [32]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder.appName('svm-app').getOrCreate()

df = spark.read.csv('/project/dataset.csv', header = True, inferSchema = True)

df = df.sample(withReplacement=True, fraction=0.1, seed=100)

In [None]:
df.printSchema()

In [2]:
df.count()

32229

In [33]:
df = df.select('title', 'summary', 'labels')
# df.printSchema()

In [34]:
map_label = {
    'E-Sport': 5,
    'การเมือง': 1,
    'กีฬา': 5,
    'กีฬาอื่นๆ': 5,
    'ข่าว': None,
    'ข่าวบันเทิง': 8,
    'คนดังนั่งเขียน': None,
    'คอลัมน์': None,
    'ตรวจหวย': 2,
    'ต่างประเทศ': 3,
    'ทองคำ': 2,
    'ทันข่าวเด่น': None,
    'ทั่วไทย': 9,
    'ท่องเที่ยว': 2,
    'นักข่าวพลเมือง': None,
    'บันเทิง': 8,
    'บันเทิงต่างประเทศ': 8,
    'บ้าน': None,
    'ผู้หญิง': None,
    'พระราชสำนัก': 9,
    'ฟุตซอล': 5,
    'ฟุตบอลยุโรป': 5,
    'ฟุตบอลโลก': 5,
    'ฟุตบอลไทย': 5,
    'ภัยพิบัติ': 10,
    'ภูมิภาค': 9,
    'มวย/MMA': 5,
    'ยานยนต์': 7,
    'วอลเลย์บอล': 5,
    'วิทยาศาสตร์เทคโนโลยี': 7,
    'ศิลปะ-บันเทิง': 8,
    'สกู๊ปไทยรัฐ': None,
    'สังคม': 1,
    'สิ่งแวดล้อม': 10,
    'หนัง': 8,
    'หนังสือพิมพ์': None,
    'หน้าแรก': None,
    'หวย': 2,
    'อาชญากรรม': 4,
    'อาหาร': 11,
    'เลือกตั้ง': 1,
    'เศรษฐกิจ': 2,
    'เอเชียนเกมส์': 5,
    'ไทยพีบีเอส อินไซส์': None,
    'ไทยรัฐเชียร์ไทยแลนด์': None,
    'ไทยลีก': 5,
    'ไลฟ์': 12,
    'ไลฟ์สไตล์': 12,
    'ไอที': 7,
    'ไอ้โหด': 4
}

label_enum = {
    1: 'การเมือง',
    2: 'เศรษฐกิจ',
    3: 'ต่างประเทศ',
    4: 'อาชญากรรม',
    5: 'กีฬา',
    # 6: 'การศึกษา'
    7: 'เทคโนโลยี',
    8: 'บันเทิง',
    9: 'ในประเทศ',
    10: 'สิ่งแวดล้อม',
    11: 'อาหาร',
    12: 'ไลฟ์สไตล์'
}

In [36]:
from collections import Counter
import numpy as np

def clean_labels(labels):
    if(labels is None):
        return None
    
    labels = labels.split(',')
    labels = [map_label[label] if label in map_label else None for label in labels ]
    labels = [label_enum[label] for label in labels if label is not None]
    
    if(len(labels) <= 0):
        return None
    
    # pick max freq label
    counter = Counter(labels)
    keys = list(counter.keys())
    values = list(counter.values())
    max_index = np.argmax(values)
    label = keys[max_index]
    
    return label

In [37]:
from pre_processing import pre_process

udf_pre_process = udf(pre_process, ArrayType(StringType()))
udf_clean_labels = udf(clean_labels, StringType())

df = df.withColumn("tokens", udf_pre_process("title", "summary"))

df = df.withColumn("label_pre", udf_clean_labels("labels"))

df = df.na.drop()

df.show()

+--------------------+--------------------+-------------------+--------------------+---------+
|               title|             summary|             labels|              tokens|label_pre|
+--------------------+--------------------+-------------------+--------------------+---------+
|'แซมบ้า' ได้เฮ ทด...|คูตินโญ-เนย์มาร์ ...|     กีฬา,ฟุตบอลโลก|[แซมบ้า, ได้, เฮ,...|     กีฬา|
|"ส.แบดมินตันฯลงโท...| หลังประชุมหารือก...|               กีฬา|[ส, แบดมินตัน, ฯ,...|     กีฬา|
|ผู้ต้องสงสัยสวมชุ...|หน่วยงานความมั่นค...|            ภูมิภาค|[ผู้ต้องสงสัย, สว...| ในประเทศ|
|ผู้ต้องสงสัยสวมชุ...|หน่วยงานความมั่นค...|            ภูมิภาค|[ผู้ต้องสงสัย, สว...| ในประเทศ|
|ยายชาวนาสุรินทร์ ...|คุณยายวัย 67 ปี อ...|       ข่าว,ทั่วไทย|[ยาย, ชาวนา, สุริ...| ในประเทศ|
|คนที่เข้ามาใหม่เข...|สุดซึ้ง! สมาชิกพั...|         ข่าว,สังคม|[คน, ที่, เข้ามา,...| การเมือง|
|คนที่เข้ามาใหม่เข...|สุดซึ้ง! สมาชิกพั...|         ข่าว,สังคม|[คน, ที่, เข้ามา,...| การเมือง|
|คนนับหมื่นแห่รับย...|คนกว่า 2 หมื่นแห่...|       

In [38]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, CountVectorizer

countVectors = CountVectorizer(inputCol="tokens", outputCol="features", vocabSize=10000, minDF=5)

label_stringIdx = StringIndexer(inputCol = "label_pre", outputCol = "label")

pipeline = Pipeline(stages=[countVectors, label_stringIdx])
# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(df)
dataset = pipelineFit.transform(df)

# dataset.show()


In [39]:
# set seed for reproducibility
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)

In [40]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)

lrModel = lr.fit(trainingData)

predictions = lrModel.transform(testData)

predictions.filter(predictions['prediction'] == 0) \
    .select("label", "prediction", "probability") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+-----+----------+------------------------------+
|label|prediction|                   probability|
+-----+----------+------------------------------+
|  0.0|       0.0|[0.9907679255633245,0.00182...|
|  0.0|       0.0|[0.9901946217032246,0.00428...|
|  0.0|       0.0|[0.9847903496746296,0.00546...|
|  0.0|       0.0|[0.9840291707636613,0.00245...|
|  3.0|       0.0|[0.980653875618762,0.001361...|
|  3.0|       0.0|[0.9789195826850762,0.00216...|
|  0.0|       0.0|[0.9776416070450392,0.00193...|
|  0.0|       0.0|[0.9775932311262264,0.00778...|
|  0.0|       0.0|[0.9774480271094073,0.00677...|
|  0.0|       0.0|[0.9758049064868581,0.00520...|
+-----+----------+------------------------------+
only showing top 10 rows



In [41]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")

evaluator.evaluate(predictions)

0.7051871594396206

In [46]:
lrModel.save('/project/lrmodel')

In [51]:
from pyspark.ml.classification import LogisticRegressionModel

lrModel = LogisticRegressionModel.load('/project/lrmodel')

lrModel

LogisticRegressionModel: uid=LogisticRegression_e9b4b9db04c7, numClasses=10, numFeatures=10000