**`LTSM-Cell`**

In [93]:
import numpy as np 

class Tensor(object):
    
    def __init__(self, data, creators=None, creation_op=None, autograd=False, id=None):
        self.data = np.array(data)
        self.creators = creators
        self.creation_op = creation_op
        self.grad = None
        self.autograd = autograd
        if(id == None):
            id = np.random.randint(0,100000)
        self.id = id
        self.children = {}
        if(creators is not None):
            for creator in creators:
                if self.id not in creator.children:
                    creator.children[self.id] = 1
                else:
                    creator.children[self.id] += 1    

    def backward(self, grad=None, grad_origin=None):
        if(self.autograd):
            if(grad_origin is not None):
                # if waiting to receive gradient, decrement counter
                if(self.children[grad_origin.id] == 0):
                    return
                else:
                    self.children[grad_origin.id] -= 1
                
                #else:
                #    raise Exception("Same child cannot backpropagate more than once!")

            # if this is the beginning of the backpropagtion chain
            if(grad is None):
                grad = Tensor(np.ones_like(self.data))

            # accumulate gradients from all the children 
            if(self.grad is None):
                self.grad = grad
            else:
                self.grad += grad    

            # make sure grads don't have their own grads
            assert grad.autograd == False

            # backpropagate to creators if all gradients from children have been received or if gradients did not originate from another node
            if((self.creators is not None) and (self.received_grads_from_all_children() or (grad_origin is None))):
                if(self.creation_op == "add"):
                    new_grad = Tensor(self.grad.data)
                    self.creators[0].backward(new_grad, self)
                    self.creators[1].backward(new_grad, self)

                if(self.creation_op == "neg"):
                    new_grad = self.grad.__neg__()
                    self.creators[0].backward(new_grad)    
                
                if(self.creation_op == "sub"):
                    new_grad = Tensor(self.grad.data)
                    self.creators[0].backward(new_grad, self)
                    new_grad = Tensor(self.grad.__neg__().data)
                    self.creators[1].backward(new_grad, self)    
                
                if(self.creation_op == "mul"):
                    new_grad = self.grad * self.creators[1]
                    self.creators[0].backward(new_grad, self)
                    new_grad = self.creators[0] * self.grad
                    self.creators[1].backward(new_grad, self)

                if(self.creation_op == "mm"):
                    new_grad = self.grad.mm(self.creators[1].transpose())
                    self.creators[0].backward(new_grad)
                    new_grad = (self.creators[0].transpose()).mm(self.grad)
                    self.creators[1].backward(new_grad)
                
                if(self.creation_op == "transpose"):
                    new_grad = self.grad.transpose()
                    self.creators[0].backward(new_grad)
                
                if(self.creation_op == "sigmoid"):
                    ones = Tensor(np.ones_like(self.grad.data))
                    # sigmoid derivative
                    new_grad = self.grad * (self * (ones - self))
                    self.creators[0].backward(new_grad)
                
                if(self.creation_op == "tanh"):
                    ones = Tensor(np.ones_like(self.grad.data))
                    # tanh derivative
                    new_grad = self.grad * (ones - self*self)
                    self.creators[0].backward(new_grad)
                
                if(self.creation_op == "relu"):
                    # relu derivative
                    new_grad = self.grad * (self.creators[0].data > 0)
                    self.creators[0].backward(new_grad)
                
                if(self.creation_op == "cross_entropy"):
                    # cross entropy derivative
                    new_grad = Tensor(self.softmax_output - self.target_dist)
                    self.creators[0].backward(new_grad)
                
                if(self.creation_op == "index_select"):
                    # gradient of the weights matrix of word embeddings
                    new_grad = np.zeros_like(self.creators[0].data)
                    # we only add gradients to the specific rows corresponding to the selected words 
                    indices_ = self.index_select_indices.data.flatten() 
                    grad_ = self.grad.data.reshape(len(indices_), -1)
                    for i in range(len(indices_)):
                        new_grad[indices_[i]] += grad_[i]
                    self.creators[0].backward(Tensor(new_grad))       

                if("sum" in self.creation_op):
                    dim = int(self.creation_op.split("_")[1])
                    ds = self.creators[0].data.shape[dim]
                    self.creators[0].backward(self.grad.expand(dim,ds))

                if("expand" in self.creation_op):
                    dim = int(self.creation_op.split("_")[1])
                    self.creators[0].backward(self.grad.sum(dim))


    # check to see if this tensor has recieved gradients from all children, which is indicated by all children counts being zero
    def received_grads_from_all_children(self):
        for id,count in self.children.items():
            if (count != 0):
                return False
        return True     

    # Note: operations always return a new tensor object 

    # element-wise addition
    def __add__(self, other):
        # return a new tensor object containing the sum
        if(self.autograd and other.autograd):
            return Tensor(self.data + other.data, creators=[self,other], creation_op ="add", autograd=True)
        return Tensor(self.data + other.data)
    
    # element-wise negation
    def __neg__(self):
        # return a new tensor object containing the negation
        if(self.autograd):
            return Tensor(-1 * self.data, creators=[self], creation_op ="neg", autograd=True)
        return Tensor(-1 * self.data)

    # element-wise subtraction
    def __sub__(self, other):
        # return a new tensor object containing the subtraction
        if(self.autograd and other.autograd):
            return Tensor(self.data - other.data, creators=[self,other], creation_op ="sub", autograd=True)
        return Tensor(self.data - other.data)

    # element-wise multiplication
    def __mul__(self, other):
        # return a new tensor object containing the multiplication
        if(self.autograd and other.autograd):
            return Tensor(self.data * other.data, creators=[self,other], creation_op ="mul", autograd=True)
        return Tensor(self.data * other.data)
    
    # sum over all elements along given axis
    def sum(self, axis):
        # return a new tensor object containing the sum
        if(self.autograd):
            return Tensor(self.data.sum(axis), creators=[self], creation_op ="sum_"+str(axis), autograd=True)
        return Tensor(self.data.sum(axis))
    
    # expands the tensor along the given axis
    def expand(self, axis, copies):
        
        trans_cmd = list(range(0,len(self.data.shape)))
        trans_cmd.insert(axis, len(self.data.shape))
        
        new_shape = list(self.data.shape) + [copies]
        new_data = self.data.repeat(copies).reshape(new_shape)
        new_data = new_data.transpose(trans_cmd)
        
        if(self.autograd):
            return Tensor(new_data, autograd=True, creators=[self], creation_op="expand_"+str(axis))
        return Tensor(new_data)

    # transpose of matrix 
    def transpose(self):
        # return a new tensor object with the transposed tensor
        if(self.autograd):
            return Tensor(self.data.transpose(), creators=[self], creation_op ="transpose", autograd=True)
        return Tensor(self.data.transpose())

    # matrix multiplication
    def mm(self, other):
        # return a new tensor object containing the multiplication
        if(self.autograd and other.autograd):
            return Tensor(np.dot(self.data, other.data), creators=[self,other], creation_op ="mm", autograd=True)
        return Tensor(np.dot(self.data, other.data))

    def __str__(self):
        return str(self.data.__str__())
    
    def __repr__(self):
        return str(self.data.__repr__())

    # Non-linearity functions

    # sigmoid function
    def sigmoid(self):
        if(self.autograd):
            return Tensor(1.0 / (1.0 + np.exp(-self.data)), creators=[self], creation_op="sigmoid", autograd=True)
        return Tensor(1.0 / (1.0 + np.exp(-self.data)))

    # tanh function
    def tanh(self):
        if(self.autograd):
            return Tensor(np.tanh(self.data), creators=[self], creation_op="tanh", autograd=True)
        return Tensor(np.tanh(self.data))
    
    # relu function
    def relu(self):
        if(self.autograd):
            return Tensor(self.data * (self.data > 0), creators=[self], creation_op="relu", autograd=True)
        return Tensor(self.data * (self.data > 0))
    
    
    def softmax(self):

        ex = np.exp(self.data)
        softmax_output = ex/np.sum(ex, axis = len(self.data.shape)-1, keepdims = True) 
        return softmax_output 
    

    def cross_entropy(self, target_indices):

        ex = np.exp(self.data)
        softmax_output = ex/np.sum(ex, axis = len(self.data.shape)-1, keepdims = True) 
        
        t = target_indices.data.flatten()
        p = softmax_output.reshape(len(t), -1)
        target_dist = np.eye(p.shape[1])[t]
        loss = -(np.log(p) * (target_dist)).sum(1).mean()

        if(self.autograd):
            out = Tensor(loss, creators = [self], creation_op = "cross_entropy", autograd=True)
            out.softmax_output = softmax_output
            out.target_dist = target_dist
            return out 
        return Tensor(loss) 


    # word embedding operations (the input 'indices' are just word a vector of indices, i.e. specifix row numbers that are to be selected and returned)
    def index_select(self, indices):
        if(self.autograd):
            selected_rows =  Tensor(self.data[indices.data], creators=[self], creation_op="index_select", autograd=True)
            selected_rows.index_select_indices = indices 
            return selected_rows 
        return Tensor(self.data[indices.data])

