In [15]:
import numpy as np
import string
from nltk.corpus import stopwords
import nltk

nltk.download('stopwords')

[nltk_data] Downloading package stopwords to C:\Users\Granth
[nltk_data]     Bagadia\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [16]:
class CBOW(object):
    def __init__(self, N, window_size):
        self.N = N
        self.X_train = []
        self.y_train = []
        self.window_size = window_size
        self.alpha = 0.001
        self.words = []
        self.word_index = {}

    def initialize(self, V, data):
        self.V = V
        self.W1 = np.random.uniform(-0.8, 0.8, (self.V, self.N))
        self.W2 = np.random.uniform(-0.8, 0.8, (self.N, self.V))
        self.words = data
        for i in range(len(data)):
            self.word_index[data[i]] = i

    def preprocessing(self, corpus):
        stop_words = set(stopwords.words('english'))
        training_data = []
        sentences = corpus.split(".")
        for i in range(len(sentences)):
            sentences[i] = sentences[i].strip()
            sentence = sentences[i].split()
            x = [word.strip(string.punctuation) for word in sentence if word not in stop_words]
            x = [word.lower() for word in x]
            training_data.append(x)
        return training_data

    def prepare_data_for_training(self, corpus):
        sentences = self.preprocessing(corpus)
        data = {}
        for sentence in sentences:
            for word in sentence:
                if word not in data:
                    data[word] = 1
                else:
                    data[word] += 1
        V = len(data)
        data = sorted(list(data.keys()))
        vocab = {}
        for i in range(len(data)):
            vocab[data[i]] = i

        for sentence in sentences:
            for i in range(len(sentence)):
                context = [0 for x in range(V)]
                center_word = [0 for x in range(V)]
                center_word[vocab[sentence[i]]] = 1
                for j in range(i - self.window_size, i + self.window_size + 1):
                    if i != j and j >= 0 and j < len(sentence):
                        context[vocab[sentence[j]]] += 1
                self.X_train.append(context)
                self.y_train.append(center_word)

        self.initialize(V, data)
        return self.X_train, self.y_train

    def softmax(self, x):
        exp_x = np.exp(x - np.max(x))
        return exp_x / exp_x.sum(axis=0)

    def feed_forward(self, X):
        self.h = np.dot(self.W1.T, X).reshape(self.N, 1)
        self.u = np.dot(self.W2.T, self.h)
        self.y = self.softmax(self.u)
        return self.y

    def backpropagate(self, x, t):
        e = self.y - np.asarray(t).reshape(self.V, 1)
        dLdW2 = np.dot(self.h, e.T)
        X = np.array(x).reshape(self.V, 1)
        dLdW1 = np.dot(X, np.dot(self.W2, e).T)
        self.W2 = self.W2 - self.alpha * dLdW2
        self.W1 = self.W1 - self.alpha * dLdW1

    def train(self, epochs):
        for epoch in range(1, epochs):
            self.loss = 0
            for j in range(len(self.X_train)):
                self.feed_forward(self.X_train[j])
                self.backpropagate(self.X_train[j], self.y_train[j])
                C = 0
                for m in range(self.V):
                    if self.y_train[j][m]:
                        self.loss += -1 * self.u[m][0]
                        C += 1
                self.loss += C * np.log(np.sum(np.exp(self.u)))
            if epoch % 1000 == 0:
                print(f"Epoch {epoch}, Loss: {self.loss}")
            self.alpha *= 1 / (1 + self.alpha * epoch)

    def predict(self, context_words, number_of_predictions):
        X = [0 for i in range(self.V)]
        for word in context_words:
            if word in self.words:
                index = self.word_index[word]
                X[index] += 1

        prediction = self.feed_forward(X)
        output = {}
        for i in range(self.V):
            output[prediction[i][0]] = i
        top_predictions = []
        for k in sorted(output, reverse=True):
            top_predictions.append(self.words[output[k]])
            if len(top_predictions) >= number_of_predictions:
                break
        return top_predictions

    def compute_similarity(self, vec1, vec2):
        return np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2))

    def rank_words(self, target_vector):
        similarities = {}
        for i in range(self.V):
            word_vector = self.W1[i]
            similarity = self.compute_similarity(target_vector, word_vector)
            similarities[i] = similarity
        ranked_words = sorted(similarities, key=similarities.get, reverse=True)
        return ranked_words

    def compute_mrr_for_window(self, target_word, context_words):
        target_index = self.word_index[target_word]
        target_vector = self.W1[target_index]

        ranked_indices = self.rank_words(target_vector)

        mrr = 0
        for context_word in context_words:
            context_index = self.word_index[context_word]
            rank = ranked_indices.index(context_index) + 1
            mrr += 1 / rank
        return mrr / len(context_words)

    def compute_mrr(self, test_data):
        total_mrr = 0
        for window in test_data:
            target_word, context_words = window[0], window[1:]
            total_mrr += self.compute_mrr_for_window(target_word, context_words)
        average_mrr = total_mrr / len(test_data)
        return average_mrr

    def evaluate_mrr(self, test_corpus):
        test_sentences = self.preprocessing(test_corpus)
        test_data = []
        for sentence in test_sentences:
            for i in range(len(sentence)):
                window = sentence[max(0, i - self.window_size):min(len(sentence), i + self.window_size + 1)]
                test_data.append(window)

        mrr = self.compute_mrr(test_data)
        print("MRR:", mrr)

In [17]:
train_corpus = "The earth revolves around the sun. The moon revolves around the earth."
test_corpus = "The sun revolves around the earth. The earth revolves around the moon."

In [18]:
cbow = CBOW(N=50, window_size=2)
cbow.prepare_data_for_training(train_corpus)
cbow.train(epochs=10000)

Epoch 1000, Loss: 10.351679490353394
Epoch 2000, Loss: 10.279755304409866
Epoch 3000, Loss: 10.256210184958494
Epoch 4000, Loss: 10.244518057426468
Epoch 5000, Loss: 10.237528499758572
Epoch 6000, Loss: 10.232879505935024
Epoch 7000, Loss: 10.229564040792393
Epoch 8000, Loss: 10.227080309512221
Epoch 9000, Loss: 10.225150217536665


In [19]:
cbow.evaluate_mrr(test_corpus)

MRR: 0.2801388888888888


In [20]:
print(cbow.predict(["the", "revolves", "sun"], 3))
print(cbow.W1[cbow.word_index["earth"]])

Word not found in dictionary
None
[-0.53256885 -0.70563596  0.07855461  0.40833322 -0.30250257  0.51874767
  0.44018612  0.51278246 -0.13371677  0.58826988  0.45678772 -0.35019537
 -0.37782976  0.108365    0.61816973  0.04530163 -0.70084048 -0.48972064
 -0.36178548 -0.23688827 -0.17547803 -0.15102303 -0.54871521 -0.44878585
  0.75011751 -0.52981644  0.50228706 -0.71437914  0.10798189 -0.04767139
  0.27967491  0.56509143 -0.08984134 -0.19054663  0.09598199 -0.18236667
 -0.56642973  0.21170609 -0.38328395 -0.19801797  0.63954692 -0.42971867
 -0.51023874  0.42693253 -0.27577159 -0.23057178  0.53139705 -0.02095248
  0.37192606 -0.68750596]
