In [None]:
import torch
from torch import nn
import torch.nn.functional as F
import torchtext
from torch.utils.data import Dataset,DataLoader
import requests
from collections import Counter
import numpy as np
import random
from torch.nn.utils.rnn import pad_sequence
from torch import Tensor

In [None]:
url_csv = "https://raw.githubusercontent.com/paulskeie/stadnamn/main/stadnamn.csv"
def get_stadnamn(url_csv):
  r = requests.get(url_csv)
  data = r.text
  return data

In [None]:
data = get_stadnamn(url_csv)

In [None]:

def tokenize(txt,tokens,start_stop_chars):
  sn_tokens=[tokens[start_stop_chars[0]]]
  for c in txt:
    sn_tokens.append(tokens[c])
  sn_tokens.append(tokens[start_stop_chars[1]])
  return sn_tokens

counter=Counter(data)

# Find the longest place name and assign the number of characters to maxlen
max_len=0
for sn in data.splitlines():
  if len(sn) > max_len:
    max_len=len(sn)
    max_len_sn=sn
# Add one to account for the start token, the end token shall not be a feature, only a target
max_len+=1


# Use characters that don't occur in the place names as start and stop characters.
start_stop_chars=['@','$']

# Here we make a dictionary mapping tokens to characters. Later we will zero pad which is why we start at 1
tokens=dict([(x,i+1) for i,x in enumerate(sorted(start_stop_chars+list(counter.keys())))])
vocab_size = len(tokens)+1

In [None]:
placenames=data.split("\n")

In [None]:
def build_placename_index(placenames):
  ith_example=0
  placename_index={}
  for i,placename in enumerate(placenames):
    len_placename=len(placename)
    jth_example=0
    for j in range(len_placename+1):
      placename_index[ith_example]=(i,jth_example)
      ith_example+=1
      jth_example+=1
  n_examples = ith_example
  return n_examples,placename_index

In [None]:

def train_test_split_indices(length, train_fraction=0.8):
    """
    Generates training and testing indices for a given dataset size.

    Args:
    - length (int): The total number of items in the dataset.
    - train_fraction (float): Fraction of the dataset to be used for training.

    Returns:
    - index_train (list): Indices for the training set.
    - index_test (list): Indices for the testing set.
    """

    # Calculate the number of training samples
    train_size = int(length * train_fraction)

    # Generate all indices
    indices = list(range(length))

    # Shuffle the indices
    random.shuffle(indices)

    # Split into training and testing sets
    index_train = indices[:train_size]
    index_test = indices[train_size:]

    return index_train, index_test


In [None]:
index_train, index_test = train_test_split_indices(len(placenames))

In [None]:
placenames_train = [placenames[idx] for idx in index_train]
placenames_test = [placenames[idx] for idx in index_test]

In [None]:
def get_jth_example_in_placename(j,placename, tokens, start_stop_chars):
  placename_tokens = tokenize(placename, tokens, start_stop_chars)
  len_placename_tokens=len(placename_tokens)
  if j > len_placename_tokens - 2:
    return None
  features = placename_tokens[:j + 1]
  target = placename_tokens[j + 1]
  return features,target

In [None]:
tokenize("Bulken", tokens, start_stop_chars)

In [None]:
get_jth_example_in_placename(4,"Bulken",tokens, start_stop_chars)

In [None]:
tokens.keys()

In [None]:
tokens.values()

In [None]:

class PlaceNameDataset(Dataset):
    def __init__(self, placenames, tokens, start_stop_chars, max_len):
        self.placenames = placenames
        self.tokens = tokens
        self.start_stop_chars = start_stop_chars
        self.max_len = max_len
        self.n_examples,self.placename_index = build_placename_index(placenames)

    def __len__(self):
        return self.n_examples

    def __getitem__(self, index):
        if index < 0 or index >= self.__len__():
            raise IndexError("Index out of range")

        # Find which placename the index corresponds to
        placename_index,local_index = self.placename_index[index]

        placename = self.placenames[placename_index]

        # Get jth example in placename
        features, target = get_jth_example_in_placename(local_index,placename,self.tokens, self.start_stop_chars)

        return features, target


In [None]:
train_dataset = PlaceNameDataset(placenames_train,tokens,start_stop_chars,max_len)
test_dataset = PlaceNameDataset(placenames_test,tokens,start_stop_chars,max_len)

In [None]:
max_seq_length=30

