In [23]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
import math
import random
import os
import json
import torch.nn.functional as F
from torch.distributions import Categorical, Normal

In [2]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
torch.backends.cudnn.benchmark = True
torch.backends.cuda.matmul.allow_tf32 = True

In [3]:
class MelodyDataset(Dataset):
    def __init__(self, sequences, seq_len=32):
        self.sequences = sequences
        self.seq_len = seq_len
        
    def __len__(self):
        return len(self.sequences)
    
    def __getitem__(self, idx):
        seq = self.sequences[idx]
        if len(seq) > self.seq_len:
            start = random.randint(0, len(seq) - self.seq_len)
            seq = seq[start:start + self.seq_len]
        elif len(seq) < self.seq_len:
            # Pad with zeros
            padding = np.zeros((self.seq_len - len(seq), 4))
            seq = np.vstack([seq, padding])
        
        x = torch.FloatTensor(seq[:-1])  # Input sequence
        y = torch.FloatTensor(seq[1:])   # Target sequence (shifted by 1)
        return x, y

In [4]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1).float()
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * 
                           -(math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe.unsqueeze(0))
        
    def forward(self, x):
        return x + self.pe[:, :x.size(1)]

In [5]:
class MelodyTransformer(nn.Module):
    def __init__(self, input_dim=4, d_model=256, nhead=8, num_layers=6, seq_len=32):
        super().__init__()
        self.d_model = d_model
        self.input_proj = nn.Linear(input_dim, d_model)
        self.pos_encoding = PositionalEncoding(d_model)
        
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model, nhead=nhead, dim_feedforward=d_model*4,
            dropout=0.1, activation='gelu', batch_first=True
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers)
        self.output_proj = nn.Linear(d_model, input_dim)
        
        # Initialize weights
        self.apply(self._init_weights)
        
    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
    
    def forward(self, x, mask=None):
        x = self.input_proj(x) * math.sqrt(self.d_model)
        x = self.pos_encoding(x)
        x = self.transformer(x, mask=mask)
        return self.output_proj(x)

In [6]:
def save_data_distribution_plots(instrument, data):
    """Save visualizations of data distribution as individual PNGs."""
    save_dir = os.path.join("training_plots", str(instrument))
    os.makedirs(save_dir, exist_ok=True)

    plots = [
        {
            "data": data[:, 0],
            "kind": "hist",
            "title": "Note Distribution",
            "xlabel": "Note Value",
            "filename": "note_distribution.png"
        },
        {
            "data": data[:, 1],
            "kind": "hist",
            "title": "Start Time Distribution",
            "xlabel": "Start Time",
            "filename": "start_time_distribution.png"
        },
        {
            "data": data[:, 2],
            "kind": "hist",
            "title": "End Time Distribution",
            "xlabel": "End Time",
            "filename": "end_time_distribution.png"
        },
        {
            "data": data[:, 3],
            "kind": "hist",
            "title": "Velocity Distribution",
            "xlabel": "Velocity",
            "filename": "velocity_distribution.png"
        },
        {
            "data": data[:, 2] - data[:, 1],
            "kind": "hist",
            "title": "Note Duration Distribution",
            "xlabel": "Duration",
            "filename": "note_duration_distribution.png"
        },
        {
            "data": (data[:, 0], data[:, 3]),
            "kind": "scatter",
            "title": "Note vs Velocity",
            "xlabel": "Note",
            "ylabel": "Velocity",
            "filename": "note_vs_velocity.png"
        },
    ]

    for plot in plots:
        plt.figure(figsize=(6, 4))
        if plot["kind"] == "hist":
            plt.hist(plot["data"], bins=50, alpha=0.7)
        elif plot["kind"] == "scatter":
            x, y = plot["data"]
            plt.scatter(x, y, alpha=0.5, s=1)
        plt.title(plot["title"])
        plt.xlabel(plot["xlabel"])
        if "ylabel" in plot:
            plt.ylabel(plot["ylabel"])
        plt.tight_layout()
        plt.savefig(os.path.join(save_dir, plot["filename"]))
        plt.close()

In [7]:
def preprocess_data(raw_data, seq_len=32):
    """Preprocess raw melody data"""
    print(f"Raw data shape: {np.array(raw_data).shape}")
    
    # Convert to numpy array
    data = np.array(raw_data, dtype=np.float32)
    
    # Normalize features
    scaler = StandardScaler()
    data_scaled = scaler.fit_transform(data)
    
    # Create sequences from continuous melody data
    sequences = []
    step_size = seq_len // 4  # Overlap sequences for more training data
    
    for i in range(0, len(data_scaled) - seq_len + 1, step_size):
        seq = data_scaled[i:i + seq_len]
        sequences.append(seq)
    
    print(f"Created {len(sequences)} sequences of length {seq_len}")
    
    save_data_distribution_plots(instrument, data)
    
    return sequences, scaler

