In [1]:
!pip install pretty_midi
import logging
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.DEBUG)
import requests
import os
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
import pretty_midi
import glob
import requests
import zipfile
import os
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR
from torch.utils.data import Dataset, DataLoader
from math import ceil
import random
import glob
import numpy as np
import pretty_midi


velo_inc = 5
dim = 128*2 + 100 + int(ceil(126/velo_inc))  
class Event:
    def __init__(self, s, t, v):
        self.time = s
        self.type = t
        self.val = v
    def encode(self):
        if self.type == 'down':
            return self.val
        elif self.type == 'up':
            return 128 + self.val
        elif self.type == 'shift':
            return 128*2 + self.val
        else:
            return 128*2 + 100 + self.val
    @staticmethod
    def decode(code):
        if code < 128:
            return 'down', code
        elif code < 128*2:
            return 'up', code - 128
        elif code < 128*2 + 100:
            return 'shift', (code - 128*2)/100 + 0.01
        else:
            return 'velo', (code - 128*2 - 100)*velo_inc + int(velo_inc/2)
        
def piano2seq(midi):
    '''
    Convert a midi object to a sequence of events
    :param midi: midi object or the file name of the midi file
    :return: numpy array that contains the sequence of events
    '''
    if type(midi) is str:
        midi = pretty_midi.PrettyMIDI(midi)
    piano = midi.instruments[0]
    velo = 0
    q = []
    for note in piano.notes:
        if note.velocity != velo:
            q.append(Event(note.start, 'velo', int(min(note.velocity, 125)/velo_inc)))
            velo = note.velocity
        q.append(Event(note.start, 'down', note.pitch))
        q.append(Event(note.end, 'up', note.pitch))
    t = 0
    qfull = []
    for e in sorted(q, key=lambda x: x.time):
        d = e.time - t
        while d > 0.01:
            dd = min(d, 1) - 0.01
            qfull.append(Event(t, 'shift', int(dd*100)))
            d = d - dd
        t = e.time
        qfull.append(e)
    seq = np.zeros((len(qfull),), dtype=np.int32)
    for i, e in enumerate(qfull):
        seq[i] = e.encode()
    assert np.max(seq) < dim
    return seq

def seq2piano(seq):
    '''
    Convert a sequence of events to midi
    :param seq: numpy array that contains the sequence
    :return: midi object
    '''
    midi = pretty_midi.PrettyMIDI()
    piano = pretty_midi.Instrument(program=0, is_drum=False, name='piano')
    midi.instruments.append(piano)

    if seq.ndim > 1:
        seq = np.argmax(seq, axis=-1)
    inote = {}
    velo = 40
    time = 0.
    for e in seq:
        t, v = Event.decode(e)
        if t == 'shift':
            time += v
        elif t == 'velo':
            velo = v
            for n in inote.values():
                if n[2] == time:
                    n[0] = v
        elif t == 'down':
            n = inote.get(v, None)
            if n is not None:
                logging.debug('consecutive downs for pitch %d at time %d and %d' % (v, n[2], time))
            else:
                inote[v]  = [velo, v, time, -1]
        else:
            n = inote.get(v, None)
            if n is not None:
                n[-1] = time
                if n[-1] > n[-2]:
                    piano.notes.append(pretty_midi.Note(*n))
                else:
                    logging.debug('note with non-positive duration for pitch %d at time %d' % (n[1], n[2]))
                del inote[v]
            else:
                logging.debug('up without down for pitch %d at time %d' % (v, time))
    # clean out the incomplete note buffer, assuming these note end at last
    for n in inote.values():
        n[-1] = time
        if n[-1] > n[-2]:
            piano.notes.append(pretty_midi.Note(*n))
    return midi

def segment(seq, maxlen=150):
    assert len(seq) > maxlen
    inc = int(maxlen/2)
    i = inc
    t = np.ones((maxlen+1,), dtype=np.int32)
    t[0] = (128*2+1)
    t[1:] = seq[:maxlen]
    s = [t]
    while i+maxlen+1 < len(seq):
        s.append(seq[i:i+maxlen+1])
        i += inc
    return np.stack(s, axis=0)

