#### Student Name:
#### Student ID:

# Assignment 10

### Transformer Models for Jazz Music Generation

Instructions: 

* This notebook is an interactive assignment; please read and follow the instructions in each cell. 

* Cells that require your input (in the form of code or written response) will have 'Question #' above.

* After completing the assignment, please submit this notebook and printout as a PDF.

# Overview

This assignment guides you through building a transformer-based model for jazz music generation. You will download a dataset of MIDI files, preprocess it, build a small GPT model, train it and generate music. Some code is provided, other parts require your input.

### Downloading the Dataset

Let's run the following cells to download the Doug McKenzie Jazz Piano MIDI dataset to the specified directory.


In [None]:
import os
import requests
from tqdm import tqdm
from bs4 import BeautifulSoup

DATA_URL = "https://bushgrafts.com/midi/"


def download_dataset(dest_dir: str) -> None:
    """Download Doug McKenzie Jazz Piano MIDI dataset."""
    os.makedirs(dest_dir, exist_ok=True)

    # Parse the index page with BeautifulSoup to reliably obtain all MIDI links
    response = requests.get(DATA_URL)
    response.raise_for_status()
    soup = BeautifulSoup(response.text, "html.parser")

    for tag in soup.find_all("a", href=True):
        href = tag["href"]
        if href.lower().endswith(".mid"):
            url = href if href.startswith("http") else DATA_URL + href
            filename = os.path.basename(href)
            out_path = os.path.join(dest_dir, filename)
            if os.path.exists(out_path):
                continue
            resp = requests.get(url, stream=True)
            resp.raise_for_status()
            total = int(resp.headers.get("content-length", 0))
            with open(out_path, "wb") as f, tqdm(
                total=total, unit="B", unit_scale=True, desc=filename
            ) as pbar:
                for chunk in resp.iter_content(chunk_size=8192):
                    if chunk:
                        f.write(chunk)
                        pbar.update(len(chunk))


In [None]:
download_dataset('jazz_data/raw')

##### Question 1 [8 points]

The dataset contains several MIDI files. Complete the following function `midi_to_pianoroll` which converts a MIDI file into a piano roll representation with shape `(128, time_steps)`.

In [None]:
import mido
import pretty_midi
import numpy as np
from io import BytesIO

DEFAULT_VELOCITY = 80
TARGET_BPM = 120

def midi_to_pianoroll(midi_path):
    """Convert a MIDI file to a piano roll.

    Args:
        midi_path (str): Path to a MIDI file.
    Returns:
        np.ndarray: Array of shape (128, T) with velocities.
    """
    # Load the MIDI file with mido, clipping invalid bytes
    mid = mido.MidiFile(midi_path, clip=True)
    buf = BytesIO()
    mid.save(file=buf)
    buf.seek(0)
    pm = pretty_midi.PrettyMIDI(buf)
    # Technically, you could also use `pretty_midi.PrettyMIDI(midi_path)` directly,
    # but using mido allows us to handle MIDI files with invalid bytes more gracefully.

    # Here we will allocate a piano roll array.
    # 128 = number of MIDI pitches (0-127)
    # We will also quantize the data, such that each column in the piano roll corresponds to a 250ms time step.
    # This is why we multiply the end time by 4 (since 1 second = 4 * 250ms).
    roll = np.zeros((128, int(pm.get_end_time() * 4)), dtype=np.int16)
    
    # TODO: iterate over instruments and notes in pm to fill the roll
    # for simplicity, you can ignore drum instruments and only consider melodic instruments
    # (you can use .is_drum attribute of Instrument to check if an instrument is a drum)

    # Hint: make sure you quantize the time steps correctly!
    # Furthermore, if a note's velocity is not defined/set in the MIDI file, you can use the default velocity specified in the DEFAULT_VELOCITY constant.

    # Your code here
    

Test out your function by running the cell below after implementing it.

In [None]:
# example usage
roll = midi_to_pianoroll('jazz_data/raw/afine-1.mid')

