# Introduction 

- First we build a baseline model using basic Spark API. 
- Secon we fine tune the model using state of art technique using Spark NLP. 

In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('classification').getOrCreate()

In [32]:
from pyspark.sql.functions import lower, col, udf, lit, regexp_replace
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
from pyspark.sql.types import IntegerType, StringType, StructType, StructField

from pyspark.ml.classification import LogisticRegression, \
                RandomForestClassifier, NaiveBayes, LinearSVC, GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.pipeline import Pipeline

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [4]:
df = spark.read.csv('Tweet_sentiment_clean.csv',\
                     header=True, inferSchema=True)

In [5]:
df1 = df.sample(0.1)

In [6]:
df1.show(3)
print('Train data size:\t', df1.count())

+---------+--------------------+
|sentiment|                text|
+---------+--------------------+
|        0|user dive mani ti...|
|        0|spring break plai...|
|        0|user day didnt ge...|
+---------+--------------------+
only showing top 3 rows

Train data size:	 161067


In [7]:
print('Train and test data schema')
df1.printSchema()

df1.groupby('sentiment').count().show()

Train and test data schema
root
 |-- sentiment: integer (nullable = true)
 |-- text: string (nullable = true)

+---------+-----+
|sentiment|count|
+---------+-----+
|        4|80630|
|        0|80437|
+---------+-----+



In [8]:
df1.where(df1['sentiment'].isNull()).count()

0

In [9]:
index = StringIndexer(inputCol='sentiment', outputCol='target')

In [10]:
df1 = index.fit(df1).transform(df1).select('text', 'target')
df1.show(3)

+--------------------+------+
|                text|target|
+--------------------+------+
|user dive mani ti...|   1.0|
|spring break plai...|   1.0|
|user day didnt ge...|   1.0|
+--------------------+------+
only showing top 3 rows



In [11]:
df1.groupby('target').count().show()

+------+-----+
|target|count|
+------+-----+
|   0.0|80630|
|   1.0|80437|
+------+-----+



# Preprocessing 

In [12]:
def preprocess(column):
    
    # Lowering the capital letters 
    column = lower(column)
    
    # Replacing user name 
    column = regexp_replace(column, r'@[^\s]+', 'USER')
    
    # Replacing url 
    column = regexp_replace(column, r'https?://\S+|www\.\S+', 'URL')
    
    # Replacing ayyyyy -> ayy 
    column = regexp_replace(column, r'(.)\1\1+', r'\1\1')
    
    # Replacing other than alphabets 
    column = regexp_replace(column, r'[^a-zA-Z\d\s]', '')
    column = regexp_replace(column, r'\d+', 'NUM')
    
    return column

In [13]:
df1 = df1.withColumn('text', preprocess(col('text')))
df1.show(3)

+--------------------+------+
|                text|target|
+--------------------+------+
|user dive mani ti...|   1.0|
|spring break plai...|   1.0|
|user day didnt ge...|   1.0|
+--------------------+------+
only showing top 3 rows



# Model building 

In [14]:
train, test = df1.randomSplit([0.7, 0.3])

In [35]:
train.show(3)
test.show(3)
test.count(), train.count()

+--------------------+------+
|                text|target|
+--------------------+------+
|NUM good weekend sad|   1.0|
|      NUM still done|   1.0|
|NUM went fli toy ...|   1.0|
+--------------------+------+
only showing top 3 rows

+--------------------+------+
|                text|target|
+--------------------+------+
|NUM last night ka...|   1.0|
|NUMhh friday fune...|   1.0|
|NUMrgh hate cop n...|   1.0|
+--------------------+------+
only showing top 3 rows



(48129, 112938)

In [17]:
print('Total number of stopwords: \t', len(StopWordsRemover().getStopWords()))
print('Example stopwords: \t\t', StopWordsRemover().getStopWords()[:5])

Total number of stopwords: 	 181
Example stopwords: 		 ['i', 'me', 'my', 'myself', 'we']


In [18]:
evaluator = MulticlassClassificationEvaluator(labelCol='target', metricName='accuracy')

In [19]:
tokenizer = Tokenizer(inputCol='text', outputCol='text_token')
remover = StopWordsRemover(inputCol='text_token', outputCol='text_trim')

hashingTF = HashingTF(inputCol='text_trim', outputCol='raw_feat', numFeatures=10000)
idf = IDF(inputCol='raw_feat', outputCol='features')

In [20]:
def pipeline(model):
    pipeline_model = Pipeline(stages=[tokenizer, remover, hashingTF, idf, model])
    return pipeline_model

In [132]:
ridge = LogisticRegression(labelCol='target', 
                        maxIter=20, 
                        regParam=0.7, 
                        elasticNetParam=0)

pipeline_ridge = pipeline(ridge)

model = pipeline_ridge.fit(train)
pred = model.transform(test)
score = evaluator.evaluate(pred)
print(score)

0.7393231180885089


In [131]:
lasso = LogisticRegression(labelCol='target', 
                        maxIter=20, 
                        regParam=0.002, 
                        elasticNetParam=1)

pipeline_lasso  = pipeline(lasso)

model = pipeline_lasso.fit(train)
pred = model.transform(test)
score = evaluator.evaluate(pred)
print(score)

0.7513849112611577


In [141]:
nb = NaiveBayes(labelCol='target', smoothing=400)

pipeline_nb = pipeline(nb)

model = pipeline_nb.fit(train)
pred = model.transform(test)
score = evaluator.evaluate(pred)
print(score)

0.7439220686915986


In [30]:
rf = RandomForestClassifier(labelCol='target', 
                            maxDepth=30,
                            maxBins=5,
                            numTrees=10)

pipeline_rf = pipeline(rf)

model = pipeline_rf.fit(train)
pred = model.transform(test)
score = evaluator.evaluate(pred)
print(score)

0.6990172245423757


In [33]:
gb = GBTClassifier(labelCol='target')

pipeline_gb = pipeline(gb)

model = pipeline_gb.fit(train)
pred = model.transform(test)
score = evaluator.evaluate(pred)
print(score)

0.6752269941199692


# Grid search 

In [38]:
nb = NaiveBayes(labelCol='target')
pipeline_nb = pipeline(nb)

paramGrid = ParamGridBuilder() \
    .addGrid(hashingTF.numFeatures, [10000, 100000]) \
    .addGrid(nb.smoothing, [200, 400, 600]) \
    .build()

In [39]:
model_names = {'Naive Bayes' : pipeline_nb}

for name in model_names:
    crossval = CrossValidator(estimator=model_names[name],
                              estimatorParamMaps=paramGrid,
                              evaluator=evaluator,
                              numFolds=5)
    
    model = crossval.fit(df1)
    pred = model.transform(df1)
    score = evaluator.evaluate(pred)
    print(name.ljust(30), score)

Naive Bayes                    0.7753605642372429


# Future direction 

- Emoji and emoticons 
- Stacking in pipeline 