In [1]:
!pip install pyspark
# !pip install demoji

Collecting pyspark
  Downloading pyspark-3.1.2.tar.gz (212.4 MB)
[K     |████████████████████████████████| 212.4 MB 56 kB/s 
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 52.8 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880768 sha256=ca651891a93a3430e2500eda7916062380393a5d40b8a3285d95b792addb6c3a
  Stored in directory: /root/.cache/pip/wheels/a5/0a/c1/9561f6fecb759579a7d863dcd846daaa95f598744e71b02c77
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.2


In [2]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import *
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.pipeline import Pipeline, PipelineModel
from pyspark.sql.types import *

from pyspark.sql import DataFrame
from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params, TypeConverters
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable

import re
import string
# import demoji


import pickle



In [3]:
class TextTransformer(Transformer, HasInputCol, HasOutputCol, DefaultParamsReadable, DefaultParamsWritable):
    input_col = Param(Params._dummy(), "input_col", "input column name.", typeConverter=TypeConverters.toString)
    output_col = Param(Params._dummy(), "output_col", "output column name.", typeConverter=TypeConverters.toString)

    @keyword_only
    def __init__(self, input_col: str = "input", output_col: str = "output", ):
        super(TextTransformer, self).__init__()
        self._setDefault(input_col=None, output_col=None)
        kwargs = self._input_kwargs
        self.set_params(**kwargs)
        

    @keyword_only
    def set_params(self, input_col: str = "input", output_col: str = "output"):
        kwargs = self._input_kwargs
        self._set(**kwargs)

    def get_input_col(self):
        return self.getOrDefault(self.input_col)

    def get_output_col(self):
        return self.getOrDefault(self.output_col)


    def _transform(self, df: DataFrame):
        def preprocess_text(text, ) -> str:
            # try:
            #     text = demoji.replace_with_desc(str(text), sep = '', )
            # except:
            #     text = ''
            text = re.sub(r'\d+', '', str(text)).translate(str.maketrans( string.punctuation, ' '*len(string.punctuation)),).strip().lower()
            return text
        input_col = self.get_input_col()
        output_col = self.get_output_col()
        # The custom action: concatenate the integer form of the doubles from the Vector
        transform_udf = udf(preprocess_text, StringType())
        new_df = df.withColumn(output_col, transform_udf(input_col))
        return new_df


In [4]:
spark = SparkSession.builder\
    .appName("examples")\
    .master('local[3]')\
    .getOrCreate()



In [5]:
# Read in mnist_train.csv dataset
schema = StructType([StructField('free_text', StringType(), True), StructField('label_id', DoubleType(), True)])
# df = spark.read.schema(schema).option('header', 'true').csv('/content/drive/MyDrive/bigdata/uit-hsd/train.csv').repartition(3)


train = spark.read.schema(schema)\
.option('header', True)\
.csv('/content/drive/MyDrive/bigdata/uit-hsd/train.csv')

test = spark.read.schema(schema)\
.option('header', True)\
.csv('/content/drive/MyDrive/bigdata/uit-hsd/test.csv')


train = train.where((train.label_id == 0) | (train.label_id == 1) | (train.label_id  == 2))

test = test.where((test.label_id == 0) | (test.label_id == 1) | (test.label_id  == 2))

train = train.withColumn('label_id', when(train.label_id == 2, 1).otherwise(train.label_id))
test = test.withColumn('label_id', when(test.label_id == 2, 1).otherwise(test.label_id))

In [6]:
# over sampling the train set to deal with imbalance class
new_train = train.withColumn('n',when(train.label_id == 1, 4).otherwise(1))
new_train = new_train.withColumn('n', expr('explode(array_repeat(n,int(n)))')).select(['free_text', 'label_id'])
train = new_train

In [7]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, OneVsRest
from pyspark.ml.feature import IDF, StringIndexer, StopWordsRemover, CountVectorizer, RegexTokenizer

label_stringIdx = StringIndexer(inputCol="label_id", outputCol="label")
text_transformer = TextTransformer(input_col='free_text', output_col='free_text')
regex_tokenizer = RegexTokenizer(inputCol='free_text', outputCol="words", pattern="[^0-9a-z#+_]+")

count_vectorizer = CountVectorizer(inputCol='words', outputCol="countFeatures", minDF=5)
idf = IDF(inputCol='countFeatures', outputCol="features")
lr = LogisticRegression(featuresCol='features', labelCol="label", )

pipeline = Pipeline(stages=[
                            # label_stringIdx,
    text_transformer,
    regex_tokenizer,
    count_vectorizer,
    idf,
    lr])

In [9]:
label_fitted = label_stringIdx.fit(train)

In [10]:
train = label_fitted.transform(train)

In [11]:
model = pipeline.fit(train)

In [12]:
model.save('text_classifier')

In [14]:
loaded_model = PipelineModel.load('text_classifier')

In [15]:
predictions = loaded_model.transform(train).toPandas()

In [16]:
from sklearn.metrics import classification_report
print(classification_report(predictions['label'].values, predictions['prediction'].values))

              precision    recall  f1-score   support

         0.0       0.78      0.87      0.82     19861
         1.0       0.81      0.70      0.75     16536

    accuracy                           0.79     36397
   macro avg       0.80      0.78      0.79     36397
weighted avg       0.79      0.79      0.79     36397



In [18]:
test_predictions = loaded_model.transform(test).toPandas()

In [22]:
from sklearn.metrics import classification_report

print(classification_report(test_predictions['label_id'].values, test_predictions['prediction'].values))

              precision    recall  f1-score   support

         0.0       0.91      0.86      0.88      5536
         1.0       0.44      0.56      0.49      1123

    accuracy                           0.81      6659
   macro avg       0.67      0.71      0.69      6659
weighted avg       0.83      0.81      0.81      6659

