In [1]:
import os
import re
import string

import matplotlib.pyplot as plt
import nltk
import numpy as np
import pandas as pd
import seaborn as sns
from nltk.corpus import stopwords, twitter_samples
from nltk.stem import PorterStemmer
from nltk.tokenize import TweetTokenizer
from rich import print
from sklearn.model_selection import train_test_split
from tqdm import tqdm

nltk.download("twitter_samples")
%load_ext rich

[nltk_data] Downloading package twitter_samples to C:\Users\Lakshya
[nltk_data]     Agarwal\AppData\Roaming\nltk_data...
[nltk_data]   Package twitter_samples is already up-to-date!


## Problem 1

$$
L(\beta) = \sum_{i=1}^n (y_i - \beta_0 - \sum_{j=1}^d\beta_j x_{ij})^2 + \lambda \sum_{j=1}^d \beta_j^2
$$


### Analytical estimation of $\beta$

Expressed in matrix format, the loss function can be written as:

$$
\begin{align*}
L(\beta) = (y - X\beta)^T(y - X\beta) + \lambda (\beta^T\beta)
\end{align*}
$$

where $X$ is a $n \times d+1$ matrix ($d$ dimensions and the intercept term)

Taking the derivative with respect to $\beta$ and setting it to zero,

$$
\begin{align*}
\frac{\partial L(\beta)}{\partial \beta} &= -2X^T(y-X\beta) + 2\lambda\beta \\
0 &= -2X^T(y-X\beta) + 2\lambda\beta \\
2\lambda\beta &= 2X^T(y-X\beta) \\
\lambda \beta &= X^Ty - X^TX\beta \\
(\lambda I + X^TX)\beta  &= X^Ty \\
\beta &= (\lambda I + X^TX)^{-1} X^Ty
\end{align*}
$$

Thus, the analytical solution for $\beta$ here, after a bit of matrix algebra, comes out to be similar to the OLS solution but with the addition of the $\lambda$ term as a regularization parameter


### Gradient of the loss function $\nabla L(\beta)$

The gradient $L(\beta)$ is the vector of partial derivatives for each $B_j$, i.e.,

$$
\begin{align*}

\nabla L(\beta) &= \begin{bmatrix}
\frac{\partial L(\beta)}{\partial \beta_0}
        \\ \frac{\partial L(\beta)}{\partial \beta_1}
        \\ \vdots
        \\ \frac{\partial L(\beta)}{\partial \beta_d}
        \end{bmatrix}
\end{align*}
$$

Solving each expression and simplifying to matrix notation, we get the same result as above:

$$
\begin{align*}
\nabla L(\beta) &= -2X^T(y-X\beta) + 2\lambda\beta \\
\end{align*}
$$


### Update step

The update step for the gradient descent, assuming a constant learning rate of $\gamma$, becomes:

$$
\beta_{t+1} = \beta_{t} - \gamma \nabla L(\beta_{t})
$$


### Psuedo-code for stochastic gradient descent

Input: Initial guess $\beta_I$, learning rate of $\gamma_t$

For: $t = 0, 1, \dots, T-1$, do

> Choose $i \in {1, 2, \dots, n}$ unifoirmly at random

> $\beta_{t+1} = \beta_{t} - \gamma_t \nabla L_i(\beta_t)$

end For loop

return $\beta_T$


## Problem 2


### Setting up the problem


#### Loading the data


In [2]:
all_positive_tweets = twitter_samples.strings("positive_tweets.json")
all_negative_tweets = twitter_samples.strings("negative_tweets.json")

tweets = all_positive_tweets + all_negative_tweets
labels = np.append(
    np.ones((len(all_positive_tweets), 1)),
    np.zeros((len(all_negative_tweets), 1)),
    axis=0,
)

X = np.array(tweets)
y = np.array(labels)

indices = np.random.choice(range(len(X)), size=7000, replace=False)

X_sampled = X[indices]
y_sampled = y[indices]

print(X_sampled.shape, y_sampled.shape)

In [3]:
# Looking at sample tweets

print("-" * 100)
print("Positive tweet examples")
print("-" * 100)
print(X_sampled[y_sampled.flatten() == 1][:5])
print("-" * 100)
print("Negative tweet examples")
print("-" * 100)
print(X_sampled[y_sampled.flatten() == 0][:5])


