In [209]:
import nltk
import re
import string
import random
from typing import List, Dict, Tuple

import numpy as np
from nltk.stem import PorterStemmer
from nltk.corpus import twitter_samples
from nltk.corpus import stopwords
from nltk.tokenize import TweetTokenizer

In [210]:
nltk.download("twitter_samples")
nltk.download("stopwords")

[nltk_data] Downloading package twitter_samples to
[nltk_data]     /Users/masgari/nltk_data...
[nltk_data]   Package twitter_samples is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/masgari/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

### Preporcess the tweet

In [211]:
def process_tweet(tweet: str):
    stemer = PorterStemmer()
    tokenizer = TweetTokenizer(preserve_case=False, strip_handles=True, reduce_len=True)

    # remove stock market tickers like $GE
    tweet = re.sub(r'\$\w*', '', tweet)
    # remove old style retweet text "RT"
    tweet = re.sub(r'^RT[\s]+', '', tweet)
    # remove hyperlinks
    tweet = re.sub(r'https?://[^\s\n\r]+', '', tweet)
    # remove hashtags
    # only removing the hash # sign from the word
    tweet = re.sub(r'#', '', tweet)

    stopwords_english = stopwords.words("english")
    tokens = tokenizer.tokenize(tweet)

    result = []
    for token in tokens:
        if token not in stopwords_english and \
            token not in string.punctuation and \
            not token.isdigit():
            token = stemer.stem(token)
            result.append(token)
    return result

### Count word frequncies in each class

In [212]:
def build_freq(processed_tweets: np.ndarray, class_label: np.ndarray) -> Dict[Tuple[str, int], int]:
    freq = dict()    
    for processed_tweet, label in zip(processed_tweets, class_label):
        for token in processed_tweet:            
            count = freq.get((token, label), 0)
            freq[(token, label)] = count + 1
    return freq

In [213]:
positive_tweets_raw = twitter_samples.strings('positive_tweets.json')
negative_tweets_raw = twitter_samples.strings('negative_tweets.json')
positive_tweets_processed = list(map(lambda tweet: process_tweet(tweet), positive_tweets_raw))
negative_tweets_processed = list(map(lambda tweet: process_tweet(tweet), negative_tweets_raw))
all_tweets_processed = positive_tweets_processed + negative_tweets_processed
Y = np.concatenate((np.ones(len(positive_tweets_raw), dtype=int), np.zeros(len(negative_tweets_raw), dtype=int)), axis=0)

NUM_FEATURES = 3
word_freq = build_freq(all_tweets_processed, Y)

In [214]:
def extract_features(tweets_processed: List[List[str]], word_freq: Dict):
    m = len(tweets_processed)
    features = np.zeros((m, NUM_FEATURES))

    for i in range(m):
        tweet = tweets_processed[i]

        tweet_features = np.zeros(NUM_FEATURES)
        tweet_features[0] = 1.0 # Bias

        for token in tweet:
            # Positive count
            tweet_features[1] += word_freq.get((token, 1), 0)        
            # Negative count
            tweet_features[2] += word_freq.get((token, 0), 0)
        features[i] = tweet_features
        
    return features

In [215]:
dataset_X = extract_features(all_tweets_processed, word_freq)
dataset = np.concatenate((dataset_X, Y.reshape(-1,1)), axis=1)
np.random.shuffle(dataset)

In [216]:
train_size = int(0.8 * len(dataset))

trainX = dataset[:train_size, :NUM_FEATURES]
trainY = dataset[:train_size, NUM_FEATURES:NUM_FEATURES+1]
testX = dataset[train_size:, :NUM_FEATURES]
testY = dataset[train_size:, NUM_FEATURES:NUM_FEATURES+1]

print(f"TrainX shape: {trainX.shape}")
print(f"TrainY shape: {trainY.shape}")
print(f"TestX shape: {testX.shape}")
print(f"TestY shape: {testY.shape}")

TrainX shape: (8000, 3)
TrainY shape: (8000, 1)
TestX shape: (2000, 3)
TestY shape: (2000, 1)


In [217]:
def sigmoid(z):
    return 1.0 / (1.0 + np.exp(-z)) 

In [218]:
def train(x, y, theta, learning_rate, num_iters):
    m = x.shape[0]
    for i in range(num_iters):
        z = np.dot(x, theta)
        y_hat = sigmoid(z)

        # cost function
        J = np.sum(y * np.log(y_hat) + (y-1) * np.log(1-y_hat))
        J *= -1/m

        if i % 100 == 0:
            print(f"Cost: {J}")
        
        gholi = y_hat - y
        grads = np.dot(x.T, gholi)
        theta = theta - (grads * learning_rate) / m
    return theta

### Train the model

In [228]:
# Set theta to random values
theta = np.zeros((NUM_FEATURES, 1))
theta = train(trainX, trainY, theta, 1e-9, 3700)
theta

Cost: -0.00571846423961955
Cost: 0.03850607039953177
Cost: 0.056567496206624775
Cost: 0.06325654123129627
Cost: 0.06493874144147337
Cost: 0.06440079911211058
Cost: 0.06290600719513659
Cost: 0.06104297617263544
Cost: 0.05908945327930741
Cost: 0.057175114745481426
Cost: 0.055357381744936804
Cost: 0.05365797062221184
Cost: 0.05208101281806302
Cost: 0.050622223545285706
Cost: 0.04927359541106453
Cost: 0.0480258018009412
Cost: 0.04686940752154669
Cost: 0.04579545161136744
Cost: 0.04479569893877493
Cost: 0.04386271871335302
Cost: 0.04298987503885662
Cost: 0.04217127549181102
Cost: 0.041401702436168114
Cost: 0.040676540117089544
Cost: 0.0399917041520908
Cost: 0.039343576508503605
Cost: 0.03872894713991522
Cost: 0.0381449624404506
Cost: 0.037589080166667115
Cost: 0.0370590302478452
Cost: 0.03655278082922442
Cost: 0.03606850889697692
Cost: 0.03560457487759335
Cost: 0.035159500665099486
Cost: 0.034731950594708315
Cost: 0.034320714944661275
Cost: 0.03392469560603129


array([[ 1.30493439e-07],
       [ 8.27405344e-04],
       [-7.57676736e-04]])

In [229]:
def accuracy(testX, testY, theta):
    y_hat = sigmoid(np.dot(testX, theta))
    y_hat = (y_hat > 0.5).astype(int)
    return np.sum((y_hat == testY).astype(int)) / testY.shape[0]

### Model accuracy

In [242]:
accuracy(testX, testY, theta)

0.9925

In [231]:
def predict(tweet: str, theta, word_freq):
    tweet_processed = process_tweet(tweet)
    x = extract_features([tweet_processed], word_freq)
    return np.squeeze(sigmoid(np.dot(x, theta)))


### Predict

In [243]:
emoji = " :)"
tweet = "I'm exciting to announce that I'm joining Tesla"
y_hat = predict(tweet + emoji, theta, word_freq)
print(y_hat)
if y_hat > 0.5:
    print("Positive")
else:
    print("Negative")

0.9463323595959156
Positive


#### Although model does not predict correctly if there is no emoji

In [244]:
y_hat = predict(tweet, theta, word_freq)
print(y_hat)
if y_hat > 0.5:
    print("Positive")
else:
    print("Negative")

0.4544629936023083
Negative
