In [1]:
import pandas as pd  
import numpy as np
import matplotlib.pyplot as plt
cols = ['sentiment','id','date','query_string','user','text']
df = pd.read_csv("tweetproc.csv",header=None, names=cols, encoding='latin-1')
df.drop(['id','date','query_string','user'],axis=1,inplace=True)


# Cleaning the data

In [2]:
df['pre_clean_len'] = [len(t) for t in df.text]
from pprint import pprint
data_dict = {
    'sentiment':{
        'type':df.sentiment.dtype,
        'description':'sentiment class - 0:negative, 1:positive'
    },
    'text':{
        'type':df.text.dtype,
        'description':'tweet text'
    },
    'pre_clean_len':{
        'type':df.pre_clean_len.dtype,
        'description':'Length of the tweet before cleaning'
    },
    'dataset_shape':df.shape
}

# cleaning process: 
### 1.using beautiful soup to clean the data i.e. to remove html encoded data
### 2.using re to clear person-tags and urls
### 3.removing text encoding
### 4.removing #tags

In [3]:
from bs4 import BeautifulSoup
import re
from nltk.tokenize import WordPunctTokenizer
tok = WordPunctTokenizer()
urlcheck = r'https?://[A-Za-z0-9./]+'
cleaner = r'@[A-Za-z0-9]+'
combined_pat = r'|'.join((cleaner, urlcheck))
def tweet_cleaner(text):
    implement_soup = BeautifulSoup(text, 'lxml')
    done = implement_soup.get_text()
    data_stripped = re.sub(combined_pat, '', done)
    try:
        clean = data_stripped.decode("utf-8-sig").replace(u"\ufffd", "?")
    except:
        clean = data_stripped
    letters_only = re.sub("[^a-zA-Z]", " ", clean)
    lower_case = letters_only.lower()
    words = tok.tokenize(lower_case)
    return (" ".join(words)).strip()
testing = df.text[:100]
test_result = []
for t in testing:
    test_result.append(tweet_cleaner(t))

# the above function is used to clean all the data and then we append it to the dataframe
## the below code will take a lot of time to execute because of the file size

In [4]:
import warnings
warnings.filterwarnings("ignore")
print("this might take sometime.... wait!")
ranger = [0,400000,800000,1200000,1600000]
clean_tweet_texts = []
for i in range(ranger[0],ranger[1]):                                                                   
    clean_tweet_texts.append(tweet_cleaner(df['text'][i]))


this might take sometime.... wait!


In [5]:
clean_df = pd.DataFrame(clean_tweet_texts,columns=['text'])
clean_df['target'] = df.sentiment

In [6]:
clean_df.to_csv('tweet2.csv',encoding='utf-8')
csv = 'tweet2.csv'
my_df = pd.read_csv(csv,index_col=0)

In [7]:
my_df.target.value_counts()

0    400000
Name: target, dtype: int64

# Now we will use the cleaned dataset for further processing..

In [8]:
#import findspark
#findspark.init()
import pyspark as ps
import warnings
from pyspark.sql import SQLContext

In [9]:
try:
    # create SparkContext on all CPUs available: in my case I have 4 CPUs on my laptop
    sc = ps.SparkContext('local[4]')
    sqlContext = SQLContext(sc)
    print("Spark Context Created!!")
except ValueError:
    warnings.warn("Refering existing spark context")

Spark Context Created!!


In [10]:
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('tweet2.csv')
type(df)

pyspark.sql.dataframe.DataFrame

In [11]:
df = df.dropna()
#this drops all the useless(NA) containing data
df.count()

399266

In [12]:
(train_set, val_set, test_set) = df.randomSplit([0.98, 0.01, 0.01], seed = 2000)

In [13]:
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF
from pyspark.ml.feature import IDF
from pyspark.ml.feature import Tokenizer 
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

In [14]:

tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashtf = HashingTF(numFeatures=2**16, inputCol="words", outputCol='tf')
idf = IDF(inputCol='tf', outputCol="features", minDocFreq=5) 
label_stringIdx = StringIndexer(inputCol = "target", outputCol = "label")
pipeline = Pipeline(stages=[tokenizer, hashtf, idf, label_stringIdx])
Train_set_to_pipelineFit = pipeline.fit(train_set)
train_df = Train_set_to_pipelineFit.transform(train_set)
val_df = Train_set_to_pipelineFit.transform(val_set)
train_df.show()