In [8]:
def musical_loss(pred, target, note_weight=2.0, time_weight=1.0, velocity_weight=0.5):
    """Custom loss function emphasizing musical coherence"""
    # Separate components
    note_loss = nn.functional.mse_loss(pred[:, :, 0], target[:, :, 0])
    start_loss = nn.functional.mse_loss(pred[:, :, 1], target[:, :, 1])
    end_loss = nn.functional.mse_loss(pred[:, :, 2], target[:, :, 2])
    velocity_loss = nn.functional.mse_loss(pred[:, :, 3], target[:, :, 3])
    
    # Duration consistency loss
    pred_duration = pred[:, :, 2] - pred[:, :, 1]
    target_duration = target[:, :, 2] - target[:, :, 1]
    duration_loss = nn.functional.mse_loss(pred_duration, target_duration)
    
    return (note_weight * note_loss + 
            time_weight * (start_loss + end_loss) + 
            velocity_weight * velocity_loss + 
            duration_loss)

In [9]:
def calculate_music_metrics(pred, target):
    """Calculate music-specific metrics"""
    with torch.no_grad():
        # Note accuracy (within semitone)
        note_acc = (torch.abs(pred[:, :, 0] - target[:, :, 0]) < 0.5).float().mean()
        
        # Timing accuracy (within 0.1 time units)
        time_acc = (torch.abs(pred[:, :, 1:3] - target[:, :, 1:3]) < 0.1).float().mean()
        
        # Velocity accuracy (within 10 units)
        vel_acc = (torch.abs(pred[:, :, 3] - target[:, :, 3]) < 0.1).float().mean()
        
        # Melodic smoothness (penalize large jumps)
        pred_intervals = torch.abs(pred[:, 1:, 0] - pred[:, :-1, 0])
        smoothness = torch.exp(-pred_intervals.mean())
        
    return {
        'note_accuracy': note_acc.item(),
        'timing_accuracy': time_acc.item(), 
        'velocity_accuracy': vel_acc.item(),
        'melodic_smoothness': smoothness.item()
    }

In [10]:
def save_training_plots(instrument, losses, metrics_history):
    # Create directory if it doesn't exist
    base_dir = os.path.join("training_plots", str(instrument))
    os.makedirs(base_dir, exist_ok=True)

    # Define plot configurations
    plots = [
        {
            "data": [losses],
            "title": "Training Loss",
            "ylabel": "Loss",
            "filename": "training_loss.png"
        },
        {
            "data": [metrics_history["note_acc"]],
            "title": "Note Accuracy",
            "ylabel": "Accuracy",
            "filename": "note_accuracy.png"
        },
        {
            "data": [metrics_history["time_acc"]],
            "title": "Timing Accuracy",
            "ylabel": "Accuracy",
            "filename": "timing_accuracy.png"
        },
        {
            "data": [metrics_history["vel_acc"]],
            "title": "Velocity Accuracy",
            "ylabel": "Accuracy",
            "filename": "velocity_accuracy.png"
        },
        {
            "data": [metrics_history["smoothness"]],
            "title": "Melodic Smoothness",
            "ylabel": "Smoothness",
            "filename": "melodic_smoothness.png"
        },
        {
            "data": [losses, [x * 10 for x in metrics_history["note_acc"]]],
            "labels": ["Loss", "Note Acc (x10)"],
            "title": "Loss vs Note Accuracy",
            "ylabel": None,
            "filename": "loss_vs_note_accuracy.png"
        },
    ]

    # Plot and save each figure
    for p in plots:
        plt.figure(figsize=(6, 4))
        for idx, d in enumerate(p["data"]):
            if "labels" in p:
                plt.plot(d, label=p["labels"][idx])
            else:
                plt.plot(d)
        plt.title(p["title"])
        plt.xlabel("Epoch")
        if p["ylabel"]:
            plt.ylabel(p["ylabel"])
        if "labels" in p:
            plt.legend()
        plt.tight_layout()
        save_path = os.path.join(base_dir, p["filename"])
        plt.savefig(save_path)
        plt.close()

