In [1]:
import numpy as np, re

In [24]:
class Vocabulary:
    def __init__(self) -> None:
        self.word2index = {}
        self.word2count = {}
        self.index2word = {}
        self.sentences = []
        self.tokens = []
        self.num_words = 0
        self.num_sentences = 0

    def _add_word(self, word):
        if word not in self.word2index:
            self.tokens.append(word)
            self.word2count[word] = 1
            self.word2index[word] = self.num_words
            self.index2word[self.num_words] = word
            self.num_words += 1
        else:
            self.word2count[word] += 1

    def _add_sentence(self, sentence):
        sentence = sentence.lower()
        new = self._clean_sentence(sentence=sentence)
        new = new.replace('\n', '')
        self.sentences.append(new)
        
        for word in new.split(' '):
            if word != '':
                self._add_word(word)
            else:
                continue
      
        self.num_sentences += 1
        
    def pad_sequences(self, sequence, length=None):
        """
        Default: Pad an input sequence to be the same as self.seq_length
        
        Alternative: Pad an input sequence to the 'length' param
        
        Keras: Pads input sequences with length of longest sequence
        
        Params:
        sequence --> np.array[numpy.array], integer matrix of tokenized words
        
        Returns:
        padded_sequence --> np.array[numpy.array], integer matrix of tokenized words with padding
        """
        return_arr = []
        
        for s in sequence:
            new = list(s)
            
            if not length:
                missing = self.seq_length - len(new)
            else:
                missing = length - len(new)
                
            new.extend([0]*missing)
            return_arr.append(new)
            
        return np.vstack(return_arr)
    
    def _sort_by_frequency(self):
        sorted_count = dict(sorted(self.word2count.items(), key=lambda x:x[1], reverse=True))

        self.word2index = {}
        
        count = 0 ## start at 1 to copy keras --> 0 is reserved for padding (this is how keras does it)
        for k,v in sorted_count.items():
            self.word2index[k] = count
            count += 1
        
        self.index2word = {v:k for k,v in self.word2index.items()}
        
        return self
    
    def _compile_vocab(self, corpus):
        """
        Creates vocabulary

        Params:
        Corpus --> List[str]
        
        Returns:
        self
        """
        for s in corpus:
            self._add_sentence(s)

        assert len(self.word2count) == len(self.word2index) == len(self.index2word)
        self.size = len(self.word2count)
        
        self._sort_by_frequency()
        
    def tokenize(self, corpus, seq_length):
        """
        Creates sequences of tokens

        Params:
        Corpus --> List[str]
        
        Returns:
        Token Sequences --> List[str]
        """
        self._compile_vocab(corpus)
        self.seq_length = seq_length
        self.token_sequences = []
        
        for i in range(seq_length, self.size):
            seq = self.tokens[i-seq_length:i]
            seq = [self.word2index[i] for i in seq]
            self.token_sequences.append(seq)
        
        return np.array(self.token_sequences)

    def _clean_sentence(self, sentence):
        new_string = re.sub(r'[^\w\s]', '', sentence)
        return new_string

    def to_word(self, index):
        return self.index2word[index]

    def to_index(self, word):
        return self.word2index[word]

In [25]:
class Dense:
    def __init__(self, neurons, activation='softmax'):
        """
        Initializes a simple dense layer

        Args:
            'neurons': int, num of output dimensions
        """
        self.neurons = neurons
        self.name = 'Dense'
        self.activation = activation
        
    def softmax(self, inputs):
        """
        Softmax Activation Function used to copute multi-class output probabilities
        """
        exp_scores = np.exp(inputs)
        probs = exp_scores / np.sum(exp_scores, axis=1, keepdims=True)
        return probs
        
    def forward(self, inputs):
        """
        Compute forward pass of single (output) layer
        """
        self.weights = np.random.randn(inputs.shape[1], self.neurons)
        self.bias = np.zeros((1, self.neurons))
        
        y = np.dot(inputs, self.weights) + self.bias
        
        return self.softmax(y)
    
class EmbeddingLayer:
    def __init__(self, vocab_size, hidden_dim):
        self.name = 'Embedding'
        self.vocab_size = vocab_size
        self.hidden_dim = hidden_dim
        self.weights = np.random.randn(vocab_size, hidden_dim) ## (vocab_size, hidden_dim)

    def predict(self, array):
        """
        PARAMS:
          array: 
           -- integer matrix of batch_size x seq_length

        RETURNS:
          array:
           -- integer matrix of batch_size x seq_length x hidden_dim
           -- the word vectors for each word in the tokenized input
        """
        assert np.max(array) <= self.vocab_size

        return np.array([self.weights[i] for i in array])    

