# Pyspark: Natural Language Processing with Disaster Tweets

https://www.kaggle.com/c/nlp-getting-started

In [1]:
import time
import numpy as np

In [2]:
import pyspark
spark = pyspark.sql.SparkSession.builder.appName("MyApp") \
            .config("spark.jars.packages", "com.microsoft.azure:synapseml:0.9.1") \
            .config("spark.jars.repositories", "https://mmlspark.azureedge.net/maven") \
            .getOrCreate()

print(spark)

<pyspark.sql.session.SparkSession object at 0x7fae1819e6d0>


In [3]:
trainPath = 'nlp-getting-started/train.csv'
testPath = 'nlp-getting-started/test.csv'

trainData = spark.read.format('csv').options(header='true', inferSchema='true', multiLine=True).load(trainPath)
testData = spark.read.format('csv').options(header='true', inferSchema='true', multiLine=True).load(testPath)

print('Number of row in Training:', trainData.count())
print('Number of row in Test:    ', testData.count())

Number of row in Training: 7613
Number of row in Test:     3263


## Preprocessing

Note that we only create several new features from existing features. Adding more new features clearly helps to understand more the dataset.

In [4]:
from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, HasInputCols, HasOutputCols, Param, Params, TypeConverters
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable  
from pyspark.ml import Pipeline 
import pyspark.sql.functions as F
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, Word2Vec, StringIndexer,OneHotEncoder, VectorAssembler, RobustScaler

### Processing Null

We create a custom transformer to replace Null. In the case of "keyword" and "location", we replace null value by a symbole e.g. "\$" not an empty string "" since OneHotEncoder has an error with an empty string.

In [5]:
class FillNanTransformer(Transformer, HasInputCols, DefaultParamsReadable, DefaultParamsWritable):
    nanReplacement = Param(Params._dummy(), "nanReplacement", "nanReplacement", typeConverter=TypeConverters.toString)
    
    @keyword_only
    def __init__(self, inputCols=None, nanReplacement=None):
        super(FillNanTransformer, self).__init__()
        self._setDefault(nanReplacement="")
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCols=None, nanReplacement=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)
    
    def getNanReplacement(self):
        return self.getOrDefault(self.nanReplacement)
    
    def _transform(self, dataset):
        nanReplacement = self.getNanReplacement()
        dataset = dataset.na.fill(value=nanReplacement,subset=self.getInputCols())
        return dataset

In [6]:
fillNanTransformer = FillNanTransformer(inputCols=["keyword", "location"], nanReplacement="$")
textFillNanTransformer = FillNanTransformer(inputCols=["text"], nanReplacement="")

### Processing urls in "text"

We remove urls from "text" and create a new column to verify if a tex contains an url.

In [7]:
class RemovePatternTransformer(Transformer, HasInputCol, HasOutputCol, DefaultParamsReadable, DefaultParamsWritable):
    pattern = Param(Params._dummy(), "pattern", "pattern", typeConverter=TypeConverters.toString)
    
    @keyword_only
    def __init__(self, inputCol=None, outputCol=None, pattern=None):
        super(RemovePatternTransformer, self).__init__()
        self._setDefault(pattern="")
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None, pattern=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def getPattern(self):
        return self.getOrDefault(self.pattern)
    
    def _transform(self, dataset):
        pattern = self.getPattern()
        dataset = dataset.withColumn(self.getOutputCol(), F.regexp_replace(F.col(self.getInputCol()), pattern, ""))
        return dataset
    
class CheckPatternTransformer(Transformer, HasInputCol, HasOutputCol):
    pattern = Param(Params._dummy(), "pattern", "pattern", typeConverter=TypeConverters.toString)
    
    @keyword_only
    def __init__(self, inputCol=None, outputCol=None, pattern=None):
        super(CheckPatternTransformer, self).__init__()
        self._setDefault(pattern="")
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None, pattern=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)
    
    def getPattern(self):
        return self.getOrDefault(self.pattern)
    
    def _transform(self, dataset):
        pattern = self.getPattern()
        dataset = dataset.withColumn(self.getOutputCol(), F.when(F.col(self.getInputCol()).rlike(pattern),1.).otherwise(0.))
        return dataset

