## Coding RNN

To build an RNN (Recurrent Neural Network) architecture from scratch for a simple text generation task using PyTorch, we'll take the approach of predicting the next word in a sequence based on previous words.

#### A Recurrent Neural Network (RNN) 

    • Type of neural network designed to handle sequential data by maintaining a memory of previous inputs.
    • Traditional feedforward neural networks,  process input independently 
    • RNNs have loops that allow information to persist. 
    • Particularly suited for tasks involving sequences, such as time series analysis, language modeling, or speech recognition.
    • Hidden State: RNN maintains a hidden state, which serves as memory and carries information from previous time steps.
    
Mathematically: ht=tanh(Wh⋅ht−1+Wx⋅xt) Where:

    • ht is the hidden state at time t.
    • xt is the input at time t.
    • Wh and Wx are weight matrices.
    • tanh is a non-linear activation function.
    
####  Problems with RNN
* Vanishing gradients: During backpropagation, gradients can shrink exponentially as they are propagated backward through time. This makes it difficult for RNNs to learn long-term dependencies.
* Slow Training: Due to sequential nature of input processing .


## Setup the environment

In [3]:
# !pip install torch

### Import  Libraries

In [5]:
import torch
import torch.nn as nn
import torch.optim as optim 
import numpy as np

### Define Dataset

In [6]:
text = """
Recurrent neural networks are a type of neural network. They are used for tasks that involve sequences, 
like text generation and time series prediction. An RNN works by maintaining a hidden state that updates 
with each new input.
"""


* Now We'll split the paragraph into individual words and create a vocabulary (word-to-index and index-to-word mappings).

In [7]:
words = text.lower().split() # Split text into words
vocab =set(words) # vocabulary

# creating mapping for word2idx and idx2word
word_to_idx = {word:i for i , word in enumerate(vocab)}
idx_to_word = {i:word for i,word in enumerate(vocab)}

vocab_size = len(vocab)
print(f"vocabulary size: {vocab_size}")

vocabulary size: 34


### Prepare data for  Training

* RNNs work with sequences of inputs. For this, we'll create input sequences (X) and their corresponding target outputs (Y). Each X will be a sequence of words, and Y will be the word following that sequence.
* For simplicity, we'll take a window size of 3 (3 words to predict the next word).



In [18]:
def create_sequences(words , word_to_idx , seq_length=3):
    sequences = []
    targets = []

    for i in range(len(words)-seq_length):
        seq = words[i:i+seq_length]  
        target = words[i+seq_length]
        sequences.append([word_to_idx[w] for w in seq])
        targets.append(word_to_idx[target])

    return torch.tensor(sequences) , torch.tensor(targets)

seq_length =3    # Predict the next word based on previous 3 words
X , Y = create_sequences(words  , word_to_idx , seq_length)

### Define the RNN model
Now we’ll define a simple RNN model using PyTorch's nn.Module. An RNN has three key components:

* Embedding layer: Converts the input word indices into dense vector representations.
* RNN layer: Handles the recurrent connections.
* Fully connected (linear) layer: Maps the RNN outputs to vocabulary size for prediction.


In [22]:
class SimpleRNN(nn.Module):
    def __init__(self , vocab_size , embed_size , hidden_size , output_size):
        super(SimpleRNN,self).__init__()

        # Embedding layer
        self.embedding = nn.Embedding(vocab_size,embed_size)
        # Simple RNN layer
        self.rnn  = nn.RNN(embed_size , hidden_size ,   batch_first = True)
        # Fully connected layer to map hidden state to output
        self.fc = nn.Linear(hidden_size , output_size)

    def forward(self,x , hidden):
        # x:  input sequences of word  indices
        x = self.embedding(x)

        # pass through RNN . It outputs the hidden state for each time step
        out,hidden = self.rnn(x , hidden)
        # Only use the output of the last time step for the final prediction
        out = self.fc(out[: , -1 ,:])
        return out ,   hidden

    def init_hidden(self,batch_size):
        #Initialize hidden state  with zeros
        return torch.zeros(1 ,batch_size,hidden_size)