def process_midi_seq(all_midis=None, datadir='data', n=10000, maxlen=150):
    '''
    Process a list of midis, convert them to sequences and segment sequences into segments of length max_len
    :param all_midis: the list of midis. If None, midis will be loaded from files
    :param datadir: data directory, assume under this directory, we have the "maestro-v1.0.0" midi directory
    :param n: # of segments to return
    :param maxlen: the length of the segments
    :return: numpy array of shape [n', max_len] for the segments. n' tries to be close to n but may not be exactly n.
    '''
    if all_midis is None:
        all_midis = glob.glob(datadir+'/maestro-v1.0.0/**/*.midi')
        random.seed()    # for debug purpose, you can pass a fix number when calling seed()
        random.shuffle(all_midis)
    data = []
    k = 0
    for m in all_midis:
        seq = segment(piano2seq(m), maxlen)
        data.append(seq)
        k += len(seq)
        if k > n:
            break
    return np.vstack(data)

def random_piano(n=100):
    '''
    Generate random piano note
    :param n: # of notes to be generated
    :return: midi object with the notes
    '''
    midi = pretty_midi.PrettyMIDI()
    piano = pretty_midi.Instrument(program=0, is_drum=False, name='piano')
    midi.instruments.append(piano)

    pitchs = np.random.choice(128, size=n)
    velos = np.random.choice(np.arange(10, 80), size=n)
    durations = np.abs(np.random.randn(n) + 1)
    intervs = np.abs(0.2*np.random.randn(n) + 0.3)
    time = 0.5
    for i in range(n):
        piano.notes.append(pretty_midi.Note(velos[i], pitchs[i], time, time+durations[i]))
        time += intervs[i]
    return midi





# Task1
This code segment demonstrates the application of a deep learning model, Critic, to solve the binary classification problem of musical sequences. The goal is to differentiate between "good music" and "bad music", essentially judging the quality of music based on its sequential data.

## Model Design: 

Implemented a LSTM (Long Short-Term Memory) neural network model named Critic for processing sequence data. The model includes an LSTM layer and a fully connected layer, along with a BCEWithLogitsLoss loss function for binary classification. 

Data Preparation: Transformed good and bad music data into tensors and assigned corresponding labels (1 for good music, 0 for bad music). The dataset was then split into training, validation, and test sets. 

Training Process: The model was trained on the training set, including forward propagation, loss calculation, backpropagation, and parameter updates. 

Validation Process: Evaluated the model's performance on the validation set, calculating validation loss and accuracy. 

Testing Process: Conducted a final performance assessment on the test set, using the model to predict each sequence and calculating the overall accuracy. Results Obtained:

On the test set, the model successfully made correct predictions for 1535 out of 2029 samples, achieving a classification accuracy of 75.65%.

In [2]:
# Import midi data
data_url = "https://storage.googleapis.com/magentadata/datasets/maestro/v1.0.0/maestro-v1.0.0-midi.zip"
data_path = "maestro-v1.0.0-midi.zip"
response = requests.get(data_url, stream=True)
with open(data_path, 'wb') as f:
    for chunk in response.iter_content(chunk_size=8192):
        f.write(chunk)
        
with zipfile.ZipFile(data_path, 'r') as zip_ref:
    zip_ref.extractall()
    
os.remove(data_path)
good_sequences = glob.glob('maestro-v1.0.0/**/*.midi', recursive=True)