In [26]:
class LSTM:
    def __init__(self, units, features, seq_length):
        """
        Initializes the LSTM layer
        
        Args:
            Units: int (num of LSTM units in layer)
            features: int (dimensionality of token embeddings)
        """
        self.name = 'LSTM'
        self.hidden_dim = units
        self.dimensionality = features
        self.seq_length = seq_length
        self.init_h = np.zeros((self.hidden_dim,))
        self.init_c = np.zeros((self.hidden_dim,))
        self.caches = []
        self.states = []
        self.cache_grads = []
        self.state_grads = []
        
    def _init_orthogonal(self, param):
        """
        Initializes weight parameters orthogonally.

        Refer to this paper for an explanation of this initialization:
        https://arxiv.org/abs/1312.6120
        """
        if param.ndim < 2:
            raise ValueError("Only parameters with 2 or more dimensions are supported.")

        rows, cols = param.shape

        new_param = np.random.randn(rows, cols)

        if rows < cols:
            new_param = new_param.T

        # Compute QR factorization
        q, r = np.linalg.qr(new_param)

        # Make Q uniform according to https://arxiv.org/pdf/math-ph/0609050.pdf
        d = np.diag(r, 0)
        ph = np.sign(d)
        q *= ph

        if rows < cols:
            q = q.T

        new_param = q

        return new_param
    
    def sigmoid(self, x, derivative=False):
        """
        Computes the element-wise sigmoid activation function for an array x.

        Args:
         `x`: the array where the function is applied
         `derivative`: if set to True will return the derivative instead of the forward pass
        """
        x_safe = x + 1e-12
        f = 1 / (1 + np.exp(-x_safe))

        if derivative: # Return the derivative of the function evaluated at x
            return f * (1 - f)
        else: # Return the forward pass of the function at x
            return f
    
    def tanh(self, x, derivative=False):
        """
        Computes the element-wise tanh activation function for an array x.

        Args:
         `x`: the array where the function is applied
         `derivative`: if set to True will return the derivative instead of the forward pass
        """
        x_safe = x + 1e-12
        f = (np.exp(x_safe)-np.exp(-x_safe))/(np.exp(x_safe)+np.exp(-x_safe))

        if derivative: # Return the derivative of the function evaluated at x
            return 1-f**2
        else: # Return the forward pass of the function at x
            return f
    
    def softmax(self, x):
        """
        Computes the softmax for an array x.

        Args:
         `x`: the array where the function is applied
         `derivative`: if set to True will return the derivative instead of the forward pass
        """
        x_safe = x + 1e-12
        f = np.exp(x_safe) / np.sum(np.exp(x_safe))

        # Return the forward pass of the function at x
        return f
    
    def _init_params(self):
        """
        Initializes the weight and biases of the layer
        
            -- Initialize weights according to https://arxiv.org/abs/1312.6120 (_init_orthogonal)
            -- Initialize weights according to https://github.com/keras-team/keras/blob/master/keras/layers/rnn/lstm.py
            -- Assumptions: Batch_First=True (PyTorch) or time_major=False (keras)
        """
        self.kernel = self._init_orthogonal(np.random.randn(self.dimensionality, self.hidden_dim * 4))
        self.recurrent_kernel = self._init_orthogonal(np.random.randn(self.hidden_dim, self.hidden_dim * 4))
        self.bias = np.random.randn(self.hidden_dim * 4, )

        self.kernel_i = self.kernel[:, :self.hidden_dim]
        self.kernel_f = self.kernel[:, self.hidden_dim: self.hidden_dim * 2]
        self.kernel_c = self.kernel[:, self.hidden_dim * 2: self.hidden_dim * 3]
        self.kernel_o = self.kernel[:, self.hidden_dim * 3:]

        self.recurrent_kernel_i = self.recurrent_kernel[:, :self.hidden_dim]
        self.recurrent_kernel_f = self.recurrent_kernel[:, self.hidden_dim: self.hidden_dim * 2]
        self.recurrent_kernel_c = self.recurrent_kernel[:, self.hidden_dim * 2: self.hidden_dim * 3]
        self.recurrent_kernel_o = self.recurrent_kernel[:, self.hidden_dim * 3:]

        self.bias_i = self.bias[:self.hidden_dim]
        self.bias_f = self.bias[self.hidden_dim: self.hidden_dim * 2]
        self.bias_c = self.bias[self.hidden_dim * 2: self.hidden_dim * 3]
        self.bias_o = self.bias[self.hidden_dim * 3:]

    def forward(self, inputs, t, state):
        self._init_params()
        
        inputs_i = inputs
        inputs_f = inputs
        inputs_c = inputs
        inputs_o = inputs
       
        h_tm1_i = state['h']
        h_tm1_f = state['h']
        h_tm1_c = state['h']
        h_tm1_o = state['h']

        x_i = np.dot(inputs_i, self.kernel_i) + self.bias_i
        x_f = np.dot(inputs_f, self.kernel_f) + self.bias_f
        x_c = np.dot(inputs_c, self.kernel_c) + self.bias_c
        x_o = np.dot(inputs_o, self.kernel_o) + self.bias_o

        f = self.sigmoid(x_f + np.dot(h_tm1_f, self.recurrent_kernel_f))
        i = self.sigmoid(x_i + np.dot(h_tm1_i, self.recurrent_kernel_i))
        o = self.sigmoid(x_o + np.dot(h_tm1_o, self.recurrent_kernel_o))
        cbar = self.sigmoid(x_c + np.dot(h_tm1_c, self.recurrent_kernel_c))
        
        c = (f * state['c']) + (i * cbar)
            
        ht = o * self.tanh(c)
        
        cache = {'i':i, 'f':f, 'cbar':cbar, 'o':o, 'inputs':inputs}
        state = {'h':ht, 'c':c}

        return cache, state
        
    def backward(self, prediction, actual, t):
        if t == 0:
            dh_next, dc_next = np.zeros_like(self.states[t]['h']), np.zeros_like(self.states[t]['c'])
            c_prev = np.zeros_like(self.states[t]['c'])
        else:
            dh_next, dc_next = self.grad_states[t-1]['h'], self.grad_states[t-1]['c']
            c_prev = self.states[t-1]['c']
        
        dscores = np.copy(prediction)
        dscores[range(self.seq_length), actual] -= 1

        i, f, cbar, o = self.caches[t]['i'], self.caches[t]['f'], self.caches[t]['cbar'], self.caches[t]['o']
        h, c = self.states[t]['h'], self.states[t]['c']

        # Hidden to output gradient
        dWy = np.dot(h.T, dscores)
        dby = dscores
        dh = np.dot(dscores, dense.weights.T) + dh_next
        
        # Gradient for o
        do = self.tanh(c) * dh
        do = self.sigmoid(o, derivative=True) * do

        # Gradient for cbar
        dcbar = o * dh * self.tanh(c, derivative=True)
        dcbar = dcbar + dc_next
            
        # Gradient for f
        df = c_prev * dcbar
        df = self.sigmoid(f, derivative=True) * df
        
        # Gradient for i
        di = c * dcbar
        di = self.sigmoid(i, derivative=True) * di
        
        # Gradient for c
        dc = i * dcbar
        dc = self.tanh(c, derivative=True) * dc
        
        # Gate gradients, just a normal fully connected layer gradient
        # We backprop into the kernel, recurrent_kernel, bias, inputs (embedding), & hidden state
        dWf = np.dot(self.caches[t]['inputs'].T, df) # --> kernel
        dXf = np.dot(df, self.kernel_f.T) # --> embedding
        dUf = np.dot(h.T, df) # --> recurrent kernel
        dhf = np.dot(df, self.recurrent_kernel_f) # --> hidden state
        dbf = df # --> bias

        dWi = np.dot(self.caches[t]['inputs'].T, di)
        dXi = np.dot(di, self.kernel_i.T)
        dUi = np.dot(h.T, di)
        dhi = np.dot(di, self.recurrent_kernel_i)
        dbi = di
        
        dWo = np.dot(self.caches[t]['inputs'].T, do)
        dXo = np.dot(do, self.kernel_o.T)
        dUo = np.dot(h.T, do)
        dho = np.dot(do, self.recurrent_kernel_o)
        dbo = do
        
        dWc = np.dot(self.caches[t]['inputs'].T, dc)
        dXc = np.dot(dc, self.kernel_c.T)
        dUc = np.dot(h.T, dc)
        dhc = np.dot(dc, self.recurrent_kernel_c)
        dbc = dc
        
        # As X was used in multiple gates, the gradient must be accumulated here
        dX = dXo + dXc + dXi + dXf

        # As h was used in multiple gates, the gradient must be accumulated here
        dh_next = dho + dhc + dhi + dhf

        # Gradient for c_old in c = hf * c_old + hi * hc
        dc_next = f * dc

        kernel_grads = dict(Wf=dWf, Wi=dWi, Wc=dWc, Wo=dWo, Wy=dWy, bf=dbf, bi=dbi, bc=dbc, bo=dbo, by=dby)
        recurrent_kernel_grads = dict(Uf=dUf, Ui=dUi, Uc=dUc, Uo=dUo)
        state_grads = (dh_next, dc_next)

