<a href="https://colab.research.google.com/github/thedatadj/natural-language-processing/blob/main/sentiment-analysis/logistic_regression.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Sentiment analysis

<table>
    <tr>
        <td>
            <b>Model</b>
        </td>
        <td>
            Logistic regression
        </td>
    </tr>
    <tr>
        <td>
            <b>Task</b>
        </td>
        <td>
            Classify a tweet as having a positive sentiment or a negative sentiment.
        </td>
    </tr>
    <tr>
        <td>
            <b>Main library</b>
        </td>
        <td>
            NLTK
        </td>
    </tr>
    <tr>
        <td>
            <b>Dataset</b>
        </td>
        <td>
            twitter_samples from NLTK datasets.
        </td>
    </tr>
</table>

In [None]:
import nltk
nltk.download('twitter_samples')
nltk.download('stopwords')
import numpy as np
import pandas as pd
from nltk.corpus import twitter_samples

[nltk_data] Downloading package twitter_samples to /root/nltk_data...
[nltk_data]   Unzipping corpora/twitter_samples.zip.
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


Load the data

In [None]:
positive_set = twitter_samples.strings("positive_tweets.json")
negative_set = twitter_samples.strings('negative_tweets.json')

Train and test split

In [None]:
# Features
train_x = positive_set[:4000] + negative_set[:4000]
test_x = positive_set[4000:] + negative_set[4000:]

In [None]:
# Labels
train_y = np.zeros((8000, 1))
train_y[:4000] = 1

test_y = np.zeros((2000, 1))
test_y[:1000] = 1

# Preprocessing

In [None]:
import re
import string
import numpy as np

from nltk.corpus import stopwords
from nltk.stem import PorterStemmer
from nltk.tokenize import TweetTokenizer

In [None]:
def process_tweet(tweet):
    """Process tweet function.
    Input:
        tweet: a string containing a tweet
    Output:
        tweets_clean: a list of words containing the processed tweet

    """
    stemmer = PorterStemmer()
    stopwords_english = stopwords.words('english')
    # remove stock market tickers like $GE
    tweet = re.sub(r'\$\w*', '', tweet)
    # remove old style retweet text "RT"
    tweet = re.sub(r'^RT[\s]+', '', tweet)
    # remove hyperlinks
    tweet = re.sub(r'https?://[^\s\n\r]+', '', tweet)
    # remove hashtags
    # only removing the hash # sign from the word
    tweet = re.sub(r'#', '', tweet)
    # tokenize tweets
    tokenizer = TweetTokenizer(preserve_case=False, strip_handles=True,
                               reduce_len=True)
    tweet_tokens = tokenizer.tokenize(tweet)

    tweets_clean = []
    for word in tweet_tokens:
        if (word not in stopwords_english and  # remove stopwords
                word not in string.punctuation):  # remove punctuation
            # tweets_clean.append(word)
            stem_word = stemmer.stem(word)  # stemming word
            tweets_clean.append(stem_word)

    return tweets_clean

In [None]:
def build_freqs(tweets, ys):
    """Build frequencies.
    Input:
        tweets: a list of tweets
        ys: an m x 1 array with the sentiment label of each tweet
            (either 0 or 1)
    Output:
        freqs: a dictionary mapping each (word, sentiment) pair to its
        frequency
    """
    # Convert np array to list since zip needs an iterable.
    # The squeeze is necessary or the list ends up with one element.
    # Also note that this is just a NOP if ys is already a list.
    yslist = np.squeeze(ys).tolist()

    # Start with an empty dictionary and populate it by looping over all tweets
    # and over all processed words in each tweet.
    freqs = {}
    for y, tweet in zip(yslist, tweets):
        for word in process_tweet(tweet):
            pair = (word, y)
            if pair in freqs:
                freqs[pair] += 1
            else:
                freqs[pair] = 1

    return freqs

In [None]:
freqs = build_freqs(train_x, train_y)

# Modeling

In [None]:
def sigmoid(z):
    '''
    Input:
        z: is the input (can be a scalar or an array)
    Output:
        h: the sigmoid of z
    '''
    a = 1/(1+np.exp(-z))

    return a

