In [None]:
%pip install miditok
%pip install torchtoolkit
%pip install git+https://github.com/huggingface/transformers
%pip install git+https://github.com/huggingface/accelerate
%pip install git+https://github.com/huggingface/evaluate
%pip install torch
%pip install tqdm

In [None]:
import os
import random
import json

from miditok import REMI
from miditok.constants import CHORD_MAPS, ADDITIONAL_TOKENS
from miditok.utils import get_midi_programs
from miditoolkit import MidiFile
from pathlib import Path
from torchtoolkit.data import create_subsets
from transformers import GPT2LMHeadModel, GPT2Config, Trainer, TrainingArguments, GenerationConfig
from evaluate import load as load_metric
from typing import Any, Dict, List
from torch import Tensor, LongTensor, flip, cat, full, argmax, cuda, no_grad
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from tqdm import tqdm
from copy import deepcopy


os.environ['WANDB_DISABLED'] = 'true'
os.environ['CUDA_LAUNCH_BLOCKING'] = '1'
os.environ['TORCH_USE_CUDA_DSA'] = '1'

seed = random.randint(1000, 10000)
device = "cuda:0" if cuda.is_available() else "cpu"

pitch_range = range(21, 109)
additional_tokens = ADDITIONAL_TOKENS
additional_tokens['Chord'] = True
additional_tokens['TimeSignature'] = True
additional_tokens['Rest'] = True
tokenizer = REMI(pitch_range=pitch_range, additional_tokens=additional_tokens)

In [None]:
len(tokenizer)

In [None]:
from glob import glob

""" Uncomment for google colab
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

import sys
sys.path.append('/content/drive/MyDrive/colab/python')

params_path = Path(f'/content/drive/MyDrive/colab/token_params.json')
data_path = Path('/content/drive/MyDrive/colab/').glob(f'ozzy*.json')
"""

from utils.midi_dataset import MIDIDataset, DataCollatorGen

params_path = Path(f'/home/nico/data/ai/models/midi/token_params.json')
base_path = '/home/nico/data/ai/models/midi'
data_paths = glob(f'{base_path}/ozzy*.json')
data_paths += glob(f'{base_path}/Mega*.json')

data_paths

In [None]:
params = tokenizer.load_params(params_path)
midi_dataset = MIDIDataset(
    files_paths=[Path(path) for path in data_paths],
    min_seq_len=24,
    max_seq_len=256
)
subset_train, subset_valid = create_subsets(midi_dataset, [0.3])

len(subset_valid)


In [None]:
cuda.empty_cache()

# Creates model
config = GPT2Config(
    vocab_size=len(tokenizer),
    n_positions=2048,
    n_embd=1024,
    n_layer=16,
    n_head=32,
    n_inner=2048,
    resid_pdrop=.1,
    embd_pdrop=.1,
    attn_pdrop=.1,
    padding_token_id=tokenizer['PAD_None'],
    bos_token_id=tokenizer['BOS_None'],
    eos_token_id=tokenizer['EOS_None']
)

# with no_grad():
#     model = GPT2LMHeadModel(config)
#     model.eval()


In [None]:
metrics = {metric: load_metric(metric) for metric in ["accuracy"]}


def compute_metrics(eval_pred):
    """Computes metrics for pretraining.
    Must use proprocess_logits function that converts logits to predictions (argmax or sampling).

    :param eval_pred: EvalPrediction containing predictions and labels
    :return: metrics
    """
    predictions, labels = eval_pred
    not_pad_mask = labels != -100
    labels, predictions = labels[not_pad_mask], predictions[not_pad_mask]
    computed = metrics["accuracy"].compute(
        predictions=predictions.flatten(), references=labels.flatten())

    return computed


def preprocess_logits(logits: Tensor, _: Tensor) -> Tensor:
    """Preprocesses the logits before accumulating them during evaluation.
    This allows to significantly reduce the memory usage and make the training tractable.
    """
    pred_ids = argmax(logits, dim=-1)  # long dtype
    return pred_ids