class Critic(nn.Module):
    """
    This code defines a class named Critic, which is a neural network model for binary classification tasks. 
    """
    def __init__(self, class_weights=None):
        super(Critic, self).__init__()
        self.lstm = nn.LSTM(input_size=151, hidden_size=100, num_layers=3,
                    batch_first=True, dropout=0.5)
        self.fc = nn.Linear(100, 1)  
        self.criterion = nn.BCEWithLogitsLoss(pos_weight=class_weights)
        self.optimizer = optim.Adam(self.parameters(), lr=0.0001)
    def forward(self, x):
        """
        This method defines how the input data x passes through the network.
        The input is first fed through the LSTM layer. The output of the LSTM layer is then processed to only keep 
        the last time step's output.
        This output is then passed through the fully connected layer to produce the final output.
        """
        out, _ = self.lstm(x)
        if len(out.shape) == 3:
            out = out[:, -1, :]
        out = self.fc(out)
        return out 
    def train_model1(self, x, label):
        self.optimizer.zero_grad()
        outputs = self.forward(x)
        loss = self.criterion(outputs, label)
        loss.backward()
        self.optimizer.step()
        return loss.item()
    def score(self, x):
        with torch.no_grad():
            return torch.sigmoid(self.forward(x)).cpu().numpy() 


DEBUG:Starting new HTTPS connection (1): storage.googleapis.com:443
DEBUG:https://storage.googleapis.com:443 "GET /magentadata/datasets/maestro/v1.0.0/maestro-v1.0.0-midi.zip HTTP/1.1" 200 46579421


In [4]:
good_music = process_midi_seq(good_sequences)
# Generate bad music and convert them to sequences
bad_sequences = []
midi = random_piano(40000)  
bad_sequences = process_midi_seq([midi])  # This function should convert midi to a sequence of events of shape (51,)

In [9]:
"""
This code segment is preparing and splitting a dataset for training, validation, and testing.
Labels for the sequences: 0 for bad music (bad_labels) and 1 for good music (good_labels).
Allocates 70% of the dataset for training (train_size), and the remaining 30% is evenly divided between validation and testing.
"""
bad_labels = torch.zeros(len(bad_sequences), 1)
good_labels = torch.ones(len(good_music), 1)
bad_music_tensor = torch.tensor(bad_sequences)
good_music_tensor = torch.tensor(good_music)
all_data = torch.cat([bad_music_tensor, good_music_tensor], 0)
all_labels = torch.cat([bad_labels, good_labels], dim=0)
dataset = TensorDataset(all_data, all_labels)

from torch.utils.data import random_split
train_size = int(0.7 * len(dataset))  # 70% of the dataset for training
remaining_size = len(dataset) - train_size
valid_size = int(0.5 * remaining_size)  # Split the remaining 30% equally for validation and test
test_size = remaining_size - valid_size
# Split the dataset
train_dataset, remaining_dataset = random_split(dataset, [train_size, remaining_size])
valid_dataset, test_dataset = random_split(remaining_dataset, [valid_size, test_size])
# Create dataloaders for each set
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=64, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

In [20]:
"""
The training and validation process of Critic 
"""
model1 = Critic()
num_epochs = 100

for epoch in range(num_epochs):
    model1.train()  # Set the model to training mode
    total_train_loss = 0.0
    # Training
    for batch_data, batch_labels in train_loader:
        batch_data = batch_data.float()
        batch_labels = batch_labels.float()
        loss = model1.train_model1(batch_data, batch_labels)
        total_train_loss += loss
    average_train_loss = total_train_loss / len(train_loader)
    # Validation
    model1.eval()  # Set the model to evaluation mode
    total_val_loss = 0.0
    total_val_correct = 0
    with torch.no_grad():
        for batch_data, batch_labels in valid_loader:
            batch_data = batch_data.float()
            batch_labels = batch_labels.float()
            outputs = model1(batch_data)
            val_loss = model1.criterion(outputs, batch_labels)
            total_val_loss += val_loss.item()
            # Calculate accuracy
            predicted = (outputs > 0.5).float()
            total_val_correct += (predicted == batch_labels).sum().item()
    average_val_loss = total_val_loss / len(valid_loader)
    val_accuracy = total_val_correct / (len(valid_loader.dataset))
    if epoch == num_epochs - 1:
        print(f"Epoch {epoch+1}, Training Loss: {average_train_loss:.4f}, Validation Loss: {average_val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}")