+---+--------------------+------+--------------------+--------------------+--------------------+-----+
|_c0|                text|target|               words|                  tf|            features|label|
+---+--------------------+------+--------------------+--------------------+--------------------+-----+
|  0|awww that s a bum...|     0|[awww, that, s, a...|(65536,[8436,8847...|(65536,[8436,8847...|  0.0|
|  1|is upset that he ...|     0|[is, upset, that,...|(65536,[1444,2071...|(65536,[1444,2071...|  0.0|
|  2|i dived many time...|     0|[i, dived, many, ...|(65536,[2548,2888...|(65536,[2548,2888...|  0.0|
|  3|my whole body fee...|     0|[my, whole, body,...|(65536,[158,11650...|(65536,[158,11650...|  0.0|
|  4|no it s not behav...|     0|[no, it, s, not, ...|(65536,[1968,4488...|(65536,[1968,4488...|  0.0|
|  5|  not the whole crew|     0|[not, the, whole,...|(65536,[8026,3398...|(65536,[8026,3398...|  0.0|
|  6|          need a hug|     0|      [need, a, hug]|(65536,[17625,308..

In [15]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(maxIter=100)
lrModel = lr.fit(train_df)
predictions = lrModel.transform(val_df)

In [16]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
e1 = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
e1.evaluate(predictions)

0.0

In [17]:
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(val_set.count())
accuracy

1.0

In [18]:
%%time
from pyspark.ml.feature import CountVectorizer

tokenizer = Tokenizer(inputCol="text", outputCol="words")
cv = CountVectorizer(vocabSize=2**16, inputCol="words", outputCol='cv')
idf = IDF(inputCol='cv', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
label_stringIdx = StringIndexer(inputCol = "target", outputCol = "label")
lr = LogisticRegression(maxIter=100)
pipeline = Pipeline(stages=[tokenizer, cv, idf, label_stringIdx, lr])

pipelineFit = pipeline.fit(train_set)
predictions = pipelineFit.transform(val_set)
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(val_set.count())
roc_auc = e1.evaluate(predictions)

print ("Accuracy Score: {0:.4f}".format(accuracy))
print ("ROC-AUC: {0:.4f}".format(roc_auc))

Accuracy Score: 1.0000
ROC-AUC: 0.0000
Wall time: 20.3 s


In [19]:
from pyspark.ml.feature import NGram, VectorAssembler
from pyspark.ml.feature import ChiSqSelector

def build_trigrams(inputCol=["text","target"], n=3):
    tokenizer = [Tokenizer(inputCol="text", outputCol="words")]
    ngrams = [
        NGram(n=i, inputCol="words", outputCol="{0}_grams".format(i))
        for i in range(1, n + 1)
    ]

    cv = [
        CountVectorizer(vocabSize=2**14,inputCol="{0}_grams".format(i),
            outputCol="{0}_tf".format(i))
        for i in range(1, n + 1)
    ]
    idf = [IDF(inputCol="{0}_tf".format(i), outputCol="{0}_tfidf".format(i), minDocFreq=5) for i in range(1, n + 1)]

    assembler = [VectorAssembler(
        inputCols=["{0}_tfidf".format(i) for i in range(1, n + 1)],
        outputCol="rawFeatures"
    )]
    label_stringIdx = [StringIndexer(inputCol = "target", outputCol = "label")]
    selector = [ChiSqSelector(numTopFeatures=2**14,featuresCol='rawFeatures', outputCol="features")]
    lr = [LogisticRegression(maxIter=100)]
    return Pipeline(stages=tokenizer + ngrams + cv + idf+ assembler + label_stringIdx+selector+lr)

In [None]:
%%time
trigram_train_set_pipelineFit = build_trigrams().fit(train_set)
predictions = trigram_train_set_pipelineFit.transform(val_set)
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(train_set.count())

In [None]:
# print accuracy, roc_auc
print ("Accuracy Score: {0:.4f}".format(accuracy))

In [None]:

from pyspark.ml.feature import NGram, VectorAssembler

def assemblyTweet(inputCol=["text","target"], n=3):
    tokenizer = [Tokenizer(inputCol="text", outputCol="words")]
    ngrams = [
        NGram(n=i, inputCol="words", outputCol="{0}_grams".format(i))
        for i in range(1, n + 1)
    ]

    cv = [
        CountVectorizer(vocabSize=5460,inputCol="{0}_grams".format(i),
            outputCol="{0}_tf".format(i))
        for i in range(1, n + 1)
    ]
    idf = [IDF(inputCol="{0}_tf".format(i), outputCol="{0}_tfidf".format(i), minDocFreq=5) for i in range(1, n + 1)]

    assembler = [VectorAssembler(
        inputCols=["{0}_tfidf".format(i) for i in range(1, n + 1)],
        outputCol="features"
    )]
    label_stringIdx = [StringIndexer(inputCol = "target", outputCol = "label")]
    lr = [LogisticRegression(maxIter=100)]
    return Pipeline(stages=tokenizer + ngrams + cv + idf+ assembler + label_stringIdx+lr)

In [None]:
%%time

assembly_pipelineFit = assemblyTweet().fit(train_set)
predictions_wocs = assembly_pipelineFit.transform(val_set)
accuracy_wocs = predictions_wocs.filter(predictions_wocs.label == predictions_wocs.prediction).count() / float(val_set.count())
roc_auc_wocs = e1.evaluate(predictions_wocs)

# print accuracy, roc_auc
print ("Accuracy Score: {0:.4f}".format(accuracy_wocs))
print ("ROC-AUC: {0:.4f}".format(roc_auc_wocs))

In [None]:
#applying transform on test_set

test_predictions = assembly_pipelineFit.transform(test_set)
#accuracy is calculated for test set
test_accuracy = test_predictions.filter(test_predictions.label == test_predictions.prediction).count() / float(test_set.count())
test_roc_auc = e1.evaluate(test_predictions)

# print accuracy, roc_auc
print ("Accuracy Score: {0:.4f}".format(test_accuracy))
test1=test_accuracy*100
print ("Accuracy is: ", test1,"%")