import matplotlib.pyplot as plt

def plot_pianoroll(roll):
    """Plot a piano roll."""
    plt.imshow(roll, aspect='auto', origin='lower')
    plt.xlabel('Time (1/4 notes)')
    plt.ylabel('Pitch (MIDI note number)')
    plt.colorbar(label='Velocity')
    plt.title('Piano Roll')
    plt.show()

print("Piano roll shape:", roll.shape) # should be (128, T) where T is the number of time steps
plot_pianoroll(roll)

##### Question 2 [8 points]
Using `midi_to_pianoroll`, write the function `preprocess_dataset` that iterates over all MIDI files in `raw_dir` and stores piano roll `.npy` arrays in `out_dir`. 

**Naming Convention:** If there is a MIDI file called `test.mid` in `raw_dir`, the output file should be `test.npy` in `out_dir`.

In [None]:
def preprocess_dataset(raw_dir, out_dir):
    """Convert all MIDI files in raw_dir to piano rolls.

    Args:
        raw_dir (str): Directory containing MIDI files.
        out_dir (str): Directory to save numpy arrays.
    Returns:
        None
    """
    os.makedirs(out_dir, exist_ok=True)
    # TODO: loop over files in raw_dir and save piano roll numpy arrays using midi_to_pianoroll
    pass

In [None]:
preprocess_dataset('jazz_data/raw','jazz_data/processed')

##### Question 3 [8 points]

Now that we have our piano rolls ready, we will tokenize them and build a vocabulary.

A **vocabulary** is a dictionary that maps **each unique column** (consisting of all 128 pitches at a given time step) to an **integer** token. 

Each column is thus a tuple of 128 integers, where each integer represents the velocity of a pitch at that time step. For example, one such column could be as follows:

```python
(40, 0, 0, ..., 0)
```

Since the element at index 0 is 40, this means that the note with MIDI note number 0 has a velocity of 40 at that time step, while all other notes have a velocity of 0. 

So, your task is to:

* Iterate over all `.npy` files obtained above.
* Count the occurrences of each unique column (tuple of 128 integers).
* Assign a unique integer token to each unique column in the following way: the column with the highest count gets token 2, the second highest gets token 3, the third highest gets token 4, and so on.
* Token 0 is reserved for `<unk>` (unknown) columns. This is a special token for columns that do not match any of the known columns in the dataset.
* Token 1 is reserved for `<pad>` (padding) columns. This is a special token for padding columns that are used to ensure that all sequences in a batch have the same length (we will use this later in training).

In the end, the resulting dictionary might look something like this:

```python
vocab = {
    (40,0,0,...,0): m,
    ...,
    (0,0,0,...,0): n,
    '<unk>': 0,
    '<pad>': 1
}
```

where `m` and `n` are the tokens assigned to the respective columns and `m, n` are integers that are greater than or equal to 2.

Save this dictionary with the `pickle.dump` method and return it.

In [None]:
import pickle

def build_vocab(data_dir, out_file="vocab.pkl"):
    """Create a vocabulary dictionary from piano rolls.

    Args:
        data_dir (str): Directory with piano roll `.npy` files.
        out_file (str): Path to save the pickled vocabulary.
    Returns:
        dict: Mapping from column tuples to integer tokens.
    """
    # TODO: populate counter with piano roll columns
    # TODO: Save the resulting dictionary using pickle.dump, and return the vocabulary dictionary

    pass

In [None]:
vocab = build_vocab('jazz_data/processed')

##### Question 4 [4 points]

Fill in `tokenize_file` which converts a single piano roll `.npy` file to a sequence of integer tokens using a given vocabulary dictionary. This will involve iterating over each column in the piano roll, converting it to a tuple, and looking up the corresponding token in the vocabulary. Furthermore, if a column is not found in the vocabulary, it should be replaced with the `<unk>` token (0). This is important for generalizing the model to unseen data that may contain columns not present in the training set.

