In [5]:
 class ShakespeareDataset(torch_Dataset):
    def __init__(self, mode,bpe_re):
        
        torch.manual_seed(10)  
        
        !wget https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt
            
        with open('input.txt', 'r', encoding='utf-8') as f:
             text = f.read()
       
        
        train_text, val_text, test_text = split_data(text)

        if mode == 'train':
            processed_text = train_text
        elif mode == 'val':
            processed_text = val_text
        else:
            processed_text = test_text
            
        
        
        self.encoding = Encoding(processed_text,bpe_re, 8)

       
        self.tokenized_data = self.encoding.map_token()
        self.x = self.encoding.transform_type(self.tokenized_data)
    def __len__(self):
        
        return len(self.x)

    def __getitem__(self, idx):
        
        return self.x[idx]

In [None]:
def split_data(data, train_ratio=0.8, val_ratio=0.1, test_ratio=0.1):


    # Calculate split sizes
    total_size = len(data)
    train_size = int(train_ratio * total_size)
    val_size = int(val_ratio * total_size)
    test_size = total_size - train_size - val_size  # Ensure we use all data

    # Split the data
    train_data = data[:train_size]
    val_data = data[train_size:train_size + val_size]
    test_data = data[train_size + val_size:]

    return train_data, val_data, test_data


In [None]:
class BPE_nore:
    def __init__(self, text, vocab_size=3257):
        self.vocab_size = vocab_size
        self.vocab = {idx: bytes([idx]) for idx in range(256)}
        self.merges = {}
        self._build_vocab(text)

    def _get_stats(self, ids):
        # Assuming get_stats function's implementation is provided elsewhere in your code
        stats = {}
        for i in range(len(ids)-1):
            pair = (ids[i], ids[i+1])
            if pair in stats:
                stats[pair] += 1
            else:
                stats[pair] = 1
        return stats



    def merge(self, ids, pair, idx):
        newids = []
        i = 0
        while i < len(ids):
            if i < len(ids) - 1 and ids[i] == pair[0] and ids[i+1] == pair[1]:
                newids.append(idx)
                i += 2
            else:
                newids.append(ids[i])
                i += 1
        return newids

    def _build_vocab(self, text):
        tokens = text.encode("utf-8")
        tokens = list(map(int, tokens))
        num_merges = self.vocab_size - 256
        ids = list(tokens)
        for i in range(num_merges):
            stats = self._get_stats(ids)
            pair = max(stats, key=stats.get)
            idx = 256 + i
            print(f"merging {pair} into a new token {idx}")
            ids = self.merge(ids, pair, idx)
            self.merges[pair] = idx
            self.vocab[idx] = self.vocab[pair[0]] + self.vocab[pair[1]]

    def encode(self, text):
        tokens = list(text.encode("utf-8"))
        while len(tokens) >= 2:
            stats = self._get_stats(tokens)
            pair = min(stats, key=lambda p: self.merges.get(p, float("inf")))
            if pair not in self.merges:
                break
            idx = self.merges[pair]
            tokens = self.merge(tokens, pair, idx)
        return tokens

    def decode(self, ids):
        tokens = b"".join(self.vocab[idx] for idx in ids)
        text = tokens.decode("utf-8", errors="replace")
        return text

