# Markov-Transformer
## About
This notebook outlines experimentation regarding the Bachelor Thesis "On Markov Sequence Compression and Summarization in Transformers and Large Language Models" at the Goethe-University, Frankfurt (Semester 24/25).
It consists of a simple binary Transformer model (based on "Attention is All You Need" by Vaswani et al., 2017) that is trained on binary Markov-generated sequences. The Markov-sequences are randomly created from Markov chains of order 1-6. Each Markov chain is defined as a transition matrix with dimensions (2^k * 2) with k being the order of the chain.

In [152]:
import numpy as np
import json
import torch
import random
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt
from pathlib import Path
import seaborn as sns

In [153]:
# I used mps as a device for faster training. Alternatively, CUDA or CPU (not recommended) are possible
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
print(f"Using device {device}")

Using device mps


## Parameters
The training parameters are oriented on "Transformer Models on Markov Data: Constant Depth Suffices" (Rajaraman et al., 2024). It has been shown that Transformer models with constant depth of 3 layers and only one head per leayer are able to perform low test loss on Markov sequences of order up to 8. These fixed parameters were chosen to best combine simplicitly and performance across the Markov orders 1 to 6.

In [154]:
block_size = 512
batch_size = 16
learning_rate = 0.001
weight_decay = 1e-3
dropout = 0.0

In [155]:
# binary vocabulary
vocabulary_size = 2
chars = [0, 1] # notice how there es no encoding (string to integer) needed because the index is the token (0 and 1)

## Markov Chains
These functions are needed to create transition matrices and generate and estimate Markov sequences:

In [156]:
def generate_transition_matrix(order: int, seed: int):
    """
    generate a random transition matrix for a given order
    """
    if seed is not None:
        torch.manual_seed(seed)
        
    # calculate number of states (2^order)
    n_states = 2 ** order
    
    # generate random probabilities
    matrix = torch.rand((n_states, 2))
    
    # normalize rows to sum to 1
    row_sums = matrix.sum(dim=1, keepdim=True)
    matrix = matrix / row_sums
    
    return matrix

def generate_sequence(transition_matrix: torch.tensor, order: int, length: int, seed: int = None) -> list:
    """
    generate a binary sequence (array) from a given Markov transition matrix
    """
    if seed is not None:
        torch.manual_seed(seed)
    
    sequence = torch.zeros(length, dtype=torch.long)
    
    # Generate first 'order' elements randomly (starting point)
    for i in range(order):
        sequence[i] = torch.randint(0, 2, (1,))
    
    # generate remaining sequence using transition probabilities
    for i in range(order, length):
        # calculate current state index from previous 'order' elements
        state_idx = 0
        for j in range(order):
            state_idx = state_idx * 2 + sequence[i-order+j]
            
        # generate next bit based on transition probabilities
        probs = transition_matrix[int(state_idx)]
        sequence[i] = torch.bernoulli(probs[1])
    
    return sequence.tolist()

def estimate_matrix(sequence: list, 
                      order: int) -> torch.Tensor:
    """
    estimate a Markov transition matrix from a given binary sequence
    """
    num_states = 2 ** order
    counts = torch.zeros((num_states, 2))
    
    # count transitions
    for i in range(len(sequence) - order):
        # get current state (last k bits)
        state = sequence[i:i+order]
        state_idx = int(''.join(map(str, state)), 2)
        
        # get next bit
        next_bit = sequence[i+order]
        
        # update counts
        counts[state_idx, next_bit] += 1
    
    # convert counts to probabilities
    row_sums = counts.sum(dim=1, keepdim=True)
    # handle zero counts by setting uniform probabilities
    row_sums[row_sums == 0] = 2
    transition_matrix = counts / row_sums
    
    return transition_matrix

## Training and Validation data
We can now generate binary sequences for the training and split these sequences into batches for training and validation.

In [157]:
def generate_training_data(transition_matrix, order, sequence_length, num_sequences):
    """
    generate a number sequences of a given length for a transition matrix
    """
    sequences = []
    for i in range(num_sequences):
        sequence = generate_sequence(transition_matrix, order, sequence_length, i)
        sequences.append(torch.tensor(sequence, dtype=torch.long))
    return sequences

def get_batch(sequences, split='train'):
    """
    split sequences into batches for training and validation
    """
    n = int(0.9 * len(sequences)) # 90% training - 10% validation
    data = sequences[:n] if split == 'train' else sequences[n:]
    
    while True:  # keep trying until we get a valid batch
        seq_indices = torch.randint(len(data), (batch_size,))
        x_batch = []
        y_batch = []
        
        for idx in seq_indices:
            sequence = data[idx]
            if len(sequence) >= block_size + 1:
                start_idx = torch.randint(0, len(sequence) - block_size, (1,))
                x = sequence[start_idx:start_idx + block_size]
                y = sequence[start_idx + 1:start_idx + block_size + 1]
                x_batch.append(x)
                y_batch.append(y)
        
        if len(x_batch) > 0:  # only proceed if there is at least one valid sequence
            x_batch = torch.stack(x_batch)
            y_batch = torch.stack(y_batch)
            return x_batch.to(device), y_batch.to(device)

In [158]:
@torch.no_grad()
def estimate_loss(model, sequences):
    out = {}
    model.eval()
    for split in ['train', 'val']:
        losses = torch.zeros(eval_iters)
        for k in range(eval_iters):
            X, Y = get_batch(sequences, split)
            logits = model(X)
            loss = model.compute_loss(logits, Y)
            losses[k] = loss.item()
        out[split] = losses.mean()
    model.train()
    return out

## Model
We can now define the Transformer model that consists of layers of Self-Attention Heads and a Multi-Layer-Perceptron (Feed-Forward):

In [159]:
class Head(nn.Module):
    """
    A single self-attention head
    """
    def __init__(self, head_size):
        super().__init__()
        self.key = nn.Linear(n_embd, head_size, bias=False)
        self.query = nn.Linear(n_embd, head_size, bias=False)
        self.value = nn.Linear(n_embd, head_size, bias=False)
        self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        B,T,C = x.shape
        k = self.key(x)   
        q = self.query(x)  
        v = self.value(x)  
        
        att = q @ k.transpose(-2,-1) * C**-0.5
        att = att.masked_fill(self.tril[:T, :T] == 0, float('-inf'))
        att = F.softmax(att, dim=-1)
        att = self.dropout(att)
        
        out = att @ v
        return out

In [160]:
class MultiHeadAttention(nn.Module):
    """
    Multiple attention-heads in parallel
    """
    def __init__(self, num_heads, head_size):
        super().__init__()
        self.heads = nn.ModuleList([Head(head_size) for _ in range(num_heads)])
        self.proj = nn.Linear(n_embd, n_embd)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        out = torch.cat([h(x) for h in self.heads], dim=-1)
        out = self.dropout(self.proj(out))
        return out

In [161]:
class FeedForward(nn.Module):
    """
    A linear layer followed by a non-linear layer
    """
    def __init__(self, n_embd):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embd, 4 * n_embd),
            nn.ReLU(),
            nn.Linear(4 * n_embd, n_embd),
            nn.Dropout(dropout),
        )

    def forward(self, x):
        return self.net(x)

In [162]:
class Block(nn.Module):
    """
    Transformer block: communication followed by computation
    """
    
    def __init__(self, n_embd, n_head):
        super().__init__()
        head_size = n_embd // n_head
        self.sa = MultiHeadAttention(n_head, head_size)
        self.ffwd = FeedForward(n_embd)
        self.ln1 = nn.LayerNorm(n_embd)
        self.ln2 = nn.LayerNorm(n_embd)
        
    def forward(self, x):
        # add residual connections around both layers
        x = x + self.sa(self.ln1(x))
        x = x + self.ffwd(self.ln2(x))
        return x

