In [1]:
# Data
import torch
import torch.nn as nn
import math
import matplotlib.pyplot as plt
import seaborn as sns  # makes heatmap look better
from datasets import load_dataset
import sentencepiece as spm
import os

vocab_size = 10000 # english has 26 * 2 + punctuation 

try:
    sp = spm.SentencePieceProcessor(model_file='./stories.model')
    print('sp', sp)
    text_data = [sp.id_to_(id) for id in range(sp.GetPieceSize())]

    dataset = load_dataset("roneneldan/TinyStories")

    train_dataset = dataset['train'][:50000]
    text_data = [entry['text'] for entry in train_dataset]
    print('successfully retrieved training data from previous model')
except:
    dataset = load_dataset("roneneldan/TinyStories")

    train_dataset = dataset['train']
    text_data = [entry['text'] for entry in train_dataset]
    text_data = text_data[:50000]
    print('len(text_data', len(text_data))
    text_data_str = '\n'.join(text_data)

    with open('temp.txt', 'w', encoding='utf-8') as f:
        f.write(text_data_str)


    spm.SentencePieceTrainer.train(
            f'--input=temp.txt --model_prefix=stories --vocab_size={vocab_size} --character_coverage=1.0 --model_type=unigram'
        )  
    sp = spm.SentencePieceProcessor(model_file='./stories.model')

    print('successfully trained sp')


  from .autonotebook import tqdm as notebook_tqdm


sp <sentencepiece.SentencePieceProcessor; proxy of <Swig Object of type 'sentencepiece::SentencePieceProcessor *' at 0x17e642b70> >


Repo card metadata block was not found. Setting CardData to empty.


len(text_data 50000
successfully trained sp


sentencepiece_trainer.cc(177) LOG(INFO) Running command: --input=temp.txt --model_prefix=stories --vocab_size=10000 --character_coverage=1.0 --model_type=unigram
sentencepiece_trainer.cc(77) LOG(INFO) Starts training with : 
trainer_spec {
  input: temp.txt
  input_format: 
  model_prefix: stories
  model_type: UNIGRAM
  vocab_size: 10000
  self_test_sample_size: 0
  character_coverage: 1
  input_sentence_size: 0
  shuffle_input_sentence: 1
  seed_sentencepiece_size: 1000000
  shrinking_factor: 0.75
  max_sentence_length: 4192
  num_threads: 16
  num_sub_iterations: 2
  max_sentencepiece_length: 16
  split_by_unicode_script: 1
  split_by_number: 1
  split_by_whitespace: 1
  split_digits: 0
  pretokenization_delimiter: 
  treat_whitespace_as_suffix: 0
  allow_whitespace_only_pieces: 0
  required_chars: 
  byte_fallback: 0
  vocabulary_output_piece_score: 1
  train_extremely_large_corpus: 0
  hard_vocab_limit: 1
  use_all_vocab: 0
  unk_id: 0
  bos_id: 1
  eos_id: 2
  pad_id: -1
  unk_pi

In [3]:
print(len(text_data))


50000


In [4]:
import torch
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from torch.optim import Adam
from collections import Counter
import json

batch_size = 20
PAD_TOKEN = sp.piece_to_id('<pad>')

def target_story_to_tensor(story):
    tokens = torch.tensor(sp.encode_as_ids(story) + [sp.piece_to_id('<eos>')], dtype=torch.long)
    return tokens

def input_story_to_tensor(story):
    tokens = torch.tensor([sp.piece_to_id('<sos>')] + sp.encode_as_ids(story), dtype=torch.long)
    return tokens

class StoryDataset(Dataset):
    def __init__(self, stories):
        self.stories = stories
    
    def __len__(self):
        return len(self.stories)
    
    def __getitem__(self, idx):
        story = self.stories[idx]
        return input_story_to_tensor(story), target_story_to_tensor(story)

def collate_fn(batch):
    inputs, targets = zip(*batch)
    inputs = pad_sequence(inputs, batch_first=True, padding_value=PAD_TOKEN)
    targets = pad_sequence(targets, batch_first=True, padding_value=PAD_TOKEN)
    return inputs, targets

# Create dataset and dataloader
dataset = StoryDataset(text_data)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)


In [5]:
# Hyperparameters
d_model = 256
dropout = 0.1 # 10% chance that any given neuron will be dropped out
n_heads = 4
n_layer = 4

In [1]:
def create_mask(seq):
    seq_len = seq.size(1)
    mask = torch.triu(torch.ones(seq_len, seq_len), diagonal=1).bool()
    return mask

class TransformerDecoder(nn.Module):
    def __init__(self, vocab_size, d_model):
        super(TransformerDecoder, self).__init__()
        
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoder = PositionalEncoding(d_model)

        self.blocks = nn.Sequential(
            *[Block(n_heads) for _ in range(n_layer)]
        )
        self.fc = nn.Linear(d_model, vocab_size)

    def forward(self, input):
        # print('Im here')
        input = self.embedding(input)
        input = self.pos_encoder(input)
        blocks_output = self.blocks(input)
        
        logits = self.fc(blocks_output)
        return logits

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)