In [None]:
class BytePairEncoding:
    def __init__(self, text=None, vocab_size=3257):
        self.vocab_size = vocab_size
        self.vocab = {idx: bytes([idx]) for idx in range(256)}
        self.merges = {}
        self.gpt2pat = regex.compile(r"""'s|'t|'re|'ve|'m|'ll|'d| ?\p{L}+| ?\p{N}+| ?[^\s\p{L}\p{N}]+|\s+(?!\S)|\s+""")
        if text is not None:
            self._build_vocab(text)

    def _get_stats(self, ids):
        stats = {}
        for i in range(len(ids)-1):
            pair = (ids[i], ids[i+1])
            stats[pair] = stats.get(pair, 0) + 1
        return stats

    def merge(self, ids, pair, idx):
        newids = []
        i = 0
        while i < len(ids):
            if i < len(ids) - 1 and ids[i] == pair[0] and ids[i+1] == pair[1]:
                newids.append(idx)
                i += 2
            else:
                newids.append(ids[i])
                i += 1
        return newids

    def _preprocess_text(self, text):
       
        if text is None:
            return []
        tokens = regex.findall(self.gpt2pat, text)
        tokens = [token.encode('utf-8') for token in tokens]
        flat_tokens = [int(byte) for token in tokens for byte in token]
        return flat_tokens

    def _build_vocab(self, text):
        tokens = self._preprocess_text(text)
        num_merges = self.vocab_size - 256
        ids = list(tokens)
        for i in range(num_merges):
            stats = self._get_stats(ids)
            pair = max(stats, key=stats.get)
            idx = 256 + i
            print(f"merging {pair} into a new token {idx}")
            ids = self.merge(ids, pair, idx)
            self.merges[pair] = idx
            self.vocab[idx] = self.vocab[pair[0]] + self.vocab[pair[1]]

    def encode(self, text):
        tokens = self._preprocess_text(text)
        while len(tokens) >= 2:
            stats = self._get_stats(tokens)
            pair = min(stats, key=lambda p: self.merges.get(p, float("inf")))
            if pair not in self.merges:
                break
            idx = self.merges[pair]
            tokens = self.merge(tokens, pair, idx)
        return tokens

    def decode(self, ids):
        tokens = b"".join(self.vocab[idx] for idx in ids)
        text = tokens.decode("utf-8", errors="replace")
        return text


In [None]:
#by line
class Encoding:
    def __init__(self, text, bpe_re, num_proc):
        self.text = text
        self.bpe_re = bpe_re
        self.num_proc = num_proc
        
    def split_lines(self):
        lines = self.text.split('\n')
        
        newlines = [line + '\n' for line in lines[:-1]]
        if lines[-1]:  
            newlines.append(lines[-1])
        return newlines

   
    def tokenize_function(self,examples):
        return {"input_ids": [self.bpe_re.encode(text) for text in examples["text"]]}

    def map_token(self):
        segments = self.split_lines()

        
        data = {"text": segments}

       
        dataset = Dataset.from_dict(data)

       
        tokenized_datasets = dataset.map(self.tokenize_function, batched=True, num_proc=self.num_proc, remove_columns=["text"])
        
        return tokenized_datasets

        
    def transform_type(self,tokenized_datasets):
        input_ids_list = tokenized_datasets['input_ids']

        
        tensor_list = []

        
        for sample_input_ids in input_ids_list:
            tensor = torch.tensor(sample_input_ids,dtype=torch.long)
            tensor_list.append(tensor)
            
        # merge the tensors in the list into a tensor by rows
        return torch.cat(tensor_list, dim=0)


In [None]:
def get_batch(data, batch_size, block_size):
    # generate a small batch of data of inputs x and targets y
    ix = torch.randint(len(data) - block_size, (batch_size,))
    x = torch.stack([data[i:i+block_size] for i in ix])
    y = torch.stack([data[i+1:i+block_size+1] for i in ix])
    x, y = x.to(device), y.to(device)
    return x, y

In [None]:
class Head(nn.Module):

    def __init__(self, head_size, n_embd, block_size, dropout):
        super().__init__()
        self.key = nn.Linear(n_embd, head_size, bias=False)
        self.query = nn.Linear(n_embd, head_size, bias=False)
        self.value = nn.Linear(n_embd, head_size, bias=False)
        self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))

        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        B,T,C = x.shape
        k = self.key(x)   # (B,T,C)
        q = self.query(x) # (B,T,C)
        # compute attention scores ("affinities")
        wei = q @ k.transpose(-2,-1) * C**-0.5 # (B, T, C) @ (B, C, T) -> (B, T, T)
        wei = wei.masked_fill(self.tril[:T, :T] == 0, float('-inf')) # (B, T, T)
        wei = F.softmax(wei, dim=-1) # (B, T, T)
        wei = self.dropout(wei)
        # perform the weighted aggregation of the values
        v = self.value(x) # (B,T,C)
        out = wei @ v # (B, T, T) @ (B, T, C) -> (B, T, C)
        return out