In [5]:
# step 1 -- data
f = open(r"C:\Users\12482\Desktop\opensource\numpy-rnn\data\alice_wonderland.txt", 'r', encoding='utf-8').readlines()

# step 2 -- tokenize
## create vocabulary + tokenize
v = Vocabulary()
token_sequences = v.tokenize(f, 26)

# step 3 -- split into x/y
## create X & Y datasets
X = token_sequences[:,:-1]
y = token_sequences[:,-1]

# step 4 -- embedding layer -- layer 1
## create embedding layer
e = EmbeddingLayer(vocab_size=v.size, hidden_dim=20) ## hidden_dim is a hyper-param
lstm_inputs = e.predict(X)

batch1 = lstm_inputs[0]

lstm = LSTM(units=100, features=20, seq_length=25)
dense = Dense(v.size)

lstm.forward(batch1, 0)
final_out = dense.forward(lstm.states[0]['h'])

lstm.backward(final_out, y[0], t=0)

In [10]:
# Hidden to output gradient
dWy = np.dot(lstm.states[0]['h'].T, final_out)
dby = final_out
dh = np.dot(final_out, dense.weights.T) + np.zeros_like(lstm.states[0]['h'])

i,f,cbar,o = lstm.caches[0]['i'],lstm.caches[0]['f'],lstm.caches[0]['cbar'],lstm.caches[0]['o']
h,c = lstm.states[0]['h'], lstm.states[0]['c']