Epoch 100, Training Loss: 0.5504, Validation Loss: 0.5547, Validation Accuracy: 0.7520


In [21]:
correct = 0
total = 0
# Iterate over batches in the test_loader
for batch_data, batch_labels in test_loader:
    batch_data = batch_data.float()
    scores = model1.forward(batch_data)
    # Convert scores to binary predictions (good or bad)
    predicted = (scores > 0.8).float().squeeze()
    # Convert labels to float for comparison (if they are not already)
    batch_labels = batch_labels.float().squeeze()
    # Count the number of correct predictions
    correct += (predicted == batch_labels).sum().item()
    total += batch_labels.size(0)

# Print values of correct and total
print("Correct predictions:", correct)
print("Total predictions:", total)
# Compute accuracy
accuracy = correct / total
print(f"The classification accuracy on the test set is: {accuracy*100:.2f}%")


Correct predictions: 1535
Total predictions: 2029
The classification accuracy on the test set is: 75.65%


# Task2:

The code for the Composer model aims to solve the problem of automatic music generation. 
The model is designed to generate musical sequences based on input data, representing a creative application of deep learning in the field of music.

## Model Architecture: 

Implemented a Composer class, an LSTM-based neural network model, which is well-suited for sequential data like music. 

The model includes: A multi-layer LSTM network for processing sequences. A fully connected layer for output generation. The forward pass method for defining the data flow through the model. A method for initializing the LSTM's hidden and cell states. A compose method for generating music sequences based on a starting note.

In [23]:
import torch.nn as nn
import torch.nn.functional as F
"""
Creating and processing a batch of data from the good_music_tensor using PyTorch's DataLoader.
"""
good_music_tensor = torch.tensor(good_music, dtype=torch.long)
# Create Dataset and DataLoader
data_loader = DataLoader(good_music_tensor, batch_size=64, shuffle=True)
# Iterate over the DataLoader to get a single batch
for batch in data_loader:
    x= batch  
    print(x.shape)  
    break 

torch.Size([64, 151])


In [31]:
"""
This code defines a neural network model named Composer, which is designed for generating musical sequences. 
The model is built using PyTorch.
"""
def one_hot_encode(note, num_notes):
    """
     converts a musical note into a one-hot encoded tensor. 
     This is used for representing discrete elements (like notes) in a format suitable for neural network processing.
    """
    tensor = torch.zeros(num_notes, dtype=torch.float)
    tensor[note] = 1.0
    return tensor

class Composer(nn.Module):
    """
    this Composer model is a neural network designed to generate music sequences. 
    It uses LSTM layers to capture the temporal dependencies in music data and predicts one note at a time, 
    building a sequence iteratively.
    """
    def __init__(self, input_dim, lstm_units, num_layers=2):
        super(Composer, self).__init__()
        self.input_dim = input_dim
        self.lstm_units = lstm_units
        self.num_layers = num_layers 
        self.lstm = nn.LSTM(input_dim, lstm_units, num_layers, batch_first=True)
        self.fc = nn.Linear(lstm_units, input_dim)

    def forward(self, x, prev_state):
      state_h, state_c = prev_state
      output, state = self.lstm(x, prev_state)
      logits = self.fc(output)
      return logits, state
    
    def init_state(self, batch_size):
        # Initialize the hidden and cell state to zeros
        return (torch.zeros(self.num_layers, batch_size, self.lstm_units),
                torch.zeros(self.num_layers, batch_size, self.lstm_units))
    
    def compose(self, start_sequence, length, temperature=1.0):
        # Ensure that start_sequence is a torch tensor
        if not isinstance(start_sequence, torch.Tensor):
            start_sequence = torch.tensor(start_sequence, dtype=torch.long)
        # Initialize the hidden state
        state_h, state_c = self.init_state(1)  # Batch size of 1 for generation
        # Initialize the sequence with the start_sequence
        current_input = one_hot_encode(start_sequence[0], self.input_dim).unsqueeze(0).unsqueeze(0)
        generated_sequence = start_sequence.tolist()
        # Generate the sequence
        
        for _ in range(length):
            # Forward pass through LSTM
            output, (state_h, state_c) = self.forward(current_input, (state_h, state_c))
            # Get the last output (next note prediction)
            last_output = output[:, -1, :]
            # Apply temperature scaling and softmax to generate probabilities
            probabilities = F.softmax(last_output / temperature, dim=1).squeeze()
            # Sample from the probability distribution to get the next note
            next_note = torch.multinomial(probabilities, 1).item()
            # Append the predicted note to the sequence
            generated_sequence.append(next_note)
            next_input = one_hot_encode(next_note, self.input_dim).unsqueeze(0).unsqueeze(0)
            current_input = torch.cat((current_input[:, 1:], next_input), 1)
        return generated_sequence[1:]