# Hyperparameters
embed_size = 10  # Embedding size for each word
hidden_size = 20  # Number of hidden units in RNN
output_size = vocab_size  # Output size equals vocabulary size

# Instantiate the model
model = SimpleRNN(vocab_size, embed_size, hidden_size, output_size)
print(model)

SimpleRNN(
  (embedding): Embedding(34, 10)
  (rnn): RNN(10, 20, batch_first=True)
  (fc): Linear(in_features=20, out_features=34, bias=True)
)


### 6. Training the model
* For training, we will use the cross-entropy loss function, which is suitable for multi-class classification problems (like predicting one word out of the entire vocabulary). We'll also use the Adam optimizer for updating the model weights.

In [28]:
criterion = nn.CrossEntropyLoss() 
optimizer = optim.Adam(model.parameters() , lr=0.001)

# Number of training epochs
epochs=500

# Training loop
for epoch in range(epochs):
    hidden = model.init_hidden(X.size(0))
    # zero gradients
    optimizer.zero_grad() 
    # Forward pass
    output , hidden = model(X,hidden)
    # Compute loss
    loss = criterion(output , Y)
    # backward pass  and optimization
    loss.backward() 
    optimizer.step()
    
    
    if (epoch+1) % 100 == 0:
        print(f'Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}')

Epoch [100/500], Loss: 0.2728
Epoch [200/500], Loss: 0.1130
Epoch [300/500], Loss: 0.0618
Epoch [400/500], Loss: 0.0398
Epoch [500/500], Loss: 0.0282


In [35]:
def predict_next_word(model, input_seq, hidden):
    with torch.no_grad():
        output, hidden = model(input_seq, hidden)
        predicted_idx = torch.argmax(output, dim=1).item()
        return predicted_idx, hidden

# Start the sequence with an initial set of words
initial_words = ['recurrent','neural', 'networks']
input_seq = torch.tensor([[word_to_idx[word] for word in initial_words]])

# Generate the next 10 words
model.eval()
hidden = model.init_hidden(input_seq.size(0))

for _ in range(10):
    predicted_idx, hidden = predict_next_word(model, input_seq, hidden)
    predicted_word = idx_to_word[predicted_idx]
    
    # Print the generated word
    print(predicted_word, end=' ')
    
    # Update input sequence by removing the first word and adding the predicted word
    input_seq = torch.cat([input_seq[:, 1:], torch.tensor([[predicted_idx]])], dim=1)


are a type of neural generation and of neural network. 

### Hadling out of vocabulary problem using BERT tokenizer

Handling out-of-vocabulary (OOV) words using a pre-trained tokenizer like BertTokenizer from the Hugging Face transformers library. The BERT tokenizer uses WordPiece tokenization, which helps split unseen words into subword units rather than marking them as OOV or <UNK>.

Let’s walk through how the BERT tokenizer handles unseen words and how it can be used with your create_sequences function.

#### Key Points:
WordPiece Tokenization: BERT uses WordPiece tokenization, meaning if a word is not in the vocabulary, it will be split into subwords. This makes it more robust to unseen or rare words.
Example: The word unseen_word would be broken down into subword tokens like un##seen and ##_word.

In [47]:
from transformers import BertTokenizer
import torch

# Load pre-trained BERT tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')



# Tokenize the sentence using BERT tokenizer
tokenized_sentence = tokenizer.encode(text, add_special_tokens=True)

# Create sequences and targets for next-word prediction
def create_sequences_bert(tokenized_sentence, seq_length=3):
    sequences = []
    targets = []

    # Generate sequences and their corresponding next token (target)
    for i in range(len(tokenized_sentence) - seq_length):
        seq = tokenized_sentence[i:i + seq_length]  # Current sequence of token IDs
        target = tokenized_sentence[i + seq_length]  # Next token ID to predict
        sequences.append(seq)
        targets.append(target)

    return torch.tensor(sequences), torch.tensor(targets)

