In [1]:
import numpy as np
import pickle
import glob

In [2]:
def accuracy_score(y_true, y_pred):
    return 1 - np.mean(np.abs(y_true - y_pred))

def f1_score(y_true, y_pred):
    precision = np.sum(y_true * y_pred) / np.sum(y_pred)
    recall = np.sum(y_true * y_pred) / np.sum(y_true)
    
    return 2 * precision * recall / (precision + recall)

Generate training and test set, and build vocabulary using only words from the training set.

In [3]:
neg_files = glob.glob("review_polarity/txt_sentoken/neg/*.txt")
pos_files = glob.glob("review_polarity/txt_sentoken/pos/*.txt")
files = neg_files + pos_files

n = len(files)
n_test = 400
n_train = n - n_test

np.random.seed(517)
test_idx = np.random.choice(n, size=n_test, replace = False)
train_idx = np.delete(np.arange(n), test_idx)

vocab = {}
index = 0
for file in np.array(files)[train_idx]:
    with open(file) as f:
        raw_txt = f.read()
    txt_arr = raw_txt.split()
    for token in txt_arr:
        if token not in vocab:
            vocab[token] = index
            index += 1
            
pickle.dump(vocab, open("vocab.pkl", "wb"))

Generate word count feature vectors $x_i$ for each document, i.e. $x_{ij} = $ the number of instances of word $j$ in document $i$. Generate labels $y_i$, which equal $0$ for negative documents and $1$ for positive documents.

In [4]:
vocab_size = len(vocab)

X_train = np.zeros((n_train, vocab_size), dtype=np.int8)
X_test = np.zeros((n_test, vocab_size), dtype=np.int8)

# Later indices are positive (1) and earlier are negative (0).
y_train = (train_idx >= 1000).astype(int)
y_test = (test_idx >= 1000).astype(int)

for i in range(n_train):
    file = files[train_idx[i]]
    with open(file) as f:
        raw_txt = f.read()
    txt_arr = raw_txt.split()
    for token in txt_arr:
        X_train[i, vocab[token]] += 1

for i in range(n_test):
    file = files[test_idx[i]]
    with open(file) as f:
        raw_txt = f.read()
    txt_arr = raw_txt.split()
    for token in txt_arr:
        # Ignore unknown tokens.
        if token in vocab:
            X_test[i, vocab[token]] += 1

In [5]:
print("Train size:", n_train)
print("Test size:", n_test)
print("Vocabulary size:", len(vocab))
print()
print("X_train shape:", X_train.shape)
print("X_test shape:", X_test.shape)
print("y_train shape:", y_train.shape)
print("y_test shape:", y_test.shape)

Train size: 1600
Test size: 400
Vocabulary size: 46182

X_train shape: (1600, 46182)
X_test shape: (400, 46182)
y_train shape: (1600,)
y_test shape: (400,)


## Logistic Regression

In [6]:
class LogisticRegressor():
    
    def __init__(self, lambda_, batch_size, epochs, lr, verbose):
        self.lambda_ = lambda_
        self.batch_size = batch_size
        self.epochs = epochs
        self.lr = lr
        self.verbose = verbose
    
    def fit(self, X, y):
        """
        Run mini-batch stochastic gradient descent to fit cross entropy loss objective.
        """
        n, d = X.shape
        X_appended = np.concatenate((np.ones((n, 1)), X), axis=1)
        if not hasattr(self, 'weights'):
            self.weights = np.random.normal(size=d+1)
        checkpoint = self.epochs // 10
        
        for epoch in range(self.epochs):
            
            order = np.random.permutation(n)
            num_batch = n // self.batch_size
            
            for i in range(num_batch):
                indices = order[i:min(i + batch_size, n)]
                X_batch = X_appended[indices]
                y_batch = y[indices]
                
                self.gradient_step(X_batch, y_batch)
                
            # Compute training loss.
            if verbose and (epoch % checkpoint == 0):
                logits = np.dot(X_appended, self.weights)
                probs = self.sigmoid(logits)
                loss = (-(y * np.log(probs) + (1 - y) * np.log(1 - probs)).sum() + 0.5 * self.lambda_ * np.dot(self.weights, self.weights)) / n
                
                y_pred = (1 + np.sign(logits)) / 2
                train_accuracy = accuracy_score(y, y_pred)
                print("Epoch %d \t cross entropy loss: %0.4f train accuracy: %0.3f" % (epoch, loss, train_accuracy))
            
                
        self.fitted = True
        return self
    
    def predict(self, X):
        """
        Predict 1 for positive sentiment and 0 for negative.
        """
        n = len(X)
        X_appended = np.concatenate((np.ones((n, 1)), X), axis=1)
        
        return (1 + np.sign(np.dot(X_appended, self.weights))) / 2
    
    def gradient_step(self, X, y):
        """
        Compute gradient and perform one optimization step.
        """
        # Use l1 regularization.
        
        n = len(X)
        probs = self.sigmoid(np.dot(X, self.weights))
        grad = (np.dot(X.T, (probs - y)) + self.lambda_ * self.weights) / n
        self.weights = self.weights - self.lr * grad
        
    def sigmoid(self, Z):
        Z_clipped = np.clip(Z, -10, 10)
        return 1 / (1 + np.exp(-Z_clipped))

Check if the logistic regression code works on a toy example: class-conditional Gaussians, centered at $-\mu$ for the negative class and $+\mu$ for the positive class.

In [7]:
n_tr = 1000
n_te = 100
d = 10

mu = np.ones(d)
cov = np.eye(d)