In [None]:
def tokenize_file(fname, vocab):
    """Convert a piano roll array to a list of token ids.

    Args:
        fname (str): Path to `.npy` array.
        vocab (dict): Mapping from column tuples to ints.
    Returns:
        np.ndarray: Sequence of integers (1D array).
    """
    # TODO: load array and map each column to a token index
    pass

In [None]:
tokens = tokenize_file('jazz_data/processed/align-1.npy', vocab)
print(f"Tokenized sequence length: {len(tokens)}")
print(f"First 10 tokens: {tokens[:10]}")

##### Question 5 [4 points]
Write `tokenize_dataset` to convert every piano roll array in `src_dir` into a sequence of integers.
Use the vocabulary built earlier. Columns not found in the vocabulary should map to the `<unk>` token (0).
Pad sequences shorter than the desired length using the `<pad>` token (1) when needed. Save each token sequence as a `.npy` file in `out_dir`.

**Naming Convention**: When saving the tokenized files, use the same base filename as the original `.npy` file, but with a `.npy` extension. For example, if the original file in `src_dir` is `align-1.npy`, the tokenized file written to `out_dir` should be `align-1.npy`.


In [None]:
def tokenize_dataset(src_dir, vocab_file, out_dir):
    """Tokenize all piano rolls in a directory.

    Args:
        src_dir (str): Directory with piano roll arrays.
        vocab_file (str): Path to saved vocabulary.
        out_dir (str): Where to save token arrays.
    """
    os.makedirs(out_dir, exist_ok=True)
    # TODO: load vocab and iterate over files saving token arrays
    pass

In [None]:
tokenize_dataset('jazz_data/processed', 'vocab.pkl', 'jazz_data/tokens')

##### Question 6 [6 points]

Now you will implement a simple Layer Normalization module, which is an essential building block in transformer models.  
Write this **from scratch** using TensorFlow tensor operations.

Do **not** use `tf.keras.layers.LayerNormalization`. The point is for you to understand each part of the computation.

Layer Normalization adjusts each feature vector so that its elements have mean zero and variance one, then applies a learned scale and shift. This helps stabilize training and gives the model flexibility to adapt the normalized outputs.

Suppose your input `x` has shape `(B, T, C)`:
- `B` is the batch size.
- `T` is the sequence length.
- `C` is the number of features.

For each feature vector of length `C`:
1. Compute the mean $\mu$ along the feature dimension. This produces a tensor of shape `(B, T, 1)`.
2. Compute the variance $\sigma^2$ along the feature dimension. This also has shape `(B, T, 1)`.
3. Normalize by subtracting the mean and dividing by the standard deviation, adding a small $\epsilon$ to avoid division by zero.
4. Apply a learned scale and shift:
   - `weight` ($\gamma$): shape `(C,)`, initialized to ones.
   - `bias` ($\beta$): shape `(C,)`, initialized to zeros.

The formula is:

$$
\text{LN}(x)_i = \gamma_i \times \Bigg( \frac{x_i - \mu}{\sqrt{\sigma^2 + \epsilon}} \Bigg) + \beta_i.
$$

This means:
- For each `(B, T)` position, the mean and variance are scalar values, not full matrices. 
- The subtraction and division happen element-wise along the feature dimension.
- The learned scale and shift also apply element-wise to each feature.

So you are not dividing or multiplying full tensors of the same shape. You normalize each feature vector using its own scalar statistics, then scale and shift each feature independently.

Use `unbiased=False` when computing the variance.  
Your final output should have the same shape `(B, T, C)` as the input.


In [None]:
import tensorflow as tf

class LayerNorm(tf.keras.layers.Layer):
    """Layer normalization over the last dimension.

    Args:
        dim (int): Feature dimension.
        eps (float): Stability epsilon.
    """
    def __init__(self, dim, eps=1e-5):
        super().__init__()
        # TODO: create learnable gamma and beta parameters
        pass
    def call(self, x):
        # TODO: normalize over the last dimension
        pass