class Block(nn.Module):
    def __init__(self, n_heads):
        super().__init__()
        head_size = d_model // n_heads
        self.multi_head_attention = MultiHeadAttention(n_heads, head_size)
        self.ffwd = nn.Sequential(
            nn.Linear(d_model, 4*d_model), # expanding and contracting the model for it to learn more intricate patterns
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(4*d_model, d_model)
        )

        self.ln1 = nn.LayerNorm(d_model)
        self.ln2 = nn.LayerNorm(d_model)

    def forward(self, x):
        # actually doing residual connection here by attn1_output + input
        # print('im in block forward')
        x = x + self.multi_head_attention(self.ln1(x))
        x = x + self.ffwd(self.ln2(x))
        # print('Block shape', x.shape)
        return x

class MultiHeadAttention(nn.Module):
    def __init__(self, num_heads, head_size):
        super().__init__()
        self.heads = nn.ModuleList([SelfAttention(head_size) for _ in range(num_heads)])
        self.proj = nn.Linear(d_model, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        mask = create_mask(x).to(x.device)
        # print('im in multiheadattention')
        out = torch.cat([h(x, x, x, mask) for h in self.heads], dim=-1)
        # print('multiheadattention out.shape', out.shape)
        out = self.dropout(self.proj(out))
        # here not printing
        # print('dropout out.shape', out.shape)
        return out
    
class SelfAttention(nn.Module):
    def __init__(self, head_size):
        super(SelfAttention, self).__init__()
        self.query = nn.Linear(d_model, head_size, bias=False)
        self.key = nn.Linear(d_model, head_size, bias=False)
        self.value = nn.Linear(d_model, head_size, bias=False)

    def forward(self, query, key, value, mask=None):
        # print('im in self attention')
        # print('head_size')
        q = self.query(query)
        k = self.key(key)
        v = self.value(value)

        # print('q.shape', q.shape)
        scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(d_model)
        # print('scores.shape', scores.shape)
        # print('mask', mask)
        if mask is not None:
            scores = scores.masked_fill(mask, float('-inf'))
            
        attention_weights = torch.nn.functional.softmax(scores, dim=-1)
        # print('attention_weights', attention_weights.shape)
        # print('v', v.shape)
        output = torch.matmul(attention_weights, v)
        # print('output.shape of selfattention', output.shape)
        return output

def plot_attention(attention, source_seq, target_seq):
    """
    Plots the attention weights.
    :param attention: Attention weights matrix.
    :param source_seq: Source sequence tokens.
    :param target_seq: Target sequence tokens.
    """
    fig, ax = plt.subplots(figsize=(5, 5))
    sns.heatmap(attention, cmap='viridis', xticklabels=source_seq, yticklabels=target_seq)
    plt.xlabel('Keys (Source)')
    plt.ylabel('Queries (Target)')
    plt.show()

NameError: name 'nn' is not defined

In [None]:
# Define the model, optimizer, and loss
decoder = TransformerDecoder(vocab_size, d_model)
optimizer = Adam(decoder.parameters(), lr=0.0001)
criterion = torch.nn.CrossEntropyLoss()

# Training loop
num_epochs = 50

criterion = torch.nn.CrossEntropyLoss(ignore_index=PAD_TOKEN)

decoder.train()
first_iteration = True

for epoch in range(num_epochs):
    total_loss = 0.0

    for inputs, targets in dataloader:
        optimizer.zero_grad()

        outputs = decoder(inputs)
        loss = criterion(outputs.view(-1, vocab_size), targets.view(-1))
        
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    avg_loss = total_loss / len(dataloader)
    print(f"Epoch {epoch + 1}/{num_epochs} - Loss: {avg_loss:.4f}")

Im here
im in block forward
im in multiheadattention
im in self attention
head_size
q.shape torch.Size([20, 416, 64])
scores.shape torch.Size([20, 416, 416])
mask tensor([[False,  True,  True,  ...,  True,  True,  True],
        [False, False,  True,  ...,  True,  True,  True],
        [False, False, False,  ...,  True,  True,  True],
        ...,
        [False, False, False,  ..., False,  True,  True],
        [False, False, False,  ..., False, False,  True],
        [False, False, False,  ..., False, False, False]])
attention_weights torch.Size([20, 416, 416])
v torch.Size([20, 416, 64])
output.shape of selfattention torch.Size([20, 416, 64])
im in self attention
head_size
q.shape torch.Size([20, 416, 64])
scores.shape torch.Size([20, 416, 416])
mask tensor([[False,  True,  True,  ...,  True,  True,  True],
        [False, False,  True,  ...,  True,  True,  True],
        [False, False, False,  ...,  True,  True,  True],
        ...,
        [False, False, False,  ..., False,  True,

: 

: 

Here we use a greedy approach where we always take the highest probable tokens from the softmax output. As you can see we always produce the same name here

In [None]:

print('vocab_size', vocab_size)
def generate_name(model, max_length=10):
    model.eval()
    with torch.no_grad():
        input_token = sp.piece_to_id('<sos>')
        output_sequence = [input_token] # we'll always get the same name because we are using the same model and the same starter token

        for i in range(max_length):
            input_tensor = torch.tensor([output_sequence]).long()
            logit_output, _, _ = model(input_tensor)

            softmax = nn.Softmax(dim=-1)
            softmax_output = softmax(logit_output)
            # Taking the token with the highest probability for prediction

            predicted_token = softmax_output[0, -1, :].argmax().item()

            # Break if we predict the end-of-string token
            if predicted_token == sp.piece_to_id('<eos>'):
                break

            output_sequence.append(predicted_token)
        

        # Convert token IDs back to strings
        print(output_sequence[1:])
        generated_name = sp.decode_ids(output_sequence[1:])
        # generated_name = ''.join([id_to_token.get(token_id, '<UNK>') for token_id in output_sequence[1:]])

    return generated_name

generated_name = generate_name(decoder)
print(generated_name)

vocab_size 61
[3, 19, 17, 7, 9, 8, 11, 7, 14, 10]
Cherileyn


Here we use Top-k sampling: At each step, sample from the top k probable tokens instead of the highest probable token approach where we always take the highest probable tokens from the softmax output. As you can see we consistently produce new names different from the names in the dataset

In [None]:
def top_k_sampling(logits, k=6):  
    # Obtain the top k logits
    values, indices = torch.topk(logits, k)
    # Create a distribution over the top k logits only
    distribution = torch.nn.functional.softmax(values, dim=-1)
    
    # Sample from the distribution
    choice = torch.multinomial(distribution, 1)
    # Choose the actual token from the top k
    token = indices.gather(-1, choice).squeeze().item()
    return token

def generate_name(model, max_length=10, k=10):  # added k parameter
    model.eval()
    with torch.no_grad():
        input_token = sp.piece_to_id('<sos>')
        output_sequence = [input_token]

        for i in range(max_length):
            input_tensor = torch.tensor([output_sequence]).long()
            logit_output, _, _ = model(input_tensor)

            # Use top_k_sampling instead of the greedy approach
            predicted_token = top_k_sampling(logit_output[0, -1, :], k)

            # Break if we predict the end-of-string token
            if predicted_token == sp.piece_to_id('<eos>'):
                break

            output_sequence.append(predicted_token)
        
        # Convert token IDs back to strings
        print(output_sequence[1:])
        generated_name = sp.decode_ids(output_sequence[1:])

    return generated_name

generated_name = generate_name(decoder)
print(generated_name)


[3, 29, 7, 10, 6, 10, 6, 15, 11, 7]
Renanadle


Here we use temperature sampling instead of top k sampling

In [None]:
def temperature_sampling(logits, temperature=1.0):
    # Divide the logits by the temperature
    logits = logits / temperature
    # Create a distribution
    distribution = torch.nn.functional.softmax(logits, dim=-1)
    # Sample from the distribution
    choice = torch.multinomial(distribution, 1)
    token = choice.squeeze().item()
    return token

def generate_name(model, max_length=8, temperature=1.0):  # added temperature parameter
    model.eval()
    with torch.no_grad():
        input_token =  sp.piece_to_id('<sos>')
        output_sequence = [input_token]

        for i in range(max_length):
            input_tensor = torch.tensor([output_sequence]).long()
            logit_output, _, _ = model(input_tensor)

            # Use temperature_sampling instead of the greedy approach
            predicted_token = temperature_sampling(logit_output[0, -1, :], temperature)

            # Break if we predict the end-of-string token
            if predicted_token ==  sp.piece_to_id('<eos>'):
                break

            output_sequence.append(predicted_token)
        
        # Convert token IDs back to strings
        generated_name = sp.decode_ids(output_sequence[1:])

    return generated_name

generated_name = generate_name(decoder, temperature=0.8)  # You can play around with different temperature values
print(generated_name)


Deloryn