X_tr = np.concatenate(
    [
        np.random.multivariate_normal(-mu, cov, size = n_tr // 2),
        np.random.multivariate_normal(mu, cov, size = n_tr // 2)
    ]
)
y_tr = np.concatenate(
    [
        np.repeat(0, n_tr // 2),
        np.repeat(1, n_tr // 2)
    ]
)
X_te = np.concatenate(
    [
        np.random.multivariate_normal(-mu, cov, size = n_te // 2),
        np.random.multivariate_normal(mu, cov, size = n_te // 2)
    ]
)
y_te = np.concatenate(
    [
        np.repeat(0, n_te // 2),
        np.repeat(1, n_te // 2)
    ]
)

batch_size = 32
epochs = 10
verbose = True
lr = 0.03
lambda_ = 0.001

lr = LogisticRegressor(lambda_=lambda_, batch_size=batch_size, epochs=epochs, lr=lr, verbose=verbose)
lr.fit(X_tr, y_tr)
y_pred = lr.predict(X_te)

print("Accuracy:", accuracy_score(y_te, y_pred))

Epoch 0 	 cross entropy loss: 0.2451 train accuracy: 0.908
Epoch 1 	 cross entropy loss: 0.1820 train accuracy: 0.934
Epoch 2 	 cross entropy loss: 0.1231 train accuracy: 0.953
Epoch 3 	 cross entropy loss: 0.1030 train accuracy: 0.958
Epoch 4 	 cross entropy loss: 0.0928 train accuracy: 0.963
Epoch 5 	 cross entropy loss: 0.0737 train accuracy: 0.970
Epoch 6 	 cross entropy loss: 0.0628 train accuracy: 0.978
Epoch 7 	 cross entropy loss: 0.0552 train accuracy: 0.979
Epoch 8 	 cross entropy loss: 0.0477 train accuracy: 0.984
Epoch 9 	 cross entropy loss: 0.0379 train accuracy: 0.986
Accuracy: 0.99


Run on Pang and Lee movie review data. Hyperparameters below.

In [8]:
batch_size = 32
epochs = 200
verbose = True
lr = 0.01
lambda_ = 0

lr = LogisticRegressor(lambda_=lambda_, batch_size=batch_size, epochs=epochs, lr=lr, verbose=verbose)

In [9]:
try:
    lr.fit(X_train, y_train)
except KeyboardInterrupt:
    print('Graceful Exit')

Epoch 0 	 cross entropy loss: 4.2969 train accuracy: 0.498
Epoch 20 	 cross entropy loss: 2.9803 train accuracy: 0.637
Epoch 40 	 cross entropy loss: 2.2397 train accuracy: 0.706
Epoch 60 	 cross entropy loss: 1.8942 train accuracy: 0.740
Epoch 80 	 cross entropy loss: 1.6722 train accuracy: 0.764
Epoch 100 	 cross entropy loss: 1.3735 train accuracy: 0.796
Epoch 120 	 cross entropy loss: 1.1819 train accuracy: 0.824
Epoch 140 	 cross entropy loss: 0.9254 train accuracy: 0.846
Epoch 160 	 cross entropy loss: 0.7978 train accuracy: 0.863
Epoch 180 	 cross entropy loss: 0.8682 train accuracy: 0.848


In [10]:
y_pred = lr.predict(X_test)

print("Accuracy:", accuracy_score(y_test, y_pred))
print("F1 score:", f1_score(y_test, y_pred))

Accuracy: 0.74
F1 score: 0.7425742574257425


## Sentiment Lexicon-Based Classifier

Collect positive and negative sentiment words, and discard those that are not in the vocabulary of the movie review data.

In [11]:
neg_word_file = "opinion-lexicon-English/negative-words.txt"
pos_word_file = "opinion-lexicon-English/positive-words.txt"

vocab = pickle.load(open("vocab.pkl", "rb"))

with open(neg_word_file, encoding="ISO-8859-1") as f:
    neg_words_orig = f.readlines()
print("Original number of negative words:", len(neg_words_orig))
neg_words = []
for word in neg_words_orig[31:]:
    candidate = word[0:-1] 

    # Ignore words that are not in the vocab.
    if candidate in vocab:
        neg_words.append(candidate)

print("Negative words in vocab:", len(neg_words))
            
with open(pos_word_file, encoding="ISO-8859-1") as f:
    pos_words_orig = f.readlines()
print("Original number of positive words:", len(pos_words_orig))
pos_words = []
for word in pos_words_orig[31:]:
    candidate = word[0:-1] 

    # Ignore words that are not in the vocab.
    if candidate in vocab:
        pos_words.append(candidate)

print("Positive words in vocab:", len(pos_words))

Original number of negative words: 4814
Negative words in vocab: 2986
Original number of positive words: 2036
Positive words in vocab: 1445


Assess accuracy on test set, as training set was used at least to generate vocabulary. Subtract "points" from the prediction for the counts all negative words in the document, and add "points" for the counts of all positive words. Predict $1$ for positive score, and $0$ otherwise.

In [12]:
# Generate predictions.
n, d = X_test.shape

scores = np.zeros(n)

for word in neg_words:
    scores -= X_test[:, vocab[word]]
        
for word in pos_words:
    scores += X_test[:, vocab[word]]
    
y_pred = (scores > 0).astype(int)

In [13]:
print("Accuracy:", accuracy_score(y_test, y_pred))
print("F1 score:", f1_score(y_test, y_pred))

Accuracy: 0.69
F1 score: 0.691542288557214
