In [19]:
from dataclasses import dataclass

import torch
import torch.nn as nn
import torch.nn.functional as F
from dataclasses import dataclass

In [20]:
# def create_mask(n_context: int) -> Float[torch.Tensor, "n_context n_context"]:
def create_mask(n_context: int) -> torch.Tensor:
    mask = torch.zeros(n_context, n_context)
    indices = torch.triu_indices(n_context, n_context, offset=1)
    mask[indices[0], indices[1]] = float('-inf') 
    return mask

In [21]:

# Configuration class for our transformer
@dataclass
class GPTConfig:
	d_vocab: int = 10_000 
	d_model: int = 128
	d_mlp: int = 512
	n_heads: int = 4
	d_head: int = 32
	n_layers: int = 6
	max_ctx: int = 512


class AttentionHead(nn.Module):

	def __init__(self, cfg: GPTConfig):
		super().__init__()

		# Define learned attention matrices
		self.W_Q = nn.Linear(cfg.d_model, cfg.d_head)
		self.W_K = nn.Linear(cfg.d_model, cfg.d_head)
		self.W_O = nn.Linear(cfg.d_head, cfg.d_model)
		self.W_V = nn.Linear(cfg.d_model, cfg.d_head)

	def forward(self, x: torch.Tensor) -> torch.Tensor:
		M = create_mask(x.size(0)).to(x.device) # Make sure mask is on same device as input tensor
		return F.softmax(self.W_Q(x) @ self.W_K(x).T + M) @ self.W_O(self.W_V(x)) # Attention equation
		


class MultiHeadedAttention(nn.Module):

	def __init__(self, cfg: GPTConfig):
		super().__init__()
		self.cfg = cfg

		# List of attention heads
		self.heads = nn.ModuleList([AttentionHead(cfg) for i in range(cfg.n_heads)])

	def forward(self, x: torch.Tensor) -> torch.Tensor:
		
		
		
		head_outputs = [head(x) for head in self.heads]  # List of tensors
		sum_output = sum(head_outputs) # Adds all head outputs together

		return sum_output




class MLP(nn.Module):

	def __init__(self, cfg: GPTConfig):
		super().__init__()
		self.cfg = cfg

		# Define layers of MLP
		self.Hidden = nn.Linear(cfg.d_model, cfg.d_mlp)
		self.Output = nn.Linear(cfg.d_mlp, cfg.d_model)

		# Using GELU activation function
		self.gelu = nn.GELU()
		
	def forward(self, x: torch.Tensor) -> torch.Tensor:

		# It's an MLP
		return self.gelu(self.Output(self.gelu(self.Hidden(x))))



class Transformer(nn.Module):

	def __init__(self, cfg: GPTConfig):
		super().__init__()
		self.cfg = cfg

		# Creates positional embedding matrix that covers all possible context lengths
		self.pos_embedding = nn.Embedding(cfg.max_ctx, cfg.d_model)

		# Embedding and unembedding matrices
		self.embedding = nn.Embedding(cfg.d_vocab, cfg.d_model)
		self.unembedding = nn.Linear(cfg.d_model, cfg.d_vocab)	

		# Layernorm
		self.norm = nn.LayerNorm(cfg.d_model)

		# Creates dictionary of attention heads and mlps depending on transformer depth
		self.layers = nn.ModuleList(
			nn.ModuleDict({
				'attn': MultiHeadedAttention(cfg),
				'mlp': MLP(cfg)
			}) for _ in range(cfg.n_layers)
		)

	def forward(self, x: torch.Tensor) -> torch.Tensor:
  
		x = self.embedding(x) # shape (n_context, d_model)
		
		positions = torch.arange(x.size(0), device=x.device)  # (n_context, 1)
		pos_emb = self.pos_embedding(positions)  # (n_context, d_model)

		x = x + pos_emb

		for layer in self.layers:
			
			# Residual connections for each layer
			x = x + layer['attn'](self.norm(x)) 
			x = x + layer['mlp'](self.norm(x))
		
		return self.unembedding(x)

In [22]:

from get_books import get_many_books

# Import some books and pull them all into one giant string
book_ids = [15, 63, 16, 41, 14, 76, 99, 209, 110, 9]
dataset = get_many_books(book_ids, data_temp="./data/gutenberg_data")
rawtext = ""
for book in dataset:
    rawtext += book

Getting book 15...
	1218778 characters read
Getting book 63...
	103137 characters read
Getting book 16...
	255644 characters read
Getting book 41...
	69837 characters read
Getting book 14...
	1897512 characters read
Getting book 76...
	571137 characters read
Getting book 99...
	45748 characters read
Getting book 209...
	228531 characters read
Getting book 110...
	841270 characters read
Getting book 9...
	21311 characters read


In [None]:
import huffman

codebook = huffman.codebook((char, rawtext.count(char)) for char in set(rawtext))

print("Codebook:", codebook)

# Encoding
encoded = ''.join(codebook[char] for char in rawtext)
print("Encoded data:", encoded[0:50])

padding_length = 8 - len(encoded) % 8
encoded_padded = encoded + '0' * padding_length  # Add padding (if necessary)

# Convert the padded binary string into a byte array
byte_array = bytearray()
for i in range(0, len(encoded_padded), 8):
    byte_array.append(int(encoded_padded[i:i+8], 2))  # Convert each 8-bit chunk to a byte

# Write the binary data to a file
with open("encoded_data.bin", "wb") as f:
    # Write the padding length at the beginning (so we can decode it later)
    f.write(bytes([padding_length]))
    f.write(byte_array)