training_config = TrainingArguments(
    "runs", False, True, True, False, "steps",
    per_device_train_batch_size=4,
    per_device_eval_batch_size=4,
    eval_steps=100,
    learning_rate=1e-4,
    weight_decay=0.01,
    max_grad_norm=3.0,
    max_steps=1000,
    lr_scheduler_type="cosine_with_restarts",
    warmup_ratio=0.3,
    log_level="debug",
    logging_strategy="steps",
    logging_steps=20,
    save_strategy="steps",
    metric_for_best_model='loss',
    greater_is_better=False,
    save_steps=100,
    save_total_limit=5,
    no_cuda=False,
    seed=seed,
    fp16=True,
    load_best_model_at_end=True,
    label_smoothing_factor=0.,
    optim="adamw_torch",
)

trainer = Trainer(
    model=model,
    args=training_config,
    data_collator=DataCollatorGen(tokenizer["PAD_None"]),
    train_dataset=subset_train,
    eval_dataset=subset_valid,
    compute_metrics=compute_metrics,
    callbacks=None,
    preprocess_logits_for_metrics=preprocess_logits,
)

# Training
train_result = trainer.train()
trainer.save_model()  # Saves the tokenizer too
trainer.log_metrics("train", train_result.metrics)
trainer.save_metrics("train", train_result.metrics)
trainer.save_state()

In [None]:
cuda.empty_cache()

with no_grad():
    model = GPT2LMHeadModel.from_pretrained('./runs/')
    model = model.to(device)


def collate_gen_left(batch: List[Dict[str, LongTensor]]) -> LongTensor:
    # Here the sequences are padded to the left, so that the last token along the time dimension
    # is always the last token of each seq, allowing to efficiently generate by batch
    bos_shape = (1,)
    batch = [flip(cat([full(bos_shape, tokenizer["BOS_None"]),
                  seq["input_ids"]], dim=0), dims=(0,)) for seq in batch]
    batch = pad_sequence(batch, batch_first=True,
                         padding_value=tokenizer["PAD_None"])  # (N,T) or (N,T,Z)
    batch = flip(batch, dims=(1,)).long()
    return batch  # (N,T)


generation_config = GenerationConfig(
    max_new_tokens=512,  # extends samples by 512 tokens
    num_beams=1,        # no beam search
    do_sample=True,     # but sample instead
    temperature=0.9,
    top_k=15,
    top_p=0.95,
    epsilon_cutoff=3e-4,
    eta_cutoff=1e-3,
    pad_token_id=config.padding_token_id,
)

(gen_results_path := Path('gen_res')).mkdir(parents=True, exist_ok=True)


def rec_gen(tokens):
    global model
    global generation_config

    res = model.generate(LongTensor([tokens]).to(model.device),
                         generation_config=generation_config)

    out = res[0].cpu().numpy().tolist()
    new_tokens = out[len(tokens)-1:]

    print(f'Generated {len(new_tokens)} new tokens.')

    return new_tokens


max_iter = 2
iter_count = 0
init_size = 256

with open(f'{base_path}/ozzy_osbourne-dreamer.json') as tokens_file:
    ids = json.load(tokens_file)['ids']
    tokens = ids[0][:init_size]  # 1 channel only

    while iter_count < max_iter:
        block_size = init_size if iter_count == 0 else int(init_size / 128)
        tokens += rec_gen(tokens[-block_size:])

        iter_count += 1


In [None]:
print('Generating the midi...')

midi = tokenizer.tokens_to_midi(LongTensor([tokens]).cpu(), time_division=384)
# midi.instruments[0].name = f'Continuation of original sample ({len(generated)} tokens)'
# midi.instruments[1].name = f'Original sample ({len(prompt)} tokens)'
# midi.instruments[2].name = f'Original sample and continuation'
midi.dump(gen_results_path / 'full.mid')
# tokenizer.save_tokens(tokens, gen_results_path / f'{count}.json')

type(midi)