# stochastic gradient descent optimizer    
class SGD_Optimizer(object):

    def __init__(self, parameters, alpha) -> None:
        self.parameters = parameters
        self.alpha = alpha    

    def zero(self):
        for p in self.parameters:
            p.grad.data *= 0

    def step(self, zero=True):
        for p in self.parameters:
            p.data -= self.alpha * p.grad.data

            if(zero):
                p.grad.data *= 0

# layer base class
class Layer(object):   
    def __init__(self) -> None:
        self.parameters = []

    def get_parameters(self):                     
        return self.parameters
    
# layer inherited classes
class Linear(Layer):
    def __init__(self, n_inputs, n_outputs, bias=True) -> None:
        super().__init__()
        self.bias = bias
        # initilize the weights
        W = np.random.randn(n_inputs, n_outputs) * np.sqrt(2.0/n_inputs)
        self.weight = Tensor(W, autograd=True)
        self.parameters.append(self.weight)

        if(bias):
            self.bias = Tensor(np.zeros(n_outputs), autograd=True)
        if(bias):
            self.parameters.append(self.bias)

    def forward(self, input):
        if(self.bias):
            return input.mm(self.weight) + self.bias.expand(0,len(input.data))   
        else:
            return input.mm(self.weight)


# embedding layer inherited class
class Embedding(Layer):
    def __init__(self, vocab_size, hidden_neurons) -> None:
        super().__init__()
        self.vocab_size = vocab_size
        self.hidden_neurons = hidden_neurons

        # initialize the weights matrix of word embeddings 
        weight = (np.random.rand(vocab_size, hidden_neurons)-0.5)/hidden_neurons
        self.weight = Tensor(weight, autograd=True)
        self.parameters.append(self.weight)   

    def forward(self, input):
        return self.weight.index_select(input)    
        