In [163]:
class BinaryTransformer(nn.Module):
    """
    Our transformer model for binary sequence prediction. 
    This model implements a transformer architecture designed for binary sequences, using token and positional embeddings followed by transformer 
    blocks with self-attention mechanisms.
    """
    
    def __init__(self):
        super().__init__()
        # create embedding tables for tokens and positions
        self.token_embedding_table = nn.Embedding(vocabulary_size, n_embd)
        self.position_embedding_table = nn.Embedding(block_size, n_embd)
        
        # create transformer blocks
        self.blocks = nn.Sequential(*[Block(n_embd, n_head=n_head) 
                                    for _ in range(n_layer)])
        # final layer normalization and prediction head
        self.ln_f = nn.LayerNorm(n_embd)
        self.lm_head = nn.Linear(n_embd, vocabulary_size)

    def forward(self, idx, targets=None):
        B, T = idx.shape
        # get token and position embeddings
        tok_emb = self.token_embedding_table(idx)
        pos_emb = self.position_embedding_table(torch.arange(T, device=device))
        
        # combine embeddings and process through transformer blocks
        x = tok_emb + pos_emb
        x = self.blocks(x)
        x = self.ln_f(x)
        logits = self.lm_head(x)

        return logits  # return only the logits, no loss calculation here
    
    def compute_loss(self, logits, targets):
        """compute cross entropy loss"""
        B, T, C = logits.shape
        logits = logits.view(B*T, C)
        targets = targets.view(B*T)
        loss = F.cross_entropy(logits, targets, reduction='mean')
        return loss
    
    def generate(self, idx, max_new_tokens):
        """
        Generate new tokens based on a context sequence.
        """
        # idx is (B, T) array of indices in the current context
        for _ in range(max_new_tokens):
            # crop idx to the last block_size tokens
            idx_cond = idx[:, -block_size:]
            # get the predictions
            logits = self(idx_cond)
            # focus only on the last time step
            logits = logits[:, -1, :] # becomes (B, C)
            # apply softmax to get probabilities
            probs = F.softmax(logits, dim=-1)
            # sample from the distribution
            idx_next = torch.multinomial(probs, num_samples=1) # (B, 1)
            # append sampled index to the running sequence
            idx = torch.cat((idx, idx_next), dim=1) # (B, T+1)
        return idx

## Training
Now that the model is defined we can script the training. The models will be saved as .pt files while their parameters (order, number, Markov chain, training history, etc.) will be saved in a json file.

In [164]:
def train_model(order, number, num_iterations, seed, sequence_length, num_sequences):
    """
    Initializes and trains a Transformer model using the above methods
    """
    model = BinaryTransformer()
    m = model.to(device)
    print(sum(p.numel() for p in m.parameters())/1e6, 'M parameters') # size of the model

    optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=num_iterations)

    training_history = { # for easier interpretation later
        'train_losses': [],
        'val_losses': [],
        'iterations': []
    }

    transition_matrix = generate_transition_matrix(order, seed)
    print("Transition Matrix: \n", transition_matrix)
    data = generate_training_data(transition_matrix, order, sequence_length, num_sequences)

    for iter in range(num_iterations):

        if iter % eval_interval == 0 or iter == num_iterations-1:
            losses = estimate_loss(model, data)
            training_history['train_losses'].append(losses['train'])
            training_history['val_losses'].append(losses['val'])
            training_history['iterations'].append(iter)
            print(f"step {iter}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}")
        # get batch
        xb, yb = get_batch(data, 'train')
        
        # forward pass
        logits = model(xb)
        loss = model.compute_loss(logits, yb)
        
        # backward pass
        optimizer.zero_grad(set_to_none=True)
        loss.backward()
        optimizer.step()
        scheduler.step()

    # save the model info
    model_info = {
        'order': order,
        'number': number,
        'seed': seed,
        'num_iterations': num_iterations,
        'training_history': training_history,
        'transition_matrix': transition_matrix,
        'final_loss': loss.item()
    }

    # save checkpoint to access the model later
    torch.save({
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'scheduler_state_dict': scheduler.state_dict(),
        'loss': loss,
        'iter': iter,
        'model_info': model_info
    }, f'models2/model{order}_{number}.pt')

    return model_info

## Training the models
Now, the models can be trained. We will train models from Markov order 1 to 6 with 5 models per order (= 30 models in total) to ensure quantitative robustness of the experiments. Every model will be trained on an individual seed to ensure reproducability. The variable parameters for different orders (e.g. n_heads, n_layers) have been chosen experimentally.

In [334]:
my_models = [] # the models will be stored here for easier evaluation later

In [None]:
def tensor_to_serializable(obj):
    """
    Convert tensors and other objects to JSON serializable format. Tensors cannot be saved in json files.
    """
    if isinstance(obj, dict):
        return {key: tensor_to_serializable(value) for key, value in obj.items()}
    elif isinstance(obj, list):
        return [tensor_to_serializable(item) for item in obj]
    elif isinstance(obj, torch.Tensor):
        return obj.tolist()
    elif isinstance(obj, (int, float, str, bool)):
        return obj
    elif obj is None:
        return None
    else:
        return str(obj)

For order 1:

In [None]:
n_head = 1
n_layer = 2
n_embd = 16
sequence_length = 1000
num_sequences = 500
num_iterations = 5000
seeds = [87, 88, 89, 90, 91]
idx = 0
for number in range(1, 6): # 5 models per order
    try:
        print(f"Training model order=1 number={number}")
        model_info = train_model(1, number, num_iterations, seeds[idx], sequence_length, num_sequences)
        my_models.append(model_info)
        idx += 1
        
        if device.type == "mps":
            torch.mps.empty_cache() # cleanup (due to issues that might have been connected to cache overflow)
            
    except Exception as e:
        print(f"Failed training model number={number}: {e}")
        continue

Training model order=1 number=1
0.014754 M parameters
Transition Matrix: 
 tensor([[0.0040, 0.9960],
        [0.3166, 0.6834]])
step 0: train loss 0.7233, val loss 0.7238
step 100: train loss 0.4993, val loss 0.4998
step 200: train loss 0.4821, val loss 0.4826
step 300: train loss 0.4804, val loss 0.4805
step 400: train loss 0.4797, val loss 0.4803
step 500: train loss 0.4800, val loss 0.4803
step 600: train loss 0.4800, val loss 0.4805
step 700: train loss 0.4797, val loss 0.4800
step 800: train loss 0.4797, val loss 0.4802
step 900: train loss 0.4795, val loss 0.4800
step 1000: train loss 0.4798, val loss 0.4799
step 1100: train loss 0.4799, val loss 0.4800
step 1200: train loss 0.4799, val loss 0.4803
step 1300: train loss 0.4798, val loss 0.4801
step 1400: train loss 0.4800, val loss 0.4802
step 1500: train loss 0.4798, val loss 0.4801
step 1600: train loss 0.4796, val loss 0.4801
step 1700: train loss 0.4799, val loss 0.4801
step 1800: train loss 0.4798, val loss 0.4802
step 1900:

In [None]:
base_path = Path("models")
base_path.mkdir(exist_ok=True)
# convert and save
serializable_models = tensor_to_serializable(my_models)
filename = f"my_models_saved.json"
filepath = base_path / filename
with open(filepath, 'w') as f:
    json.dump(serializable_models, f, indent=2)

print(f"Saved models to {filepath}")

Saved models to models2/my_models_saved.json


Order 2:

In [322]:
n_head = 1
n_layer = 2
n_embd = 64
sequence_length = 1000
num_sequences = 500
num_iterations = 5000
seeds = [92, 93, 94, 95, 96]
idx = 0
for number in range(1, 6): # 5 models per order
    try:
        print(f"Training model order=2 number={number}")
        model_info = train_model(2, number, num_iterations, seeds[idx], sequence_length, num_sequences)
        my_models.append(model_info)
        idx += 1
        
        if device.type == "mps":
            torch.mps.empty_cache() # cleanup
            
    except Exception as e:
        print(f"Failed training model number={number}: {e}")
        continue

Training model order=2 number=1
0.132738 M parameters
Transition Matrix: 
 tensor([[0.4424, 0.5576],
        [0.8116, 0.1884],
        [0.9809, 0.0191],
        [0.9278, 0.0722]])
