In [None]:
!pip install x-transformers lightning datasets transformers tensorboard tokenizers sentencepiece

In [None]:
import torch
from torch import nn
from torch.nn import functional as F
from torch.utils.data import DataLoader
from torch.utils.data import random_split
import lightning as L
from torch.utils.data import DataLoader, Dataset
from x_transformers import TransformerWrapper, Decoder
from x_transformers.autoregressive_wrapper import AutoregressiveWrapper
import numpy as np
import random
import argparse
import os
import shutil
from datasets import load_dataset, interleave_datasets
from transformers import DataCollatorForLanguageModeling,AutoTokenizer

from lightning.pytorch.callbacks import LearningRateMonitor
from lightning.pytorch.loggers import TensorBoardLogger



In [None]:
!head -n 100 data/goethe.txt

In [None]:
!mkdir -p models/goethe

In [None]:
from tokenizers import Tokenizer
from tokenizers.models import BPE
from tokenizers.trainers import BpeTrainer
from tokenizers.pre_tokenizers import Whitespace
from tokenizers.decoders import BPEDecoder
from transformers import PreTrainedTokenizerFast

# 1. Define your special tokens
special_tokens = ["[UNK]", "[CLS]", "[SEP]", "[PAD]", "[MASK]"]

# 2. Create the tokenizer
tokenizer = Tokenizer(BPE(unk_token="[UNK]"))
trainer = BpeTrainer(special_tokens=special_tokens, min_frequency=2, end_of_word_suffix='</w>')

tokenizer.pre_tokenizer = Whitespace()
tokenizer.decoder = BPEDecoder(suffix='</w>')
tokenizer.enable_padding(pad_token="[PAD]", pad_id=special_tokens.index("[PAD]"))

print("Training tokenizer...")
tokenizer.train(["data/goethe.txt"], trainer)

# 3. Create PreTrainedTokenizerFast with explicit token mappings
goethe_tokenizer = PreTrainedTokenizerFast(
    tokenizer_object=tokenizer,
    unk_token="[UNK]",
    cls_token="[CLS]", 
    sep_token="[SEP]",
    pad_token="[PAD]",
    mask_token="[MASK]"
)

# 4. Verify the special tokens are set properly
goethe_tokenizer.model_max_length = 100
goethe_tokenizer.save_pretrained("models/goethe/goethenizer")

print("Number of tokens:", goethe_tokenizer.vocab_size)
print("PAD token id:", goethe_tokenizer.pad_token_id)
print("All special tokens:", goethe_tokenizer.all_special_tokens)
print("All special IDs:", goethe_tokenizer.all_special_ids)

# 5. Test if the tokenizer works correctly
test_text = "Dies ist ein Test."
encoded = goethe_tokenizer.encode(test_text, padding="max_length", max_length=20)
print("Encoded with padding:", encoded)
print("Decoded back:", goethe_tokenizer.decode(encoded, skip_special_tokens=True))

In [None]:
encoded = goethe_tokenizer.encode("Dies ist fuer sie.")
print("Encoded tokens:", encoded)
decoded = goethe_tokenizer.decode(encoded)
print("Decoded text:", decoded)

# Define a Model