In [8]:
removeUrlTransformer = RemovePatternTransformer(inputCol="text", outputCol="textNoUrl", pattern="(https?://\S+)")
checkUrlTransformer = CheckPatternTransformer(inputCol="text", outputCol="textIsContainedUrl", pattern="(https?://\S+)")

### Get lengths for "keyword" and "text" (without urls)

In [9]:
class GetLengthTransformer(Transformer, HasInputCols, HasOutputCols):
    @keyword_only
    def __init__(self, inputCols=None, outputCols=None):
        super(GetLengthTransformer, self).__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCols=None, outputCols=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)
    
    def _transform(self, dataset):
        for inputCol, outputCol in zip(self.getInputCols(), self.getOutputCols()):
            dataset = dataset.withColumn(outputCol, F.length(inputCol))
        return dataset

In [10]:
getLengthTransformer = GetLengthTransformer(inputCols=["keyword","textNoUrl"], outputCols=["keywordLen", "textNoUrlLen"])

### Processing discrete features: "keyword", "location" and length features

Indexing "keyword" and "location"

In [11]:
keywordIndexer = StringIndexer(inputCol="keyword", outputCol="keywordIndex", handleInvalid="keep")
locationIndexer = StringIndexer(inputCol="location", outputCol="locationIndex", handleInvalid="keep")

In [12]:
oneHotEncoder = OneHotEncoder(inputCols=["keywordIndex", "locationIndex", "textIsContainedUrl"],
                              outputCols=["keywordVec", "locationVec", "textIsContainedUrlVec"],
                              handleInvalid="keep")

### Processing "text" (without urls)

We remove special characters and stopwords, then use word2vec to obtain a vector of "textNoUrl"

In [13]:
regexTokenizer = RegexTokenizer(inputCol="textNoUrl", outputCol="textArrayWord", pattern="\\W")

stopWordsRemover = StopWordsRemover(inputCol="textArrayWord", outputCol="textNoSW")
word2Vec = Word2Vec(vectorSize=50, windowSize=10, minCount=0, inputCol="textNoSW", outputCol="textVec")

We concatenate strings in "keyword", "location", "textNoUrl" and then apply the same procedure as for "textNoUrl"

In [14]:
class ConcatenateTransformer(Transformer, HasInputCols, HasOutputCol):
    @keyword_only
    def __init__(self, inputCols=None, outputCol=None):
        super(ConcatenateTransformer, self).__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCols=None, outputCol=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def _transform(self, dataset):
        dataset = dataset.withColumn(self.getOutputCol(), F.col(self.getInputCols()[0]))
        for colName in self.getInputCols()[1:]:
            dataset = dataset.withColumn(self.getOutputCol(), 
                F.concat_ws('@', F.col(self.getOutputCol()), F.col(colName)))
        return dataset

In [15]:
concatStringTransformer = ConcatenateTransformer(inputCols=["keyword", "location", "textNoUrl"], outputCol="concatString")
concatStringRegexTokenizer = RegexTokenizer(inputCol="concatString", outputCol="concatStringArrayWord", pattern="\\W")
concatStringStopWordsRemover = StopWordsRemover(inputCol="concatStringArrayWord", outputCol="concatStringArrayWordNoSW")

We would like observe effects of removing stopwords by generating two vectors "concatStringArrayWord" containing stopwords and "concatStringArrayWordNoSW" without stopwords

In [16]:
concatStringWord2Vec = Word2Vec(vectorSize=50, windowSize=10, minCount=0, inputCol="concatStringArrayWord", outputCol="concatStringVec")
concatStringNoSWWord2Vec = Word2Vec(vectorSize=50, windowSize=10, minCount=0, inputCol="concatStringArrayWordNoSW", outputCol="concatStringNoSWVec")

### Combining several features