step 0: train loss 0.8664, val loss 0.8662
step 100: train loss 0.5956, val loss 0.5944
step 200: train loss 0.5951, val loss 0.5935
step 300: train loss 0.5922, val loss 0.5911
step 400: train loss 0.5844, val loss 0.5835
step 500: train loss 0.5749, val loss 0.5736
step 600: train loss 0.5645, val loss 0.5635
step 700: train loss 0.5528, val loss 0.5527
step 800: train loss 0.5458, val loss 0.5453
step 900: train loss 0.5406, val loss 0.5402
step 1000: train loss 0.5351, val loss 0.5342
step 1100: train loss 0.5310, val loss 0.5305
step 1200: train loss 0.5272, val loss 0.5268
step 1300: train loss 0.5240, val loss 0.5235
step 1400: train loss 0.5215, val loss 0.5204
step 1500: train loss 0.5177, val loss 0.5178
step 1600: train loss 0.5103, val loss 0.5103
step 1700: train loss 0.5054, val loss 0.5048
step

In [None]:
base_path = Path("models")
base_path.mkdir(exist_ok=True)
# convert and save
serializable_models = tensor_to_serializable(my_models)
filename = f"my_models_saved.json"
filepath = base_path / filename
with open(filepath, 'w') as f:
    json.dump(serializable_models, f, indent=2)

print(f"Saved models to {filepath}")

Saved models to models2/my_models_saved2.json


Order 3:

In [326]:
n_head = 1
n_layer = 3
n_embd = 64
sequence_length = 1000
num_sequences = 1000
num_iterations = 10000
seeds = [97, 98, 99, 100, 101]
idx = 0
for number in range(1, 6): # 5 models per order
    try:
        print(f"Training model order=3 number={number}")
        model_info = train_model(3, number, num_iterations, seeds[idx], sequence_length, num_sequences)
        my_models.append(model_info)
        idx += 1
        
        if device.type == "mps":
            torch.mps.empty_cache() # cleanup
            
    except Exception as e:
        print(f"Failed training model number={number}: {e}")
        continue

Training model order=3 number=1
0.18253 M parameters
Transition Matrix: 
 tensor([[0.4146, 0.5854],
        [0.2918, 0.7082],
        [0.3577, 0.6423],
        [0.5404, 0.4596],
        [0.8489, 0.1511],
        [0.5691, 0.4309],
        [0.5103, 0.4897],
        [0.6480, 0.3520]])
step 0: train loss 0.7743, val loss 0.7743
step 100: train loss 0.6929, val loss 0.6932
step 200: train loss 0.6928, val loss 0.6931
step 300: train loss 0.6924, val loss 0.6926
step 400: train loss 0.6924, val loss 0.6927
step 500: train loss 0.6921, val loss 0.6923
step 600: train loss 0.6923, val loss 0.6926
step 700: train loss 0.6914, val loss 0.6916
step 800: train loss 0.6909, val loss 0.6910
step 900: train loss 0.6896, val loss 0.6898
step 1000: train loss 0.6883, val loss 0.6885
step 1100: train loss 0.6860, val loss 0.6861
step 1200: train loss 0.6837, val loss 0.6838
step 1300: train loss 0.6801, val loss 0.6801
step 1400: train loss 0.6778, val loss 0.6774
step 1500: train loss 0.6755, val loss 

In [None]:
base_path = Path("models")
base_path.mkdir(exist_ok=True)
# convert and save
serializable_models = tensor_to_serializable(my_models)
filename = f"my_models_saved.json"
filepath = base_path / filename
with open(filepath, 'w') as f:
    json.dump(serializable_models, f, indent=2)

print(f"Saved models to {filepath}")

Saved models to models2/my_models_saved2.json


Order 4:

In [331]:
n_head = 1
n_layer = 3
n_embd = 64
sequence_length = 2000
num_sequences = 1000
num_iterations = 10000
seeds = [102, 103, 104, 105, 106]
idx = 0
for number in range(1, 6): # 5 models per order
    try:
        print(f"Training model order=4 number={number}")
        model_info = train_model(4, number, num_iterations, seeds[idx], sequence_length, num_sequences)
        my_models.append(model_info)
        idx += 1
        
        if device.type == "mps":
            torch.mps.empty_cache() # cleanup
            
    except Exception as e:
        print(f"Failed training model number={number}: {e}")
        continue

Training model order=4 number=1
0.18253 M parameters
Transition Matrix: 
 tensor([[0.0083, 0.9917],
        [0.1100, 0.8900],
        [0.5988, 0.4012],
        [0.1800, 0.8200],
        [0.5873, 0.4127],
        [0.2977, 0.7023],
        [0.4502, 0.5498],
        [0.8272, 0.1728],
        [0.9822, 0.0178],
        [0.7752, 0.2248],
        [0.4257, 0.5743],
        [0.8044, 0.1956],
        [0.7702, 0.2298],
        [0.5113, 0.4887],
        [0.7706, 0.2294],
        [0.9386, 0.0614]])
step 0: train loss 0.7542, val loss 0.7536
step 100: train loss 0.6822, val loss 0.6822
step 200: train loss 0.6776, val loss 0.6775
step 300: train loss 0.6702, val loss 0.6707
step 400: train loss 0.6567, val loss 0.6572
step 500: train loss 0.6356, val loss 0.6354
step 600: train loss 0.6052, val loss 0.6054
step 700: train loss 0.5736, val loss 0.5743
step 800: train loss 0.5432, val loss 0.5443
step 900: train loss 0.5260, val loss 0.5248
step 1000: train loss 0.5049, val loss 0.5045
step 1100: trai

In [None]:
base_path = Path("models")
base_path.mkdir(exist_ok=True)
# convert and save
serializable_models = tensor_to_serializable(my_models)
filename = f"my_models_saved.json"
filepath = base_path / filename
with open(filepath, 'w') as f:
    json.dump(serializable_models, f, indent=2)

print(f"Saved models to {filepath}")

Saved models to models2/my_models_saved2.json


Order 5:

In [336]:
n_head = 1
n_layer = 3
n_embd = 64
sequence_length = 2000
num_sequences = 2000
num_iterations = 15000
seeds = [107, 108, 109, 110, 111]
idx = 0
for number in range(1, 6): # 5 models per order
    try:
        print(f"Training model order=5 number={number}")
        model_info = train_model(5, number, num_iterations, seeds[idx], sequence_length, num_sequences)
        my_models.append(model_info)
        idx += 1
        
        if device.type == "mps":
            torch.mps.empty_cache() # cleanup
            
    except Exception as e:
        print(f"Failed training model number={number}: {e}")
        continue

Training model order=5 number=1
0.18253 M parameters
Transition Matrix: 
 tensor([[0.4786, 0.5214],
        [0.4535, 0.5465],
        [0.2829, 0.7171],
        [0.6684, 0.3316],
        [0.3589, 0.6411],
        [0.0818, 0.9182],
        [0.3591, 0.6409],
        [0.9840, 0.0160],
        [0.6488, 0.3512],
        [0.3760, 0.6240],
        [0.4755, 0.5245],
        [0.8907, 0.1093],
        [0.9958, 0.0042],
        [0.9909, 0.0091],
        [0.9526, 0.0474],
        [0.5505, 0.4495],
        [0.1753, 0.8247],
        [0.5390, 0.4610],
        [0.8194, 0.1806],
        [0.1969, 0.8031],
        [0.6243, 0.3757],
        [0.4714, 0.5286],
        [0.7115, 0.2885],
        [0.6734, 0.3266],
        [0.6348, 0.3652],
        [0.2588, 0.7412],
        [0.3997, 0.6003],
        [0.0488, 0.9512],
        [0.6354, 0.3646],
        [0.7367, 0.2633],
        [0.7870, 0.2130],
        [0.4704, 0.5296]])
step 0: train loss 0.7408, val loss 0.7417
step 100: train loss 0.6831, val loss 0.6831
step 