##### Question 7 [8 points] 

In this part, you will write the multi-head self-attention computation from scratch using TensorFlow tensor operations. Do **not** use any built-in multi-head attention layers.

Let the input be a batch of sequences $ X \in \mathbb{R}^{B \times T \times d} $, where $ B $ is the batch size, $ T $ is the sequence length, and $ d $ is the embedding dimension. In multi-head attention, we first project the input linearly to obtain the queries $ Q $, keys $ K $, and values $ V $. These are computed by applying a shared linear transformation:
$$
[Q, K, V] = X W, \quad \text{where } W = [W_q, W_k, W_v] \in \mathbb{R}^{d \times 3d}
$$
Each of the resulting tensors $ Q, K, V \in \mathbb{R}^{B \times T \times d} $ is then split into $ h $ separate heads. Each head operates on a reduced dimension $ d_h = d / h $, reshaping the tensors into
$$
Q, K, V \in \mathbb{R}^{B \times h \times T \times d_h}.
$$

Within each head, the attention scores are computed by scaled dot-product:
$$
A = \frac{Q K^\top}{\sqrt{d_h}} \in \mathbb{R}^{B \times h \times T \times T}.
$$
Then, we apply a lower-triangular mask $ M \in \mathbb{R}^{T \times T} $, where
$$
M_{ij} =
\begin{cases}
0 & \text{if } j \le i, \\
-\infty & \text{if } j > i.
\end{cases}
$$
This mask is added to the attention scores:
$$
A' = A + M,
$$
so that positions can only attend to themselves and prior tokens in the sequence.