In [11]:
def train_model(instrument, model, train_loader, epochs=100, lr=1e-4):
    """Train the transformer model"""
    optimizer = optim.AdamW(model.parameters(), lr=lr, weight_decay=0.01)
    scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, epochs)
    
    model.train()
    losses = []
    metrics_history = {'note_acc': [], 'time_acc': [], 'vel_acc': [], 'smoothness': []}
    
    for epoch in range(epochs):
        epoch_loss = 0
        epoch_metrics = {'note_accuracy': 0, 'timing_accuracy': 0, 
                        'velocity_accuracy': 0, 'melodic_smoothness': 0}
        
        for batch_x, batch_y in train_loader:
            batch_x, batch_y = batch_x.to(device), batch_y.to(device)
            
            optimizer.zero_grad()
            pred = model(batch_x)
            loss = musical_loss(pred, batch_y)
            loss.backward()
            
            # Gradient clipping
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()
            
            epoch_loss += loss.item()
            
            # Calculate metrics
            batch_metrics = calculate_music_metrics(pred, batch_y)
            for key in epoch_metrics:
                epoch_metrics[key] += batch_metrics[key]
        
        scheduler.step()
        
        # Average metrics
        num_batches = len(train_loader)
        epoch_loss /= num_batches
        for key in epoch_metrics:
            epoch_metrics[key] /= num_batches
            
        losses.append(epoch_loss)
        metrics_history['note_acc'].append(epoch_metrics['note_accuracy'])
        metrics_history['time_acc'].append(epoch_metrics['timing_accuracy'])
        metrics_history['vel_acc'].append(epoch_metrics['velocity_accuracy'])
        metrics_history['smoothness'].append(epoch_metrics['melodic_smoothness'])
        
        if epoch % 10 == 0:
            print(f'Epoch {epoch}: Loss={epoch_loss:.4f}, '
                  f'Note Acc={epoch_metrics["note_accuracy"]:.3f}, '
                  f'Smoothness={epoch_metrics["melodic_smoothness"]:.3f}')
    
    save_training_plots(instrument, losses, metrics_history)
    
    return model

In [26]:
def top_k_top_p_filtering(logits, top_k=0, top_p=1.0):
    """Filter logits using top-k and/or nucleus (top-p) sampling"""
    top_k = min(top_k, logits.size(-1)) if top_k > 0 else logits.size(-1)
    
    # Top-k filtering
    if top_k > 0:
        values, _ = torch.topk(logits, top_k)
        min_values = values[..., -1, None]
        logits = torch.where(logits < min_values, torch.full_like(logits, float('-inf')), logits)

    # Top-p filtering
    if top_p < 1.0:
        sorted_logits, sorted_indices = torch.sort(logits, descending=True)
        cumulative_probs = torch.cumsum(torch.softmax(sorted_logits, dim=-1), dim=-1)

        sorted_mask = cumulative_probs > top_p
        # Shift mask right to keep at least one
        sorted_mask[..., 1:] = sorted_mask[..., :-1].clone()
        sorted_mask[..., 0] = 0

        indices_to_remove = sorted_indices[sorted_mask]
        logits[indices_to_remove] = float('-inf')

    return logits

In [27]:
def generate_melody(model, scaler, seed_sequence=None, length=64, temperature=1.0, top_k=10, top_p=0.9):
    model.eval()

    if seed_sequence is None:
        generated = torch.randn(1, 1, 4).to(device)
    else:
        seed_normalized = scaler.transform(np.array(seed_sequence))
        generated = torch.FloatTensor(seed_normalized).unsqueeze(0).to(device)

    with torch.no_grad():
        for _ in range(length - generated.size(1)):
            pred = model(generated)  # shape: (1, T, 4)
            next_pred = pred[:, -1, :] / temperature  # shape: (1, 4)

            # ---- Discrete Sampling: pitch and velocity ----
            pitch_logits = next_pred[0, 0].unsqueeze(0).repeat(128)  # expand to 0-127 range
            velocity_logits = next_pred[0, 3].unsqueeze(0).repeat(128)

            pitch_logits = top_k_top_p_filtering(pitch_logits, top_k=top_k, top_p=top_p)
            velocity_logits = top_k_top_p_filtering(velocity_logits, top_k=top_k, top_p=top_p)

            pitch_probs = torch.softmax(pitch_logits, dim=-1)
            velocity_probs = torch.softmax(velocity_logits, dim=-1)

            pitch_sampled = torch.multinomial(pitch_probs, 1).item()
            velocity_sampled = torch.multinomial(velocity_probs, 1).item()

            # ---- Continuous Sampling: start and end ----
            start = next_pred[0, 1].item()
            end = next_pred[0, 2].item()

            # ---- Build and append ----
            next_note = torch.tensor([[[
                pitch_sampled,
                start,
                end,
                velocity_sampled
            ]]], dtype=torch.float32).to(device)

            generated = torch.cat([generated, next_note], dim=1)

    # Convert back to original scale
    generated_np = generated.squeeze(0).cpu().numpy()
    generated_original = scaler.inverse_transform(generated_np).tolist()

    formatted_melodies = []
    for melody in generated_original:
        formatted_melody = [
            int(round(melody[0])),
            round(melody[1], 2),
            round(melody[2], 2),
            int(round(melody[3]))
        ]
        formatted_melodies.append(formatted_melody)

    return formatted_melodies