In [None]:
base_path = Path("models")
base_path.mkdir(exist_ok=True)
# convert and save
serializable_models = tensor_to_serializable(my_models)
filename = f"my_models_saved.json"
filepath = base_path / filename
with open(filepath, 'w') as f:
    json.dump(serializable_models, f, indent=2)

print(f"Saved models to {filepath}")

Saved models to models2/my_models_saved2.json


Order 6:

In [340]:
n_head = 1
n_layer = 4
n_embd = 64
sequence_length = 2000
num_sequences = 2000
num_iterations = 15000
seeds = [112, 113, 114, 115, 116] # 114, 116 length variieren
idx = 0
for number in range(1, 6): # 5 models per order
    try:
        print(f"Training model order=6 number={number}")
        model_info = train_model(6, number, num_iterations, seeds[idx], sequence_length, num_sequences)
        my_models.append(model_info)
        idx += 1
        
        if device.type == "mps":
            torch.mps.empty_cache() # cleanup
            
    except Exception as e:
        print(f"Failed training model number={number}: {e}")
        continue

Training model order=6 number=1
0.232322 M parameters
Transition Matrix: 
 tensor([[9.4984e-02, 9.0502e-01],
        [7.1621e-01, 2.8379e-01],
        [2.4665e-01, 7.5335e-01],
        [2.7498e-01, 7.2502e-01],
        [4.9996e-01, 5.0004e-01],
        [2.1048e-01, 7.8952e-01],
        [1.5258e-01, 8.4742e-01],
        [2.5055e-01, 7.4945e-01],
        [8.5798e-01, 1.4202e-01],
        [7.1078e-01, 2.8922e-01],
        [7.0453e-01, 2.9547e-01],
        [3.4466e-01, 6.5534e-01],
        [5.5391e-01, 4.4609e-01],
        [3.3408e-01, 6.6592e-01],
        [6.2882e-02, 9.3712e-01],
        [2.5419e-01, 7.4581e-01],
        [3.9826e-01, 6.0174e-01],
        [4.4364e-01, 5.5636e-01],
        [9.8209e-02, 9.0179e-01],
        [2.5315e-01, 7.4685e-01],
        [5.1722e-01, 4.8278e-01],
        [9.0810e-01, 9.1898e-02],
        [6.3782e-01, 3.6218e-01],
        [6.5414e-01, 3.4586e-01],
        [4.6645e-01, 5.3355e-01],
        [4.9828e-01, 5.0172e-01],
        [4.6347e-01, 5.3653e-01],
       

In [None]:
base_path = Path("models")
base_path.mkdir(exist_ok=True)
# convert and save
serializable_models = tensor_to_serializable(my_models)
filename = f"my_models_saved.json"
filepath = base_path / filename
with open(filepath, 'w') as f:
    json.dump(serializable_models, f, indent=2)

print(f"Saved models to {filepath}")

Saved models to models2/my_models_saved2.json


## Loss curves
We can plot the loss curves of the training process to get insights into:
1. If and when the curves converge.
2. Wether the models achieve a loss close to the optimal loss value.
3. How the chain's order and the complexity affect the loss.

In [None]:
def calculate_optimal_loss(transition_matrix, order):
    """
    Calculate the optimal (theoretical minimum) cross-entropy loss for a Markov chain of order k.
    """
    # Convert to numpy array if not already
    P_next = np.array(transition_matrix)
    n_states = 2**order
    
    # Create the state transition matrix
    P = np.zeros((n_states, n_states))
    
    # Fill the state transition matrix
    for state in range(n_states):
        for next_bit in [0, 1]:
            # Calculate next state by shifting left and adding new bit
            next_state = (state << 1) & (n_states - 1) | next_bit
            # Add transition probability
            P[state, next_state] = P_next[state, next_bit]
    
    # Find stationary distribution using power iteration
    pi = np.ones(n_states) / n_states  # Start with uniform distribution
    
    # Power iteration
    for _ in range(100):  # Usually converges much faster
        pi_next = pi @ P
        if np.allclose(pi, pi_next, rtol=1e-8):
            break
        pi = pi_next
    
    # Calculate entropy rate
    entropy_rate = 0.0
    eps = 1e-10  # Small constant to avoid log(0)
    
    for state in range(n_states):
        for next_bit in [0, 1]:
            p = P_next[state, next_bit]
            if p > eps:
                entropy_rate -= pi[state] * p * np.log(p)
    
    return float(entropy_rate)

In [None]:
def plot_loss_curves():
    """
    Plots the training history for every model relative to optimal loss
    """
    with open('models/my_models_saved.json', 'r') as f:
        models = json.load(f)
    
    fig, axes = plt.subplots(2, 3, figsize=(20, 12))
    axes = axes.flatten()
    
    for order in range(1, 7):
        order_models = [m for m in models if m['order'] == order]
        ax = axes[order-1]
        
        for model in order_models:
            # Convert transition matrix from list to numpy array
            transition_matrix = np.array(model['transition_matrix'])
            optimal_loss = calculate_optimal_loss(transition_matrix, model['order'])
            
            train_losses = np.array(model['training_history']['train_losses'])
            val_losses = np.array(model['training_history']['val_losses'])
            iterations = model['training_history']['iterations']
            
            # Plot absolute difference from optimal
            train_diff = train_losses - optimal_loss
            val_diff = val_losses - optimal_loss
            
            ax.plot(iterations, train_diff, 
                   label=f'Train (Chain={model["order"]}.{model["number"]})')
            ax.plot(iterations, val_diff, 
                   linestyle='--', label=f'Val (Chain={model["order"]}.{model["number"]})')
            
        ax.set_title(f'Order {order}')
        ax.set_xlabel('Iterations')
        ax.set_ylabel('Loss Difference from Optimal')
        ax.grid(True, alpha=0.3)
        ax.legend()
    
    plt.tight_layout()
    plt.savefig('plots/loss_curves.png', dpi=300, bbox_inches='tight')
    plt.close()

plot_loss_curves()

In [None]:
def plot_combined_loss_curves():
    """
    Plots the combined training history for every model relative to optimal loss,
    including variance as shaded regions.
    """
    with open('models/my_models_saved.json', 'r') as f:
        models = json.load(f)
    
    plt.figure(figsize=(10, 6))
    
    for order in range(1, 7):
        order_models = [m for m in models if m['order'] == order]
        
        # Initialize lists to store loss differences
        all_train_diff = []
        all_val_diff = []
        iterations = None
        
        for model in order_models:
            # Convert transition matrix from list to numpy array
            transition_matrix = np.array(model['transition_matrix'])
            optimal_loss = calculate_optimal_loss(transition_matrix, model['order'])
            
            train_losses = np.array(model['training_history']['train_losses'])
            val_losses = np.array(model['training_history']['val_losses'])
            iterations = model['training_history']['iterations']
            
            # Calculate absolute difference from optimal
            train_diff = train_losses - optimal_loss
            val_diff = val_losses - optimal_loss
            
            all_train_diff.append(train_diff)
            all_val_diff.append(val_diff)
        
        # Convert to numpy arrays for easier computation
        all_train_diff = np.array(all_train_diff)
        all_val_diff = np.array(all_val_diff)
        
        # Calculate mean and standard deviation
        mean_train_diff = np.mean(all_train_diff, axis=0)
        std_train_diff = np.std(all_train_diff, axis=0)
        
        mean_val_diff = np.mean(all_val_diff, axis=0)
        std_val_diff = np.std(all_val_diff, axis=0)
        
        # Plot mean with shaded variance
        plt.plot(iterations, mean_train_diff, label=f'order = {order}')
        plt.fill_between(iterations, 
                         mean_train_diff - std_train_diff, 
                         mean_train_diff + std_train_diff, 
                         alpha=0.2)
    
    plt.title('Test Loss Gap from the Optimal')
    plt.xlabel('Iteration')
    plt.ylabel('Test Loss Gap from the Optimal')
    plt.grid(True, alpha=0.3)
    plt.legend()
    
    plt.savefig('plots/combined_loss_curves', dpi=300, bbox_inches='tight')
    plt.close()

plot_combined_loss_curves()

## Convergence
We can identify the average points of training convergence for each order:

In [None]:
def calculate_convergence_point(loss_history, threshold=0.01, window_size=10):
    """
    Calculate the point of convergence for a loss curve
    """
    final_loss = np.mean(loss_history[-window_size:])  # Average of last window
    
    for i in range(len(loss_history) - window_size):
        window_mean = np.mean(loss_history[i:i+window_size])
        relative_change = abs(window_mean - final_loss) / final_loss
        
        if relative_change < threshold:
            return i * 100  # multiply by 100 since eval_iter is 100
    
    return len(loss_history) * 100

def analyze_order_convergence(all_loss_histories):
    """
    Analyze convergence for all chains of a specific order
    """
    convergence_points = []
    final_losses = []
    
    for chain_losses in all_loss_histories:
        conv_point = calculate_convergence_point(chain_losses)
        final_loss = np.mean(chain_losses[-10:])  # Average of last 10 points
        
        convergence_points.append(conv_point)
        final_losses.append(final_loss)
    
    results = {
        'convergence_mean': np.mean(convergence_points),
        'convergence_std': np.std(convergence_points),
        'final_loss_mean': np.mean(final_losses),
        'final_loss_std': np.std(final_losses)
    }
    
    return results

with open('models/my_models_saved.json', 'r') as m:
    models = json.load(m)

for order in range(1, 7): 
    histories = []  
    for model in models:
        if model['order'] == order:
            histories.append(model['training_history']['train_losses'])
    print(analyze_order_convergence(histories))

{'convergence_mean': 80.0, 'convergence_std': 40.0, 'final_loss_mean': 0.6032889360189437, 'final_loss_std': 0.06878144836785413}
{'convergence_mean': 1320.0, 'convergence_std': 1066.5833300778706, 'final_loss_mean': 0.4284674080833793, 'final_loss_std': 0.21281476684534775}
{'convergence_mean': 2760.0, 'convergence_std': 826.0750571225353, 'final_loss_mean': 0.5204345923662186, 'final_loss_std': 0.09792156646814257}
{'convergence_mean': 3320.0, 'convergence_std': 832.8265125462806, 'final_loss_mean': 0.5500198811292648, 'final_loss_std': 0.0726387712344261}
{'convergence_mean': 7320.0, 'convergence_std': 1815.9295140505867, 'final_loss_mean': 0.5719257605075837, 'final_loss_std': 0.04497018930392192}
{'convergence_mean': 7120.0, 'convergence_std': 3642.7462167985295, 'final_loss_mean': 0.628945450782776, 'final_loss_std': 0.041451449633561385}


## Testing
After the models are trained and stored the testing can begin. It has been observed that the context has only an insignificant importance here since the models are trained on sequences of the same Markov chain. The context will have the length 50 for every experiment.

In [165]:
device ="cpu"

In [170]:
def model_output(model_info, max_new_tokens, seed):
    """
    Takes a model and generates output based on a context
    """
    order = model_info['order']
    number = model_info['number']
    
    transition_matrix = torch.tensor(model_info['transition_matrix'])
    
    # Initialize model with same parameters as training
    model = BinaryTransformer().to(device)
    model.eval()
    
    # Load checkpoint
    checkpoint = torch.load(f'models/model{order}_{number}.pt')
    model.load_state_dict(checkpoint['model_state_dict'])

    # Generate context
    context = torch.tensor(generate_sequence(transition_matrix, order, 50, seed))
    context = context.unsqueeze(0).to(device)  # Add batch dimension
    
    # Generate prediction
    with torch.no_grad():
        output = model.generate(context, max_new_tokens)
        
    return output.tolist()[0][50:]

## Metrics
The chosen metrics for this approach are the Kullback-Leibler Divergence (kld) and the Mean Absolute Error (mae). 

In [171]:
def kl_divergence(chain1, chain2, epsilon=1e-10):
    """
    Calculate the KL-Divergence from two markov chains, ignoring undefined states in chain2
    """
    # identify defined states in chain2 (sum of probabilities > 0)
    defined_states = torch.sum(chain2, dim=1) > 0
    
    # extract only the defined states and their corresponding states in chain1
    p = chain1[defined_states]
    q = chain2[defined_states]
    
    # Add epsilon to p and q to avoid log(0)
    p = p + epsilon
    q = q + epsilon
    
    # normalize both distributions
    p = p / p.sum(dim=1, keepdim=True)
    q = q / q.sum(dim=1, keepdim=True)
    
    # calculate the KL-Divergence only for defined states
    kl_div = torch.sum(p * torch.log(p / q))
    
    return kl_div.item()


def mean_error(chain1, chain2):
    """
    Calculate the mean absolute error of two Markov chains, ignoring undefined states in chain2
    """
    # identify defined states in chain2 (sum of probabilities > 0)
    defined_states = torch.sum(chain2, dim=1) > 0
    
    # extract only the defined states and their corresponding states in chain1
    p = chain1[defined_states]
    q = chain2[defined_states]
    
    # Calculate the absolute differences for defined states
    abs_diff = torch.abs(p - q)
        
    # Calculate the mean error across all defined states
    mean_error = torch.mean(abs_diff).item()
    
    return mean_error

In [None]:
with open('models/my_models_saved.json', 'r') as m:
        models = json.load(m)

# let's test the model outputs one these lengths
output_lengths = [100, 200, 300, 400, 500, 600, 700, 800, 900, 1000, 2000, 3000, 4000, 5000]
seeds = [111,112,113,114,115,116,117,118,119,120] # every model output_length is tested 100 times on different contexts
results = []
n_head = 1
n_layer = 2
n_embd = 16
for model in models:
        order = model['order']
        number = model['number']
        if order == 2:
               n_embd = 64
        if order == 3:
               n_layer = 3
        if order == 6:
               n_layer = 4
        print(f"Testing model{order}_{number}")
        transition_matrix = torch.tensor(model['transition_matrix'])
        for output_length in output_lengths:
                print(f"Testing output length {output_length}")
                for seed in seeds:
                        output = model_output(model, output_length, seed) # generate output_length new tokens
                        estimated_matrix = estimate_matrix(output, order) # estimate a Markov chain for the output
                        kld = kl_divergence(transition_matrix, estimated_matrix)
                        mae = mean_error(transition_matrix, estimated_matrix)
                        results.append({'order': order, 'number': number, 'output_length': output_length, 'seed':seed, 'kld': kld, 'mae': mae})

base_path = Path("results")
base_path.mkdir(exist_ok=True)

filename = f"model_results.json"
filepath = base_path / filename
with open(filepath, 'w') as f:
    json.dump(results, f, indent=2)
print(f"Saved results to {filepath}")

Testing model1_1
Testing output length 100
Testing output length 200
Testing output length 300
Testing output length 400
Testing output length 500
Testing output length 600
Testing output length 700
Testing output length 800
Testing output length 900
Testing output length 1000
Testing output length 2000
Testing output length 3000
Testing output length 4000
Testing output length 5000
Testing model1_2
Testing output length 100
Testing output length 200
Testing output length 300
Testing output length 400
Testing output length 500
Testing output length 600
Testing output length 700
Testing output length 800
Testing output length 900
Testing output length 1000
Testing output length 2000
Testing output length 3000
Testing output length 4000
Testing output length 5000
Testing model1_3
Testing output length 100
Testing output length 200
Testing output length 300
Testing output length 400
Testing output length 500
Testing output length 600
Testing output length 700
Testing output length 800
Tes

## Random Process
To evaluate how precise the Transformer model is in predicting next tokens in a Markov sequence we can compare it to a random process.

In [None]:
with open('models/my_models_saved.json', 'r') as m:
        models = json.load(m)

# let's look what the scores for a completely random process (balanced chain) look like
results = []
output_lengths = [100, 200, 300, 400, 500, 600, 700, 800, 900, 1000, 2000, 3000, 4000, 5000]
seeds = [111, 112, 113, 114, 115, 116, 117, 118, 119, 120] # the sequences are generated across 10 seeds
for model in models:
        order = model['order']
        number = model['number']
        transition_matrix = torch.tensor(model['transition_matrix'])
        print(f"Testing chain{order}_{number}")
        for output_length in output_lengths:
               for seed in seeds:
                np.random.seed(seed)
                rand_sequence = np.random.randint(0, 2, output_length) # generates a random array of 0s and 1s with length output_length
                estimated_matrix = estimate_matrix(rand_sequence, order)
                kld = kl_divergence(transition_matrix, estimated_matrix)
                mae = mean_error(transition_matrix, estimated_matrix)
                results.append({'order': order, 'number': number, 'output_length': output_length, 'seed': seed, 'kld': kld, 'mae': mae})

base_path = Path("results")
base_path.mkdir(exist_ok=True)

filename = f"random_results.json"
filepath = base_path / filename
with open(filepath, 'w') as f:
    json.dump(results, f, indent=2)

print(f"Saved models to {filepath}")

Testing chain1_1
Testing chain1_2
Testing chain1_3
Testing chain1_4
Testing chain1_5
Testing chain2_1
Testing chain2_2
Testing chain2_3
Testing chain2_4
Testing chain2_5
Testing chain3_1
Testing chain3_2
Testing chain3_3
Testing chain3_4
Testing chain3_5
Testing chain4_1
Testing chain4_2
Testing chain4_3
Testing chain4_4
Testing chain4_5
Testing chain5_1
Testing chain5_2
Testing chain5_3
Testing chain5_4
Testing chain5_5
Testing chain6_1
Testing chain6_2
Testing chain6_3
Testing chain6_4
Testing chain6_5
Saved models to results/random_results.json


## Lower Bound Score
As a second comparison, we can generate a sequence for every output_length that minimizes the KL-Divergence. The sequence is chosen as one of 100 sequences (each generated with the transition probabilities of the underlying Markov chain). It serves as an orientation to evaluate the model's precision.

In [180]:
def search_sequence(transition_matrix, k, m, num_candidates=500):
    """
    Generate a near-optimal sequence of length m for a kth-order Markov chain. Sampled from num_candidates sequences.
    """

    min_kl_div = float('inf')
    best_sequence = None

    for i in range(num_candidates):
        sequence = generate_sequence(transition_matrix, k, m, i)
        seq_matrix = estimate_matrix(sequence, k)
        kl_div = kl_divergence(transition_matrix, seq_matrix)
        if kl_div < min_kl_div:
            min_kl_div = kl_div
            best_sequence = sequence
            
    return best_sequence

In [None]:
with open('models/my_models_saved.json', 'r') as m:
        models = json.load(m)
        
output_lengths = [100, 200, 300, 400, 500, 600, 700, 800, 900, 1000, 2000, 3000, 4000, 5000]
results = []

for model in models:
    order = model['order']
    number = model['number']
    print(f"Sampling for chain{order}_{number}")
    transition_matrix = torch.tensor(model['transition_matrix'])
    for output_length in output_lengths:
        best = search_sequence(transition_matrix, order, output_length)
        estimated_matrix = estimate_matrix(best, order) # estimate a Markov chain for the best sequence
        kld = kl_divergence(transition_matrix, estimated_matrix)
        mae = mean_error(transition_matrix, estimated_matrix)
        results.append({'order': order, 'number': number, 'output_length': output_length, 'kld': kld, 'mae': mae})

base_path = Path("results")
base_path.mkdir(exist_ok=True)

filename = f"lower_bound.json"
filepath = base_path / filename
with open(filepath, 'w') as f:
    json.dump(results, f, indent=2)
print(f"Saved results to {filepath}")

## Evaluate Results
The results are now evaluated and tested on significance by the use of confidence intervals.

In [183]:
from scipy import stats

def analyze_test_results(results_path):
    with open(results_path, 'r') as f:
        results = json.load(f)
    
    analysis_results = []
    for order in range(1, 7):
        order_results = [r for r in results if r['order'] == order]
        
        # get unique seeds
        seeds = list(set(r['seed'] for r in order_results))
        
        # calculate statistics across seeds
        kld_values = [r['kld'] for r in order_results]
        mae_values = [r['mae'] for r in order_results]
        
        # calculate mean, std, and confidence intervals
        kld_mean = np.mean(kld_values)
        kld_std = np.std(kld_values)
        mae_mean = np.mean(mae_values)
        mae_std = np.std(mae_values)
        
        # 95% confidence intervals
        kld_ci = tuple(round(x, 3) for x in stats.t.interval(0.95, len(kld_values)-1, 
                                                    loc=kld_mean, 
                                                    scale=stats.sem(kld_values)))
        mae_ci = tuple(round(x, 3) for x in stats.t.interval(0.95, len(mae_values)-1, 
                                                    loc=mae_mean, 
                                                    scale=stats.sem(mae_values)))
        
        analysis_results.append({'order': order, 'kld_mean': kld_mean, 'kld_std': kld_std, 'kld_ci':kld_ci, 'mae_mean': mae_mean, 'mae_std': mae_std,
                                  'mae_ci':mae_ci, 'n_samples': len(kld_values)})
        
        print()
        print(order)
        print(f"kld_mean': {round(kld_mean, 3)}")
        print(f"kld_std': {round(kld_std, 3)}")
        print(f"kld_ci': {kld_ci}")
        print(f"mae_mean': {round(mae_mean, 3)}")
        print(f"mae_std': {round(mae_std, 3)}")
        print(f"mae_ci': {mae_ci}")
        print(f"n_samples': {len(kld_values)}")
    
    return analysis_results

analysis = analyze_test_results('results/model_results.json')

base_path = Path("results")
base_path.mkdir(exist_ok=True)

filename = f"results_analysis.json"
filepath = base_path / filename
with open(filepath, 'w') as f:
    json.dump(analysis, f, indent=2)
print(f"Saved results to {filepath}")



1
kld_mean': 0.01
kld_std': 0.021
kld_ci': (0.009, 0.012)
mae_mean': 0.019
mae_std': 0.018
mae_ci': (0.018, 0.021)
n_samples': 700

2
kld_mean': 1.322
kld_std': 3.582
kld_ci': (1.056, 1.588)
mae_mean': 0.09
mae_std': 0.056
mae_ci': (0.086, 0.094)
n_samples': 700

3
kld_mean': 0.612
kld_std': 2.249
kld_ci': (0.445, 0.779)
mae_mean': 0.064
mae_std': 0.043
mae_ci': (0.061, 0.068)
n_samples': 700

4
kld_mean': 3.494
kld_std': 8.25
kld_ci': (2.882, 4.107)
mae_mean': 0.077
mae_std': 0.045
mae_ci': (0.074, 0.08)
n_samples': 700

5
kld_mean': 19.213
kld_std': 26.727
kld_ci': (17.228, 21.198)
mae_mean': 0.143
mae_std': 0.046
mae_ci': (0.139, 0.146)
n_samples': 700

6
kld_mean': 65.063
kld_std': 89.822
kld_ci': (58.392, 71.733)
mae_mean': 0.188
mae_std': 0.077
mae_ci': (0.182, 0.194)
n_samples': 700
Saved results to results/results_analysis.json


## Plot Results
Now that all results are calculated and stored, these results can be plotted. We can plot a heatmap and a line chart to visualize the output length vs. the metric accuracy for the model outputs and the lower bound sequences. Furthermore, a comparison of the two - including the random process as orientation - visualizes how good the Transformer models work as compressors.
First, let's plot the model results:

In [None]:
def plot_model_results(results_file):
    
    with open(results_file, 'r') as f:
        results = json.load(f)
    
    plt.style.use('classic')
    sns.set_palette("husl")
    
    # Create figure with two subplots
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
    
    # Extract unique values
    orders = sorted(list(set(r['order'] for r in results)))
    lengths = sorted(list(set(r['output_length'] for r in results)))
    numbers = sorted(list(set(r['number'] for r in results)))
    
    # Prepare data structures for plotting
    metrics = {
        'kld': {'name': 'KL Divergence', 'data': {}},
        'mae': {'name': 'Mean Absolute Error', 'data': {}}
    }
    
    for order in orders:
        metrics['kld']['data'][order] = {'means': [], 'stds': []}
        metrics['mae']['data'][order] = {'means': [], 'stds': []}
        
        for length in lengths:
            kld_averages = []
            mae_averages = []
            
            # First average across seeds for each number
            for number in numbers:
                # Get results for this order, length, and number (across all seeds)
                values = [r for r in results if r['order'] == order and 
                         r['output_length'] == length and 
                         r['number'] == number]
                
                # Calculate average across seeds for this combination
                if values:
                    kld_averages.append(np.mean([v['kld'] for v in values]))
                    mae_averages.append(np.mean([v['mae'] for v in values]))
            
            # Calculate final statistics across numbers (after seed averaging)
            metrics['kld']['data'][order]['means'].append(np.mean(kld_averages))
            metrics['kld']['data'][order]['stds'].append(np.std(kld_averages))
            metrics['mae']['data'][order]['means'].append(np.mean(mae_averages))
            metrics['mae']['data'][order]['stds'].append(np.std(mae_averages))
    
    # KL-Divergence
    for order in orders:
        ax1.errorbar(lengths, 
                    metrics['kld']['data'][order]['means'],
                    yerr=metrics['kld']['data'][order]['stds'],
                    label=f'Order {order}',
                    marker='o',
                    capsize=3,
                    markersize=4)
    
    ax1.set_xlabel('Output Length')
    ax1.set_ylabel('KL Divergence')
    ax1.set_title('KL Divergence vs Output Length\n(Averaged over seeds)')
    ax1.set_xscale('log')
    #ax1.set_yscale('log')
    ax1.grid(True, which="both", ls="-", alpha=0.2)
    ax1.legend()
    
    # MAE
    for order in orders:
        ax2.errorbar(lengths,
                    metrics['mae']['data'][order]['means'],
                    yerr=metrics['mae']['data'][order]['stds'],
                    label=f'Order {order}',
                    marker='o',
                    capsize=3,
                    markersize=4)
    
    ax2.set_xlabel('Output Length')
    ax2.set_ylabel('Mean Absolute Error')
    ax2.set_title('MAE vs Output Length\n(Averaged over seeds)')
    ax2.set_xscale('log')
    ax2.grid(True, which="both", ls="-", alpha=0.2)
    ax2.legend()
        
    plt.tight_layout()
    plt.savefig('plots/model_line_chart.png', dpi=300, bbox_inches='tight')
    plt.close()

    # Create heatmap visualization with seed-averaged data
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))
    
    heatmap_data = {
        'kld': np.zeros((len(orders), len(lengths))),
        'mae': np.zeros((len(orders), len(lengths)))
    }
    
    for i, order in enumerate(orders):
        for j, length in enumerate(lengths):
            kld_averages = []
            mae_averages = []
            
            for number in numbers:
                values = [r for r in results if r['order'] == order and 
                         r['output_length'] == length and 
                         r['number'] == number]
                if values:
                    kld_averages.append(np.mean([v['kld'] for v in values]))
                    mae_averages.append(np.mean([v['mae'] for v in values]))
            
            heatmap_data['kld'][i,j] = np.mean(kld_averages)
            heatmap_data['mae'][i,j] = np.mean(mae_averages)
    
    sns.heatmap(heatmap_data['kld'], ax=ax1,
                xticklabels=lengths,
                yticklabels=orders,
                cmap='viridis',
                norm='log')
    ax1.set_title('KL Divergence\n(Averaged over seeds)')
    ax1.set_xlabel('Output Length')
    ax1.set_ylabel('Markov Order')
    
    sns.heatmap(heatmap_data['mae'], ax=ax2,
                xticklabels=lengths,
                yticklabels=orders,
                cmap='viridis')
    ax2.set_title('Mean Absolute Error\n(Averaged over seeds)')
    ax2.set_xlabel('Output Length')
    ax2.set_ylabel('Markov Order')
    
    plt.tight_layout()
    plt.savefig('plots/model_heatmap.png', dpi=300, bbox_inches='tight')
    plt.close()