Documentation for X-Transformers [here](https://github.com/lucidrains/x-transformers).

In [None]:
class SequenceDecoder(L.LightningModule):
    def __init__(self, sequence_length, learn_rate, tokenizer):
        super().__init__()
        # instantiate GPT-like decoder model
        transformer_model = TransformerWrapper(
            num_tokens = tokenizer.vocab_size+1,
            max_seq_len = sequence_length,
            emb_dropout=0.05,
            use_abs_pos_emb=False, # replace absolute pos embeddings with relative ones (rotary_xpos)
            l2norm_embed = True,
            attn_layers = Decoder(
                dim = 256,
                depth = 4,
                heads = 4,
                attn_flash = True, # flash attention
                rotary_xpos = True, # improved version of rotary_pos_emb
                ff_swish = True, #SwiGLU from PaLM and LLaMA
                attn_one_kv_head = True, # improves speed, does not cost much performance
                attn_head_scale = True,
                ff_no_bias = True, # faster and better \o/
            )
        )

        self.model = AutoregressiveWrapper(transformer_model,mask_prob=0.0, pad_value=tokenizer.pad_token_id) # test again with 0.15
        self.learn_rate = learn_rate
        self.tokenizer = tokenizer
        self.save_hyperparameters()

    def forward(self,x):
        return self.model.forward(x["input_ids"])

    def configure_optimizers(self):
        optimizer = torch.optim.AdamW(self.parameters(), lr=self.learn_rate,)
        return optimizer

    def training_step(self, train_batch, batch_idx):
        loss = self.model(train_batch["input_ids"])
        self.log('train_loss', loss)
        return loss

    def validation_step(self, val_batch, batch_idx):
        self.model.eval()
        with torch.no_grad():
            loss = self.model(val_batch["input_ids"])

        self.log('val_loss',loss.item())

    def on_validation_epoch_end(self):
        #model.eval()
        #item = next(iter(val_loader.dataset))
        #mask = item['attention_mask'].clone().detach().to(torch.bool)
        #ids = item['input_ids'].clone().detach()[mask].to(self.device)

        #prompt = self.tokenizer.decode(ids,skip_special_tokens=False)
        prompt = "Der Mensch ist ein"
        ids = torch.tensor(self.tokenizer.encode(prompt)).to(self.device)
        sample = self.model.generate(ids, 20, temperature=0.7, filter_kwargs={"k": 10})
        output_str = self.tokenizer.decode(sample.cpu().numpy(),skip_special_tokens=False)
        print(f'prompt: {prompt}\ngeneration:{output_str}')


# Prepare the data

In [None]:
SEQ_LEN = 100
BATCH_SIZE = 64
MODEL_PATH = "./models/goethe"

# Load tokenizer
#tokenizer = AutoTokenizer.from_pretrained("LeoLM/leo-hessianai-7b-chat", trust_remote_code=True) # <-- much better
tokenizer = AutoTokenizer.from_pretrained("models/goethe/goethenizer")
tokenizer.model_max_length = SEQ_LEN

# Load raw text data
raw_dataset = load_dataset("text", data_files={"train": 'data/goethe.txt'}, split="train")

# Tokenize the entire text at once
def tokenize_function(examples):
    return tokenizer(examples["text"], truncation=False, padding=False)

# Tokenize all texts and concatenate them
tokenized_dataset = raw_dataset.map(
    tokenize_function, 
    batched=True, 
    remove_columns=["text"]
)

# Concatenate all tokenized texts into a single list of tokens
all_token_ids = []
for example in tokenized_dataset:
    all_token_ids.extend(example["input_ids"])

# Create a custom dataset class for continuous token sequences
class ContinuousTokenDataset(Dataset):
    def __init__(self, token_ids, seq_length):
        self.token_ids = token_ids
        self.seq_length = seq_length
        self.total_sequences = len(token_ids) - seq_length + 1
    
    def __len__(self):
        return self.total_sequences
    
    def __getitem__(self, idx):
        # Get a sequence of tokens starting at position idx
        input_ids = self.token_ids[idx:idx + self.seq_length]
        # Create labels identical to input_ids for autoregressive language modeling
        return {
            "input_ids": torch.tensor(input_ids),
            "labels": torch.tensor(input_ids)
        }

# Create train/validation split - splitting the token indices, not the dataset itself
total_sequences = len(all_token_ids) - SEQ_LEN + 1
train_size = int(0.99 * total_sequences)
val_size = total_sequences - train_size

# Create datasets
train_dataset = ContinuousTokenDataset(all_token_ids[:train_size + SEQ_LEN - 1], SEQ_LEN)
val_dataset = ContinuousTokenDataset(all_token_ids[train_size:], SEQ_LEN)

# Create dataloaders
# Note: We don't need DataCollatorForLanguageModeling since our dataset already provides labels
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)

In [None]:
print(val_dataset[0])

In [None]:

tokenizer.decode(val_dataset[1]['input_ids'],skip_special_tokens=True)


In [None]:
print("Pretrained PAD token id:", tokenizer.pad_token_id)
print( tokenizer.all_special_ids)
print( tokenizer.all_special_tokens)

In [None]:
print("Pretrained PAD token id:", tokenizer.pad_token_id)
print( tokenizer.all_special_ids)
print( tokenizer.all_special_tokens)

# Train the Model

In [None]:
model = SequenceDecoder(sequence_length=SEQ_LEN,learn_rate=0.0002, tokenizer=tokenizer)

lr_monitor = LearningRateMonitor(logging_interval='step')

logger = TensorBoardLogger("lightning_logs", name=type(model).__name__ )

trainer = L.Trainer(accelerator="gpu",
                    devices=-1,
                    precision="bf16-mixed",
                    max_epochs=2,
                    log_every_n_steps=5,
                    val_check_interval = 500,
                    accumulate_grad_batches=8,
                    gradient_clip_val=0.7,
                    limit_train_batches=1.0,
                    limit_val_batches=0.2,
                    callbacks=[lr_monitor])

trainer.fit(model, train_loader, val_loader)
trainer.validate(dataloaders=val_loader)
#
#
checkpoint = MODEL_PATH +"/sequence_decoder_model.ckpt"
print (f"saveing finished model to " +checkpoint)
trainer.save_checkpoint(checkpoint)

# Generate some Texts


In [None]:
from x_transformers.autoregressive_wrapper import top_p, top_k

model.eval()

def ask_model(prompt):
  start = torch.tensor(tokenizer.encode(prompt)).to("cpu")
  sample = model.model.generate(start, 5,temperature=0.7,filter_logits_fn=top_k)
  sample = sample.to("cpu")
  output_str = tokenizer.decode(sample.numpy(),skip_special_tokens=False)
  print(prompt + " " + output_str)

In [None]:
ask_model("Was begehrt das Herz mehr als eine")