# Define sequence length
seq_length = 3  # Predict the next token based on the previous 3 tokens

# Generate sequences and targets using BERT tokenization
X, Y = create_sequences_bert(tokenized_sentence, seq_length)

In [54]:
class SimpleRNN(nn.Module):
    def __init__(self , vocab_size , embed_size , hidden_size , output_size):
        super(SimpleRNN,self).__init__()

        # Embedding layer
        self.embedding = nn.Embedding(vocab_size,embed_size)
        # Simple RNN layer
        self.rnn  = nn.RNN(embed_size , hidden_size ,   batch_first = True)
        # Fully connected layer to map hidden state to output
        self.fc = nn.Linear(hidden_size , output_size)

    def forward(self,x , hidden):
        # x:  input sequences of word  indices
        x = self.embedding(x)

        # pass through RNN . It outputs the hidden state for each time step
        out,hidden = self.rnn(x , hidden)
        # Only use the output of the last time step for the final prediction
        out = self.fc(out[: , -1 ,:])
        return out ,   hidden

    def init_hidden(self,batch_size):
        #Initialize hidden state  with zeros
        return torch.zeros(1 ,batch_size,hidden_size)

# Hyperparameters
embed_size = 10  # Embedding size for each word
hidden_size = 20  # Number of hidden units in RNN
vocab_size = tokenizer.vocab_size
output_size = vocab_size  # Output size equals vocabulary size

# Instantiate the model
model = SimpleRNN(vocab_size, embed_size, hidden_size, output_size)
print(model)

SimpleRNN(
  (embedding): Embedding(30522, 10)
  (rnn): RNN(10, 20, batch_first=True)
  (fc): Linear(in_features=20, out_features=30522, bias=True)
)


In [71]:
criterion = nn.CrossEntropyLoss() 
optimizer = optim.Adam(model.parameters() , lr=0.001)

# Number of training epochs
epochs=500

# Training loop
for epoch in range(epochs):
    hidden = model.init_hidden(X.size(0))
    # zero gradients
    optimizer.zero_grad() 
    # Forward pass
    output , hidden = model(X,hidden)
    # Compute loss
    loss = criterion(output , Y)
    # backward pass  and optimization
    loss.backward() 
    optimizer.step()
    
    if (epoch+1) % 100 == 0:
        print(f'Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}')

Epoch [100/500], Loss: 0.0435
Epoch [200/500], Loss: 0.0227
Epoch [300/500], Loss: 0.0133
Epoch [400/500], Loss: 0.0087
Epoch [500/500], Loss: 0.0063


In [74]:
def predict_next_word(model, input_seq, hidden):
    with torch.no_grad():
        output, hidden = model(input_seq, hidden)
        # Get the predicted token index (the word with the highest score)
        predicted_idx = torch.argmax(output, dim=1).item()
        return predicted_idx, hidden

# Start the sequence with an initial set of words
initial_words = ['recurrent', 'neural', 'networks']

# Convert the words to their token indices using the tokenizer
input_seq = torch.tensor([tokenizer.encode(' '.join(initial_words), add_special_tokens=False)])

# Generate the next 10 words
model.eval()  # Set the model to evaluation mode
hidden = model.init_hidden(input_seq.size(0))  # Initialize hidden state

# Loop to predict the next 10 words
for _ in range(10):
    predicted_idx, hidden = predict_next_word(model, input_seq, hidden)
    
    # Convert the predicted token index back to a word
    predicted_word = tokenizer.decode([predicted_idx])
    
    # Print the predicted word
    print(predicted_word, end=' ')
    
    # Update the input sequence by removing the first token and adding the predicted token
    input_seq = torch.cat([input_seq[:, 1:], torch.tensor([[predicted_idx]])], dim=1)


a a type of neural networks a a type of 