#### Preprocessing the data

- Tokenizing the string
- Convert the tweet into lowercase and split the tweets into tokens(words)
- Removing stop words and punctuation
- Removing commonly used words on the twitter platform like the hashtag, retweet marks, hyperlinks, numbers, and email address
- Stemming


In [4]:
class Twitter_Preprocessor:
    def __init__(self):
        self.stemmer = PorterStemmer()
        self.stopwords_english = stopwords.words("english")
        self.tokenizer = TweetTokenizer(
            preserve_case=False, strip_handles=True
        )
        self.punctuation = string.punctuation

    def remove_chars(self, tweet):
        # remove old style retweet text "RT"
        tweet = re.sub(r"^RT[\s]+", "", tweet)

        # remove hyperlinks
        tweet = re.sub(r"https?:\/\/.*[\r\n]*", "", tweet)

        # remove hashtags
        tweet = re.sub(r"#", "", tweet)

        # remove email addresses
        tweet = re.sub(r"\S*@\S*\s?", "", tweet)

        # remove numbers
        tweet = re.sub(r"\d+", "", tweet)

        return tweet

    def tokenize(self, tweet):
        # tokenize tweets
        return self.tokenizer.tokenize(tweet)

    def remove_stopwords(self, tweet_tokens):
        tweets_clean = []
        for word in tweet_tokens:
            if word in self.stopwords_english or word in self.punctuation:
                continue

            tweets_clean.append(word)

        return tweets_clean

    def stem(self, tweet_tokens):
        tweets_stem = []
        for word in tweet_tokens:
            stem_word = self.stemmer.stem(word)
            tweets_stem.append(stem_word)

        return tweets_stem

    def process_tweet(self, tweet):
        tweet = self.remove_chars(tweet)
        tweet_tokens = self.tokenize(tweet)
        tweet_tokens = self.remove_stopwords(tweet_tokens)
        tweet_tokens = self.stem(tweet_tokens)

        return tweet_tokens

    def process_tweets(self, tweets):
        processed_tweets = []

        for _, tweet in tqdm(enumerate(tweets)):
            processed_tweets.append(self.process_tweet(tweet))

        return processed_tweets


In [5]:
preprocessor = Twitter_Preprocessor()

X_sampled_processed = preprocessor.process_tweets(X_sampled)
print("Samples")
print(X_sampled[:5])
print("Processed")
print(X_sampled_processed[:5])

7000it [00:01, 4033.27it/s]


#### Feature extraction

To predict the sentiment of a tweet, we will build a logistic regression model. The features that will be used as inputs to the model are:

- **Bias term:** This is a constant term that is added to the feature vector, represented as $x_0$
- **Number of positive words in the tweet:** This is a count of the number of positive words (defined through a bag-of-words model) in the tweet, represented as $x_1$
- **Number of negative words in the tweet:** This is a count of the number of negative words (defined through a bag-of-words model) in the tweet, represented as $x_2$
- **Number of words in the tweet**: This is a  count of the number of words in the tweet, represented as $x_3$

To build these features, we use a Bag of Words (BoW) model to count the number of times each word appears for each class.

In [6]:
def build_bag_of_words(tweets, labels):
    word_freq = {}

    for tweet, label in list(zip(tweets, labels)):
        for word in tweet:
            label = int(label)
            word_freq[(word, label)] = word_freq.get((word, label), 0) + 1

    return word_freq


bow_dict = build_bag_of_words(X_sampled_processed, y_sampled)


In [7]:
def extract_features(tweet, bow_dict):
    features = np.zeros((1, 4))

    # bias term is set to 1
    features[0, 0] = 1

    for word in tweet:
        # Positive word count
        features[0, 1] += bow_dict.get((word, 1), 0)

        # Negative word count
        features[0, 2] += bow_dict.get((word, 0), 0)

    # Log of total word count
    features[0, 3] = len(tweet)

    return features


#### Train / Test split


In [8]:
train_X_tweet, test_X_tweet, train_y_tweet, test_y_tweet = train_test_split(
    X_sampled_processed, y_sampled, test_size=0.4, random_state=42
)

print(f"Train set: {len(train_X_tweet)}")
print(f"Test set: {len(test_X_tweet)}")

