# Introduction 

- First we build a baseline model using basic Spark API. 
- Secon we fine tune the model using state of art technique using Spark NLP. 

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.0.0.tar.gz (204.7 MB)
[K     |████████████████████████████████| 204.7 MB 17 kB/s s eta 0:00:01
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 42.9 MB/s eta 0:00:01
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.0.0-py2.py3-none-any.whl size=205044182 sha256=152b4fbb687bdbc2080572683e7292efbb1c87669adcc2fb5c6708376a0aaa5f
  Stored in directory: /root/.cache/pip/wheels/4e/c5/36/aef1bb711963a619063119cc032176106827a129c0be20e301
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.0
You should consider upgrading via the '/opt/conda/bin/python3.7 -m pip install --upgrade pip' command.[0m


In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('classification').getOrCreate()

In [33]:
from pyspark.sql.functions import lower, col, udf, lit, regexp_replace
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.sql.types import IntegerType, StringType, StructType, StructField

from pyspark.ml.classification import RandomForestClassifier, NaiveBayes, LinearSVC
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.pipeline import Pipeline

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [51]:
schema = StructType([StructField('id', IntegerType(), True), 
                      StructField('keyword', StringType(), True), 
                      StructField('location', StringType(), True), 
                      StructField('text', StringType(), True),
                      StructField('target', IntegerType(), True)
                     ])

df1 = spark.read.csv('/kaggle/input/nlp-getting-started/train.csv', header=True, schema=schema)
df1 = df1.where(~df1['id'].isNull()).select('target', 'text')

df2 = spark.read.csv('/kaggle/input/nlp-getting-started/test.csv', header=True, schema=schema)
df2 = df2.where(~df2['id'].isNull()).select('id', 'text')

print('Train data head')
df1.show(3)

print('Test data head')
df2.show(3)

print('Train data size:\t', df1.count(), 
      '\nTrain data size:\t', df2.count())

Train data head
+------+--------------------+
|target|                text|
+------+--------------------+
|     1|Our Deeds are the...|
|     1|Forest fire near ...|
|     1|All residents ask...|
+------+--------------------+
only showing top 3 rows

Test data head
+---+--------------------+
| id|                text|
+---+--------------------+
|  0|Just happened a t...|
|  2|Heard about #eart...|
|  3|there is a forest...|
+---+--------------------+
only showing top 3 rows

Train data size:	 7613 
Train data size:	 3263


In [52]:
print('Train and test data schema')
df1.printSchema(), df2.printSchema()

df1.groupby('target').count().show()

Train and test data schema
root
 |-- target: integer (nullable = true)
 |-- text: string (nullable = true)

root
 |-- id: integer (nullable = true)
 |-- text: string (nullable = true)

+------+-----+
|target|count|
+------+-----+
|  null|  437|
|     1| 3081|
|     0| 4095|
+------+-----+



In [53]:
df1 = df1.where(~df1['target'].isNull())
df1.count()
df1.show(3)

+------+--------------------+
|target|                text|
+------+--------------------+
|     1|Our Deeds are the...|
|     1|Forest fire near ...|
|     1|All residents ask...|
+------+--------------------+
only showing top 3 rows



# Preprocessing 

In [54]:
def preprocess(column):
    
    # Lowering the capital letters 
    column = lower(column)
    
    # Replacing user name 
    column = regexp_replace(column, r'@[^\s]+', 'USER')
    
    # Replacing url 
    column = regexp_replace(column, r'https?://\S+|www\.\S+', 'URL')
    
    # Replacing ayyyyy -> ayy 
    column = regexp_replace(column, r'(.)\1\1+', r'\1\1')
    
    # Replacing other than alphabets 
    column = regexp_replace(column, r'[^a-zA-Z\d\s]', '')
    column = regexp_replace(column, r'\d+', 'NUM')
    
    return column

In [55]:
df1 = df1.withColumn('text', preprocess(col('text')))
df2 = df2.withColumn('text', preprocess(col('text')))

df1.show(3)
df2.show(3)

+------+--------------------+
|target|                text|
+------+--------------------+
|     1|our deeds are the...|
|     1|forest fire near ...|
|     1|all residents ask...|
+------+--------------------+
only showing top 3 rows

+---+--------------------+
| id|                text|
+---+--------------------+
|  0|just happened a t...|
|  2|heard about earth...|
|  3|there is a forest...|
+---+--------------------+
only showing top 3 rows



# Model building 

In [56]:
train, test = df1.randomSplit([0.7, 0.3])

In [57]:
print('Total number of stopwords: \t', len(StopWordsRemover().getStopWords()))
print('Example stopwords: \t\t', StopWordsRemover().getStopWords()[:5])

Total number of stopwords: 	 181
Example stopwords: 		 ['i', 'me', 'my', 'myself', 'we']


In [58]:
evaluator = MulticlassClassificationEvaluator(labelCol='target', metricName='accuracy')

In [59]:
tokenizer = Tokenizer(inputCol='text', outputCol='text_token')
remover = StopWordsRemover(inputCol='text_token', outputCol='text_trim')

hashingTF = HashingTF(inputCol='text_trim', outputCol='raw_feat', numFeatures=10000)
idf = IDF(inputCol='raw_feat', outputCol='features')

In [60]:
rf = RandomForestClassifier(labelCol='target', 
                            maxDepth=16, 
                            numTrees=100)

nb = NaiveBayes(labelCol='target', smoothing=200)
svc = LinearSVC(labelCol='target', regParam=1, maxIter=20)

In [61]:
pipeline_rf = Pipeline(stages=[tokenizer, remover, hashingTF, idf, rf])
pipeline_nb = Pipeline(stages=[tokenizer, remover, hashingTF, idf, nb])
pipeline_svc = Pipeline(stages=[tokenizer, remover, hashingTF, idf, svc])

In [62]:
model_names = {'Random forest': pipeline_rf, 
               'Naive Bayes' : pipeline_nb, 
               'SVM classifier' : pipeline_svc}

for name in model_names:
    model = model_names[name].fit(train)
    pred = model.transform(test)
    score = evaluator.evaluate(pred)
    print(name.ljust(30), score)

Random forest                  0.6693434104865376
Naive Bayes                    0.7808219178082192
SVM classifier                 0.7529522909777988


In [63]:
paramGrid = ParamGridBuilder() \
    .addGrid(hashingTF.numFeatures, [10000, 100000]) \
    .addGrid(nb.smoothing, [100, 200, 300]) \
    .build()

In [64]:
model_names = {'Naive Bayes' : pipeline_nb}

for name in model_names:
    crossval = CrossValidator(estimator=model_names[name],
                              estimatorParamMaps=paramGrid,
                              evaluator=evaluator,
                              numFolds=5)
    
    model = crossval.fit(df1)
    pred = model.transform(df1)
    score = evaluator.evaluate(pred)
    print(name.ljust(30), score)

Naive Bayes                    0.8663600891861761


# Null text ? 

In [65]:
df1.where(df1['text'].isNull()).show()
df2.where(df2['text'].isNull()).show()

+------+----+
|target|text|
+------+----+
+------+----+

+-----+----+
|   id|text|
+-----+----+
| 7524|null|
|10103|null|
+-----+----+



In [66]:
pred_null = df2.where(df2['text'].isNull()).\
            withColumn('target', lit(0)).select('id', 'target')
pred_null.show()

df2 = df2.dropna()

+-----+------+
|   id|target|
+-----+------+
| 7524|     0|
|10103|     0|
+-----+------+



# Prediction 

In [67]:
pred_test = model.transform(df2).select('id', 'prediction')

predictions = pred_test.\
                withColumn('target', pred_test['prediction'].\
                cast('integer')).drop('prediction')

predictions.show(4)

+---+------+
| id|target|
+---+------+
|  0|     1|
|  2|     0|
|  3|     1|
|  9|     1|
| 11|     1|
+---+------+
only showing top 5 rows



In [74]:
predictions = predictions.union(pred_null)
predictions.count()

3265

In [27]:
predictions.toPandas().to_csv('submission.csv', index=False)

In [73]:
import pandas as pd
pd.read_csv('submission.csv').sample(5)

Unnamed: 0,id,target
1717,5788,1
2391,7997,0
395,1281,0
2739,9127,1
1840,6214,1


# Future direction 

- Emoji and Emoticons 
- Lementation 