<a href="https://colab.research.google.com/github/msaleem-aisci/FYP/blob/main/LSTM_from_scratch.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [34]:
import numpy as np
import re
from collections import Counter

In [35]:
class Tanh:
    def forward(self, z):
        return np.tanh(z)

    def backward(self, delta, z):
        return delta * (1 - np.tanh(z)**2)

class Sigmoid:
    def forward(self, z):
        return 1 / (1 + np.exp(-z))

    def backward(self, delta, z):
        sig = self.forward(z)
        return delta * (sig * (1 - sig))

class Softmax:
    def forward(self, z):
        exp_z = np.exp(z - np.max(z, axis=0, keepdims=True))
        return exp_z / np.sum(exp_z, axis=0, keepdims=True)

    def backward(self, delta, z):
        return delta

In [36]:
class Dense:
    def __init__(self, units, input_dim):
        self.weights = np.random.randn(units, input_dim) * 0.01
        self.bias = np.zeros((units, 1))

    def forward(self, x):
        self.inputs = x
        self.z = np.dot(self.weights, x) + self.bias

        exp_z = np.exp(self.z - np.max(self.z))
        self.probs = exp_z / np.sum(exp_z)
        return self.probs

    def backward(self, target_idx, lr):
        dy = self.probs.copy()
        dy[target_idx] -= 1

        dW = np.dot(dy, self.inputs.T)
        db = dy
        dX = np.dot(self.weights.T, dy)

        self.weights -= lr * dW
        self.bias -= lr * db
        return dX

In [38]:
class LSTM:
    def __init__(self, units, input_dim):
        self.units = units
        self.input_dim = input_dim

        def init(r, c): return np.random.randn(r, c) * np.sqrt(2.0/(r+c))

        self.params = {
            'Wf': init(units, input_dim), 'Uf': init(units, units), 'bf': np.zeros((units, 1)),
            'Wi': init(units, input_dim), 'Ui': init(units, units), 'bi': np.zeros((units, 1)),
            'Wc': init(units, input_dim), 'Uc': init(units, units), 'bc': np.zeros((units, 1)),
            'Wo': init(units, input_dim), 'Uo': init(units, units), 'bo': np.zeros((units, 1))
        }
        self.cache = []

    def sigmoid(self, x): return 1 / (1 + np.exp(-x))
    def tanh(self, x): return np.tanh(x)

    def forward(self, X):
        self.h = np.zeros((self.units, 1))
        self.c = np.zeros((self.units, 1))
        self.cache = []

        for x_t in X:
            x_t = x_t.reshape(-1, 1)
            p = self.params

            f = self.sigmoid(np.dot(p['Wf'], x_t) + np.dot(p['Uf'], self.h) + p['bf'])
            i = self.sigmoid(np.dot(p['Wi'], x_t) + np.dot(p['Ui'], self.h) + p['bi'])
            c_tilde = self.tanh(np.dot(p['Wc'], x_t) + np.dot(p['Uc'], self.h) + p['bc'])

            self.c = (f * self.c) + (i * c_tilde)
            o = self.sigmoid(np.dot(p['Wo'], x_t) + np.dot(p['Uo'], self.h) + p['bo'])
            self.h = o * self.tanh(self.c)

            self.cache.append({'x':x_t, 'h_prev': self.cache[-1]['h'] if self.cache else np.zeros_like(self.h),
                               'c_prev': self.cache[-1]['c'] if self.cache else np.zeros_like(self.c),
                               'f':f, 'i':i, 'c_tilde':c_tilde, 'o':o, 'c':self.c, 'h':self.h})
        return self.h

    def backward(self, dh, lr):
        grads = {k: np.zeros_like(v) for k, v in self.params.items()}
        dh_next = np.zeros_like(self.h)
        dc_next = np.zeros_like(self.c)

        for step in reversed(self.cache):
            dh_total = dh + dh_next
            tanh_c = self.tanh(step['c'])

            do = dh_total * tanh_c
            do_raw = do * step['o'] * (1 - step['o'])
            grads['Wo'] += np.dot(do_raw, step['x'].T); grads['Uo'] += np.dot(do_raw, step['h_prev'].T); grads['bo'] += do_raw

            dc = (dh_total * step['o'] * (1 - tanh_c**2)) + dc_next

            dc_tilde = dc * step['i']
            dc_tilde_raw = dc_tilde * (1 - step['c_tilde']**2)
            grads['Wc'] += np.dot(dc_tilde_raw, step['x'].T); grads['Uc'] += np.dot(dc_tilde_raw, step['h_prev'].T); grads['bc'] += dc_tilde_raw

            di = dc * step['c_tilde']
            di_raw = di * step['i'] * (1 - step['i'])
            grads['Wi'] += np.dot(di_raw, step['x'].T); grads['Ui'] += np.dot(di_raw, step['h_prev'].T); grads['bi'] += di_raw

            df = dc * step['c_prev']
            df_raw = df * step['f'] * (1 - step['f'])
            grads['Wf'] += np.dot(df_raw, step['x'].T); grads['Uf'] += np.dot(df_raw, step['h_prev'].T); grads['bf'] += df_raw

            dc_next = dc * step['f']
            dh_next = (np.dot(self.params['Uo'].T, do_raw) + np.dot(self.params['Uc'].T, dc_tilde_raw) +
                       np.dot(self.params['Ui'].T, di_raw) + np.dot(self.params['Uf'].T, df_raw))

        for k in self.params:
            np.clip(grads[k], -1, 1, out=grads[k])
            self.params[k] -= lr * grads[k]
        return dh_next