class MultiHeadAttention(nn.Module):

    def __init__(self, num_heads, head_size, n_embd, block_size, dropout):
        super().__init__()
        self.heads = nn.ModuleList([Head(head_size, n_embd, block_size, dropout) for _ in range(num_heads)])
        self.proj = nn.Linear(n_embd, n_embd)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        out = torch.cat([h(x) for h in self.heads], dim=-1)
        out = self.dropout(self.proj(out))
        return out


class GELU(nn.Module):

    def forward(self, x):
        return 0.5 * x * (1.0 + torch.tanh(math.sqrt(2.0 / math.pi) * (x + 0.044715 * torch.pow(x, 3.0))))

class FeedFoward(nn.Module):


    def __init__(self, n_embd, dropout):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embd, 4 * n_embd),
            nn.GELU(),
            nn.Linear(4 * n_embd, n_embd),
            nn.Dropout(dropout),
        )

    def forward(self, x):
        return self.net(x)



class Block(nn.Module):

    def __init__(self, n_embd, n_head, block_size, dropout):
        super().__init__()
        head_size = n_embd // n_head
        self.sa = MultiHeadAttention(n_head, head_size, n_embd, block_size, dropout)
        self.ffwd = FeedFoward(n_embd,dropout)
        self.ln1 = nn.LayerNorm(n_embd)
        self.ln2 = nn.LayerNorm(n_embd)


    def forward(self, x):
        x = x + self.sa(self.ln1(x))
        x = x + self.ffwd(self.ln2(x))
        return x

class MiniGPTModel(nn.Module):

    def __init__(self, n_embd, n_layer, n_head, block_size, dropout,vocab_size):
        super().__init__()
        self.token_embedding_table = nn.Embedding(vocab_size, n_embd)
        self.position_embedding_table = nn.Embedding(block_size, n_embd)
        self.blocks = nn.Sequential(*[Block(n_embd, n_head, block_size, dropout) for _ in range(n_layer)])
        self.ln_f = nn.LayerNorm(n_embd)
        self.lm_head = nn.Linear(n_embd, vocab_size)

    def forward(self, idx, targets=None):
        B, T = idx.shape

        # idx and targets are both (B,T) tensor of integers
        tok_emb = self.token_embedding_table(idx) # (B,T,C)
        pos_emb = self.position_embedding_table(torch.arange(T, device=device)) # (T,C)
        x = tok_emb + pos_emb # (B,T,C)
        x = self.blocks(x) # (B,T,C)
        x = self.ln_f(x) # (B,T,C)
        logits = self.lm_head(x) # (B,T,vocab_size)

        if targets is None:
            loss = None
        else:
            B, T, C = logits.shape
            logits = logits.view(B*T, C)
            targets = targets.view(B*T)
            loss = F.cross_entropy(logits, targets)

        return logits, loss

    def generate(self, idx, max_new_tokens,block_size):
        # idx is (B, T) array of indices in the current context
        for _ in range(max_new_tokens):
            # crop idx to the last block_size tokens
            idx_cond = idx[:, -block_size:]
            # get the predictions
            logits, loss = self(idx_cond)
            # focus only on the last time step
            logits = logits[:, -1, :] # becomes (B, C)
            # apply softmax to get probabilities
            probs = F.softmax(logits, dim=-1) # (B, C)
            # sample from the distribution
            idx_next = torch.multinomial(probs, num_samples=1) # (B, 1)
            # append sampled index to the running sequence
            idx = torch.cat((idx, idx_next), dim=1) # (B, T+1)
        return idx


In [None]:
@torch.no_grad()
def estimate_loss(model, eval_iters, batch_size, block_size, data):
    model.eval()
    losses = torch.zeros(eval_iters)
    for k in range(eval_iters):
        X, Y = get_batch(data, batch_size, block_size)
        logits, loss = model(X, Y)
        losses[k] = loss.item()
    model.train()
    return losses.mean()