plot_model_results("results/model_results.json", )

The same is done for the lower bound results:

In [None]:
def plot_best_results(results_file):
    """
    Create plots showing how metrics vary with output length for different orders
    """
    with open(results_file, 'r') as f:
        results = json.load(f)
    
    plt.style.use('classic')
    sns.set_palette("husl")
    
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
    
    orders = sorted(list(set(r['order'] for r in results)))
    lengths = sorted(list(set(r['output_length'] for r in results)))
    
    metrics = {
        'kld': {'name': 'KL Divergence', 'data': {}},
        'mae': {'name': 'Mean Absolute Error', 'data': {}}
    }
    
    # Process data for each order
    for order in orders:
        metrics['kld']['data'][order] = {'means': [], 'stds': []}
        metrics['mae']['data'][order] = {'means': [], 'stds': []}
        
        for length in lengths:
            # Get results for this order and length
            values = [r for r in results if r['order'] == order and r['output_length'] == length]
            
            # Calculate statistics
            for metric in ['kld', 'mae']:
                values_metric = [v[metric] for v in values]
                metrics[metric]['data'][order]['means'].append(np.mean(values_metric))
                metrics[metric]['data'][order]['stds'].append(np.std(values_metric))
    
    # KL-Divergence
    for order in orders:
        ax1.errorbar(lengths, 
                    metrics['kld']['data'][order]['means'],
                    yerr=metrics['kld']['data'][order]['stds'],
                    label=f'Order {order}',
                    marker='o',
                    capsize=3,
                    markersize=4)
    
    ax1.set_xlabel('Output Length')
    ax1.set_ylabel('KL Divergence')
    ax1.set_title('KL Divergence vs Output Length')
    ax1.set_xscale('log')
    #ax1.set_yscale('log')
    ax1.grid(True, which="both", ls="-", alpha=0.2)
    ax1.legend()

    
    # MAE
    for order in orders:
        ax2.errorbar(lengths,
                    metrics['mae']['data'][order]['means'],
                    yerr=metrics['mae']['data'][order]['stds'],
                    label=f'Order {order}',
                    marker='o',
                    capsize=3,
                    markersize=4)
    
    ax2.set_xlabel('Output Length')
    ax2.set_ylabel('Mean Absolute Error')
    ax2.set_title('MAE vs Output Length')
    ax2.set_xscale('log')
    ax2.grid(True, which="both", ls="-", alpha=0.2)
    ax2.legend()
        
    plt.tight_layout()
    plt.savefig('plots/lower_bound_line_chart.png', dpi=300, bbox_inches='tight')
    plt.close()

    # Create heatmap visualization
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))
    
    heatmap_data = {
        'kld': np.zeros((len(orders), len(lengths))),
        'mae': np.zeros((len(orders), len(lengths)))
    }
    
    for i, order in enumerate(orders):
        for j, length in enumerate(lengths):
            values = [r for r in results if r['order'] == order and r['output_length'] == length]
            heatmap_data['kld'][i,j] = np.mean([v['kld'] for v in values])
            heatmap_data['mae'][i,j] = np.mean([v['mae'] for v in values])
    
    # Plot heatmaps
    sns.heatmap(heatmap_data['kld'], ax=ax1,
                xticklabels=lengths,
                yticklabels=orders,
                cmap='viridis',
                norm='log')
    ax1.set_title('KL Divergence')
    ax1.set_xlabel('Output Length')
    ax1.set_ylabel('Markov Order')
    
    sns.heatmap(heatmap_data['mae'], ax=ax2,
                xticklabels=lengths,
                yticklabels=orders,
                cmap='viridis')
    ax2.set_title('Mean Absolute Error')
    ax2.set_xlabel('Output Length')
    ax2.set_ylabel('Markov Order')
    
    plt.tight_layout()
    plt.savefig('plots/lower_bound_heatmap.png', dpi=300, bbox_inches='tight')
    plt.close()