class RNNcell(Layer):
    def __init__(self, input_neurons, hidden_neurons, output_neurons, activation = "sigmoid") -> None:
        super().__init__()
        self.input_neurons = input_neurons
        self.hidden_neurons = hidden_neurons
        self.output_neurons = output_neurons
        
        # initialize the nonlinearity layer
        if(activation == "sigmoid"):
            self.activation = Sigmoid()
        elif(activation == "tanh"):
            self.activation = Tanh()
        elif(activation == "relu"):
            self.activation = Relu()
        else:
            raise Exception("ERROR: Non-linearity function not found!")

        # initialize the wieghts
        self.w_ih = Linear(input_neurons, hidden_neurons)
        self.w_hh = Linear(hidden_neurons, hidden_neurons)
        self.w_ho = Linear(hidden_neurons, output_neurons)

        self.parameters += self.w_ih.get_parameters()
        self.parameters += self.w_hh.get_parameters()
        self.parameters += self.w_ho.get_parameters()

    def forward(self, input, prev_hidden):

        # compute hidden state for this RNN cell
        input_times_weight = self.w_ih.forward(input) 
        combined = input_times_weight + self.w_hh.forward(prev_hidden)   
        hidden = self.activation.forward(combined)
        #compute prediction
        pred = self.w_ho.forward(hidden)
       
        return pred, hidden
     
    def init_hidden(self, batch_size = 1):
        # initialize the hidden state
        return Tensor(np.zeros(shape=(batch_size, self.hidden_neurons)), autograd=True) 
  
        
