In [1]:
import pyspark
sc = pyspark.SparkContext('local[*]')

# do something to prove it works
rdd = sc.parallelize(range(1000))
rdd.takeSample(False, 5)

[398, 383, 394, 631, 168]

In [2]:
import io
import csv

In [3]:
def parse_line(line):
    csv_reader = csv.reader(io.StringIO(line))
    data_line = next(csv_reader)
    return data_line[0], data_line[-1]

In [4]:
tweet_rdd = sc.textFile('./data/train.csv') \
    .map(parse_line) \
    .map(lambda record: (int(record[0]), record[1].split()))

### Tweet Processing Functions

Below are the definitions of the functions that are used to process the tweets before being sent on to feature extraction

In [5]:
!pip install --upgrade nltk
!pip install matplotlib

Collecting nltk
  Downloading nltk-3.2.1.tar.gz (1.1MB)
[K    100% |████████████████████████████████| 1.1MB 437kB/s 
[?25hBuilding wheels for collected packages: nltk
  Running setup.py bdist_wheel for nltk ... [?25l- \ | / - \ | / done
[?25h  Stored in directory: /home/jovyan/.cache/pip/wheels/55/0b/ce/960dcdaec7c9af5b1f81d471a90c8dae88374386efe6e54a50
Successfully built nltk
Installing collected packages: nltk
Successfully installed nltk-3.2.1


In [6]:
%matplotlib inline

import re
import matplotlib.pyplot as plt
from nltk.stem.porter import PorterStemmer
from nltk.corpus import stopwords
from nltk import bigrams



In [7]:
with open('./data/stopwords.txt') as stopword_file:
    stopwords = {stopword.strip(): 1 for stopword in stopword_file.readlines()}
    
porter_stemmer = PorterStemmer()

In [8]:
def basic_cleaning(tweet):
    clean_tweet = [word.lower().strip().strip('\'').strip('"') for word in tweet]
    clean_tweet = [word.replace('.', '').replace('?', '').replace(',', '') for word in clean_tweet]
    clean_tweet = [word.replace('#', '').replace('!', '').replace('\'', '') for word in clean_tweet]
    clean_tweet = [word.replace('"', '').replace('...', ' ').replace('..', ' ') for word in clean_tweet]
    clean_tweet = [word.replace('-', '').replace('.', ' ') for word in clean_tweet]
    clean_tweet = list(filter(lambda word: word != '', clean_tweet))
    
    return clean_tweet

In [9]:
def remove_non_alpha_starting_words(tweet):
    non_alpha_start_words_regex = '(^|\s)[^a-zA-Z]\w*($|\s)'
    clean_tweet = [re.sub(non_alpha_start_words_regex, '', word) for word in tweet]
    clean_tweet = list(filter(lambda word: word != '', clean_tweet))

    return clean_tweet

In [10]:
def remove_stopwords(tweet):
    tweet_without_stopwords = []
    for word in tweet:
        if word.find(' ') == -1:
            new_word = word if word not in stopwords else ''
        else:
            new_word = ' '.join([w for w in word.split() if w not in stopwords])
        tweet_without_stopwords.append(new_word)
    tweet_without_stopwords = list(filter(lambda word: word != '', tweet_without_stopwords))
    tweet_without_stopwords = list(set(tweet_without_stopwords))
    return tweet_without_stopwords

In [11]:
def replace_urls(tweet):
    http_url_regex = 'http(?s):\/\/.*'
    www_url_regex = 'www\.\w+.*'
    clean_tweet = [re.sub(http_url_regex, 'URL', word) for word in tweet]
    clean_tweet = [re.sub(www_url_regex, 'URL', word) for word in clean_tweet]
    clean_tweet = list(filter(lambda word: word != '', clean_tweet))
    
    return clean_tweet

In [12]:
def replace_user_handles(tweet):
    user_handle_regex = '@.*'
    clean_tweet = [re.sub(user_handle_regex, 'AT_USER', word) for word in tweet]
    clean_tweet = list(filter(lambda word: word != '', clean_tweet))

    return clean_tweet

In [13]:
def replace_repeated_characters(tweet):
    repeated_character_regex = '(\w)\\1{2,}'
    clean_tweet = [re.sub(repeated_character_regex, r'\1\1', word) for word in tweet]
    clean_tweet = list(filter(lambda word: word != '', clean_tweet))

    return clean_tweet

In [14]:
def generate_bigrams(tweet):
    bigram_tweet = bigrams(tweet)
    bigram_tweet = [bigram[0] + ' ' + bigram[1] for bigram in bigram_tweet]
    bigram_tweet.extend(tweet)
    return bigram_tweet

In [15]:
def stem_tweet(tweet, stemmer):
    stemmed_tweet = []
    for word in tweet:
        if word.find(' ') == -1:
            stemmed_word = stemmer.stem(word)
        else:
            stemmed_word = ' '.join([stemmer.stem(w) for w in word.split()])
        stemmed_tweet.append(stemmed_word)

    return stemmed_tweet

In [16]:
clean_tweet_sentiment_rdd = tweet_rdd.map(lambda record: (record[0], replace_urls(record[1]))) \
    .map(lambda record: (record[0], replace_user_handles(record[1]))) \
    .map(lambda record: (record[0], basic_cleaning(record[1]))) \
    .map(lambda record: (record[0], remove_non_alpha_starting_words(record[1]))) \
    .map(lambda record: (record[0], replace_repeated_characters(record[1]))) \
    .map(lambda record: (record[0], generate_bigrams(record[1]))) \
    .map(lambda record: (record[0], remove_stopwords(record[1]))) \
    .map(lambda record: (record[0], stem_tweet(record[1], porter_stemmer)))

### Feature Extraction using HashingTF and IDF

This section consists of extracting features to be fed into our algorithms to generate the classifier

In [17]:
from pyspark.mllib.feature import HashingTF, IDF
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import NaiveBayes, NaiveBayesModel

In [18]:
clean_tweet_rdd = clean_tweet_sentiment_rdd.map(lambda record: record[1])

In [19]:
hashingtf = HashingTF(100000)

In [20]:
tf = hashingtf.transform(clean_tweet_rdd)
tf.cache()
idf = IDF(minDocFreq=2).fit(tf)
tfidf = idf.transform(tf)

In [21]:
tf_idf_sentiment = clean_tweet_sentiment_rdd.map(lambda record: record[0]).zip(tfidf) \
    .map(lambda record: LabeledPoint(record[0], record[1]))

In [22]:
training = tf_idf_sentiment

### Naive Bayes

This section consists of the training of a Naive Bayes model and checking its accuracy value

In [23]:
nb_model = NaiveBayes.train(training, 1.0)

In [24]:
nb_labels_and_preds = training.map(lambda record: (nb_model.predict(record.features), record.label))

In [25]:
nb_accuracy = nb_labels_and_preds.filter(lambda x: x[0] == x[1]).count() / training.count()
nb_accuracy

0.8819625

### Logistic Regression

This following section describes the Logistic Regression steps

In [26]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel

In [27]:
lr_model = LogisticRegressionWithLBFGS.train(training, regType='l2')

In [28]:
lr_labels_and_preds = training.map(lambda record: (record.label, lr_model.predict(record.features)))
trainErr = lr_labels_and_preds.filter(lambda record: record[0] != record[1]).count() / float(training.count())

In [29]:
trainErr

0.0254375

In [893]:
def k_fold_cross_validation(k_value, model_type, data):
    split_ratio = [0.1 for i in range(0, k_value)]
    data_split = data.randomSplit(split_ratio)
    test_index = 0
    avg_accuracy = 0
    best_model = None
    for test_index in range(0, k_value):
        print(str(test_index) + 'th iteration')
        training_list = [i for index, i in enumerate(data_split) if index != test_index]
        training_rdd = sc.emptyRDD()
        for training in training_list:
            training_rdd.union(training)
        model = model_type.train(training, 1.0)
        accuracy = test_with_model(model, data_split[test_index])
        avg_accuracy += accuracy
    
    avg_accuracy /= k_value
    
    return avg_accuracy


def test_with_model(model, test):
    predictionAndLabel = test.map(lambda p: (model.predict(p.features), p.label))
    accuracy = predictionAndLabel.filter(lambda x: x[0] == x[1]).count() / test.count()
    return accuracy

In [894]:
NB_avg = k_fold_cross_validation(10, NaiveBayes, tf_idf_sentiment)
NB_avg

0th iteration
1th iteration
2th iteration
3th iteration
4th iteration
5th iteration
6th iteration
7th iteration
8th iteration
9th iteration


0.6454920921937479

In [895]:
LR_avg = k_fold_cross_validation(10, LogisticRegressionWithLBFGS, tf_idf_sentiment)
LR_avg

0th iteration
1th iteration
2th iteration
3th iteration
4th iteration
5th iteration
6th iteration
7th iteration
8th iteration
9th iteration


0.6784774653707852

### Running classifiers on Test Data

This section shows the results obtained by running the generated models on the test data

In [30]:
test_rdd = sc.textFile('./data/test.csv') \
    .map(parse_line) \
    .map(lambda record: (int(record[0]), record[1].split()))

In [31]:
test_sentiment_rdd = test_rdd.map(lambda record: (record[0], replace_urls(record[1]))) \
    .map(lambda record: (record[0], replace_user_handles(record[1]))) \
    .map(lambda record: (record[0], basic_cleaning(record[1]))) \
    .map(lambda record: (record[0], remove_non_alpha_starting_words(record[1]))) \
    .map(lambda record: (record[0], replace_repeated_characters(record[1]))) \
    .map(lambda record: (record[0], generate_bigrams(record[1]))) \
    .map(lambda record: (record[0], remove_stopwords(record[1]))) \
    .map(lambda record: (record[0], stem_tweet(record[1], porter_stemmer)))

In [32]:
clean_test_rdd = test_sentiment_rdd.map(lambda record: record[1])

In [33]:
test_tf = hashingtf.transform(clean_test_rdd)
test_tf.cache()
test_tfidf = idf.transform(test_tf)

In [34]:
test_tfidf_sentiment = test_sentiment_rdd.map(lambda record: record[0]).zip(test_tfidf) \
    .map(lambda record: LabeledPoint(record[0], record[1]))

In [35]:
nb_labels_and_preds = test_tfidf_sentiment.map(lambda p: (nb_model.predict(p.features), p.label))

In [36]:
accuracy = nb_labels_and_preds.filter(lambda x: x[0] == x[1]).count() / test_tfidf_sentiment.count()
accuracy

0.7103064066852368

In [37]:
lr_model = LogisticRegressionWithLBFGS.train(training, regType='l2')

In [38]:
lr_labels_and_preds = test_tfidf_sentiment.map(lambda record: (record.label, lr_model.predict(record.features)))
trainErr = lr_labels_and_preds.filter(lambda record: record[0] != record[1]).count() / float(test_tfidf_sentiment.count())

In [39]:
trainErr

0.2618384401114206

### Evaluation of Classifiers

This section covers capturing of the various performance metrics that the algorithms achieve on the test set. 

In [40]:
def precision(labels_and_preds):
    classified_1s = list(filter(lambda line: line[1] == 1, labels_and_preds))
    classified_0s = list(filter(lambda line: line[1] == 0, labels_and_preds))
    correctly_predicted_1s = len(list(filter(lambda line: line[0] == line[1], classified_1s)))
    correctly_predicted_0s = len(list(filter(lambda line: line[0] == line[1], classified_0s)))
    precision_1s = correctly_predicted_1s / len(classified_1s)
    precision_0s = correctly_predicted_0s / len(classified_0s)
    
    return {'p1': precision_1s, 'p0': precision_0s}

In [41]:
def recall(labels_and_preds):
    actual_1s = list(filter(lambda line: line[0] == 1, labels_and_preds))
    actual_0s = list(filter(lambda line: line[0] == 0, labels_and_preds))
    correctly_predicted_1s = len(list(filter(lambda line: line[0] == line[1], actual_1s)))
    correctly_predicted_0s = len(list(filter(lambda line: line[0] == line[1], actual_0s)))
    recall_1s = correctly_predicted_1s / len(actual_1s)
    recall_0s = correctly_predicted_0s / len(actual_0s)
    
    return {'r1': recall_1s, 'r0': recall_0s}

In [42]:
def generate_confusion_matrix(labels_and_preds):
    actual_1s = list(filter(lambda line: line[0] == 1, labels_and_preds))
    actual_0s = list(filter(lambda line: line[0] == 0, labels_and_preds))
    correctly_predicted_1s = len(list(filter(lambda line: line[0] == line[1], actual_1s)))
    correctly_predicted_0s = len(list(filter(lambda line: line[0] == line[1], actual_0s)))
    incorrectly_predicted_1s = len(list(filter(lambda line: line[0] != line[1], actual_1s)))
    incorrectly_predicted_0s = len(list(filter(lambda line: line[0] != line[1], actual_0s)))
    
    return {'correct_1s': correctly_predicted_1s, 'correct_0s': correctly_predicted_0s, \
            'incorrect_1s': incorrectly_predicted_1s, 'incorrect_0s': incorrectly_predicted_0s}

In [44]:
nb_precisions = precision(nb_labels_and_preds.collect())

In [45]:
lr_precisions = precision(lr_labels_and_preds.collect())

In [46]:
nb_recalls = recall(nb_labels_and_preds.collect())

In [47]:
lr_recalls = recall(lr_labels_and_preds.collect())

In [51]:
nb_precisions['p1'], nb_precisions['p0']

(0.6538461538461539, 0.768361581920904)

In [52]:
nb_recalls['r1'], nb_recalls['r0']

(0.74375, 0.6834170854271356)

In [50]:
lr_precisions['p1'], lr_precisions['p0']

(0.7417582417582418, 0.7344632768361582)

In [53]:
lr_recalls['r1'], lr_recalls['r0']

(0.7417582417582418, 0.7344632768361582)

In [55]:
nb_conf_matrix = generate_confusion_matrix(nb_labels_and_preds.collect())

In [56]:
lr_conf_matrix = generate_confusion_matrix(lr_labels_and_preds.collect())

In [57]:
nb_conf_matrix

{'correct_0s': 136, 'correct_1s': 119, 'incorrect_0s': 63, 'incorrect_1s': 41}

In [58]:
lr_conf_matrix

{'correct_0s': 130, 'correct_1s': 135, 'incorrect_0s': 47, 'incorrect_1s': 47}

### Calculating Tweets with the Highest Prediction Probabilities

This section calculates the Tweets with the highest prediction probabilities for Logistic Regression

In [59]:
from operator import itemgetter

In [60]:
lr_model = LogisticRegressionWithLBFGS.train(training, regType='l2')

In [61]:
lr_labels_and_preds = test_tfidf_sentiment.map(lambda record: (record.label, lr_model.predict(record.features)))
trainErr = lr_labels_and_preds.filter(lambda record: record[0] != record[1]).count() / float(test_tfidf_sentiment.count())

In [62]:
lr_lnpred_list = lr_labels_and_preds.collect()

In [63]:
lr_model.clearThreshold()

In [64]:
lr_labels_and_probs = test_tfidf_sentiment.map(lambda record: (record.label, lr_model.predict(record.features)))

In [65]:
lr_lnprob_list = lr_labels_and_probs.collect()

In [68]:
def calculate_highest_probs(labels_and_preds, labels_and_probs, tweets_list):
    lr_lnpredprob_list = [(r1[0], r1[1], r2[1])for r1, r2 in zip(labels_and_preds, labels_and_probs)]
    lr_lnpredprob_list = [(index, val[0], val[1], val[2]) for index, val in enumerate(lr_lnpredprob_list)]
        
    lr_pred_prob_correct = list(sorted(filter(lambda record: record[1] == record[2], lr_lnpredprob_list), key=itemgetter(3), \
                           reverse=True))
    lr_pred_prob_incorrect = list(sorted(filter(lambda record: record[1] != record[2], lr_lnpredprob_list), key=itemgetter(3), \
                                    reverse=True))
    
    tweets_with_pred_probs = {'correct': [], 'incorrect': []}

    for record in lr_pred_prob_correct[:5]:
        tweets_with_pred_probs['correct'].append({'text': ' '.join(tweets_list[record[0]][1]), 'prob': record[3]})

    for record in lr_pred_prob_incorrect[:5]:
        tweets_with_pred_probs['incorrect'].append({'text': ' '.join(tweets_list[record[0]][1]), 'prob': record[3]})
    
    return tweets_with_pred_probs

In [70]:
highest_prob_tweets = calculate_highest_probs(lr_lnpred_list, lr_lnprob_list, tweet_rdd.collect())

In [71]:
highest_prob_tweets

{'correct': [{'prob': 0.9999999988837434,
   'text': '@David_Henrie *thats people mag haha i couldnt fit it all in.. i dont think those pictures ever made it in the magazine tho! haha'},
  {'prob': 0.999999392838995,
   'text': 'i had 7 hours of sleep and now i cant go back to sleeping im thirsty'},
  {'prob': 0.9999984609403177,
   'text': '@duchess_rebecca Man... intervention is soo sad'},
  {'prob': 0.9999971741610747,
   'text': 'iiiii havent slept yet and i have to be at work in 40 minutes. boo'},
  {'prob': 0.9999955435455488,
   'text': 'going, going, aaand gone. poor moosie fell asleep in class http://twitpic.com/2y82y'}],
 'incorrect': [{'prob': 0.9996191051874458,
   'text': 'Stuck at home I watch way to many border patrol programs..watching a new zeland one now. What the hell is MAF?'},
  {'prob': 0.9992751969020601,
   'text': "@griffmiester no exchanging for me, my laptop hasn't arrived"},
  {'prob': 0.9987971855520478,
   'text': 'Feeling soree, bad idea to go running whe