We would like to observe how these feature sets affect the performance. Note that we scale these feature sets by using RobustScaler.

**"discreteFeatures"**: we only use discrete features

In [17]:
discreteFeaturesAssembler = VectorAssembler(inputCols=["keywordVec", "locationVec", "textIsContainedUrlVec",
                                                      "keywordLen", "textNoUrlLen"], 
                                            outputCol="discreteFeatures")


discreteFeaturesRobustScaler = RobustScaler(inputCol="discreteFeatures", outputCol="discreteFeaturesScale",
                                            withScaling=True, withCentering=True, lower=0.25, upper=0.75)

**"discreteAndTextFeatures"**: we add features of text in "discreteFeatures". Note that this could decay the effects of these discrete features.

In [18]:
discreteAndTextFeaturesAssembler = VectorAssembler(inputCols=["discreteFeatures", "textVec"],
                                                   outputCol="discreteAndTextFeatures")

discreteAndTextFeaturesRobustScaler = RobustScaler(inputCol="discreteAndTextFeatures", outputCol="discreteAndTextFeaturesScale",
                                                   withScaling=True, withCentering=True, lower=0.25, upper=0.75)

### Combining all preprocessing stages

In [19]:
preprocessingPipeline = Pipeline(stages=[fillNanTransformer, textFillNanTransformer,
                                         removeUrlTransformer, regexTokenizer, stopWordsRemover, word2Vec,
                                         keywordIndexer, locationIndexer, checkUrlTransformer, getLengthTransformer, oneHotEncoder,
                                         concatStringTransformer, concatStringRegexTokenizer, concatStringStopWordsRemover, 
                                         concatStringWord2Vec, concatStringNoSWWord2Vec,
                                         discreteFeaturesAssembler, 
                                         discreteAndTextFeaturesAssembler,
                                         discreteFeaturesRobustScaler, discreteAndTextFeaturesRobustScaler
                                        ])

## Training

In [20]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier, GBTClassifier, MultilayerPerceptronClassifier, LinearSVC
from synapse.ml.lightgbm import LightGBMClassifier

labelCol = "target"
evaluator = BinaryClassificationEvaluator(labelCol=labelCol, rawPredictionCol="prediction", metricName="areaUnderROC")

In [None]:
preprocessingModel = preprocessingPipeline.fit(trainData)

trainDataPreprocessed = preprocessingModel.transform(trainData)
testDataPreprocessed = preprocessingModel.transform(testData)

In [None]:
trainSet, validSet = trainDataPreprocessed.randomSplit([0.9, 0.1], seed=2406)

### Several observations on our feature sets

**Important features by using RandomForestClassifier**

In [27]:
featuresCol = "discreteIndexFeaturesScale"

discreteIndexFeaturesAssembler = VectorAssembler(inputCols=["keywordIndex", "locationIndex", "textIsContainedUrl",
                                                            "keywordLen", "textNoUrlLen"], 
                                                 outputCol="discreteIndexFeatures")

discreteIndexFeaturesRobustScaler = RobustScaler(inputCol="discreteIndexFeatures", outputCol="discreteIndexFeaturesScale",
                                            withScaling=True, withCentering=True, lower=0.25, upper=0.75)

checkSet = discreteIndexFeaturesAssembler.transform(trainSet)
checkSet = discreteIndexFeaturesRobustScaler.fit(checkSet).transform(checkSet)

featuresImportanceModel = RandomForestClassifier(featuresCol=featuresCol, labelCol=labelCol).fit(checkSet)

for column in zip(discreteIndexFeaturesAssembler.getInputCols(), list(featuresImportanceModel.featureImportances)):
     print(f"{column[0]:20}: {column[1]}")

keywordIndex        : 0.18558627096125901
locationIndex       : 0.02047601483628147
textIsContainedUrl  : 0.38809280396066725
keywordLen          : 0.2615534413038534
textNoUrlLen        : 0.14429146893793882


We see that containing an url in "text" has the highest impact on the target.  
*Note that we can generate more features based on n-gram, number of urls, etc.*