class LSTMcell(Layer):
    def __init__(self, input_neurons, hidden_neurons, output_neurons) -> None:
        super().__init__()
        self.input_neurons = input_neurons
        self.hidden_neurons = hidden_neurons
        self.output_neurons = output_neurons
        
        # initialize the wieghts
        self.xf = Linear(input_neurons, hidden_neurons)
        self.xi = Linear(input_neurons, hidden_neurons)
        self.xo = Linear(input_neurons, hidden_neurons)
        self.xc = Linear(input_neurons, hidden_neurons)
        
        self.hf = Linear(hidden_neurons, hidden_neurons, bias=False)
        self.hi = Linear(hidden_neurons, hidden_neurons, bias=False)
        self.ho = Linear(hidden_neurons, hidden_neurons, bias=False)
        self.hc = Linear(hidden_neurons, hidden_neurons, bias=False)
        
        self.w_ho = Linear(hidden_neurons, output_neurons, bias=False)

        self.parameters += self.xf.get_parameters()
        self.parameters += self.xi.get_parameters()
        self.parameters += self.xo.get_parameters()
        self.parameters += self.xc.get_parameters()
        
        self.parameters += self.hf.get_parameters()
        self.parameters += self.hi.get_parameters()
        self.parameters += self.ho.get_parameters()
        self.parameters += self.hc.get_parameters()
        
        self.parameters += self.w_ho.get_parameters()


    def forward(self, input, hidden):

        prev_hidden = hidden[0]
        prev_cell = hidden[1]

        # compute the gates
        f = (self.xf.forward(input) + self.hf.forward(prev_hidden)).sigmoid()
        i = (self.xi.forward(input) + self.hi.forward(prev_hidden)).sigmoid()
        o = (self.xo.forward(input) + self.ho.forward(prev_hidden)).sigmoid()
        g = (self.xc.forward(input) + self.hc.forward(prev_hidden)).tanh()
        
        # compute both gated hidden states for this LSTM cell
        c = (f * prev_cell) + (i * g)
        h = o * c.tanh()

        #compute prediction
        pred = self.w_ho.forward(h)
       
        return pred, (h, c)

     
    def init_hidden(self, batch_size = 1):
        # initialize both hidden states
        h = Tensor(np.zeros(shape=(batch_size, self.hidden_neurons)), autograd=True) 
        c = Tensor(np.zeros(shape=(batch_size, self.hidden_neurons)), autograd=True) 
        h.data[:,0] += 1
        c.data[:,0] += 1
        return (h, c)


# a class for a senquence of layer, i.e. a neral network model
class Sequential(Layer):
    def __init__(self, layers = []) -> None:
        super().__init__()
        self.layers = layers

    def add(self, layer):
        self.layers.append(layer)

    def forward(self, input):
        for layer in self.layers:
            input = layer.forward(input)
        return input
    
    def get_parameters(self):
        params = []
        for layer in self.layers:
            params += layer.get_parameters()

        return params    
    
# means squared error loss function layer    
class MSELoss(Layer):
    def __init__(self) -> None:
        super().__init__()

    def forward(self, pred, target):
        return ((pred-target) * (pred-target)).sum(0)

# cross entropy loss function layer    
class CrossEntropyLoss(Layer):
    def __init__(self) -> None:
        super().__init__()

    def forward(self, input, target):
        return input.cross_entropy(target)


# nonlinearity layers
class Sigmoid(Layer):
    def __init__(self) -> None:
        super().__init__()

    def forward(self, input):
        return input.sigmoid()

class Tanh(Layer):
    def __init__(self) -> None:
        super().__init__()

    def forward(self, input):
        return input.tanh()