In [32]:
# Hyperparameters
batch_size = 64
num_epochs = 5
input_dim = good_music.max() + 1  

# Convert good_music to a tensor and create the dataset
good_music_tensor = torch.from_numpy(good_music).long()
sequences = good_music_tensor[:, :-1]  
targets = good_music_tensor[:, 1:]  
dataset = TensorDataset(sequences, targets)
data_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# Define the Composer model
composer = Composer(input_dim=good_music_tensor.max() + 1, lstm_units=512, num_layers=2)

# Loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(composer.parameters(), lr=0.001)

In [38]:
# Training loop
num_epochs = 50
for epoch in range(num_epochs):
    state_h, state_c = composer.init_state(batch_size)
    total_loss = 0
    num_batches = 0

    for batch, (x, y) in enumerate(data_loader):
        optimizer.zero_grad()
        current_batch_size = x.size(0)

        # Initialize the hidden state if it's None or if the batch size has changed (last batch case)
        if state_h is None or state_h.size(1) != current_batch_size:
            state_h, state_c = composer.init_state(current_batch_size)

        x_one_hot = nn.functional.one_hot(x, num_classes=composer.input_dim).float()
        y_pred, (state_h, state_c) = composer(x_one_hot, (state_h, state_c))

        # Flatten the output and targets
        loss = criterion(y_pred.transpose(1, 2), y)

        state_h = state_h.detach()
        state_c = state_c.detach()

        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        num_batches += 1

    if epoch == num_epochs - 1:
        average_loss = total_loss / num_batches
        print(f'Epoch {epoch}, Average Loss: {average_loss:.4f}')

Epoch 49, Average Loss: 5.9480


In [None]:
# Task3

The code generates a sequence of music starting with a random note. 
This note is selected within the range of the maximum value in good_music_tensor. 
Each generated sequence by the Composer is transformed into a tensor format suitable for the Critic model. 
The Critic model then evaluates each generated sequence, providing a score that reflects the quality of the sequence as perceived by the model.

Calculated the average score of all generated sequences, which was 0.5244389.

In [39]:
composer = Composer(input_dim=good_music_tensor.max() + 1, lstm_units=512, num_layers=2)
composer.eval()  
model1 = Critic()
model1.eval()  
generated_sequences = []
scores = []
total_score = 0
for _ in range(50):
    start_note = random.randint(1, good_music_tensor.max())  
    start_sequence = [start_note]
    start_sequence_tensor = torch.tensor(start_sequence, dtype=torch.long)
    generated_sequence = composer.compose(start_sequence_tensor, length=151)
    generated_sequence_tensor = torch.tensor([generated_sequence], dtype=torch.float)
    score = model1.score(generated_sequence_tensor)
    total_score += score
    generated_sequences.append(generated_sequence)
    scores.append(score)
average_score = total_score / 50
print(f"Average Score: {average_score}")

Average Score: [[0.5244389]]
