In [2]:
import os
import re
import string
import nltk
from nltk.corpus import stopwords
from nltk.stem.porter import PorterStemmer
from collections import Counter
import math
import numpy as np
from sklearn.metrics import confusion_matrix, accuracy_score
import pickle

In [3]:
nltk.download('stopwords')

[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/amanmehmood/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [4]:
stemmer = PorterStemmer()

stop_words = set(stopwords.words('english'))

emoticon_pattern = r'[:=;][oO\-]?[D\)\]\(\]/\\OpP]'

In [5]:
def process_text(text):
    text = text.lower()
    text = re.sub(r'<.*?>', '', text)
    
    words = re.split(r'\W+', text)
    words = [stemmer.stem(word) for word in words if word not in string.punctuation]
    words = [word for word in words if word not in stop_words and not re.match(emoticon_pattern, word)]

    return words

In [6]:
def create_vocab(directory):
    vocab = Counter()

    for path, _, files in os.walk(directory):
        for file in files:
            with open(os.path.join(path, file), 'r') as f:
                text = f.read()
                words = process_text(text)
                vocab.update(words)

    return vocab

In [10]:
vocab = create_vocab('/Users/amanmehmood/AIT 726/programming_assignment_2/data/raw/tweet/train')

In [11]:
def calculate_tf(document_words, vocab):
    tf = dict.fromkeys(vocab, 0)
    word_count = Counter(document_words)

    for word, count in word_count.items():
        if word in vocab:
            tf[word] = count / len(document_words)

    return tf

In [12]:
def calculate_idf(documents, vocab):
    N = len(documents)
    idf = dict.fromkeys(vocab, 0)

    for document in documents:
        for word in vocab:
            if word in document:
                idf[word] += 1
    
    for word in vocab:
        idf[word] = math.log(N / float(idf[word]))
        
    return idf

In [13]:
def calculate_tf_idf(tf, idf, vocab):
    tf_idf = dict.fromkeys(vocab, 0)

    for word in vocab:
        tf_idf[word] = tf[word] * idf[word]
        
    return tf_idf

In [14]:
def load_documents(directory):
    documents = []
    
    for path, _, files in os.walk(directory):
        for file in files:
            with open(os.path.join(path, file), 'r') as f:
                text = f.read()
                words = process_text(text)
                documents.append(words)
                
    return documents

In [15]:
documents = load_documents('/Users/amanmehmood/AIT 726/programming_assignment_2/data/raw/tweet/train')

In [16]:
tfs = [calculate_tf(document, vocab) for document in documents]

idf = calculate_idf(documents, vocab)

tf_idfs = [calculate_tf_idf(tf, idf, vocab) for tf in tfs]

In [17]:
def sigmoid(x):
    return 1 / (1 + np.exp(-x))

def sigmoid_derivative(x):
    return x * (1 - x)

def mse(y_true, y_pred):
    return np.mean(np.square(y_true - y_pred))

def mse_derivative(y_true, y_pred):
    return 2 * (y_pred - y_true) / y_true.size

In [18]:
class NeuralNetwork:
    def __init__(self, input_size, hidden_size, output_size):
        self.W1 = np.random.randn(hidden_size, input_size)
        self.b1 = np.zeros((hidden_size, 1))
        self.W2 = np.random.randn(output_size, hidden_size)
        self.b2 = np.zeros((output_size, 1))

    def forward(self, X):
        self.Z1 = np.dot(self.W1, X) + self.b1
        self.A1 = sigmoid(self.Z1)
        self.Z2 = np.dot(self.W2, self.A1) + self.b2
        self.A2 = sigmoid(self.Z2)
        return self.A2

    def backward(self, X, Y, output, learning_rate=0.0001):
        dZ2 = mse_derivative(Y, output) * sigmoid_derivative(output)
        dW2 = np.dot(dZ2, self.A1.T)
        db2 = np.sum(dZ2, axis=1, keepdims=True)
        dZ1 = np.dot(self.W2.T, dZ2) * sigmoid_derivative(self.A1)
        dW1 = np.dot(dZ1, X.T)
        db1 = np.sum(dZ1, axis=1, keepdims=True)
        
        self.W1 -= learning_rate * dW1
        self.b1 -= learning_rate * db1
        self.W2 -= learning_rate * dW2
        self.b2 -= learning_rate * db2

    def train(self, X, Y, epochs):
        for epoch in range(epochs):
            output = self.forward(X)
            loss = mse(Y, output)
            print(f'Epoch {epoch}, Loss: {loss}')
            self.backward(X, Y, output)

    def predict(self, X):
        output = self.forward(X)
        return (output > 0.5).astype(int)

In [19]:
nn = NeuralNetwork(input_size=len(vocab), hidden_size=20, output_size=1)

X_train = np.array([list(tf_idf.values()) for tf_idf in tf_idfs])

Y_train = np.random.randint(0, 2, (1, len(tf_idfs)))

nn.train(X_train.T, Y_train, epochs=100)

Epoch 0, Loss: 0.3253301769055179
Epoch 1, Loss: 0.3253274401374648
Epoch 2, Loss: 0.3253247034620452
Epoch 3, Loss: 0.32532196687926457
Epoch 4, Loss: 0.3253192303891283
Epoch 5, Loss: 0.325316493991642
Epoch 6, Loss: 0.32531375768681103
Epoch 7, Loss: 0.325311021474641
Epoch 8, Loss: 0.32530828535513723
Epoch 9, Loss: 0.3253055493283053
Epoch 10, Loss: 0.3253028133941505
Epoch 11, Loss: 0.3253000775526784
Epoch 12, Loss: 0.3252973418038946
Epoch 13, Loss: 0.3252946061478043
Epoch 14, Loss: 0.32529187058441317
Epoch 15, Loss: 0.3252891351137265
Epoch 16, Loss: 0.3252863997357498
Epoch 17, Loss: 0.3252836644504886
Epoch 18, Loss: 0.3252809292579483
Epoch 19, Loss: 0.3252781941581344
Epoch 20, Loss: 0.32527545915105227
Epoch 21, Loss: 0.32527272423670744
Epoch 22, Loss: 0.32526998941510527
Epoch 23, Loss: 0.32526725468625134
Epoch 24, Loss: 0.325264520050151
Epoch 25, Loss: 0.3252617855068097
Epoch 26, Loss: 0.325259051056233
Epoch 27, Loss: 0.32525631669842614
Epoch 28, Loss: 0.3252535

In [20]:
test_documents = load_documents('/Users/amanmehmood/AIT 726/programming_assignment_2/data/raw/tweet/test')

In [21]:
test_tfs = [calculate_tf(document, vocab) for document in test_documents]


test_tf_idfs = [calculate_tf_idf(tf, idf, vocab) for tf in test_tfs]

In [22]:
X_test = np.array([list(tf_idf.values()) for tf_idf in test_tf_idfs])

Y_test = np.random.randint(0, 2, (1, len(test_tf_idfs)))

In [23]:
predictions = nn.predict(X_test.T)

In [24]:
accuracy = accuracy_score(Y_test.flatten(), predictions.flatten())
confusion_matrix = confusion_matrix(Y_test.flatten(), predictions.flatten())

In [25]:
with open('/Users/amanmehmood/AIT 726/programming_assignment_2/reports/sa_evaluation.log', 'w') as f:
    f.write(f'Accuracy: {accuracy}\n')
    f.write(f'Confusion Matrix:\n{confusion_matrix}\n')

In [57]:
with open('C:\\Users\\AMEHMOOD\\Documents\\Repos\\programming_assignment_2\\models\\trained_sa_nn.pkl', 'wb') as f:
    pickle.dump(nn, f)