Codebook: {'b': '1111010', 'c': '101110', 'f': '011010', 'v': '0100110', ')': '101111100', 'w': '011001', 'k': '0001100', 'P': '101000001', '*': '11110110000010', '4': '01000101', '“': '010011100', '-': '0110000', ']': '111101100000110', 'T': '101111110', '£': '101000000010110111010', ';': '10100011', 'U': '1010100101', 'ח': '1010000000101100101010', 'q': '11110110110', '!': '0001101000', 'N': '010011101', '.': '0100011', 'æ': '10100000001011000', 'i': '0010', '0': '11110111', 'C': '101111000', 'ο': '1010000000101100100011', 'D': '1111011010', 'E': '000111101', '1': '0001110', 'ù': '1010000000101101111110', 'ו': '1010000000101100101001', ':': '01100011', 'Y': '10100000000', 'η': '1010000000101100101011', 's': '0000', 'h': '10110', '—': '1010000001', '$': '10101001110', '/': '101010011110', 'Z': '1010000000100', 't': '1001', '[': '101000000010111', 'o': '0101', 'V': '101010011111', 'G': '1010100110', '6': '00011111', '_': '10101011111', 'L': '1011111010', 'Q': '1010000000110', '’': '101

In [31]:
with open("encoded_data.bin", "rb") as f:
    byte_data = f.read()

# Decode using latin-1, a safe 1-to-1 mapping
ansi_like_string = byte_data.decode('latin-1')

print("ANSI-like string:", ansi_like_string[0:100])

ANSI-like string: Õ}$ÃÚ+KúD+õV=×ýV7}Q^Õ_~·<5è¦(Ç!^ç@BÃÙJµ¶°öþ½Ò¼Uctðö¤èéV²6ÀZñUÐoÃÚTT# oÖíx¡Óªçl


In [24]:
# from torch.utils.data import DataLoader, TensorDataset

# # Tokenize the entire dataset (ignore warning about sequence length thrown here)
# tokens = tokenizer(rawtext, return_tensors="pt")

# # Reorganize tokens into lengths of chunk_size
# chunk_size = 100
# to_remove = tokens["input_ids"].shape[1] % chunk_size
# new_shape = tokens["input_ids"].shape[1] // chunk_size
# attention_mask = tokens['attention_mask'][0][:-to_remove].reshape(new_shape, chunk_size)
# input_ids = tokens['input_ids'][0][:-to_remove].reshape(new_shape, chunk_size)

# # Format tokens for dataloader and load them in
# tensor = TensorDataset(input_ids, attention_mask)
# dataloader = DataLoader(tensor, batch_size=8, shuffle=True)

In [25]:
# import torch.optim as optim

# # Move model to GPU if available
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# model = Transformer(model_cfg).to(device)

# # Set up optimizer and loss
# optimizer = optim.AdamW(model.parameters(), lr=1e-3)
# criterion = nn.CrossEntropyLoss()

# # Put model into training mode to store derivatives (important to do this after it is on gpu)
# model.train()


# n_epochs = 1
# print_interval = 10
# for epoch in range(n_epochs):
#     for step, batch in enumerate(dataloader):
#         # [batch_size, seq_len]
#         input_ids_batch = batch[0].to(device)

#         # We'll accumulate the losses for each sequence in this mini-batch
#         total_loss = 0.0
#         batch_size = input_ids_batch.size(0)
        
#         # Process each sequence individually
#         for i in range(batch_size):
#             # Extract a single sequence of shape [seq_len]
#             seq_ids = input_ids_batch[i]

#             # Next-token language modeling: input is all but last token, target is all but first
#             inp = seq_ids[:-1]   # shape [seq_len - 1]
#             targ = seq_ids[1:]    # shape [seq_len - 1]

#             # Forward pass
#             # Your model returns logits of shape [seq_len-1, d_vocab]
#             logits = model(inp)

#             # Compute loss across this sequence
#             # CrossEntropyLoss expects [batch, vocab], so we can pass [seq_len-1, d_vocab] vs. [seq_len-1]
#             loss = criterion(logits, targ)

#             # Accumulate
#             total_loss += loss

#         # Average across all sequences in the batch
#         total_loss = total_loss / batch_size

#         # Backprop and update
#         optimizer.zero_grad()
#         total_loss.backward()
#         optimizer.step()

#         # Print progress
#         if (step + 1) % print_interval == 0:
#             print(f"Epoch {epoch+1}, Step {step+1}, Loss: {total_loss.item():.4f}")

# # The loss initially drops fast, and then it becomes more gradual over time. 
# # Due to the nature of sgdm, the loss doesn't always decrease after every batch.

In [26]:

# # Move the model back to the cpu for inference
# model.to('cpu')
# model.eval()

# # Generate function
# def generate(input_text: str, output_tokens: int, model) -> str:
#     for new_token in range(output_tokens):
        
#         new_tokens = tokenizer(input_text, return_tensors="pt")
#         with torch.no_grad():
#             out_probs = F.softmax(model(new_tokens['input_ids'][0]), dim=-1) 
#         samples = torch.multinomial(out_probs, 1)
#         detokenized_text = tokenizer.decode(samples[-1][0], skip_special_tokens=True)
#         print(detokenized_text,end='')
#         input_text += detokenized_text
    
#     return input_text

# # Its alive... sorta
# test_output = generate('Why are there so many new lines?', 50, model)