In [9]:
# Build the feature matrix for the training set
train_X = np.zeros((len(train_X_tweet), 4))
for i, tweet in enumerate(train_X_tweet):
    train_X[i, :] = extract_features(tweet, bow_dict)

# Build the feature matrix for the test set
test_X = np.zeros((len(test_X_tweet), 4))
for i, tweet in enumerate(test_X_tweet):
    test_X[i, :] = extract_features(tweet, bow_dict)

print(f"Train X: {train_X.shape}")
print(f"Test X: {test_X.shape}")

### Black-box classifier


In [10]:
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score

blackbox_model = LogisticRegression(random_state=42, max_iter=1000, fit_intercept=False)


print(f"Training blackbox model on {train_X.shape[0]} samples...")

blackbox_model.fit(train_X, train_y_tweet.ravel())

print(f"Coefficents: {blackbox_model.coef_}")

print(f"Evaluating blackbox model on {test_X.shape[0]} samples...")

test_y_pred = blackbox_model.predict(test_X)


print(f"Accuracy: {accuracy_score(test_y_tweet, test_y_pred)}")

print(f"Precision: {precision_score(test_y_tweet, test_y_pred)}")

print(f"Recall: {recall_score(test_y_tweet, test_y_pred)}")

print(f"F1: {f1_score(test_y_tweet, test_y_pred)}")


### Mathematical formulation

Given a tweet $x$ (I just know Elon Musk is smiling somewhere!), the probability of the tweet being positive ($y=1$) is given by:

$$
P(y=1|x, \beta) = \sigma(\beta \cdot x)
$$

where

- $\sigma$ is the sigmoid function, defined as:

  $$
  \sigma(z) = \frac{1}{1 + e^{-z}}
  $$

- $\beta$ is the parameter vector $[\beta_0, \beta_1, \beta_2]$

Therefore, the probability function can be written as:

$$
\begin{align*}
P(y=1|x, \beta) & &= \sigma(\beta \cdot x) &= \frac{1}{1 + e^{-\beta \cdot x}} \\
P(y=0|x, \beta) &= 1 - P(y=1|x, \beta) &= \sigma(-\beta \cdot x) &= \frac{1}{1 + e^{\beta \cdot x}}
\end{align*}
$$

The vector $\beta$ is learned from the training data by minimizing the negative log-likelihood loss function, and then used to predict the sentiment of the test data.


### Likelihood function

The likelihood function for the logistic regression model is the likelihood of observing labels $y_i$ given the features $x_i$ and the parameters $\beta$. It is defined as the product of individual probabilities for each observation:

$$
L(\beta) = \prod_{i=1}^n P(y_i | x_i, \beta) = \prod_{i=1}^n (\sigma(\beta \cdot x_i))^{y_i} (1 - \sigma(\beta \cdot x_i))^{1 - y_i}
$$

The estimated probability can be written as:

$$
p_i = \sigma(\beta \cdot x_i)
$$

In practice, it is easier to work with the log-likelihood function, which is the logarithm of the likelihood function:

$$
\log L(\beta) = \sum_{i=1}^n y_i \log p_i + (1 - y_i) \log (1 - p_i)
$$

To optimize, we consider the negative log-likelihood function, and minimize it:

$$
\text{NLL} = -\sum_{i=1}^n \left[ y_i \log (\sigma(\beta \cdot x_i)) + (1 - y_i) \log (1 - \sigma(\beta \cdot x_i)) \right]
$$


### Minimzing the negative log-likelihood function using stochastic gradient descent

#### Gradient

The gradient of the negative log-likelihood function is given by:

$$
\nabla_{\beta} \text{NLL} = -(y - \sigma(\beta \cdot x))x
$$

#### Objective function

Given samples $S = {(x^{(i)}, y^{(i)})}_{i=1}^n$, the objective is to find $\beta$ that minimizes the negative log-likelihood:

$$
J_S(\beta) = -\frac{1}{n} \sum_{i=1}^n \log P(y^{(i)} | x^{(i)}, \beta)
$$

The gradient becomes:

$$
\nabla_{\beta} J_S(\beta) = \frac{1}{n} \sum_{i=1}^n (y^{(i)} - \sigma(\beta \cdot x^{(i)}))x^{(i)}
$$