In [None]:
def collate_fn(batch, max_len=max_seq_length):
    # Separate features and targets
    feature_batch, target_batch = zip(*batch)

    # Process features: Pad/truncate each sequence to max_len
    processed_features = []
    for seq in feature_batch:
      seq = torch.tensor(seq)
      if len(seq) < max_len:
          # Pad sequence if shorter than max_len
          padded_seq = torch.cat([torch.full((max_len - len(seq),), 0),seq])
      else:
          # Truncate sequence if longer than max_len
          padded_seq = seq[-max_len:]
      processed_features.append(padded_seq)

    # Stack all processed features and targets
    features = torch.stack(processed_features)
    targets = torch.tensor(target_batch)

    return features, targets

In [None]:

# Create DataLoaders with custom collate_fn
train_dataloader = DataLoader(train_dataset, batch_size=64, collate_fn=collate_fn,shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=64, collate_fn=collate_fn)

In [None]:
len(test_dataloader)

In [None]:
len(train_dataloader)

In [None]:
for X, y in test_dataloader:
    print(f"Shape of X [batch_size, sequence_length]: {X.shape}")
    print(f"Shape of y: {y.shape} {y.dtype}")
    break

In [None]:
emb=nn.Embedding(vocab_size,2)
out=emb(X)
out.size()

In [None]:

class SimpleEmbedding(nn.Module):
  def __init__(self,vocab_size,embedding_dim,max_seq_length,fan_out_dim_linear1):
    super().__init__()
    self.emb = nn.Embedding(vocab_size,embedding_dim)
    self.linear1 = nn.Linear(max_seq_length*embedding_dim,fan_out_dim_linear1)
    self.bn1 = nn.BatchNorm1d(fan_out_dim_linear1)
    self.linear2 = nn.Linear(fan_out_dim_linear1,vocab_size)

    self.init_weights()

  def init_weights(self) -> None:
    initrange = 0.1
    self.emb.weight.data.uniform_(-initrange, initrange)
    self.linear1.bias.data.zero_()
    self.linear1.weight.data.uniform_(-initrange, initrange)
    self.linear2.bias.data.zero_()
    self.linear2.weight.data.uniform_(-initrange, initrange)

  def forward(self,x):
    x = self.emb(x)
    x = x.view(x.size(0),-1)
    x = self.linear1(x)
    x = self.bn1(x)
    x = F.relu(x)
    logits = self.linear2(x)
    return logits


In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = SimpleEmbedding(vocab_size,8,max_seq_length,120)

In [None]:
model = model.to(device)

In [None]:
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-2)

In [None]:
optimizer.zero_grad()
logits = model(X.to(device))
loss = loss_fn(logits,y.to(device))
loss.backward()
optimizer.step()

In [None]:
loss.item()

In [None]:
optimizer = torch.optim.Adam(model.parameters(), lr=1e-5)

In [None]:
correct = 0
total = 0
batch_number = 0
for X,y in train_dataloader:
  X=X.to(device)
  y=y.to(device)
  optimizer.zero_grad()
  logits = model(X)
  loss = loss_fn(logits,y)
  loss.backward()
  optimizer.step()
  batch_number+=1

  # Convert logits to predicted class indices
  _, predicted = torch.max(logits.data, 1)
  total += y.size(0)
  correct += (predicted == y).sum().item()

  if batch_number % 1000 == 0:
    print(batch_number,loss.item())
    accuracy = 100 * correct / total
    print(f"Batch: {batch_number}, Loss: {loss.item()}, Accuracy: {accuracy}%")
    correct = 0
    total = 0

  if batch_number % 40000 == 0:
    break

In [None]:
embeddings = model.emb.weight.detach().cpu().numpy()
token_to_char = {token: char for char, token in tokens.items()}

In [None]:
embeddings[10]

In [None]:
import matplotlib.pyplot as plt

# Plot each embedding
plt.figure(figsize=(10, 10))
for token, char in token_to_char.items():
    x, y = embeddings[token]  # Get the 2D coordinates
    plt.scatter(x, y)
    plt.annotate(char, (x, y), textcoords="offset points", xytext=(0,10), ha='center')

plt.xlabel('Embedding Dimension 1')
plt.ylabel('Embedding Dimension 2')
plt.title('Character Embeddings')
plt.show()


In [None]:

class SimpleTransformerModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, max_seq_length, fan_out_dim_linear1, nhead, num_encoder_layers, dim_feedforward):
        super().__init__()
        self.emb = nn.Embedding(vocab_size, embedding_dim)
        encoder_layer = nn.TransformerEncoderLayer(d_model=embedding_dim, nhead=nhead, dim_feedforward=dim_feedforward)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_encoder_layers)
        self.linear1 = nn.Linear(max_seq_length * embedding_dim, fan_out_dim_linear1)
        self.bn1 = nn.BatchNorm1d(fan_out_dim_linear1)
        self.linear2 = nn.Linear(fan_out_dim_linear1, vocab_size)

        self.init_weights()

    def init_weights(self) -> None:
        initrange = 0.1
        self.emb.weight.data.uniform_(-initrange, initrange)
        self.linear1.bias.data.zero_()
        self.linear1.weight.data.uniform_(-initrange, initrange)
        self.linear2.bias.data.zero_()
        self.linear2.weight.data.uniform_(-initrange, initrange)

    def forward(self, x):
        x = self.emb(x)
        x = x.permute(1, 0, 2)  # Transformer expects src (L, N, E) format
        x = self.transformer_encoder(x)
        x = x.permute(1, 0, 2)  # Revert to (N, L, E) for linear layers
        x = x.reshape(x.size(0), -1)
        x = self.linear1(x)
        x = self.bn1(x)
        x = F.relu(x)
        logits = self.linear2(x)
        return logits


In [None]:
#model = SimpleTransformerModel(123,8,10,120,4,2,120).to(device)
model = SimpleTransformerModel(vocab_size=123, embedding_dim=8, max_seq_length=30, fan_out_dim_linear1=120, nhead=4, num_encoder_layers=4, dim_feedforward=120).to(device)


In [None]:
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

In [None]:
correct = 0
total = 0
batch_number = 0
for X,y in train_dataloader:
  X=X.to(device)
  y=y.to(device)
  optimizer.zero_grad()
  logits = model(X)
  loss = loss_fn(logits,y)
  loss.backward()
  optimizer.step()
  batch_number+=1

  # Convert logits to predicted class indices
  _, predicted = torch.max(logits.data, 1)
  total += y.size(0)
  correct += (predicted == y).sum().item()

  if batch_number % 1000 == 0:
    print(batch_number,loss.item())
    accuracy = 100 * correct / total
    print(f"Batch: {batch_number}, Loss: {loss.item()}, Accuracy: {accuracy}%")
    correct = 0
    total = 0

  #if batch_number % 10000 == 0:
  #  break

In [None]:
# Set the model to evaluation mode
model.eval()

# Disable gradient calculation
correct = 0
total = 0
with torch.no_grad():
    for X, y in test_dataloader:
        X = X.to(device)
        y = y.to(device)

        # Forward pass
        logits = model(X)

        # Convert logits to predicted class indices
        _, predicted = torch.max(logits.data, 1)
        total += y.size(0)
        correct += (predicted == y).sum().item()

# Calculate accuracy
accuracy = 100 * correct / total
print(f'Accuracy on the test set: {accuracy:.2f}%')

In [None]:

import math

class PositionalEncoding(nn.Module):

    def __init__(self, d_model: int, dropout: float = 0.1, max_len: int = 5000):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
        pe = torch.zeros(max_len, 1, d_model)
        pe[:, 0, 0::2] = torch.sin(position * div_term)
        pe[:, 0, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, x: Tensor) -> Tensor:
        """
        Arguments:
            x: Tensor, shape ``[seq_len, batch_size, embedding_dim]``
        """
        x = x + self.pe[:x.size(0)]
        return self.dropout(x)


In [None]:
p=np.arange(0.01,1,0.01)
cce=-np.log(p)
plt.plot(p,cce);

In [None]:
positional_embedding=pe.pe.cpu().numpy()

In [None]:
from matplotlib import pyplot as plt
plt.plot(positional_embedding[:,0,:9])

In [None]:
XX=X.cpu()

In [None]:
embedding_dim=8
emb=nn.Embedding(vocab_size,embedding_dim)
out=emb(XX).permute(1,0,2)
print(XX.size())
print(out.size())
pe = PositionalEncoding(embedding_dim,0.1,max_seq_length)
peout=pe(out)
print(peout.size())

In [None]:
out.size()

In [None]:
out.permute(1,0,2).size()

In [None]:
from matplotlib import pyplot as plt



In [None]:

class MediumTransformerModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, max_seq_length, fan_out_dim_linear1, nhead, num_encoder_layers, dim_feedforward):
        super().__init__()
        self.emb = nn.Embedding(vocab_size, embedding_dim)
        encoder_layer = nn.TransformerEncoderLayer(d_model=embedding_dim, nhead=nhead, dim_feedforward=dim_feedforward)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_encoder_layers)
        self.linear1 = nn.Linear(max_seq_length * embedding_dim, fan_out_dim_linear1)
        self.bn1 = nn.BatchNorm1d(fan_out_dim_linear1)
        self.linear2 = nn.Linear(fan_out_dim_linear1, vocab_size)
        self.pos_encoder = PositionalEncoding(embedding_dim, 0.1, max_seq_length)

        self.embedding_dim = embedding_dim

        self.init_weights()

    def init_weights(self) -> None:
        initrange = 0.1
        self.emb.weight.data.uniform_(-initrange, initrange)
        self.linear1.bias.data.zero_()
        self.linear1.weight.data.uniform_(-initrange, initrange)
        self.linear2.bias.data.zero_()
        self.linear2.weight.data.uniform_(-initrange, initrange)

    def forward(self, x):
        x = self.emb(x) * math.sqrt(self.embedding_dim)  # Scale embedding
        x = x.permute(1,0,2)
        x = self.pos_encoder(x)
        #x = x.permute(1, 0, 2)  # Transformer expects src (L, N, E) format
        x = self.transformer_encoder(x)
        x = x.permute(1, 0, 2)  # Revert to (N, L, E) for linear layers
        x = x.reshape(x.size(0), -1)
        x = self.linear1(x)
        x = self.bn1(x)
        x = F.relu(x)
        logits = self.linear2(x)
        return logits


In [None]:
model = MediumTransformerModel(vocab_size=123, embedding_dim=4, max_seq_length=max_seq_length, fan_out_dim_linear1=64, nhead=4, num_encoder_layers=4, dim_feedforward=64).to(device)

In [None]:
optimizer = torch.optim.Adam(model.parameters(), lr=1e-2)

In [None]:
correct = 0
total = 0
batch_number = 0
for X,y in train_dataloader:
  X=X.to(device)
  y=y.to(device)
  optimizer.zero_grad()
  logits = model(X)
  loss = loss_fn(logits,y)
  loss.backward()
  optimizer.step()
  batch_number+=1

  # Convert logits to predicted class indices
  _, predicted = torch.max(logits.data, 1)
  total += y.size(0)
  correct += (predicted == y).sum().item()

  if batch_number % 1000 == 0:
    print(batch_number,loss.item())
    accuracy = 100 * correct / total
    print(f"Batch: {batch_number}, Loss: {loss.item()}, Accuracy: {accuracy}%")
    correct = 0
    total = 0

  if batch_number % 10000 == 0:
    break

In [None]:
n_parameters=0
for parameter in model.parameters():
    par=parameter.view(-1).size()[0]
    n_parameters+=par
    print(par,n_parameters)

In [None]:
# TODO, try this:
# https://pytorch.org/tutorials/beginner/transformer_tutorial.html

class PositionalEncoding(nn.Module):

    def __init__(self, d_model: int, dropout: float = 0.1, max_len: int = 5000):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
        pe = torch.zeros(max_len, 1, d_model)
        pe[:, 0, 0::2] = torch.sin(position * div_term)
        pe[:, 0, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, x: Tensor) -> Tensor:
        """
        Arguments:
            x: Tensor, shape ``[seq_len, batch_size, embedding_dim]``
        """
        x = x + self.pe[:x.size(0)]
        return self.dropout(x)


In [None]:
# Instantiate PositionalEncoding
d_model = 8  # Embedding dimension
max_len = 30  # Maximum sequence length for visualization
pe = PositionalEncoding(d_model, 0.1 ,max_len)

# Generate a dummy input tensor
seq_length = 30
dummy_input = torch.zeros(seq_length, 1, d_model)

# Get the positional encodings
with torch.no_grad():
    positional_encodings = pe(dummy_input).squeeze(1)

# Convert to numpy for visualization
positional_encodings = positional_encodings.cpu().numpy()

# Plotting
plt.figure(figsize=(15, 10))
plt.pcolormesh(positional_encodings, cmap='RdBu')
plt.xlabel('Embedding Dimensions')
plt.ylabel('Position in Sequence')
plt.colorbar()
plt.title('Positional Encoding Heatmap')
plt.show()