**We observe scores based on several feature sets:**
* "discreteFeaturesScale": "keywordIndex", "locationIndex", "keywordLength", "textNoLinkLength", "containLink"
* "discreteAndTextFeaturesScale": "discreteFeaturesScale" and "vecText"
* "vecText" only
* "concatStringVec": concatenate "keyword", "location" and "text" (not containing urls), then apply Word2Vec
* "concatStringNoSWVec": concatenate "keyword", "location" and "text" (not containing urls and stopwords), then apply Word2Vec

In [33]:
featuresCols = ["discreteFeaturesScale", "discreteAndTextFeaturesScale", 
                "textVec", "concatStringVec", "concatStringNoSWVec"]

algorithmList = {"LR":   LogisticRegression(featuresCol=featuresCol, labelCol=labelCol),
                 "DTC":  DecisionTreeClassifier(featuresCol=featuresCol, labelCol=labelCol),
                 "RFC":  RandomForestClassifier(featuresCol=featuresCol, labelCol=labelCol),
                 "GBTC": GBTClassifier(featuresCol=featuresCol, labelCol=labelCol),
                 "LSVC": LinearSVC(featuresCol=featuresCol, labelCol=labelCol),
                 "LGBMC":LightGBMClassifier(featuresCol=featuresCol, labelCol=labelCol)
                }

for param in featuresCols:
    scores = []
    for name, algorithm in zip(algorithmList.keys(), algorithmList.values()):
        algorithm.setFeaturesCol(param)
        prediction = algorithm.fit(trainSet).transform(validSet)
        scores.append(evaluator.evaluate(prediction))
    print(f'Param {param:50}: {np.round(np.mean(scores), 5)}')

Param discreteFeaturesScale                             : 0.58467
Param discreteAndTextFeaturesScale                      : 0.69364
Param textVec                                           : 0.71249
Param concatStringVec                                   : 0.69641
Param concatStringNoSWVec                               : 0.7066


**We observe that "textVec" ("text" without urls and stopwords) produces the best result**.  
* Adding "keyword" and "location" into "text" does not improve the performance. An explaination is that "text" can contain both "keyword" and "location", adding more these words seems to harm the performance.
* Removing stopwords can lead to a better result.  
* Using more discrete features can help to gain some more points.

### Training on several classification algorithms

We use default hyper-parameters.

In [24]:
featuresCol = "textVec"

algorithmList = {"LR":   LogisticRegression(featuresCol=featuresCol, labelCol=labelCol),
                 "DTC":  DecisionTreeClassifier(featuresCol=featuresCol, labelCol=labelCol),
                 "RFC":  RandomForestClassifier(featuresCol=featuresCol, labelCol=labelCol),
                 "GBTC": GBTClassifier(featuresCol=featuresCol, labelCol=labelCol),
                 "MPC":  MultilayerPerceptronClassifier(featuresCol=featuresCol, labelCol=labelCol, layers=[50,2]),
                 "LSVC": LinearSVC(featuresCol=featuresCol, labelCol=labelCol,),
                 "LGBMC":LightGBMClassifier(featuresCol=featuresCol, labelCol=labelCol)
                }

In [25]:
for name, algorithm in zip(algorithmList.keys(), algorithmList.values()):
    startTime = time.time()
    model = algorithm.fit(trainSet)
    prediction = model.transform(validSet)
    score = evaluator.evaluate(prediction)
    print(f'{name:4}: {np.round(score,5)} in {np.round(time.time() - startTime, 3)}s')

LR  : 0.69002 in 16.081s
DTC : 0.69808 in 12.514s
RFC : 0.67623 in 12.112s
GBTC: 0.71931 in 19.075s
MPC : 0.69311 in 12.814s
LSVC: 0.69121 in 21.75s
LGBMC: 0.7213 in 7.369s


We observe that LightGBMClassifier (default hyper-parameters) produces the highest score.  
We train LightGBMClassifier with all data, **the score of test set is 0.74410**.

In [31]:
labelCol = "target"
featuresCol = "textVec"

