### Credit: https://www.youtube.com/watch?v=kCc8FmEb1nY&t=6s
### This is an LLM that will be trained to generate poetry, hopefully haikus. This is a fun experiment. Haikus were created with OpenAI GPT-4

In [None]:
import os
import glob
import numpy as np
import torch
import torch.nn as nn
from torch.nn import functional as F
from torch.utils.data import Dataset, DataLoader

In [None]:
file_path = './haikus.txt'
with open(file_path, 'r') as file:
    contents = file.read()

contents

In [None]:
unique_chars = sorted(list(set(contents)))
unique_chars_len = len(unique_chars)
print(''.join(unique_chars))
print(unique_chars_len)

In [None]:
#Lets try One Hot Encoding and pring out vocab dictionary

def one_hot_encode(data_input, char_set):
    # Create a dictionary mapping each character to its index
    char_to_index = {ch: i for i, ch in enumerate(char_set)}
    vector_set = []

    for i in list(data_input):
    # Initialize a vector of zeros with the length of the character set
        one_hot_vector = [0] * len(char_set)

        # Set the position corresponding to the character to 1
        if i in char_to_index:
            one_hot_vector[char_to_index[i]] = 1
            vector_set.append(one_hot_vector)
        else:
            raise ValueError(f"Character '{i}' not in character set")

    return vector_set


In [None]:
#Decoder
def one_hot_decode(encoded_data, char_set):
    # Create a dictionary mapping each index to its character
    index_to_char = {i: ch for i, ch in enumerate(char_set)}

    decoded_string = ""

    for vector in encoded_data:
        # Find the index of the 1 in the vector
        index = vector.index(1)

        # Append the corresponding character to the decoded string
        decoded_string += index_to_char[index]

    return decoded_string

# Example usage
char_set = ',.ABCDGHILMNOPRSTUWabcdefghiklmnopqrstuvwyz'
data_input = "Hello"
encoded_data = one_hot_encode(data_input, char_set)
print(encoded_data)


In [None]:
#test the encoding
encoding = one_hot_encode(unique_chars, unique_chars)
print(encoding)

In [None]:
#Test the decoding
decoding = one_hot_decode(encoding, unique_chars)
print(decoding)

In [None]:
#Let's Encode our Data
prepped_data = contents#.replace(" ", "")
prepped_data
encoded_data = one_hot_encode(prepped_data, unique_chars)
print(encoded_data)

In [None]:
#Transform encoded data into Tensor
tensor_data = torch.tensor(encoded_data, dtype=torch.float32)
print(tensor_data.shape, tensor_data.dtype)
print(tensor_data)

In [None]:
#Train and Validation Sets
n = int(0.8*len(tensor_data))
train = tensor_data[:n]
val = tensor_data[n:]
print(train)
print(val)

In [None]:
block_size = 8
train[:block_size + 1]

In [None]:
x = train[:block_size]
y = train[1:block_size+1]
for t in range(block_size):
    context = x[:t+1]
    target = y[t]
    print(f"When the input is {context}, the target is {target}")

In [None]:
print(train)

In [None]:
#Introducing the batch dimension
torch.manual_seed(1337)
batch_size = 4
block_size = 8


def get_batch(split):
    data = train if split == 'train' else val
    ix = torch.randint(len(data) - block_size, (batch_size,))
    x = torch.stack([data[i:i+block_size] for i in ix])
    y = torch.stack([data[i+1:i+block_size+1] for i in ix]) 
    return x, y  

xb, yb = get_batch('train')
print("inputs:")
print(xb.shape)
print(xb)
print('targets')
print(yb.shape)
print(yb)

print("-----")

for b in range(batch_size):
    for t in range(block_size):
        context = xb[b, :t+1]
        target = yb[b,t]
        print(f"when the input is {context.tolist()}, the target is: {target}")

In [None]:
x = train[:block_size]
y = train[1:block_size+1]
for t in range(block_size):
    context = x[:t+1]
    target = y[t]
    print(f"When context is {context}, the target is {target}")

In [None]:
#RNN can handle One-Hot vectors well
class SimpleRNNModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(SimpleRNNModel, self).__init__()
        self.hidden_size = hidden_size

        # RNN layer
        self.rnn = nn.RNN(input_size, hidden_size, batch_first=True)

        # Output layer
        self.linear = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        h0 = torch.zeros(1, x.size(0), self.hidden_size)
        out, hidden = self.rnn(x, h0)
        out = out.contiguous().view(-1, self.hidden_size)
        out = self.linear(out)
        return out

   

        
model = SimpleRNNModel(unique_chars_len, 128, unique_chars_len)

In [None]:
# Loss function
loss_function = nn.CrossEntropyLoss()

