In [20]:
import numpy as np
import nltk
from nltk import TweetTokenizer
from nltk.corpus import stopwords, twitter_samples
from nltk.stem import PorterStemmer
import string
import matplotlib.pyplot as plt
import re

import warnings
warnings.filterwarnings('ignore')

In [5]:
nltk.download('twitter_samples')
nltk.download('stopwords')

[nltk_data] Downloading package twitter_samples to
[nltk_data]     C:\Users\mukes\AppData\Roaming\nltk_data...
[nltk_data]   Unzipping corpora\twitter_samples.zip.
[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\mukes\AppData\Roaming\nltk_data...
[nltk_data]   Unzipping corpora\stopwords.zip.


True

In [6]:
pos_tweets = twitter_samples.strings('positive_tweets.json')
neg_tweets = twitter_samples.strings('negative_tweets.json')

In [22]:
def process_tweet(tweet):
    """
        Input:
            string containing tweet
        Output:
            list of words containing processed tweet(tokenize, lowercase, remove punctuation/stopwords, stem the words)
    """
    stemmer = PorterStemmer()
    stopwords_en = stopwords.words('english')

    tweet = re.sub(r'\$\w', '', tweet)
    tweet = re.sub(r'^RT[\s]+', '', tweet)
    tweet = re.sub(r'https?://[^\s\n\r]+', '', tweet)

    tokenizer = TweetTokenizer(strip_handles=True, preserve_case=False, reduce_len=True)

    tweet_tokens = tokenizer.tokenize(tweet)

    tweets_clean = []

    for word in tweet_tokens:
        if word not in stopwords_en and word not in string.punctuation:
            stem_word = stemmer.stem(word)
            tweets_clean.append(stem_word)

    return tweets_clean



In [27]:
def build_freqs(tweets, ys):
    """
        Input:
            tweets: list of tweets
            ys: mx1 array of sentiment labels for each tweet
        Output:
            freqs: dictionary mapping each (word, sentiment) pair to it's frequency
    """
    ys = ys.squeeze()
    freqs = {}
    for y, tweet in zip(ys, tweets):
        for word in process_tweet(tweet):
            pair = (word, y)
            if pair in freqs:
                freqs[pair]+=1
            else:
                freqs[pair]=1

    return freqs

In [14]:
len(pos_tweets), len(neg_tweets)

(5000, 5000)

In [71]:
train_pos, test_pos = pos_tweets[:4000], pos_tweets[4000:]
train_neg, test_neg = neg_tweets[:4000], neg_tweets[4000:]

train_x = train_pos+train_neg
test_x = test_pos+test_neg

train_y = np.append(np.ones((len(train_pos), 1)), np.zeros((len(train_neg), 1)), axis=0)
test_y = np.append(np.ones((len(test_pos), 1)), np.zeros((len(test_neg), 1)), axis=0)

In [72]:
freqs = build_freqs(train_x, train_y)

In [73]:
len(freqs)

11659

# Logistic Regression

In [45]:
def extract_features(tweet, freqs, process_tweet=process_tweet):
    """ 
        Input:
            tweet: input tweet
            freqs: dictionary mapping (word, label) to freq
        Output:
            x: feature vector of dimension (1, 3)
    """
    word_l = process_tweet(tweet)

    x = np.zeros((1, 3))
    x[0, 0] = 1

    for word in word_l:
        x[0, 1] += freqs.get((word, 1), 0)
        x[0, 2] += freqs.get((word, 0), 0)

    return x

extract_features(train_x[2], freqs)

array([[1.000e+00, 3.119e+03, 1.160e+02]])

In [47]:
X = np.zeros((len(train_x), 3))
for i in range(len(train_x)):
    X[i, :] = extract_features(train_x[i], freqs)

Y = train_y

In [49]:
from sklearn.pipeline import Pipeline
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score

In [52]:
scaler = StandardScaler()
lr = LogisticRegression()
lg_model = Pipeline([('standardize', scaler),
                    ('log_reg', lr)])

lg_model.fit(X, Y)

pred_y_train = lg_model.predict(X)

print(f"accuracy score on train data is {accuracy_score(Y, pred_y_train)}")

accuracy score on train data is 0.995


In [53]:
X_test = np.zeros((len(test_x), 3))
for i in range(len(test_x)):
    X_test[i, :] = extract_features(test_x[i], freqs)

Y_test = test_y

pred_y_test = lg_model.predict(X_test)

print(f"accuracy score on test data is {accuracy_score(Y_test, pred_y_test)}")

accuracy score on test data is 0.994


In [62]:
#sample test
sample = test_x[1006]
print("sample data: ", sample)
print("processed sample: ", process_tweet(sample))
result = "positive" if lg_model.predict(extract_features(sample, freqs))[0] == 1.0 else "negative"
print("predicted result using Logistic regression: ", result)

sample data:  @Carouselballet Monday? :(
processed sample:  ['monday', ':(']
predicted result using Logistic regression:  negative


# Naive Bayes

In [147]:
def build_freq_table(vocab_dict, freqs):
    """
    Input: 
        vocab_dict: dictionary mapping index, word
        freqs: dictionary mapping (word, label) to freq
    Output:
        freq_tbl: nx2 array containing pos and neg probabilities of words in vocab
    """
    freq_tbl = np.zeros((len(vocab_dict), 2))
    for i, word in vocab_dict.items():
        freq_tbl[i, :] = [freqs.get((word, 0), 0), freqs.get((word, 1), 0)]

    n_unique = np.count_nonzero(freq_tbl, axis=0)
    n_count = np.sum(freq_tbl, axis=0)

    # Laplacian smoothning
    freq_tbl = np.divide(freq_tbl+1, n_unique+n_count)
    return freq_tbl
    
def naive_bayes_predict(tweet, logprior, freq_tbl, vocab_dict_rev):
    """
    Input: 
        tweet: input tweet
        logprior: log of prior prob
        freq_tbl: nx2 array containing pos and neg probabilities of words in vocab
        vocab_dict_rev: dictionary mapping word, index
    Output:
        result: 1/0 indicating if tweet is pos/neg sentiment
    """
    pred = logprior
    for word in process_tweet(tweet):
        if word in vocab_dict_rev.keys():
            p_neg, p_pos = freq_tbl[vocab_dict_rev[word], :]
            pred += (np.log(p_pos)-np.log(p_neg))
    result = 1 if pred>0 else 0
    return result

In [65]:
vocab_dict = {i: word for i, word in enumerate(list(set([word for word, _ in freqs.keys()])))}
vocab_dict_rev = {word: i for i, word in vocab_dict.items()}


In [148]:
freq_tbl = build_freq_table(vocab_dict, freqs)

log_prior_train = float(np.log(sum(train_y)) - np.log(len(train_y)-sum(train_y)))
pred_y_train = [naive_bayes_predict(tweet, log_prior_train, freq_tbl, vocab_dict_rev) for tweet in train_x]

log_prior_test = float(np.log(sum(test_y)) - np.log(len(test_y)-sum(test_y)))
pred_y_test = [naive_bayes_predict(tweet, log_prior_test, freq_tbl, vocab_dict_rev) for tweet in test_x]

print(f"accuracy score on train data is {accuracy_score(train_y, np.array(pred_y_train).reshape(len(pred_y_train), 1))}")
print(f"accuracy score on train data is {accuracy_score(test_y, np.array(pred_y_test).reshape(len(pred_y_test), 1))}")

accuracy score on train data is 0.999375
accuracy score on train data is 0.9955
