In [1]:
import torch
import matplotlib.pyplot as plt
import numpy as np
import torch.nn as nn
import torch.optim as optim
import re
%matplotlib inline

In [2]:
# Model
"""
y = b + Wx + U * tanh(d + Hx)

x = concat of all input sequence feature vectors(words)
b = biases for W
d = biases for H
W = direct representation matrix
H = hidden layer matrix
U = another hidden to output layer matrix

y = (Wx + b) + (U * tanh(d+Hx))
y =  (1,|V|) +   (1, |V|) 
     
goes to two different models, addition = (1,|V|) + (1, |V|) = (1,|V|)
|V| -> length of vocabuluary

then (1,|V|) -> softmax -> probabilities for each word in vocab
"""

'\ny = b + Wx + U * tanh(d + Hx)\n\nx = concat of all input sequence feature vectors(words)\nb = biases for W\nd = biases for H\nW = direct representation matrix\nH = hidden layer matrix\nU = another hidden to output layer matrix\n\ny = (Wx + b) + (U * tanh(d+Hx))\ny =  (1,|V|) +   (1, |V|) \n     \ngoes to two different models, addition = (1,|V|) + (1, |V|) = (1,|V|)\n|V| -> length of vocabuluary\n\nthen (1,|V|) -> softmax -> probabilities for each word in vocab\n'

In [40]:
class NPL:

    def __init__(self, vocab, hidden_units=100, context_size=3, feature_word_len=10, has_direct_rep=True):
        
        self.hidden_units = hidden_units
        self.feature_word_len = feature_word_len
        self.has_direct_rep = has_direct_rep
        self.context_size = context_size
        self.vocab = vocab

        self.C = torch.randn(self.vocab, feature_word_len)
        self.hidden_layer = torch.randn((self.context_size*self.feature_word_len), self.hidden_units)
        self.b = torch.randn(self.hidden_units)
        self.output_layer = torch.randn(self.hidden_units, self.vocab)
        
        self.parameters = [self.C, self.hidden_layer, self.b, self.output_layer]
        
        if has_direct_rep:
            self.direct_representation = torch.randn((self.context_size*self.feature_word_len), self.vocab)
            self.d = torch.randn(self.vocab)
            self.parameters.extend([self.direct_representation, self.d])

        self.tanh = nn.Tanh()
        self.softmax = nn.Softmax(dim=1)
        self.CLE = nn.CrossEntropyLoss()

        # Set parameters gradient to true
        for p in self.parameters:
            p.requires_grad = True
            
    # List of word indexes to feature vectors
    def get_feature_vectors(self, x, y):

        # C[[index_1,index_2,index_3],...], C[[index_y],...]
        x,y = self.C[x], self.C[y]
       
        # concat all input feature vectors into one
        x = x.view(x.shape[0], x.shape[1]*x.shape[2]) # [B, context_size*feature_vector_len)
        
        return x,y
        
    def forward(self, x, y):

        x,y = self.get_feature_vectors(x,y)
        #print(x.shape,y.shape) # [B, context_size*feature_vector_len] , [B, feature_vector_len]
        
        # Hidden layer tanh(b+Hx)
        H = self.tanh(torch.matmul(x, self.hidden_layer) + self.b)
        O = torch.matmul(H, self.output_layer)

        if self.has_direct_rep:
            # Direct representation layer (Wx + d)
            D = torch.matmul(x, self.direct_representation) + self.d
            logits = O + D
        else:
            logits = O

        return logits
        
    def __call__(self, x,y):
        logits = self.forward(x,y)
        return logits
        
    def generate(self, start_context, length):

        if type(start_context) is not str:
            raise "Context has to be a string"

        start_context = start_context.split()

        if len(start_context) > self.context_size:
            prnit("input string larger than context size, might lead to improper responses\n")
        
        input_vectors = self.get_feature_vectors(start_context)
 

In [135]:

def prepare_text(filename):
    words = open(filename, "r").read()
    words = words.lower()
    words = re.sub(r'[^a-zA-Z\s]', '', words)
    words = words.split()
    
    # Create vocabulary by removing all duplicates
    vocab = sorted(list(set(words)))
    
    return words, vocab
    
def create_pairs(words, context_size):
    for i in range(len(words)-context_size):
        x.append(words[i:i+context_size])
        y.append(words[i+context_size:i+context_size+1])

    # x[i] -> ["asd","Asd","aw"] context_size=3
    # y[i] -> ["fgds"]
    return x,y


def get_index_vectors(x, y, words_to_i):
    for i in range(len(x)):
        for j in range(len(x[i])):
            x[i][j] = words_to_i[x[i][j]]
        y[i] = words_to_i[y[i][0]]

    # x -> [12312,1231,1] context_size=3
    # y -> [5]
    return x,y