# Optimizer (example: using Adam)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [None]:
class CustomDataset(Dataset):
    def __init__(self, data, targets):
        self.data = data
        self.targets = targets

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx], self.targets[idx]


In [None]:
# Assuming train_data and train_targets are your data tensors
train_dataset = CustomDataset(xb, yb)

# Create the DataLoader
train_loader = DataLoader(train_dataset, batch_size, shuffle=True)


### Test Run

In [None]:
num_epochs = 500
previous_loss = float('inf') #somthing ridiculous to start out with

for epoch in range(num_epochs):
    for batch in train_loader:  # Assuming you have a DataLoader
        # Split batch data
        x_batch, y_batch = batch  # x_batch is input, y_batch is target labels
        y_batch = y_batch.view(-1, unique_chars_len) #Need to reshape in order to meet the expected shape of the models output
        
        # Forward pass: Compute predicted y by passing x to the model
        y_pred = model(x_batch)
        
        # Compute and print loss
        current_loss = loss_function(y_pred, y_batch)

        #Save the model if the loss is not improving
        if current_loss < previous_loss:
            previous_loss = current_loss
            torch.save(model.state_dict(), 'best_model.pth')

        
        print(f"Epoch {epoch}, Loss: {current_loss.item()}")

        # Zero gradients, perform a backward pass, and update the weights.
        optimizer.zero_grad()
        current_loss.backward()
        optimizer.step()


### Put it in a function

In [None]:
def train_function(epochs, train_data_loader, model, b, l):
  previous_loss = float('inf')
  loss_function = nn.CrossEntropyLoss()
  optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
  for epoch in range(epochs):
      for batch in train_data_loader:  # Assuming you have a DataLoader
        # Split batch data
        x_batch, y_batch = batch  # x_batch is input, y_batch is target labels
        y_batch = y_batch.view(-1, unique_chars_len) #Need to reshape in order to meet the expected shape of the models output

        # Forward pass: Compute predicted y by passing x to the model
        y_pred = model(x_batch)

        # Compute and print loss
        current_loss = loss_function(y_pred, y_batch)

        if current_loss < previous_loss:
           previous_loss = current_loss
           torch.save(model, "best_model_batch_{}_layers_{}_epochs_{}.pth".format(b, l, epochs))

        print(f"Epoch {epoch}, Loss: {current_loss.item()}")

        # Zero gradients, perform a backward pass, and update the weights.
        optimizer.zero_grad()
        current_loss.backward()
        optimizer.step()
  

### Train using different hyperparameters

In [None]:
batch_sizes = [8, 16, 32, 64]
num_epochs = [500, 1000, 10000]
layers = [128, 256, 512]


for batch in batch_sizes:
    train_loader = DataLoader(train_dataset, batch_size, shuffle=True)
    for layer in layers:
        model = SimpleRNNModel(unique_chars_len, layer, unique_chars_len)
        for epochs in num_epochs:
            print("Size (hidden state): {} Epoch: {} Batch Size: {}".format(layer, epochs, batch))
            train_function(epochs, train_loader, model, batch, layer)

### Define a function that can generate text

In [None]:
def generate_text(model, start_input, char_to_index, index_to_char, max_length=100):
    model.eval()  # Set the model to evaluation mode
    
    # Ensure start_input is a tensor with shape [1, input_size]
    input_seq = torch.tensor(start_input, dtype=torch.float).unsqueeze(0).unsqueeze(0)  # [1, 1, input_size]
    generated_text = ""
    hidden = torch.zeros(1, 1, model.hidden_size)  # Shape: [1, 1, hidden_size]

   

    for _ in range(max_length):
        # Forward pass
        out, hidden = model.rnn(input_seq, hidden)
        out = model.linear(out.squeeze(1))  # Remove the sequence length dimension

        # Get the character with the highest probability
        _, predicted_index = torch.max(out, dim=1)
        last_char_index = predicted_index.item()
        generated_text += index_to_char[last_char_index]

        # Prepare the next input

        input_seq = torch.zeros((1, 1, len(char_to_index)))  # Shape: [1, 1, input_size]
        input_seq[0, 0, last_char_index] = 1.0  # Set the correct character index to 1

    return generated_text


### Iterate through model files

In [None]:
model_dir = './'
pattern = "best_model_batch*.pth"
search_pattern = f"{model_dir}/{pattern}"
model_files = glob.glob(search_pattern)
print(model_files)

### Lets see which model performs the best

In [None]:
starting_vector = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] #start with a capital letter
for file in model_files:
    model = torch.load(file)
    print("Loading model: {}".format(file))
    model_text = generate_text(model, starting_vector, encoding, decoding, max_length=100)
    print(model_text)