plot_best_results("results/lower_bound.json")

Now, we create a comparison plot on output length vs metric score for both metrics to compare how the models perform (across all orders) vs the ideal results. We include the random results as an orientation.

In [None]:
def create_comparison_plots(model_path, best_path, random_path):
    with open(model_path, 'r') as f:
        model_results = json.load(f)
    with open(best_path, 'r') as f:
        ideal_results = json.load(f)
    with open(random_path, 'r') as f:
        random_results = json.load(f)
    
    # Extract unique output lengths
    lengths = sorted(list(set(r['output_length'] for r in ideal_results)))
    
    ideal_metrics = {'kld': [], 'mae': []}
    model_metrics = {'kld': [], 'mae': []}
    random_metrics = {'kld': [], 'mae': []}
    
    for length in lengths:
        ideal_length = [r for r in ideal_results if r['output_length'] == length]
        model_length = [r for r in model_results if r['output_length'] == length]
        random_length = [r for r in random_results if r['output_length'] == length]
        
        ideal_metrics['kld'].append(np.mean([r['kld'] for r in ideal_length]))
        ideal_metrics['mae'].append(np.mean([r['mae'] for r in ideal_length]))
        model_metrics['kld'].append(np.mean([r['kld'] for r in model_length]))
        model_metrics['mae'].append(np.mean([r['mae'] for r in model_length]))
        random_metrics['kld'].append(np.mean([r['kld'] for r in random_length]))
        random_metrics['mae'].append(np.mean([r['mae'] for r in random_length]))
    
    # Create figure with two subplots
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
    
    width = 0.35
    x = np.arange(len(lengths))
    
    # KLD
    ax1.bar(x - width/2, ideal_metrics['kld'], width, label='Lower Bound', color='skyblue')
    ax1.bar(x + width/2, model_metrics['kld'], width, label='Transformer', color='lightcoral')
    ax1.plot(x, random_metrics['kld'], color='green', linestyle='--', label='Random')
    ax1.set_ylabel('KL Divergence')
    ax1.set_title('Average KL Divergence by Output Length')
    ax1.set_xticks(x)
    ax1.set_xticklabels(lengths, rotation=45)
    ax1.legend()
    ax1.grid(True, alpha=0.2)
    
    # MAE
    ax2.bar(x - width/2, ideal_metrics['mae'], width, label='Lower Bound', color='skyblue')
    ax2.bar(x + width/2, model_metrics['mae'], width, label='Transformer', color='lightcoral')
    ax2.plot(x, random_metrics['mae'], color='green', linestyle='--', label='Random')
    ax2.set_ylabel('Mean Absolute Error')
    ax2.set_title('Average MAE by Output Length')
    ax2.set_xticks(x)
    ax2.set_xticklabels(lengths, rotation=45)
    ax2.legend()
    ax2.grid(True, alpha=0.2)
    
    plt.tight_layout()
    plt.savefig('plots/comparison_total.png', dpi=300, bbox_inches='tight')
    plt.close()