# Gradient for o
do = lstm.tanh(c) * dh
do = lstm.sigmoid(o, derivative=True) * do

# Gradient for c
dc = o * dh * lstm.tanh(c, derivative=True)
dc = dc + np.zeros_like(lstm.states[0]['c'])

# Gradient for f
df = np.zeros((100,)) * dc
df = lstm.sigmoid(f, derivative=True) * df

# Gradient for i
di = c * dc
di = lstm.sigmoid(i, derivative=True) * di

# Gradient for c
dhc = i * dc
dhc = lstm.tanh(c, derivative=True) * dhc

Wf = lstm.kernel_f
Wi = lstm.kernel_i
Wo = lstm.kernel_o
Wc = lstm.kernel_c

Uf = lstm.recurrent_kernel_f
Ui = lstm.recurrent_kernel_i
Uo = lstm.recurrent_kernel_o
Uc = lstm.recurrent_kernel_c

x = batch1

# Gate gradients, just a normal fully connected layer gradient
dWf = np.dot(x.T, df)
dbf = df
dXf = np.dot(df, Wf.T)

dUf = np.dot(h.T, df)
dhf = np.dot(df, Uf)

dWi = x.T @ di
dbi = di
dXi = di @ Wi.T

dUi = np.dot(h.T, di)
dhi = np.dot(di, Ui)

dWo = x.T @ do
dbo = do
dXo = do @ Wo.T

dUo = np.dot(h.T, do)
dho = np.dot(do, Uo)

dWc = x.T @ dc
dbc = dc
dXc = dc @ Wc.T

dUc = np.dot(h.T, dc)
dhc = np.dot(dc, Uc)

# As X was used in multiple gates, the gradient must be accumulated here
dX = dXo + dXc + dXi + dXf

# As h was used in multiple gates, the gradient must be accumulated here
dh_next = dho + dhc + dhi + dhf

# Gradient for c_old in c = hf * c_old + hi * hc
dc_next = f * dc

kernel_grads = dict(Wf=dWf, Wi=dWi, Wc=dWc, Wo=dWo, Wy=dWy, bf=dbf, bi=dbi, bc=dbc, bo=dbo, by=dby)
recurrent_kernel_grads = dict(Uf=dUf, Ui=dUi, Uc=dUc, Uo=dUo)
state_grads = (dh_next, dc_next)

state_grads[0].shape, lstm.states[0]['h'].shape

((25, 100), (25, 100))

In [20]:
e.weights[X[0]][0]

array([-0.95758776,  2.1121455 , -0.26800323,  0.01459076, -0.47376143,
       -1.48681638, -1.18158921, -1.20269818,  0.3432233 , -1.19742022,
       -0.39271261, -1.82865208,  0.25866743,  0.0479968 ,  0.90222416,
       -1.88538138,  0.19184315,  1.59177148, -0.65775989, -1.67760029])

In [22]:
e.weights[X[0]][0]

array([-0.9571145 ,  2.11214903, -0.26865439,  0.01458452, -0.47344209,
       -1.48669556, -1.18086327, -1.20255205,  0.34382675, -1.1973645 ,
       -0.39347352, -1.82798446,  0.25882647,  0.04762641,  0.90263504,
       -1.88514798,  0.19170229,  1.59246777, -0.65807185, -1.67725589])

In [21]:
e.weights[X[0]] -= 0.01 * dX

In [30]:
X[0]

array([   8,   11,  273,    2,   98,   27,  464,    6,  341,   74,   15,
        416,   17,    0, 1070,    1,  342,  127,   50,  132,   53,  588,
          4,   19,  837])