In [1]:
import findspark
findspark.init()
import pyspark as ps
import warnings
from pyspark.sql import SQLContext

In [2]:
import pandas as pd  
import numpy as np
cols = ['Target','id','date','query','user','tweet']
df = pd.read_csv("../trainingandtestdata/training.1600000.processed.noemoticon.csv",header=None, names=cols, encoding = "ISO-8859-1")
df.head()

Unnamed: 0,Target,id,date,query,user,tweet
0,0,1467810369,Mon Apr 06 22:19:45 PDT 2009,NO_QUERY,_TheSpecialOne_,"@switchfoot http://twitpic.com/2y1zl - Awww, t..."
1,0,1467810672,Mon Apr 06 22:19:49 PDT 2009,NO_QUERY,scotthamilton,is upset that he can't update his Facebook by ...
2,0,1467810917,Mon Apr 06 22:19:53 PDT 2009,NO_QUERY,mattycus,@Kenichan I dived many times for the ball. Man...
3,0,1467811184,Mon Apr 06 22:19:57 PDT 2009,NO_QUERY,ElleCTF,my whole body feels itchy and like its on fire
4,0,1467811193,Mon Apr 06 22:19:57 PDT 2009,NO_QUERY,Karoli,"@nationwideclass no, it's not behaving at all...."


# PreProcessing

This is a function used to remove Null values, Hyperlinks, Mentions, etc.

In [3]:
df.drop(['id','date','query','user'],axis=1,inplace=True)

In [4]:
df[df.Target == 0].head(10)

Unnamed: 0,Target,tweet
0,0,"@switchfoot http://twitpic.com/2y1zl - Awww, t..."
1,0,is upset that he can't update his Facebook by ...
2,0,@Kenichan I dived many times for the ball. Man...
3,0,my whole body feels itchy and like its on fire
4,0,"@nationwideclass no, it's not behaving at all...."
5,0,@Kwesidei not the whole crew
6,0,Need a hug
7,0,@LOLTrish hey long time no see! Yes.. Rains a...
8,0,@Tatiana_K nope they didn't have it
9,0,@twittera que me muera ?


In [5]:
from nltk.tokenize import WordPunctTokenizer
import re
from bs4 import BeautifulSoup

token = WordPunctTokenizer()
patern1 = r'@[A-Za-z0-9]+'
patern2 = r'https?://[A-Za-z0-9./]+'
combinedpatern = r'|'.join((patern1, patern2))
www_patern = r'www.[^ ]+'
negativedic = {"isn't":"is not", "aren't":"are not", "wasn't":"was not", "weren't":"were not",
                "haven't":"have not","hasn't":"has not","hadn't":"had not","won't":"will not",
                "wouldn't":"would not", "don't":"do not", "doesn't":"does not","didn't":"did not",
                "can't":"can not","couldn't":"could not","shouldn't":"should not","mightn't":"might not",
                "mustn't":"must not"}
negativepatern = re.compile(r'\b(' + '|'.join(negativedic.keys()) + r')\b')


def tweet_preprocessing(tweet):
    soup = BeautifulSoup(tweet, 'lxml')
    soup = soup.get_text()
    try:
        clean = soup.decode("utf-8-sig").replace(u"\ufffd", "?")
    except:
        clean = soup
    rip = re.sub(combinedpatern, '', clean)
    rip = re.sub(www_patern, '', rip)
    lower = rip.lower()
    neg = negativepatern.sub(lambda x: negativedic[x.group()], lower)
    alpha= re.sub("[^a-zA-Z]", " ", neg)
    w = [w for w  in token.tokenize(alpha) if len(w) > 1]
    return (" ".join(w)).strip()

res = []
for i in range(1600000):                                                                    
    res.append(tweet_preprocessing(df['tweet'][i]))

In [6]:
res = pd.DataFrame(res,columns=['tweet'])
res['Target'] = df.Target
res.head()
res.to_csv('tweet.csv',encoding='utf-8')
file = 'tweet.csv'
df = pd.read_csv(file,index_col=0)
df.head()

  mask |= (ar1 == a)


