In [1]:
!pip install pretty_midi
!pip install gdown
!pip install miditok
!pip install midi-clip

!sudo apt install -y fluidsynth
!pip install --upgrade pyfluidsynth

!wget https://raw.githubusercontent.com/roostico/NesGen/refs/heads/main/utility.py

Collecting pretty_midi
  Downloading pretty_midi-0.2.10.tar.gz (5.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m5.6/5.6 MB[0m [31m66.6 MB/s[0m eta [36m0:00:00[0m:00:01[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting mido>=1.1.16 (from pretty_midi)
  Downloading mido-1.3.3-py3-none-any.whl.metadata (6.4 kB)
Downloading mido-1.3.3-py3-none-any.whl (54 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m54.6/54.6 kB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pretty_midi
  Building wheel for pretty_midi (setup.py) ... [?25l[?25hdone
  Created wheel for pretty_midi: filename=pretty_midi-0.2.10-py3-none-any.whl size=5592287 sha256=101448c36054ace4e8c6406172a133ad4c9d3cb0d60b2d07a0c469a606e835f5
  Stored in directory: /root/.cache/pip/wheels/cd/a5/30/7b8b7f58709f5150f67f98fde4b891ebf0be9ef07a8af49f25
Successfully built pretty_midi
Installing collected packages: mido, p

In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import glob

import os
import random
import shutil
from pathlib import Path
import pretty_midi
import numpy as np
from miditok import REMI, TokenizerConfig
import json
from miditok.utils import split_files_for_training
from miditok.data_augmentation import augment_dataset
from random import shuffle
from tqdm import tqdm

import sys
import pickle
     

In [None]:
# Get Maestro Dataset
!wget https://storage.googleapis.com/magentadata/datasets/maestro/v3.0.0/maestro-v3.0.0-midi.zip
!unzip 'maestro-v3.0.0-midi.zip'
!rm 'maestro-v3.0.0-midi.zip'
dataset_path = "/kaggle/working/maestro-v3.0.0"

In [None]:
# Paths to the files of the dataset

midi_paths = list(Path(dataset_path).resolve().glob("**/*.mid")) + list(Path(dataset_path).resolve().glob("**/*.midi"))

midis_dir = "midis"
os.makedirs(midis_dir, exist_ok=True)


for i, midi_path in enumerate(midi_paths):
  new_midi_path = os.path.join(midis_dir, f"{i}.midi")
  shutil.move(str(midi_path), new_midi_path)


midis = list(Path("/kaggle/working/midis").resolve().glob("**/*.mid")) + list(Path("/kaggle/working/midis").resolve().glob("**/*.midi"))

def sample():
  return str(random.choice(midis))

In [None]:
BEAT_RES = {(0, 1): 12, (1, 2): 4, (2, 4): 2, (4, 8): 1}

TOKENIZER_PARAMS = {
    "pitch_range": (21, 109),
    "beat_res": BEAT_RES,
    "num_velocities": 6,
    "special_tokens": ["BOS", "EOS"],
    "use_chords": True,
    "use_rests": True,
    "use_tempos": True,
    "num_tempos": 8,
    "tempo_range": (50, 200),  # (min_tempo, max_tempo),
}

config = TokenizerConfig(**TOKENIZER_PARAMS)

tokenizer = REMI(config)

In [None]:
vocab_size = 1000
tokenizer.train(vocab_size=vocab_size, files_paths=midis)
processed = [Path(f"{s}") for s in midis]
print(len(processed))

valid_perc = 0.05
augment = False

total_num_files = len(processed)
num_files_valid = round(total_num_files * valid_perc)
shuffle(processed)
midi_paths_valid = processed[:num_files_valid]
midi_paths_train = processed[num_files_valid:]

# Chunk MIDIs and perform data augmentation on each subset independently

for files_paths, subset_name in (
    (midi_paths_train, "train"),
    (midi_paths_valid, "valid"),
):
    print(files_paths[0])

    # Split the MIDIs into chunks of sizes approximately about 1024 tokens

    subset_chunks_dir = Path(f"Maestro_{subset_name}")

    split_files_for_training(
        files_paths=files_paths,
        tokenizer=tokenizer,
        save_dir=subset_chunks_dir,
        max_seq_len=1024,
        num_overlap_bars=2,
    )

    # Perform data augmentation
    if augment:
        augment_dataset(
            subset_chunks_dir,
            pitch_offsets=[-12, 12],
            velocity_offsets=[-3, 3],
            duration_offsets=[-0.5, 0.5],
        )

midi_paths_train = list(Path("Maestro_train").glob("**/*.mid")) + list(Path("Maestro_train").glob("**/*.midi"))
midi_paths_valid = list(Path("Maestro_valid").glob("**/*.mid")) + list(Path("Maestro_valid").glob("**/*.midi"))

In [None]:
def midi_valid(midi) -> bool:

    if any(ts.numerator != 4 for ts in midi.time_signature_changes):

        return False  # time signature different from 4/*, 4 beats per bar

    return True



if os.path.exists("tokenized"):

  shutil.rmtree("tokenized")


for dir in ("train", "valid"):
    tokenizer.tokenize_dataset(        
    
        Path(f"/kaggle/working/Maestro_{dir}"),
        Path(f"/kaggle/working/tokenized_{dir}"),
        midi_valid,
    
    )

In [None]:
def read_json(path: str) -> dict:

  with open(path, "r") as f:

    return json.load(f)

def read_json_files(json_file_paths):
    """Reads a list of JSON files and returns a list of objects.
    Args:
        json_file_paths: A list of file paths to JSON files.
    Returns:
        A list of objects, where each object represents the data from a JSON file.
        Returns an empty list if any error occurs during file processing.
    """

    objects = []

    for file_path in tqdm(json_file_paths):

        try:

            objects.append(read_json(file_path))

        except FileNotFoundError:

            print(f"Error: File not found - {file_path}")

            return [] # Return empty list on error

        except json.JSONDecodeError:

            print(f"Error decoding JSON in file: {file_path}")

            return [] # Return empty list on error

    return objects

In [None]:
tokenized_train = list(Path("tokenized_train").resolve().glob("**/*.json"))
data_objects_train = read_json_files(tokenized_train)

tokenized_valid = list(Path("tokenized_valid").resolve().glob("**/*.json"))
data_objects_valid = read_json_files(tokenized_valid)

if data_objects_train:
    print(f"\nSuccessfully read {len(data_objects_train)} training JSON files.")
else:
    print("Error reading JSON files.")

In [None]:
encoded_train = [np.array(song["ids"][0]) for song in data_objects_train]
encoded_valid = [np.array(song["ids"][0]) for song in data_objects_valid]

In [None]:
# tokenizer.decode([encoded_train[0][:1024]]).dump_midi("sample.mid")
all_ids_train = np.concatenate(encoded_train)
all_ids_valid = np.concatenate(encoded_valid)
import datetime
today = datetime.datetime.today()
day = today.day
month = today.month
name = "tokenizer{:d}_{:02d}{:02d}.json".format(vocab_size, month, day)
tokenizer.save(name)
np.savetxt("ids_train_{:02d}{:02d}.txt".format(month, day), all_ids_train)
np.savetxt("ids_valid_{:02d}{:02d}.txt".format(month, day), all_ids_valid)
all_ids_train = all_ids_train.astype(dtype=np.int32)
all_ids_valid = all_ids_valid.astype(dtype=np.int32)

In [3]:
# if you need to skip all
!gdown 1FqWFCW5TjWTI8rrbBy5uwYjS3GYl9MWF # tokenizer1000_1219.json
!gdown 1Xs-5FenAaUJE_WipUDIFfQ4lFrK8VHve # ids_train_1219.txt
tokenizer = REMI(params="tokenizer1000_1219.json")
all_ids_train = np.loadtxt("ids_train_1219.txt").astype(dtype=np.int32)
#all_ids_valid = np.loadtxt("ids_valid_1217.txt").astype(dtype=np.int32)

Downloading...
From: https://drive.google.com/uc?id=1FqWFCW5TjWTI8rrbBy5uwYjS3GYl9MWF
To: /kaggle/working/tokenizer1000_1219.json
100%|██████████████████████████████████████| 59.5k/59.5k [00:00<00:00, 81.2MB/s]
Downloading...
From (original): https://drive.google.com/uc?id=1Xs-5FenAaUJE_WipUDIFfQ4lFrK8VHve
From (redirected): https://drive.google.com/uc?id=1Xs-5FenAaUJE_WipUDIFfQ4lFrK8VHve&confirm=t&uuid=4f7535ba-efc6-4e0b-8c76-c4cd5f2d96d2
To: /kaggle/working/ids_train_1219.txt
100%|█████████████████████████████████████████| 395M/395M [00:02<00:00, 147MB/s]


# Hyper-parameters

In [4]:
# Parametri
vocab_size = len(tokenizer)
seq_length = 512 # for the current setup, must be equal to 'noise_dim'

generator_dim = 512
discriminator_dim = 384
num_layers = 2
batch_size = 128
noise_dim = 512


normalized_seq = (all_ids_train - vocab_size / 2) / (vocab_size / 2)

# Suddivisione in sequenze
all_ids_train_seq = [normalized_seq[i:i + seq_length] 
                 for i in range(0, len(normalized_seq) - seq_length, seq_length)]

# Model 

## Generator (choose one)

In [5]:
class Generator(nn.Module):
    def __init__(self, noise_dim, seq_length, n_features = 1, drop_rate = 0.3):
        """
        PyTorch implementation of the Generator model.

        Args:
            noise_dim (int): Size of the input noise vector.
            seq_length (int): Length of the output sequence.
            n_features (int): Number of features per timestep in the output.
            drop_rate (float): Dropout rate.
        """
        super(Generator, self).__init__()
        self.noise_dim = noise_dim
        self.seq_length = seq_length
        self.n_features = n_features

        self.fc1 = nn.Linear(noise_dim, 512)
        self.lrelu1 = nn.LeakyReLU(0.3)
        self.bn1 = nn.BatchNorm1d(512)

        self.fc2 = nn.Linear(512, 1024)
        self.lrelu2 = nn.LeakyReLU(0.2)
        self.bn2 = nn.BatchNorm1d(1024, momentum=0.8)
        self.dropout = nn.Dropout(drop_rate)

        self.fc3 = nn.Linear(1024, seq_length * n_features)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        """
        Forward pass of the Generator.

        Args:
            x (torch.Tensor): Input noise vector of shape (batch_size, noise_dim).

        Returns:
            torch.Tensor: Output sequence of shape (batch_size, seq_length, n_features).
        """
        x = self.fc1(x)
        x = self.lrelu1(x)
        x = self.bn1(x)

        x = self.fc2(x)
        x = self.lrelu2(x)
        x = self.bn2(x)
        x = self.dropout(x)

        x = self.fc3(x)
        x = self.sigmoid(x)
        x = x.view(-1, self.seq_length, self.n_features)  # Reshape to (batch_size, seq_length, n_features)
        return x

In [5]:
class GeneratorLSTM(nn.Module):
    def __init__(self, noise_dim, hidden_dim, seq_length, num_layers, drop_rate=0.3):
        super(GeneratorLSTM, self).__init__()

        # LSTM layer
        self.lstm = nn.LSTM(input_size=1, hidden_size=hidden_dim, num_layers=num_layers, batch_first=True)

        # Dropout layer
        self.dropout = nn.Dropout(drop_rate)

        # Time-distributed layers
        self.fc1 = nn.Linear(hidden_dim, seq_length)
        self.lrelu1 = nn.LeakyReLU(0.2)
        self.bn1 = nn.BatchNorm1d(noise_dim, momentum=0.8)

        self.fc2 = nn.Linear(seq_length, 1)
        self.tanh = nn.Tanh()

    def forward(self, x):
        """
        Forward pass of the Generator.

        Args:
            x (torch.Tensor): Input noise of shape (batch_size, latent_dim, 1).

        Returns:
            torch.Tensor: Output sequence of shape (batch_size, latent_dim, output_dim).
        """
        # Pass through LSTM
        x, _ = self.lstm(x)

        # Apply dropout
        x = self.dropout(x)

        # Time-distributed dense layers
        x = self.fc1(x)
        x = self.lrelu1(x)
        x = self.bn1(x.transpose(1, 2)).transpose(1, 2)  # BatchNorm applied along the feature axis

        x = self.fc2(x)  # Final output layer
        return self.tanh(x)

In [None]:
class LSTMGenerator(nn.Module):
    def __init__(self, noise_dim, hidden_dim, seq_length, num_layers):
        super(LSTMGenerator, self).__init__()
        self.seq_length = seq_length
        self.hidden_dim = hidden_dim
        self.noise_dim = noise_dim
        
        # The linear layer should map the noise to (batch_size, hidden_dim * seq_length)
        self.fc = nn.Linear(noise_dim, hidden_dim * seq_length)  # Map noise to the size that works with LSTM
        self.lstm = nn.LSTM(hidden_dim, hidden_dim, num_layers, batch_first=True)
        self.output_fc = nn.Linear(hidden_dim, 1)  # Output layer for each time step
        self.tanh = nn.Tanh()

    def forward(self, x):
        x = x.squeeze(-1)  # Remove the last dimension -> (batch_size, noise_dim)
        x = self.fc(x)  # Map noise to (batch_size, hidden_dim * seq_length)
        x = x.view(-1, self.seq_length, self.hidden_dim)  # Reshape to (batch_size, seq_length, hidden_dim)
        out, _ = self.lstm(x)  # Pass through LSTM
        out = self.output_fc(out)  # Map hidden states to output dimension
        return self.tanh(out)  # Shape: (batch_size, seq_length, 1)

## Discriminator and GAN class 

In [6]:
class LSTMDiscriminator(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_layers):
        super(LSTMDiscriminator, self).__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_dim, 1)  # Predict a single scalar for real/fake
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        out, _ = self.lstm(x)  # Pass through LSTM
        out = out[:, -1, :]  # Use the last time step's hidden state: (batch_size, hidden_dim)
        out = self.fc(out)  # Map to a single value: (batch_size, 1) 
        return out  # Real/fake probability

class GAN:
    def __init__(self, 
                 generator_builder, 
                 discriminator_builder, 
                 noise_dim,
                 seq_length,
                 generator_layers,
                 discriminator_layers,
                 generator_hidden_dim,
                 discriminator_hidden_dim,
                 loss = nn.BCEWithLogitsLoss(),
                ):
        self.G = generator_builder(
            noise_dim  = noise_dim,
            hidden_dim = generator_hidden_dim,
            seq_length = seq_length,
            num_layers = generator_layers
        )
        self.D = discriminator_builder(
            input_dim  = 1,
            hidden_dim = discriminator_hidden_dim,
            num_layers = discriminator_layers,
        )
        self.noise_dim = noise_dim
        self.seq_length = seq_length
        self.loss = loss
        self.optimizer_G = optim.Adam(self.G.parameters(), lr=0.0002)
        self.optimizer_D = optim.Adam(self.D.parameters(), lr=0.0002)

    def noise(self, size):
        return torch.randn(size, self.noise_dim, 1) 

    def _G_train_step(self, batch_size, real_labels):
        # Train Generator
        self.G.zero_grad()
        z = self.noise(batch_size).to(device)
        fake_data = self.G(z)

        outputs = self.D(fake_data)
        loss_G = self.loss(outputs, real_labels)  # Generator wants to fool the Discriminator
        loss_G.backward()
        self.optimizer_G.step()
        return loss_G

    def _D_train_step(self, batch_size, real_labels, fake_labels, real_data):
        # Train Discriminator
        self.D.zero_grad()
        z = self.noise(batch_size).to(device)  # Correct noise shape
        fake_data = self.G(z)

        outputs_real = self.D(real_data)  # Pass real_data through Discriminator
        loss_real = self.loss(outputs_real, real_labels)

        outputs_fake = self.D(fake_data)  # Pass fake_data through Discriminator
        loss_fake = self.loss(outputs_fake, fake_labels)

        loss_D = loss_real + loss_fake
        loss_D.backward()
        self.optimizer_D.step()

        return loss_D

    def train(
        self, 
        dataloader, 
        epochs, 
        device, 
        warmup_epochs=0, 
        g_steps=0, 
        steps_each_print=5
    ):
        print(f"Starting training with {epochs} epochs")
        self.G = self.G.to(device)
        self.D = self.D.to(device)
        self.G.train(True)
        self.D.train(True)
        
        iteration_count = len(dataloader)  # Number of batches per epoch
        batch_size = dataloader.batch_size

        # Labels
        real_labels = torch.ones(batch_size, 1).to(device)
        fake_labels = torch.zeros(batch_size, 1).to(device)
        
        for epoch in range(epochs):  
            pbar = tqdm(total=iteration_count, position=0, leave=True)
            for step, real_data in enumerate(dataloader):
                batch_size = real_data.size(0)
                real_data = real_data.to(device).unsqueeze(-1)  # Shape: (batch_size, seq_length, 1)
                
                if epoch < warmup_epochs:  # Train Generator more during initial epochs
                    for _ in range(g_steps):
                        self._G_train_step(batch_size, real_labels)
                
                loss_D = self._D_train_step(batch_size, real_labels, fake_labels, real_data)

                loss_G = self._G_train_step(batch_size, real_labels)
                
                if (step + 1) % steps_each_print == 0:
                    pbar.set_description(
                        f"D Loss: {loss_D.item():.4f}, " +
                        f"G Loss: {loss_G.item():.4f}"
                    )
                    pbar.update(steps_each_print)
            pbar.n = pbar.total  
            pbar.refresh()    
            pbar.close()
        
            print(f'Epoch [{epoch+1}/{epochs}] Loss D: {loss_D.item():.4f}, Loss G: {loss_G.item():.4f}')

    def predict(self, n, device):
        z = self.noise(n).to(device) # Rumore casuale
        return self.G(z).cpu().detach().numpy()
    
    def generate(self, n, tokenizer, base_output_name, device):
        normalized_predictions = self.predict(n, device)
        boundary = int(len(tokenizer) / 2)
        predictions = [x * boundary + boundary for x in normalized_predictions]
        for i in range(len(predictions)):
            pred = predictions[i]                                # [[5.0], [6.0], [7.0]]
            pred_tokens = np.concatenate(pred).astype(np.int32)  # [5, 6, 7]
            pred_tokens[pred_tokens == len(tokenizer)] = 0       # if some value equals vocab_size, replace with empty tokens 
            decoded = tokenizer.decode([pred_tokens])
            decoded.dump_midi(f"{base_output_name}_{i}.mid")


gan = GAN(
    GeneratorLSTM,
    LSTMDiscriminator,
    noise_dim                = noise_dim,
    seq_length               = seq_length,
    generator_layers         = num_layers,
    discriminator_layers     = num_layers,
    generator_hidden_dim     = generator_dim,
    discriminator_hidden_dim = discriminator_dim,
)

# Training

## Prepare data 

In [14]:
# Dataset personalizzato
class TokenDataset(Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return torch.tensor(self.data[idx], dtype=torch.float32)

dataset = TokenDataset(all_ids_train_seq)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True, drop_last=True)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

## Start the training

In [None]:
# Training loop
epochs = 10
warmup_epochs = 8
g_steps = 0
steps_each_print = 5

gan.train(
    dataloader       = dataloader, 
    epochs           = epochs, 
    device           = device, 
    warmup_epochs    = warmup_epochs, 
    g_steps          = g_steps, 
    steps_each_print = steps_each_print
)

Starting training with 10 epochs


D Loss: 0.3583, G Loss: 2.2247: 100%|██████████| 241/241 [04:40<00:00,  1.16s/it]


Epoch [1/10] Loss D: 0.2537, Loss G: 2.2565


D Loss: 0.4795, G Loss: 2.1923:  66%|██████▋   | 160/241 [03:06<01:34,  1.17s/it]

In [9]:
n_generations = 5
base_name = "generated"

gan.generate(
    n                 = n_generations,
    tokenizer         = tokenizer, 
    base_output_name  = base_name, 
    device            = device,
)

In [11]:
from IPython import display

_SAMPLING_RATE = 16000

def display_audio(file, seconds=30):
  pm = pretty_midi.PrettyMIDI(file)
  waveform = pm.fluidsynth(fs=_SAMPLING_RATE)
  # Take a sample of the generated waveform to mitigate kernel resets
  waveform_short = waveform[:seconds*_SAMPLING_RATE]
  return display.Audio(waveform_short, rate=_SAMPLING_RATE)

display_audio(f"{base_name}_1.mid")

In [None]:
from IPython.display import FileLink
FileLink(r'generated.mid')