class Relu(Layer):
    def __init__(self) -> None:
        super().__init__()

    def forward(self, input):
        return input.relu()



Training next character prediction with Shakespeare dataset

In [54]:
# read training data from file
f = open('shakespear.txt', 'r')
raw = f.read() # read character by character
f.close()

# build a vocabulary of all characters in the dataset
vocab= list(set(raw))

# create a dictionary of vocab character indices
char_index = {}
for i, word in enumerate(vocab):
    char_index[word] = i    

    # convert the entire dataset into character indices
indices = []
for char in raw:
    indices.append(char_index[char])
data = np.array(indices).reshape(-1)    

In [94]:
import math

niters = 500
batch_size = 50
hidden_neurons = 512
chunk_size = 25

np.random.seed(1)

# since the data is a single very long string of characters, we split it up into smallers sections, each section containing chunk_size number of characters
num_inputs = math.ceil(len(data)/chunk_size)
inputs = np.zeros(shape=(num_inputs, chunk_size))
for i in range(num_inputs):
    lo = i * chunk_size
    hi = min((i + 1) * chunk_size, len(data))
    inputs[i,0:hi-lo] = data[lo:hi]
inputs = inputs.astype(int)

print(f"inputs shape: {inputs.shape}")


# initialize the RNN layers
embed = Embedding(len(vocab), hidden_neurons)
# Note: since we're going to feed in outputs from the embedding layer into the RNN cell, the input neurons size needs to be equal to the the length of the embedding vectors, which is the hidden neurons size
model = LSTMcell(hidden_neurons, hidden_neurons, len(vocab))
#model = RNNcell(hidden_neurons, hidden_neurons, len(vocab))

# initialize loss layers for predictions at each RNN cell
loss_layer = CrossEntropyLoss() 

params = embed.get_parameters() + model.get_parameters() 
optim = SGD_Optimizer(params, alpha=0.05)


inputs shape: (4000, 25)


In [56]:
len(embed.get_parameters())

1

In [57]:
len(model.get_parameters())

13

In [95]:
import sys

# make sure input data set is divisible by batch size
if(inputs.shape[0]%batch_size != 0):
    raise Exception("ERROR! Input dataset needs to be divisible by batch_size")

# train the network to predict the next word in the given input sequence
for iter in range(niters):
    
    total_loss = 0.0
    correct = 0
    incorrect = 0

    # initilaize hidden state
    hidden = model.init_hidden(batch_size) 

    # train in batches
    for j in range(int(inputs.shape[0]/batch_size)):
    
        batch_lo = j * batch_size 
        batch_hi = min((j+1) * batch_size, inputs.shape[0]) 
        batch = inputs[batch_lo:batch_hi]
        
        
        # initialize a new hidden state at the beginning of each chunk
        hidden = (Tensor(hidden[0].data, autograd=True), Tensor(hidden[1].data, autograd=True))
        #hidden = (Tensor(hidden.data, autograd=True))

        # forward pass through LSTM cells      
        for k in range(batch.shape[1]-1):
            
            input = Tensor(batch[:, k], autograd=True)
            
            # create the word embedding from the input word
            lstm_input = embed.forward(input)

            # feed the word embedding into the RNN cell to predict the next word
            prediction, hidden = model.forward(lstm_input, hidden)
    
        # compute loss (i.e. compare predicted word from the last RNN cell to last word in the sentence)
        target = Tensor(batch[:, k+1], autograd=True)
        loss = loss_layer.forward(prediction, target)
        total_loss += loss.data
    
        # compute prediction accuracy
        for ix in range(batch_size):
            if(np.argmax(prediction.data[ix]) == target.data[ix]):
                correct += 1
            else:
                incorrect += 1    

        # backpropagate the loss gadients
        loss.backward()
    
        # weights optimization
        optim.step()

    # exponentially decay the learning rate
    #optim.alpha *= 0.99

    if(iter%1 == 0):
        print(f"Iteration# {iter+1}, Loss: {total_loss}, Accuracy: {float(correct)/(float(correct + incorrect))}")


KeyboardInterrupt: 