In [None]:
def gradientDescent(x, y, theta, alpha, num_iters):
    '''
    Input:
        x: matrix of features which is (m,n+1)
        y: corresponding labels of the input matrix x, dimensions (m,1)
        theta: weight vector of dimension (n+1,1)
        alpha: learning rate
        num_iters: number of iterations you want to train your model for
    Output:
        J: the final cost
        theta: your final weight vector
    Hint: you might want to print the cost to make sure that it is going down.
    '''
    # get 'm', the number of rows in matrix x
    m = x.shape[0]

    for i in range(0, num_iters):

        # get z, the dot product of x and theta
        z = x.dot(theta)

        # get the sigmoid of z
        h = sigmoid(z)

        # calculate the cost function
        J = -1./m * np.sum(y.T.dot(np.log(h)) + (1-y).T.dot(np.log(1-h)))

        # update the weights theta
        theta = theta - alpha * 1./m * (x.T.dot(h - y))

    J = float(J)
    return J, theta

Feature extraction

In [None]:
def extract_features(tweet, freqs, process_tweet=process_tweet):
    '''
    Input:
        tweet: a string containing one tweet
        freqs: a dictionary corresponding to the frequencies of each tuple (word, label)
    Output:
        x: a feature vector of dimension (1,3)
    '''
    # process_tweet tokenizes, stems, and removes stopwords
    word_l = process_tweet(tweet)

    # 3 elements for [bias, positive, negative] counts
    x = np.zeros(3)

    # bias term is set to 1
    x[0] = 1

    # loop through each word in the list of words
    for word in word_l:

        # increment the word count for the positive label 1
        x[1] += freqs.get((word, 1.0), 0)

        # increment the word count for the negative label 0
        x[2] += freqs.get((word, 0.0), 0)

    x = x[None, :]  # adding batch dimension for further processing
    assert(x.shape == (1, 3))
    return x

# Training

In [None]:
# Feature matrix
X = np.zeros((len(train_x), 3))
for i in range(len(train_x)):
    X[i, :]= extract_features(train_x[i], freqs)

In [None]:
J, theta = gradientDescent(X, train_y, np.zeros((3, 1)), 1e-9, 1500)

In [None]:
print(f"The cost after training is {J:.8f}.")

The cost after training is 0.22521264.


# Testing

In [None]:
def predict_tweet(tweet, freqs, theta):
    '''
    Input:
        tweet: a string
        freqs: a dictionary corresponding to the frequencies of each tuple (word, label)
        theta: (3,1) vector of weights
    Output:
        y_pred: the probability of a tweet being positive or negative
    '''

    # extract the features of the tweet and store it into x
    x = extract_features(tweet, freqs)

    # make the prediction using x and theta
    y_pred = sigmoid(x.dot(theta))

    return y_pred

In [None]:
my_tweet = "Next I'm buying Coca-Cola to put the cocaine back in"
predict_tweet(my_tweet, freqs, theta)

array([[0.48728224]])

# Evaluation

In [None]:
def test_logistic_regression(test_x, test_y, freqs, theta, predict_tweet=predict_tweet):
    """
    Input:
        test_x: a list of tweets
        test_y: (m, 1) vector with the corresponding labels for the list of tweets
        freqs: a dictionary with the frequency of each pair (or tuple)
        theta: weight vector of dimension (3, 1)
    Output:
        accuracy: (# of tweets classified correctly) / (total # of tweets)
    """


    # the list for storing predictions
    y_hat = []

    for tweet in test_x:
        # get the label prediction for the tweet
        y_pred = predict_tweet(tweet, freqs, theta)

        if y_pred > 0.5:
            # append 1.0 to the list
            y_hat.append(1.0)
        else:
            # append 0 to the list
            y_hat.append(0.0)

    # With the above implementation, y_hat is a list, but test_y is (m,1) array
    # convert both to one-dimensional arrays in order to compare them using the '==' operator
    accuracy = np.sum(np.array(y_hat).flatten() == np.array(test_y).flatten())/len(test_x)


    return accuracy

In [None]:
tmp_accuracy = test_logistic_regression(test_x, test_y, freqs, theta)
print(f"Logistic regression model's accuracy = {tmp_accuracy:.4f}")

Logistic regression model's accuracy = 0.9950


# Make your own prediction

In [None]:
my_tweet = "It's a new day in America"
print(process_tweet(my_tweet))
y_hat = predict_tweet(my_tweet, freqs, theta)
print(y_hat)
if y_hat > 0.5:
    print('Positive sentiment')
else:
    print('Negative sentiment')

['new', 'day', 'america']
[[0.5184329]]
Positive sentiment