def get_word_dict(vocab):
    
    words_to_i = {}
    i_to_words = {}
    
    for i in enumerate(vocab):
        #('word', index) <=> (index, 'word')
        words_to_i[i[1]] = i[0]
        i_to_words[i[0]] = i[1]
        
    return words_to_i, i_to_words
    
def train(text_file, **kwargs):
    
    defaults = {
        'hidden_units': 100,
        'context_size': 3,
        'feature_vector_size': 10,
        'direct_rep': False,
    }

    defaults.update(kwargs)

    # Prepare data
    words, vocab = prepare_text(text_file)

    # Helper dictionaries mapping words to index and vice versa
    words_to_i, i_to_words = get_word_dict(vocab)
    
    x,y = create_pairs(words, defaults['context_size'])
    x,y = get_index_vectors(x,y, words_to_i)
    
    # Model
    model = NPL(vocab=len(vocab), hidden_units=defaults['hidden_units'], context_size=defaults['context_size'], 
                feature_word_len=defaults['feature_vector_size'], has_direct_rep=defaults['direct_rep'])

    # optimizer and loss
    softmax = nn.Softmax(dim=1)
    CLE = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters, lr=0.01, momentum=0.9)

'th'

In [50]:
def prepare_text(filename):
    words = open(filename, "r").read()
    words = words.lower()
    words = re.sub(r'[^a-zA-Z\s]', '', words)
    words = words.split()
    vocab = list(set(words))
    return words, vocab


In [201]:
words,vocab = prepare_text("t8.shakespeare.txt")

In [202]:
len(words), len(vocab)

(899836, 28122)

In [203]:
words_to_i = {}
i_to_words = {}
vocab = sorted(vocab)
for i in enumerate(vocab):
    words_to_i[i[1]] = i[0]
    i_to_words[i[0]] = i[1]

In [204]:
x,y = [], []
context_size = 3
feature_vector_size = 10 # 3*10 -> 30 input size
C = torch.randn(len(vocab), feature_vector_size)

In [205]:
a = [1,2]
a.extend([3,4])
a

[1, 2, 3, 4]

In [206]:
def create_pairs(words, context_size):
    for i in range(len(words)-context_size):
        x.append(words[i:i+context_size])
        y.append(words[i+context_size:i+context_size+1])

    # x[i] -> ["asd","Asd","aw"]
    # y[i] -> ["fgds"]
    return x,y

In [207]:
def get_index_vectors(x, y, words_to_i):
    for i in range(len(x)):
        for j in range(len(x[i])):
            x[i][j] = words_to_i[x[i][j]]
        y[i] = words_to_i[y[i][0]]

    # x -> [12312,1231,1]
    # y -> [5]
    return x,y

In [208]:
x,y = create_pairs(words, 5)
x,y = get_index_vectors(x,y, words_to_i)

In [209]:
x,y = torch.tensor(x), torch.tensor(y)
x.shape, y.shape

(torch.Size([899831, 5]), torch.Size([899831]))

In [234]:
#vocab, hidden_units=100, context_size=3, feature_word_len=10, has_direct_rep=True):
model = NPL(vocab=len(vocab), hidden_units=100, context_size=5, feature_word_len=10, has_direct_rep=True)

In [235]:
x.shape, y.shape

(torch.Size([899831, 5]), torch.Size([899831]))

In [236]:
x[:1], y[:1]

(tensor([[24508, 13013, 24393, 24358,  8367]]), tensor([9204]))

In [256]:
logits.shape

torch.Size([10, 28122])

In [238]:
y[:10]

tensor([ 9204, 18779,  3341, 19033, 11001,   844, 13013, 18779, 12455,  5239])

In [303]:
logits = model(x[:10],y[:10])
loss = CLE(logits, y[:10])
loss

tensor(23.9045, grad_fn=<NllLossBackward0>)

In [304]:
optimizer.zero_grad()
loss.backward()
optimizer.step()
print(loss)

tensor(23.9045, grad_fn=<NllLossBackward0>)


In [264]:
import torch.optim

optimizer = optim.SGD(model.parameters, lr=0.01, momentum=0.9)

In [265]:
softmax = nn.Softmax(dim=1)
CLE = nn.CrossEntropyLoss()

In [127]:
prob = softmax(logits)

In [128]:
prob.shape

torch.Size([10, 28122])

In [None]:
#vocab, hidden_units=100, context_size=3, feature_word_len=10, has_direct_rep=True):
model = NPL(vocab=len(vocab), hidden_units=100, context_size=3, feature_word_len=10, has_direct_rep=True)