def train_model(config, train_data, val_data, max_iters = 5000):
    start_time = time.time()


    model = MiniGPTModel(
        n_embd=config['n_embd'],
        n_head=config['n_head'],
        n_layer=config['n_layer'],
        block_size=config['block_size'],
        dropout =config['dropout'],
        vocab_size = config['vocab_size']
    ).to(device)

    print(sum(p.numel() for p in model.parameters())/1e6, 'M parameters')

    optimizer = torch.optim.AdamW(model.parameters(), lr=config['learning_rate'])

    eval_interval = 100
    eval_iters = 500
    train_losses = []
    val_losses = []

    # Setting early-stopping
    patience = 10
    patience_counter = 0
    best_val_loss = float('inf')


    iter_times = []  # store the iteration time
    for iter in range(max_iters):
        iter_start_time = time.time()  # start time



        # every once in a while evaluate the loss on train and val sets
        if iter % eval_interval == 0 or iter == max_iters - 1:
            losses_train = estimate_loss(model,eval_iters, config['batch_size'], config['block_size'], train_data)
            losses_val = estimate_loss(model,eval_iters, config['batch_size'], config['block_size'], val_data)

            print(f"step {iter}: train loss {losses_train:.4f}, val loss {losses_val:.4f}")

            train_losses.append((iter, losses_train))
            val_losses.append((iter, losses_val))


            current_val_loss = losses_val
            if current_val_loss < best_val_loss:
                best_val_loss = current_val_loss
                patience_counter = 0  
            else:
                patience_counter += 1  

        
        if patience_counter >= patience:
            print(f"Early stopping triggered after {iter + 1} iterations.")
            break

        xb, yb = get_batch(train_data, config['batch_size'], config['block_size'])

        logits, loss = model(xb, yb)
        optimizer.zero_grad(set_to_none=True)
        loss.backward()
        optimizer.step()

        iter_end_time = time.time()  # end time
        iter_times.append(iter_end_time - iter_start_time)  


    avg_iter_time = sum(iter_times) / len(iter_times)
    print(f"Average iteration time: {avg_iter_time:.4f} seconds")


    end_time = time.time()  
    print(f"Total training time: {end_time - start_time:.4f} seconds") 


    return model, train_losses, val_losses


In [None]:
def finetune_model(model, config, train_data, val_data, max_iters = 5000):
    start_time = time.time()

    print(sum(p.numel() for p in model.parameters())/1e6, 'M parameters')

    optimizer = torch.optim.AdamW(model.parameters(), lr=config['learning_rate'])

    eval_interval = 100
    eval_iters = 500
    train_losses = []
    val_losses = []

    # Setting early-stopping
    patience = 10
    patience_counter = 0
    best_val_loss = float('inf')


    iter_times = []  
    for iter in range(max_iters):
        iter_start_time = time.time()  



        # every once in a while evaluate the loss on train and val sets
        if iter % eval_interval == 0 or iter == max_iters - 1:
            losses_train = estimate_loss(model,eval_iters, config['batch_size'], config['block_size'], train_data)
            losses_val = estimate_loss(model,eval_iters, config['batch_size'], config['block_size'], val_data)

            print(f"step {iter}: train loss {losses_train:.4f}, val loss {losses_val:.4f}")

            train_losses.append((iter, losses_train))
            val_losses.append((iter, losses_val))


            current_val_loss = losses_val
            if current_val_loss < best_val_loss:
                best_val_loss = current_val_loss
                patience_counter = 0  
            else:
                patience_counter += 1  

        
        if patience_counter >= patience:
            print(f"Early stopping triggered after {iter + 1} iterations.")
            break

        xb, yb = get_batch(train_data, config['batch_size'], config['block_size'])

        logits, loss = model(xb, yb)
        optimizer.zero_grad(set_to_none=True)
        loss.backward()
        optimizer.step()

        iter_end_time = time.time()  
        iter_times.append(iter_end_time - iter_start_time)  


    avg_iter_time = sum(iter_times) / len(iter_times)
    print(f"Average iteration time: {avg_iter_time:.4f} seconds")


    end_time = time.time()
    print(f"Total training time: {end_time - start_time:.4f} seconds") 


    return model, train_losses, val_losses
