# sparknlp-bert-train+infer

## Colab Setup

In [1]:
! pip install -q pyspark==3.2.0 spark-nlp==3.4.2

[K     |████████████████████████████████| 281.3 MB 40 kB/s 
[K     |████████████████████████████████| 142 kB 20.8 MB/s 
[K     |████████████████████████████████| 198 kB 49.3 MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [2]:
import sparknlp

spark = sparknlp.start(gpu = True, spark32=True) # for GPU training >> sparknlp.start(gpu = True) # for Spark 2.3 =>> sparknlp.start(spark23 = True)

from sparknlp.base import *
from sparknlp.annotator import *
from pyspark.ml import Pipeline
import pandas as pd
import os

print("Spark NLP version", sparknlp.version())
print("Apache Spark version:", spark.version)

spark

Spark NLP version 3.4.2
Apache Spark version: 3.2.0


## Config

In [3]:
class CFG:
    # Globals #
    EXP_ID = 'EXP001' 
    seed = 111

# Set Direcotry

LOG_DIR   = '/content/log/'
MODEL_DIR = f'/content/model/{CFG.EXP_ID}'
SUBMIT_DIR = '/content/submission.csv'

## Load Dataset

In [4]:
# Download files from PyThaiNLP wisesight-sentiment github

!wget  'https://github.com/PyThaiNLP/wisesight-sentiment/raw/master/kaggle-competition/train.txt' -q
!wget  'https://github.com/PyThaiNLP/wisesight-sentiment/raw/master/kaggle-competition/train_label.txt' -q
!wget  'https://github.com/PyThaiNLP/wisesight-sentiment/raw/master/kaggle-competition/test.txt' -q
!wget  'https://github.com/PyThaiNLP/wisesight-sentiment/raw/master/kaggle-competition/test_label.txt' -q

In [5]:
# Read dataframe

from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import row_number,lit,col
from pyspark.sql.window import Window
w = Window().orderBy(lit('A'))

textSchema = StructType([StructField("text", StringType(), True)])
labelSchema = StructType([StructField("label", StringType(), True)])

train =       (spark.read.option("header", False)
                    .csv('/content/train.txt', schema=textSchema)
                    .withColumn("id", row_number().over(w)))
train_label = (spark.read.option("header", False)
                    .csv('/content/train_label.txt', schema=labelSchema)
                    .withColumn("id", row_number().over(w)))
test =        (spark.read.option("header", False)
                    .csv('/content/test.txt', schema=textSchema)
                    .withColumn("id", row_number().over(w)))
test_label =  (spark.read.option("header", False)
                    .csv('/content/test_label.txt', schema=labelSchema)
                    .withColumn("id", row_number().over(w)))

In [6]:
# Join text with label

train_df = train.join(train_label, ['id'],'inner')
test_df = test.join(test_label, ['id'],'inner')

print(f"train_df: {train_df.count()}")
train_df.show(5)

print(f"test_df: {test_df.count()}")
test_df.show(5)

train_df: 24063
+---+--------------------+-----+
| id|                text|label|
+---+--------------------+-----+
|  1|ประเทศเราผลิตและส...|  neu|
|  2|                  คะ|  neu|
|  3|อิเหี้ยออมทำกูอยา...|  neg|
|  4|              😅😅😅|  neu|
|  5|สวัสดีวันพุธ แนน ...|  neu|
+---+--------------------+-----+
only showing top 5 rows

test_df: 2674
+---+--------------------+-----+
| id|                text|label|
+---+--------------------+-----+
|  1|ซื้อแต่ผ้าอนามัยแ...|  neg|
|  2|    ครับ #phithanbkk|  neu|
|  3|การด่าไปเหมือนได้...|  neg|
|  4|Cf clarins 5 ขวด ...|  neu|
|  5|ทานได้ค่ะ น้ำซุป ...|  neu|
+---+--------------------+-----+
only showing top 5 rows



In [7]:
# Count label
# Positive (pos), Neutral (neu), Negative (neg), Question (q)

print('train_df')
train_df.groupBy("label") \
    .count() \
    .orderBy(col("count").desc()) \
    .show()

print('test_df')
test_df.groupBy("label") \
    .count() \
    .orderBy(col("count").desc()) \
    .show()

train_df
+-----+-----+
|label|count|
+-----+-----+
|  neu|13105|
|  neg| 6140|
|  pos| 4300|
|    q|  518|
+-----+-----+

test_df
+-----+-----+
|label|count|
+-----+-----+
|  neu| 1456|
|  neg|  683|
|  pos|  478|
|    q|   57|
+-----+-----+



In [8]:
# set seed for reproducibility
(trainData, testData) = train_df.randomSplit([0.8, 0.2], seed = CFG.seed)
print("Train Dataset Count: " + str(trainData.count()))
print("Test Dataset Count: " + str(testData.count()))

Train Dataset Count: 19254
Test Dataset Count: 4809


## Model Pipeline

In [9]:

document_assembler = DocumentAssembler() \
    .setInputCol("text") \
    .setOutputCol("document")

word_segmenter = WordSegmenterModel.pretrained('wordseg_best', 'th')\
        .setInputCols("document")\
        .setOutputCol("token")
  
embeddings = DistilBertEmbeddings.pretrained("distilbert_embeddings_distilbert_base_th_cased","th") \
    .setInputCols(["document", "token"]) \
    .setOutputCol("embeddings")

embeddingsSentence = SentenceEmbeddings() \
    .setInputCols(["document", "embeddings"]) \
    .setOutputCol("sentence_embeddings") \
    .setPoolingStrategy("AVERAGE")
    
classsifierdl = ClassifierDLApproach()\
    .setInputCols(["sentence_embeddings"])\
    .setOutputCol("class")\
    .setLabelColumn("label")\
    .setMaxEpochs(40)\
    .setLr(0.001)\
    .setDropout(0.5)\
    .setBatchSize(8)\
    .setEnableOutputLogs(True)\
    .setRandomSeed(CFG.seed)\
    .setOutputLogsPath(LOG_DIR)

# ClassifierDLApproach(Default): lr=0.005, batchSize=64, dropou=0.5, maxEpochs=30

bert_clf_pipeline = Pipeline(stages=[
    document_assembler,
    word_segmenter,
    embeddings,
    embeddingsSentence,
    classsifierdl
])

wordseg_best download started this may take some time.
Approximate size to download 79.2 KB
[OK!]
distilbert_embeddings_distilbert_base_th_cased download started this may take some time.
Approximate size to download 177.1 MB
[OK!]


In [10]:
# Transform
# (Need to clean emoji)

finisher = Finisher() \
    .setInputCols(["token"]) \
    .setOutputCols(["tokens"]) \
    .setOutputAsArray(True) \
    .setCleanAnnotations(False)

transform_pipeline = Pipeline(stages=[
    document_assembler,
    word_segmenter, 
    finisher
])

transform_pipeline_run = transform_pipeline.fit(trainData)
transform_df = transform_pipeline_run.transform(trainData)

transform_df.show()

+---+--------------------+-----+--------------------+--------------------+--------------------+
| id|                text|label|            document|               token|              tokens|
+---+--------------------+-----+--------------------+--------------------+--------------------+
|  1|ประเทศเราผลิตและส...|  neu|[{document, 0, 48...|[{token, 0, 5, ปร...|[ประเทศ, เรา, ผลิ...|
|  3|อิเหี้ยออมทำกูอยา...|  neg|[{document, 0, 26...|[{token, 0, 6, อิ...|[อิเหี้ย, ออม, ทำ...|
|  5|สวัสดีวันพุธ แนน ...|  neu|[{document, 0, 22...|[{token, 0, 3, สว...|[สวัส, ด, ี, วัน,...|
|  7|เน็ตควายมากกูพูดจ...|  neg|[{document, 0, 45...|[{token, 0, 3, เน...|[เน็ต, ควาย, มาก,...|
|  8|ปากแดงกินฟรีค่ะ 😬😬|  neu|[{document, 0, 19...|[{token, 0, 2, ปา...|[ปาก, แดง, กินฟรี...|
|  9|เหล้าเบลล์รสชาติเ...|  neg|[{document, 0, 37...|[{token, 0, 4, เห...|[เหล้า, เบลล์รสชา...|
| 10|🚗💨💨 ซิ่งเป็นบา...|  neu|[{document, 0, 98...|[{token, 0, 0, ?,...|[?, ?, ?, ?, ?, ?...|
| 11|สนใจ ฟอจูนเนอร์ ส...|  pos|[{document, 0

## Run

In [11]:
# remove the existing logs

! rm -r {LOG_DIR}

rm: cannot remove '/content/log/': No such file or directory


In [12]:
%%time
# training will take some time due to Bert (use GPU runtime when possible)

bert_clf_pipelineModel = bert_clf_pipeline.fit(trainData)

CPU times: user 4.04 s, sys: 447 ms, total: 4.49 s
Wall time: 14min 31s


In [13]:
# Check log file

log_files = os.listdir(LOG_DIR)
log_files

['ClassifierDLApproach_41a1d955c589.log']

In [14]:
# Read log file

log_file_name = os.listdir(LOG_DIR)[0]

with open(f'{LOG_DIR}{log_file_name}', "r") as log_file :
    print(log_file.read())

Training started - epochs: 40 - learning_rate: 0.001 - batch_size: 8 - training_examples: 19254 - classes: 4
Epoch 0/40 - 3.77s - loss: 2797.774 - acc: 0.5729773 - batches: 2407
Epoch 1/40 - 3.45s - loss: 2754.4026 - acc: 0.5964603 - batches: 2407
Epoch 2/40 - 3.38s - loss: 2743.8276 - acc: 0.60390687 - batches: 2407
Epoch 3/40 - 3.36s - loss: 2736.3838 - acc: 0.6062968 - batches: 2407
Epoch 4/40 - 3.43s - loss: 2730.378 - acc: 0.60863465 - batches: 2407
Epoch 5/40 - 3.35s - loss: 2724.9817 - acc: 0.61066085 - batches: 2407
Epoch 6/40 - 3.36s - loss: 2720.3225 - acc: 0.6125312 - batches: 2407
Epoch 7/40 - 3.40s - loss: 2716.4514 - acc: 0.6140378 - batches: 2407
Epoch 8/40 - 3.35s - loss: 2712.6995 - acc: 0.61637574 - batches: 2407
Epoch 9/40 - 3.41s - loss: 2708.9097 - acc: 0.6184019 - batches: 2407
Epoch 10/40 - 3.34s - loss: 2705.147 - acc: 0.62011635 - batches: 2407
Epoch 11/40 - 3.33s - loss: 2701.493 - acc: 0.6214672 - batches: 2407
Epoch 12/40 - 3.39s - loss: 2698.0889 - acc: 0.6

In [15]:
preds = bert_clf_pipelineModel.transform(testData)
preds_df = preds.select('label','text',"class.result").toPandas()

print(preds_df)

     label                                               text result
0      neu                                                 คะ  [neu]
1      neu                                                😅😅😅  [neu]
2      neu  ก้อนขอบพระคุณมากๆนะคร้าบ ที่มาหาก้อนและชมเชยกา...  [neu]
3      pos                           อยากกินบาบีก้อนหรอ555555  [neu]
4      neu                                  เดวลากเจ้ามือไปคน  [neu]
...    ...                                                ...    ...
4804   neg                       คุณก็รู้ "ที่นี่ประเทศไทย"!!  [neu]
4805   neu           อยากได้...L'orealสีขาว 😘😘แดดพัทยาร้อนมาก  [neu]
4806   neu                                   คุนเจ้อยากฉันจัง  [neu]
4807   neg           เพิ่งไปกินมาเอง ไม่อร่อย ร้านอื่นเส้ 555  [neg]
4808   neu                                         กินฟรีป่าว  [neu]

[4809 rows x 3 columns]


In [16]:
# We are going to use sklearn to evalute the results on test dataset
from sklearn.metrics import classification_report

preds_df['result'] = preds_df['result'].apply(lambda x : x[0])

print (classification_report(preds_df['result'], preds_df['label']))

              precision    recall  f1-score   support

         neg       0.63      0.55      0.59      1413
         neu       0.83      0.64      0.72      3396
         pos       0.00      0.00      0.00         0
           q       0.00      0.00      0.00         0

    accuracy                           0.61      4809
   macro avg       0.37      0.30      0.33      4809
weighted avg       0.77      0.61      0.68      4809



  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


## Save Model

In [17]:
# Save a Spark NLP pipeline

bert_clf_pipelineModel.save(MODEL_DIR)

In [18]:
# # cd into saved dir and zip
# ! cd "/content/model" ; zip -r {MODEL_DIR}/my_nlp_pipeline.zip *

## Load Model

In [19]:
# Setting (Colab Setup to Load Dataset)

loaded_bert_clf_pipelineModel = PipelineModel.load(MODEL_DIR)

## Infer

In [20]:
# submission

preds = loaded_bert_clf_pipelineModel.transform(test_df)

preds_df = preds.select('id', 'text',"class.result").toPandas()

print(preds_df)

        id                                               text result
0        1  ซื้อแต่ผ้าอนามัยแบบเย็นมาค่ะ แบบว่าอีห่ากูนอนไ...  [neg]
1        2                                   ครับ #phithanbkk  [neu]
2        3  การด่าไปเหมือนได้บรรเทาความเครียดเฉยๆ แต่บีทีเ...  [neg]
3        4                              Cf clarins 5 ขวด 2850  [neu]
4        5  ทานได้ค่ะ น้ำซุป MK ต้มมาจากหัวผักกาด ซีอิ้วขา...  [neu]
...    ...                                                ...    ...
2669  2670  เล็กลงมา แต่ดีเท่าเดิม 😘 แวะมาหาได้ที่เซเว่นน๊...  [neu]
2670  2671  แพนด้าซิ่ง กับมุมมองที่แตกต่างอย่างลงตัวในสไตล...  [neu]
2671  2672                                          ไปเถอะดอม  [neu]
2672  2673          เจมส์ บอนด์ ขับโตโยต้าอัลติส 555555555555  [neg]
2673  2674                                จัดสิค่ะ บาร์บีก้อน  [neu]

[2674 rows x 3 columns]


In [21]:
# create submission_df
submission_df = preds_df[['id', 'result']].copy()

submission_df['result'] = submission_df['result'].str[0]
submission_df = submission_df.rename(columns={"result": "label"})

print(len(submission_df)) # Recheck 2674

submission_df

2674


Unnamed: 0,id,label
0,1,neg
1,2,neu
2,3,neg
3,4,neu
4,5,neu
...,...,...
2669,2670,neu
2670,2671,neu
2671,2672,neu
2672,2673,neg


In [22]:
# Save file

submission_df.to_csv(SUBMIT_DIR, index=False)

print(SUBMIT_DIR)

/content/submission.csv


## Evaluate

In [23]:
# We are going to use sklearn to evalute the results on test dataset
from sklearn.metrics import classification_report

solution = list(test_df.select('label').toPandas()['label'])

print (classification_report(submission_df['label'], solution))

              precision    recall  f1-score   support

         neg       0.62      0.54      0.58       778
         neu       0.82      0.63      0.72      1896
         pos       0.00      0.00      0.00         0
           q       0.00      0.00      0.00         0

    accuracy                           0.61      2674
   macro avg       0.36      0.29      0.32      2674
weighted avg       0.76      0.61      0.67      2674



  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