In [13]:
def save_model(model, instrument):
    """Save the trained model to disk."""
    save_dir = "saved_models"
    os.makedirs(save_dir, exist_ok=True)
    save_path = os.path.join(save_dir, f"{instrument}.pth")
    torch.save(model.state_dict(), save_path)
    print(f"Model saved to {save_path}")

In [14]:
def train_melody_generator(instrument, raw_data, seq_len=32, epochs=100):
    """Complete training pipeline"""
    print("Starting melody generation training...")
    
    # Preprocess data
    sequences, scaler = preprocess_data(raw_data, seq_len)
    
    # Create dataset and dataloader
    dataset = MelodyDataset(sequences, seq_len)
    train_loader = DataLoader(dataset, batch_size=32, shuffle=True, 
                              num_workers=4, pin_memory=True)
    
    # Create model
    model = MelodyTransformer(seq_len=seq_len).to(device)
    print(f"Model parameters: {sum(p.numel() for p in model.parameters()):,}")
    
    # Train model
    model = train_model(instrument, model, train_loader, epochs)

    # Save model
    save_model(model, instrument)
    
    return model, scaler

In [15]:
def load_data(instrument):
    with open(f'raw_data/{instrument}.json', 'r') as file:
        data = json.load(file)
        if isinstance(data, list):
            return data
        raise ValueError("JSON does not contain a top-level array.")

In [16]:
def save_data(array, filename=None):
    if not isinstance(array, list):
        raise ValueError("Input must be a list (array)")

    os.makedirs('generated_data', exist_ok=True)

    if filename is None:
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        filename = f"data_{timestamp}.json"

    path = os.path.join('generated_data', f'{filename}.json')

    with open(path, 'w') as file:
        json.dump(array, file, indent=2)

    print(f"Data saved to {path}")

In [17]:
instruments = [24, 16, 33, 61, 2, 38, 81, 89, 0, 90]
for instrument in instruments:
    raw_data = load_data(instrument)
    model, scaler = train_melody_generator(instrument, raw_data, seq_len=1024, epochs=100)

Starting melody generation training...
Raw data shape: (17111, 4)
Created 63 sequences of length 1024
Model parameters: 4,740,868
Epoch 0: Loss=4.8894, Note Acc=0.492, Smoothness=0.849
Epoch 10: Loss=2.2033, Note Acc=0.521, Smoothness=0.917
Epoch 20: Loss=2.0717, Note Acc=0.548, Smoothness=0.880
Epoch 30: Loss=2.0061, Note Acc=0.546, Smoothness=0.853
Epoch 40: Loss=1.9529, Note Acc=0.552, Smoothness=0.840
Epoch 50: Loss=1.9087, Note Acc=0.560, Smoothness=0.827
Epoch 60: Loss=1.8762, Note Acc=0.564, Smoothness=0.822
Epoch 70: Loss=1.8605, Note Acc=0.566, Smoothness=0.813
Epoch 80: Loss=1.8478, Note Acc=0.571, Smoothness=0.811
Epoch 90: Loss=1.8471, Note Acc=0.569, Smoothness=0.809
Model saved to saved_models/24.pth
Starting melody generation training...
Raw data shape: (5362, 4)
Created 17 sequences of length 1024
Model parameters: 4,740,868
Epoch 0: Loss=3.7959, Note Acc=0.451, Smoothness=0.920
Epoch 10: Loss=0.6335, Note Acc=0.737, Smoothness=0.777
Epoch 20: Loss=0.4063, Note Acc=0.86

In [28]:
instruments = [24, 16, 33, 61, 2, 38, 81, 89, 0, 90]
for instrument in instruments:
    model = MelodyTransformer(seq_len=32)
    model.load_state_dict(torch.load(f"saved_models/{instrument}.pth"))
    model.to(device)
    generated_melody = generate_melody(model, scaler)
    save_data(generated_melody, str(instrument))

Data saved to generated_data/24.json
Data saved to generated_data/16.json
Data saved to generated_data/33.json
Data saved to generated_data/61.json
Data saved to generated_data/2.json
Data saved to generated_data/38.json
Data saved to generated_data/81.json
Data saved to generated_data/89.json
Data saved to generated_data/0.json
Data saved to generated_data/90.json