create_comparison_plots('results/model_results.json', 'results/lower_bound.json', 'results/random_results.json')

Finally, we can compare how these results appear under the different Markov orders:

In [None]:
def create_comparison_by_order(model_path, ideal_path, random_path):
    with open(model_path, 'r') as f:
        model_results = json.load(f)
    with open(ideal_path, 'r') as f:
        ideal_results = json.load(f)
    with open(random_path, 'r') as f:
        random_results = json.load(f)
    
    for order in range(1, 7):
        # Create figure with two subplots
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
        
        # Filter results for this order
        order_ideal = [r for r in ideal_results if r['order'] == order]
        order_model = [r for r in model_results if r['order'] == order]
        order_random = [r for r in random_results if r['order'] == order]
        
        lengths = sorted(list(set(r['output_length'] for r in order_ideal)))
        
        ideal_metrics = {'kld': [], 'mae': []}
        model_metrics = {'kld': [], 'mae': []}
        random_metrics = {'kld': [], 'mae': []}
        
        for length in lengths:
            ideal_length = [r for r in order_ideal if r['output_length'] == length]
            model_length = [r for r in order_model if r['output_length'] == length]
            random_length = [r for r in order_random if r['output_length'] == length]
            
            ideal_metrics['kld'].append(np.mean([r['kld'] for r in ideal_length]))
            ideal_metrics['mae'].append(np.mean([r['mae'] for r in ideal_length]))
            model_metrics['kld'].append(np.mean([r['kld'] for r in model_length]))
            model_metrics['mae'].append(np.mean([r['mae'] for r in model_length]))
            random_metrics['kld'].append(np.mean([r['kld'] for r in random_length]))
            random_metrics['mae'].append(np.mean([r['mae'] for r in random_length]))
        
        width = 0.35
        x = np.arange(len(lengths))
        
        # Plot KLD (left plot)
        ax1.bar(x - width/2, ideal_metrics['kld'], width, label='Lower Bound', color='skyblue')
        ax1.bar(x + width/2, model_metrics['kld'], width, label='Transformer', color='lightcoral')
        ax1.plot(x, random_metrics['kld'], color='green', linestyle='--', label='Random')
        ax1.set_title(f'Order {order} - KL Divergence')
        ax1.set_xticks(x)
        ax1.set_xticklabels(lengths, rotation=45)
        ax1.set_xlabel('Output Length')
        ax1.set_ylabel('KL Divergence')
        ax1.legend()
        ax1.grid(True, alpha=0.2)
        
        # Plot MAE (right plot)
        ax2.bar(x - width/2, ideal_metrics['mae'], width, label='Lower Bound', color='skyblue')
        ax2.bar(x + width/2, model_metrics['mae'], width, label='Transformer', color='lightcoral')
        ax2.plot(x, random_metrics['mae'], color='green', linestyle='--', label='Random')
        ax2.set_title(f'Order {order} - Mean Absolute Error')
        ax2.set_xticks(x)
        ax2.set_xticklabels(lengths, rotation=45)
        ax2.set_xlabel('Output Length')
        ax2.set_ylabel('Mean Absolute Error')
        ax2.legend()
        ax2.grid(True, alpha=0.2)
        
        plt.tight_layout()
        plt.savefig(f'plots/order_{order}_comparison.png', dpi=300, bbox_inches='tight')
        plt.close()