#### Stochastic gradient descent

The steps for the stochastic gradient descent algorithm are as follows:

- Initialize $\beta$ to some value $\beta^0$
- For $t = 0, 1, \dots, T-1$, do
  - Choose a sample $i \in {1, 2, \dots, n}$ uniformly at random
  - $\beta^{t+1} = \beta^t + \eta \nabla_{\beta} J_S^{i}(\beta^t)$
- Return $\beta^T$

where $\eta$ is the learning rate, and $T$ is the number of iterations


In [11]:
def sigmoid(z):
    return 1 / (1 + np.exp(-z))


def prediction_function(X, beta):
    return sigmoid(np.dot(X, beta))


def loss_function(y, x, beta):
    predictions = prediction_function(x, beta)
    predictions = np.clip(predictions, 1e-9, 1 - 1e-9)

    return -y * np.log(predictions) - (1 - y) * np.log(1 - predictions)


def stochastic_gradient_descent(
    X, y, beta_init, learning_rate=0.0001, epochs=1000, tolerance=1e-6
):
    """
    Performs stochastic gradient descent to optimize the beta coefficients for a logistic regression model.

    Parameters:
    - X (numpy.ndarray): The input features matrix, where each row represents a sample and each column represents a feature.
    - y (numpy.ndarray): The target vector, where each element is the target for a sample.
    - beta_init (numpy.ndarray): The initial values for the beta coefficients.
    - learning_rate (float, optional): The learning rate for the optimization. Defaults to 0.0001.
    - epochs (int, optional): The number of iterations to run the gradient descent. Defaults to 1000.
    - tolerance (float, optional): The tolerance for the loss to declare convergence. Defaults to 1e-6.

    Returns:
    - numpy.ndarray: The optimized beta coefficients after running stochastic gradient descent.
    """
    loss_history = []

    beta = beta_init
    n_samples = X.shape[0]

    for epoch in range(epochs):
        idx = np.random.randint(0, n_samples)
        X_i = X[idx, :].reshape(1, -1)
        y_i = y[idx]

        predictions = prediction_function(X_i, beta)
        errors = y_i - predictions
        gradient = np.dot(X_i.T, errors)

        beta += learning_rate * gradient

        loss = loss_function(y, X, beta).mean()
        loss_history.append(loss)

        if epoch % 1000 == 0:
            print(f"Loss at epoch {epoch}: {loss}")

        if loss < tolerance:
            break

    print(f"Final loss: {loss_history[-1]}")

    return beta


In [12]:
beta_T = stochastic_gradient_descent(
    train_X,
    train_y_tweet,
    np.zeros((train_X.shape[1], 1)),
    learning_rate=0.001,
    epochs=10000,
    tolerance=1e-4,
)

beta_T


  return 1 / (1 + np.exp(-z))



[1;35marray[0m[1m([0m[1m[[0m[1m[[0m [1;36m0.02739916[0m[1m][0m,
       [1m[[0m [1;36m2.92325763[0m[1m][0m,
       [1m[[0m[1;36m-2.10764575[0m[1m][0m,
       [1m[[0m [1;36m0.12694995[0m[1m][0m[1m][0m[1m)[0m

In [13]:
print(np.round(beta_T, 3).reshape(1, -1))

In [14]:
print(np.round(blackbox_model.coef_, 3))

In [15]:
y_pred_T = (prediction_function(test_X, beta_T) > 0.5).astype(int)

print(f"Accuracy: {accuracy_score(test_y_tweet, y_pred_T)}")
print(f"Precision: {precision_score(test_y_tweet, y_pred_T)}")
print(f"Recall: {recall_score(test_y_tweet, y_pred_T)}")
print(f"F1: {f1_score(test_y_tweet, y_pred_T)}")


  return 1 / (1 + np.exp(-z))


### Comparison with the black-box classifier

The trained logistic regression model acheives a similar performance to the black-box classifier, with an accuracy of 0.99. The coefficients for the two models are:

- Black-box classifier: $[[ 1.156, 0.013, -0.012, -0.094]]$
- Manual logistic regression: $[[ 0.027,  2.923, -2.108,  0.127]]$

Here, although the coefficients are different in magnitude, the performance of the two models is similar.