lgbmc = LightGBMClassifier(boostingType='dart',
                           objective= 'binary',
                           metric= 'auc',
                           isUnbalance= True,
                           numIterations= 300,
                           labelCol="target",
                           featuresCol="textVec")

prediction = lgbmc.fit(trainSet).transform(validSet)
print(evaluator.evaluate(prediction))

ConnectionRefusedError: [Errno 61] Connection refused

**Tuning LightGBMClassifier** using TuneHyperparameters  
https://microsoft.github.io/SynapseML/docs/documentation/estimators/estimators_core/

Note that we need to convert our dataframe into a simple datafram with each column representing a feature and one column for target. Other types (e.g. vector, array) can lead to an error.

In [26]:
from synapse.ml.automl import *
from synapse.ml.train import *

from pyspark.ml.functions import vector_to_array
import pyspark.sql.functions as F
import re

trainSetAllHP = (trainDataPreprocessed.withColumn("feature", vector_to_array(featuresCol)))\
.select([labelCol]+ [F.col("feature")[i] for i in range(50)])

trainSetHP = (trainSet.withColumn("feature", vector_to_array(featuresCol)))\
.select([labelCol]+ [F.col("feature")[i] for i in range(50)])

validSetHP = (validSet.withColumn("feature", vector_to_array(featuresCol)))\
.select([labelCol]+ [F.col("feature")[i] for i in range(50)])

testSetHP = (testDataPreprocessed.withColumn("feature", vector_to_array(featuresCol)))\
.select([F.col("feature")[i] for i in range(50)])


# We remove "[]" in the column names.
trainSetAllHP = trainSetAllHP.select([F.col(col).alias(re.sub("[^0-9a-zA-Z$]+","",col)) for col in trainSetAllHP.columns])
trainSetHP = trainSetHP.select([F.col(col).alias(re.sub("[^0-9a-zA-Z$]+","",col)) for col in trainSetHP.columns])
validSetHP = validSetHP.select([F.col(col).alias(re.sub("[^0-9a-zA-Z$]+","",col)) for col in validSetHP.columns])
testSetHP = testSetHP.select([F.col(col).alias(re.sub("[^0-9a-zA-Z$]+","",col)) for col in testSetHP.columns])

In [27]:
from synapse.ml.automl import *
from synapse.ml.train import *
import sklearn.metrics as metrics

lgbmc = LightGBMClassifier(boostingType='dart',
                           objective= 'binary',
                           metric= 'auc',
                           isUnbalance= True,
                           numIterations= 300)

smlmodels = [lgbmc]
mmlmodels = [TrainClassifier(model=model, labelCol=labelCol) for model in smlmodels]

paramBuilder = (HyperparamBuilder()
.addHyperparam(lgbmc, lgbmc.learningRate, RangeHyperParam(0.01, 0.5))
.addHyperparam(lgbmc, lgbmc.maxDepth, DiscreteHyperParam([1,30]))
.addHyperparam(lgbmc, lgbmc.numLeaves, DiscreteHyperParam([10,200]))
.addHyperparam(lgbmc, lgbmc.featureFraction, RangeHyperParam(0.1, 1.0))
.addHyperparam(lgbmc, lgbmc.baggingFraction, RangeHyperParam(0.1, 1.0))
.addHyperparam(lgbmc, lgbmc.baggingFreq, RangeHyperParam(0, 3))
)

searchSpace = paramBuilder.build()

randomSpace = RandomSpace(searchSpace)

bestModel = TuneHyperparameters(evaluationMetric="AUC", models=mmlmodels, numFolds=2, 
                                numRuns=len(mmlmodels) * 2, parallelism=2, 
                                paramSpace=randomSpace.space(), seed=0).fit(trainSetHP)


prediction = bestModel.transform(validSetHP)
predLabel = np.array(prediction.select('scored_labels').collect()).squeeze()
trueLabel = np.array(prediction.select('target').collect()).squeeze()
print(metrics.roc_auc_score(trueLabel, predLabel))