In [41]:
class TextProcessor:
    def __init__(self, text):
        text = text.lower()
        text = re.sub(r'[^\w\s]', '', text)
        self.words = text.split()

        self.vocab = sorted(list(set(self.words)))
        self.word_to_ix = { w:i for i,w in enumerate(self.vocab) }
        self.ix_to_word = { i:w for i,w in enumerate(self.vocab) }
        self.vocab_size = len(self.vocab)


    def create_sequences(self, seq_length):
        X, y = [], []
        for i in range(len(self.words) - seq_length):
            seq_in = self.words[i : i + seq_length]
            seq_out = self.words[i + seq_length]

            X.append([self.word_to_ix[w] for w in seq_in])
            y.append(self.word_to_ix[seq_out])

        return X, y

    def one_hot(self, idx):
        vec = np.zeros((self.vocab_size, 1))
        vec[idx] = 1
        return vec

In [24]:
raw_text = """
Deep learning is a subset of machine learning based on artificial neural networks.
We implemented an LSTM from scratch to understand how recurrent neural networks work.
The vanishing gradient problem was solved by the LSTM architecture using gates.
We are now building a next word predictor to test our understanding.
Machine learning requires data and algorithms to make predictions.
Neural networks are inspired by the human brain and its neurons.
Gradients flow backward to update the weights of the model.
The forget gate decides what information to throw away from the cell state.
The input gate decides what new information to store in the cell state.
The output gate decides what the next hidden state should be.
"""

In [25]:
SEQ_LENGTH = 3
HIDDEN_SIZE = 32
LEARNING_RATE = 0.01
EPOCHS = 300

In [33]:
processor = TextProcessor(raw_text)
X_data, y_data = processor.create_sequences(SEQ_LENGTH)

lstm = LSTM(units=HIDDEN_SIZE, input_dim=processor.vocab_size)
dense = Dense(units=processor.vocab_size, input_dim=HIDDEN_SIZE)

Dataset Stats: 117 words, 76 unique tokens.


In [32]:
processor.vocab_size

76

In [27]:

print(f"Training on {len(X_data)} sequences...")
loss_history = []

for epoch in range(EPOCHS):
    total_loss = 0

    for i in range(len(X_data)):
        inputs = [processor.one_hot(idx) for idx in X_data[i]]
        target_idx = y_data[i]

        h_last = lstm.forward(inputs)
        probs = dense.forward(h_last)

        loss = -np.log(probs[target_idx, 0] + 1e-9)
        total_loss += loss

        dh = dense.backward(target_idx, LEARNING_RATE)
        lstm.backward(dh, LEARNING_RATE)

    if epoch % 50 == 0:
        print(f"Epoch {epoch}, Avg Loss: {total_loss / len(X_data):.4f}")

Training on 114 sequences...
Epoch 0, Avg Loss: 4.3246
Epoch 50, Avg Loss: 0.6893
Epoch 100, Avg Loss: 0.1080
Epoch 150, Avg Loss: 0.0749
Epoch 200, Avg Loss: 0.0664
Epoch 250, Avg Loss: 0.0620


In [28]:
def predict_next_words(start_text, n_words=1):

    clean_text = start_text.lower()
    clean_text = re.sub(r'[^\w\s]', '', clean_text)
    words = clean_text.split()

    current_words = words[-SEQ_LENGTH:]

    generated_output = []

    print(f"\n--- Predicting next {n_words} words for: '{start_text}' ---")

    for _ in range(n_words):
        try:
            inputs = [processor.one_hot(processor.word_to_ix[w]) for w in current_words]
        except KeyError:
            print("Error: Word not in vocabulary.")
            return

        h_last = lstm.forward(inputs)
        probs = dense.forward(h_last)

        pred_idx = np.argmax(probs)
        pred_word = processor.ix_to_word[pred_idx]

        generated_output.append(pred_word)

        current_words.append(pred_word)
        current_words.pop(0)

    print(f"Prediction: {' '.join(generated_output)}")

In [43]:
predict_next_words("Machine Learning", n_words=5)


--- Predicting next 5 words for: 'Machine Learning' ---
Prediction: learning update update update update