We apply the softmax function over the last dimension of $ A' $ to obtain normalized attention weights:
$$
\alpha = \text{softmax}(A') \in \mathbb{R}^{B \times h \times T \times T}.
$$
These weights are used to form a weighted sum of the values:
$$
Z = \alpha V \in \mathbb{R}^{B \times h \times T \times d_h}.
$$

Finally, the outputs from all heads are concatenated and projected back to the original dimension $ d $ using a linear transformation:
$$
\text{Output} = \text{ConcatHeads}(Z)\, W_o + b, \quad W_o \in \mathbb{R}^{d \times d}.
$$

This completes the masked multi-head attention computation. Your implementation should follow this structure exactly: compute $ Q, K, V $, reshape for multiple heads, apply scaled dot-product attention with a causal mask, compute the weighted sum, concatenate the heads, and project back to the original feature dimension.

In [None]:
class SelfAttention(tf.keras.layers.Layer):
    def __init__(self, dim, heads):
        super().__init__()
        self.heads = heads
        self.scale = (dim // heads) ** -0.5
        # TODO: define qkv projection and output projection layers
        pass
    def call(self, x):
        B = tf.shape(x)[0]
        T = tf.shape(x)[1]
        C = x.shape[-1]
        # TODO: compute query, key, value tensors
        # TODO: apply causal mask before softmax
        # TODO: return attention output
        pass


##### Question 8 [6 points]
Complete the `TransformerBlock` module using LayerNorm, SelfAttention and a feed-forward network. The FeedForward network is already given to you below, and is a basic two-layer MLP with GELU (Gaussian Error Linear Unit) activation.

To understand the connections between these components, read the paper [Improving Language Understanding by Generative Pre-Training](https://cdn.openai.com/research-covers/language-unsupervised/language_understanding_paper.pdf) from OpenAI.

The input `x` and output of the block have shape `(B, T, C)`. Remember to create two LayerNorm layers, a SelfAttention module, a FeedForward network and dropout layers, then apply them with residual connections.

In [None]:
class FeedForward(tf.keras.layers.Layer):
    def __init__(self, dim, hidden):
        super().__init__()
        self.net = tf.keras.Sequential([
            tf.keras.layers.Dense(hidden),
            tf.keras.layers.Activation(tf.nn.gelu),
            tf.keras.layers.Dense(dim)
        ])
    def call(self, x):
        return self.net(x)

class TransformerBlock(tf.keras.layers.Layer):
    def __init__(self, dim, heads, hidden, dropout=0.1):
        super().__init__()
        # TODO: define LayerNorm, SelfAttention, FeedForward and dropout
        pass
    def call(self, x, training=False):
        # TODO: apply attention and feed-forward with residual connections
        pass


##### Question 9 [10 points]

Finish implementing the `GPT` model class so that it correctly follows the transformer architecture for autoregressive language modeling.

The input is a tensor of token indices $x \in \mathbb{N}^{B \times T}$, where $B$ is the batch size and $T$ is the sequence length. The model first embeds tokens and positions. The token embedding is a learned matrix $E \in \mathbb{R}^{V \times d}$, where $V$ is the vocabulary size and $d$ is the embedding dimension. The lookup $E[x]$ produces a tensor in $\mathbb{R}^{B \times T \times d}$. 

A positional embedding $P \in \mathbb{R}^{1 \times L \times d}$ provides position-dependent information, where $L$ is the maximum sequence length. For a given input length $T$, the slice $P[:, :T]$ has shape $(1, T, d)$ and broadcasts over the batch. The input to the transformer blocks is the sum of token and positional embeddings:
$$
X = E[x] + P[:, :T].
$$

**Important Note**: If you read the original [Attention Is All You Need](https://arxiv.org/pdf/1706.03762) paper, you might notice that the authors use something called "positional encoding" instead of positional embeddings. A positional encoding is typically a fixed function (like sine and cosine functions) that generates position-dependent vectors. However, in this assignment, we are using learned positional embeddings, which are more flexible and can adapt to the data during training. You can initialize the positional embedding matrix $P$ randomly.

The model applies a stack of $N$ identical transformer blocks in sequence:
$$
H_0 = X,\quad H_{i} = \text{Block}_i(H_{i-1}),\quad i = 1,\dots,N.
$$
Each block preserves the shape, so $H_i \in \mathbb{R}^{B \times T \times d}$.

After the last block, the output is normalized:
$$
H = \text{LayerNorm}(H_N).
$$

Finally, the model projects this output to the vocabulary logits using a learned weight matrix $W_o \in \mathbb{R}^{d \times V}$ and bias $b \in \mathbb{R}^{V}$:
$$
\text{logits} = HW_o + b.
$$

The result $\text{logits} \in \mathbb{R}^{B \times T \times V}$ gives the unnormalized scores for each token in the vocabulary at every position in the sequence. Make sure your implementation matches this sequence: embedding lookup, positional addition, stacked blocks, layer normalization, and final linear projection.

In [None]:
class GPT(tf.keras.Model):
    def __init__(self, vocab_size, seq_len, dim=512, depth=8, heads=8, hidden=2048, dropout=0.1):
        super().__init__()
        # TODO: initialize token embedding, positional embedding, blocks, layer norm and output head
        pass
    def call(self, x, training=False):
        T = tf.shape(x)[1]
        # TODO: implement forward pass following the architecture description
        pass


##### Question 10 [10 points]
Complete the training loop below. For each mini-batch $(x,y)$ compute logits $L = \text{GPT}(x)$ and minimize
the cross-entropy loss
$$\mathcal{L} = -\frac{1}{N}\sum_i y_i \log \text{softmax}(L_i).$$
Save model weights to `model_epoch{epoch}.pt` after every epoch.


In [None]:
import tensorflow as tf

class TokenDataset(tf.data.Dataset):
    def __new__(cls, root, seq_len=128):
        files = [os.path.join(root, f) for f in os.listdir(root) if f.endswith('.npy')]
        def gen():
            # TODO: yield (x, y) pairs of length seq_len from token arrays
            pass
        return tf.data.Dataset.from_generator(gen, output_signature=(tf.TensorSpec((seq_len,), tf.int32), tf.TensorSpec((seq_len,), tf.int32)))

def train(data_dir, vocab_file, seq_len=128, batch_size=16, epochs=10, lr=3e-4, device='cuda'):
    """Train the GPT model on tokenized data."""
    # TODO: implement training loop using Keras optimizers and gradients
    pass


Feel free to adjust the hyperparameters such as learning rate, batch size, and number of epochs to achieve better performance.

In [None]:
train('jazz_data/tokens','vocab.pkl')

##### Question 11 [8 points]
Implement the `generate` function below which autoregressively samples from the model.
Use the trained weights and convert the predicted tokens back to a piano roll.

Your code should:
1. Load the trained model checkpoint from the file path `model_path`.
2. Feed the provided `seed` tokens to the model and iteratively sample one new token at a time.
3. Append each sampled token to the context and continue for `steps` iterations.
4. Convert the resulting token sequence back to a piano roll using `tokens_to_pianoroll` and `pianoroll_to_pretty_midi`.

Note that the model outputs logits for the next token at each step, which you can sample from using `tf.random.categorical`.

In [None]:
from scipy.io import wavfile

SECONDS_PER_STEP = 0.25

def tokens_to_pianoroll(tokens):
    cols = []
    for t in tokens:
        if isinstance(t, str):
            cols.append(np.zeros(128, dtype=np.int16))
        else:
            cols.append(np.array(t, dtype=np.int16))
    if not cols:
        return np.zeros((128,0), dtype=np.int16)
    return np.stack(cols, axis=1)

def pianoroll_to_pretty_midi(roll):
    pm = pretty_midi.PrettyMIDI(initial_tempo=120)
    instrument = pretty_midi.Instrument(program=0)
    num_steps = roll.shape[1]
    for pitch in range(roll.shape[0]):
        velocity_series = roll[pitch]
        prev_vel = 0
        start = 0
        for t in range(num_steps):
            vel = int(velocity_series[t])
            if vel > 0 and prev_vel == 0:
                start = t
                prev_vel = vel
            elif vel != prev_vel:
                if prev_vel > 0:
                    note = pretty_midi.Note(velocity=int(prev_vel), pitch=pitch, start=start*SECONDS_PER_STEP, end=t*SECONDS_PER_STEP)
                    instrument.notes.append(note)
                if vel > 0:
                    start = t
                prev_vel = vel
        if prev_vel > 0:
            note = pretty_midi.Note(velocity=int(prev_vel), pitch=pitch, start=start*SECONDS_PER_STEP, end=num_steps*SECONDS_PER_STEP)
            instrument.notes.append(note)
    pm.instruments.append(instrument)
    return pm

def generate(model_path, vocab_file, seed, steps=100, seq_len=128, device='cuda'):
    # TODO: load vocab and model, then autoregressively generate new tokens
    pass


In [None]:
pm = generate('model_epoch10.pt','vocab.pkl',[1],steps=50)

In [None]:
def save_audio(pm: pretty_midi.PrettyMIDI, path: str, sample_rate: int = 44100) -> None:
    """Save a :class:`pretty_midi.PrettyMIDI` object as a WAV file."""
    audio = pm.fluidsynth(fs=sample_rate)
    wavfile.write(path, sample_rate, audio.astype(np.float32))

pm.write('generated.mid') # Save the MIDI file
save_audio(pm, 'generated.wav') # Save the audio file

In [None]:
from IPython.display import Audio
Audio('generated.wav')  # Play the generated audio

##### Question 12 [4 points]
After you have trained your model and generated some samples, briefly describe the quality of the generated music. What worked well and what could be improved? You can change the seed tokens to generate different samples and see how the model performs with different starting points.

`Your answer here`

##### Question 13 [4 points]
Explain why masking in self-attention is necessary for autoregressive generation.

`Your answer here`

##### Question 14 [4 points]
Why do we tokenize the piano roll instead of feeding the raw continuous values directly into the transformer?

`Your answer here`

##### Question 15 [8 points]
Suggest two hyperparameters you might tune to improve generation quality and briefly justify your choices.

`Your answer here`