Unnamed: 0,tweet,Target
0,awww that bummer you shoulda got david carr of...,0
1,is upset that he can not update his facebook b...,0
2,dived many times for the ball managed to save ...,0
3,my whole body feels itchy and like its on fire,0
4,no it not behaving at all mad why am here beca...,0


# Model PipeLine

This model PipeLine consists of tokenizer, ngrams, TF-IDF and Linear Regression

In [7]:

from pyspark.ml.feature import HashingTF, IDF, RegexTokenizer, CountVectorizer, StopWordsRemover
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline


In [8]:
from pyspark.ml.feature import NGram, VectorAssembler,VectorSizeHint

def build_model(inputCol=["tweet","Target"], n=3):
    tokenizer = [Tokenizer(inputCol="tweet", outputCol="words")]
    ngrams = [
        NGram(n=i, inputCol="words", outputCol="{0}_grams".format(i))
        for i in range(1, n + 1)
    ]

    cv = [
        CountVectorizer(vocabSize=5460,inputCol="{0}_grams".format(i),
            outputCol="{0}_tf".format(i))
        for i in range(1, n + 1)
    ]
    idf = [IDF(inputCol="{0}_tf".format(i), outputCol="{0}_tfidf".format(i), minDocFreq=5) for i in range(1, n + 1)]
    hint1 = [VectorSizeHint(inputCol="1_tfidf",size=5460)]
    hint2 = [VectorSizeHint(inputCol="2_tfidf",size=5460)]
    hint3 = [VectorSizeHint(inputCol="3_tfidf",size=5460)]

    assembler = [VectorAssembler(
        inputCols=["{0}_tfidf".format(i) for i in range(1, n + 1)],
        outputCol="features"
    )]
    label_stringIdx = [StringIndexer(inputCol = "Target", outputCol = "label")]
    lr = [LogisticRegression(maxIter=100)]
    return Pipeline(stages=tokenizer + ngrams + cv + idf+ hint1 + hint2 + hint3 +  assembler + label_stringIdx + lr )

# Spliting Data

In [9]:
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('tweet.csv')
df = df.dropna()
(train_set, val_set) = df.randomSplit([0.98, 0.02], seed = 1996)

In [10]:
df.show(5)


+---+--------------------+------+
|_c0|               tweet|Target|
+---+--------------------+------+
|  0|awww that bummer ...|     0|
|  1|is upset that he ...|     0|
|  2|dived many times ...|     0|
|  3|my whole body fee...|     0|
|  4|no it not behavin...|     0|
+---+--------------------+------+
only showing top 5 rows



In [11]:

from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")


In [12]:
ngrampipelineFit = build_model().fit(train_set)
predictions = ngrampipelineFit.transform(val_set)
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(val_set.count())
rocnauc = evaluator.evaluate(predictions)

# Checking Accuracy of the model

In [13]:
print ("Accuracy Score:" + "{0:.3f}".format(accuracy))
print ("ROC-AUC: {0:.3f}".format(rocnauc))

Accuracy Score:0.809
ROC-AUC: 0.884


# Saving The Model

In [14]:
ngrampipelineFit.write().overwrite().save('./pipe1')

In [15]:
from pyspark.ml import PipelineModel
ngrampipelineFit = PipelineModel.load("pipe1/")

In [16]:
stages = ngrampipelineFit.stages
print(stages)

[Tokenizer_059692fb3911, NGram_f9f9d3a04f9e, NGram_ab4a693cecfa, NGram_2011bf4f0044, CountVectorizer_a5f8d832cbff, CountVectorizer_474478641fae, CountVectorizer_21887e045da3, IDF_95754bb7fafc, IDF_a23c51767eab, IDF_2ad9e4f59fbf, VectorSizeHint_6e75adc414ab, VectorSizeHint_ed92d1d4f8bb, VectorSizeHint_0f5b8d149453, VectorAssembler_c9e9de9283dc, StringIndexer_9d308fb24d55, LogisticRegressionModel: uid = LogisticRegression_5b9e360708ed, numClasses = 2, numFeatures = 16380]