create_comparison_by_order('results/model_results.json', 'results/lower_bound.json', 'results/random_results.json')

We can also plot the error distribution across Markov orders:

In [None]:
plt.style.use('classic')
sns.set_palette("husl")

orders = range(1, 7)
kld_means = [0.001, 0.894, 0.154, 0.203, 3.068, 5.158]
kld_stds = [0.001, 2.088, 0.286, 0.302, 2.655, 3.632]
mae_means = [0.006, 0.128, 0.056, 0.045, 0.132, 0.043]
mae_stds = [0.004, 0.055, 0.015, 0.039, 0.043, 0.071]

fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))

# KLD
ax1.errorbar(orders, kld_means, yerr=kld_stds, 
            fmt='o-', capsize=3, capthick=1, 
            color='skyblue', label='KLD',
            markersize=6, linewidth=1.5)
ax1.set_xlabel('Markov Order')
ax1.set_ylabel('KLD')
ax1.grid(True, linestyle='-', alpha=0.7)
ax1.set_title('Kullback-Leibler Divergence')

# MAE
ax2.errorbar(orders, mae_means, yerr=mae_stds, 
            fmt='s-', capsize=3, capthick=1,
            color='lightcoral', label='MAE',
            markersize=6, linewidth=1.5)
ax2.set_xlabel('Markov Order')
ax2.set_ylabel('MAE')
ax2.grid(True, linestyle='-', alpha=0.7)
ax2.set_title('Mean Absolute Error')

ax1.legend()
ax2.legend()

plt.tight_layout()

plt.savefig('plots/error_distribution.png', dpi=300, bbox_inches='tight')
plt.close()