#***Run***

In [None]:
!gdown '1TVmI7nhTLjdd_6ghEcjLp_9wW0HWi2oa'
!unzip -d data/ /content/FinalProjectMat.zip
!gdown '1-0a0eKi5awySLCo9xfqbyMaI8uRCID8q'
!gdown '1ndBcjwNS6JwyJe6JDury31ZdDAer4MK6'
# from google.colab import drive
# drive.mount('/content/drive')

#***Import Data***

In [None]:
import numpy as np
import IPython.display as ipd
import os
import random
import soundfile as sf

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

import librosa
from sklearn.model_selection import train_test_split
from keras.preprocessing.text import Tokenizer


device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
device

# Preprocessing

## Create dictionary from files

In [None]:
notes = dict(enumerate(os.listdir("data/piano_triads")))
artifacts = dict(enumerate(os.listdir("data/artifacts")))

## Functions

In [None]:
def get_random_note(n):
    notes_list = random.sample(list(notes), n)
    return notes_list
def get_random_noise():
    random_noise_file = random.sample(list(artifacts), 1)[0]
    return random_noise_file

In [None]:
def append_notes(notes_list, sr):
    final_waveform = []
    for note in notes_list:
        wave, sr = librosa.load('data/piano_triads/' + notes[note], sr=sr)
        final_waveform =  np.concatenate((final_waveform, wave))
    return final_waveform

In [None]:
def add_random_noise(random_noise_file, piece, sr):
    random_noise, sr = librosa.load('data/artifacts/' + artifacts[random_noise_file], sr=sr)
    duration = len(piece)
    fixed_len_noise = librosa.util.fix_length(random_noise, duration, mode = 'symmetric')
    return fixed_len_noise, fixed_len_noise + piece

## Test functions

In [None]:
sample_rate = 16000
notes_list = get_random_note(30)
initial_piece = append_notes(notes_list, sample_rate)
r_noise = get_random_noise()
noise_itself, noisy_piece = add_random_noise(r_noise, initial_piece, sample_rate)

In [None]:
# Initial piece
ipd.Audio(initial_piece, rate=sample_rate)

In [None]:
# Noise itself
ipd.Audio(noise_itself, rate=sample_rate)

In [None]:
# Noisy piece
ipd.Audio(noisy_piece, rate=sample_rate)

In [None]:
LEN_OF_NOTE = len(append_notes(get_random_note(1), sample_rate))
print(LEN_OF_NOTE)

# Denoising Autoencoder

## Model functions

In [None]:
def DAEtrain(model, trainloader, criterion, optimizer, n):
    losses = []
    model.train()
    for item in trainloader:
        
        X = item['noisy'].to(device)
        y = item['normal'].to(device)
        output = model(X)
        
        loss = criterion(output, y)
        losses.append(loss.item())

        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
        
    return np.mean(losses)

In [None]:
def DAEvalidation(model, testloader, criterion, n):
    losses = []
    corrects = 0
    model.eval()
    with torch.no_grad():
        for X, y in testloader:
            X = X.to(device)
            y = y.to(device)

            outputs = model(X)
            _, pred = torch.max(outputs, dim=1)
            corrects += torch.sum(pred == y)

            loss = criterion(outputs, y)
            losses.append(loss.item())
        
    return (corrects.item() / n), np.mean(losses)

In [None]:
def DAEfit(model, criterion, optimizer, EPOCHS):
    losses_train, losses_valid, acc_train, acc_valid, predicts = ([] for i in range(5))
    best_acc = 0

    for e in range(EPOCHS):
        train_l = DAEtrain(model, trainloader, criterion, optimizer, len(trainset))
        print(f"Epoch {e + 1} -- train losses {train_l:.3f}", end='')
        losses_train.append(train_l)  

    #     val_c, val_l = validation(model, testloader, criterion, len(testloader.sampler))
    #     print(f" --- valid accuracy {val_c * 100:.3f}   valid losses {val_l:.3f}")
    #     losses_valid.append(val_l)
    #     acc_valid.append(val_c)
    # return losses_train, losses_valid, acc_train, acc_valid
    return losses_train, losses_valid


In [None]:
globprec, globrec, globf1 = 0, 0, 0

def show_metrics(model, dataloader, filename=None):
    global globprec, globrec, globf1
    y_pred = []
    y_true = []
    with torch.no_grad():
        for X, y in dataloader:
            outputs = model(X.to(device))
            _, pred = torch.max(outputs, dim=1)
            y_pred.append(pred)
            y_true.append(y)
    y_pred = torch.cat(y_pred).cpu()
    y_true = torch.cat(y_true)
    creport = classification_report(y_true, y_pred)
    print(creport)
    globprec, globrec, globf1, _ = precision_recall_fscore_support(y_true, y_pred)
    disp = ConfusionMatrixDisplay(confusion_matrix(y_true, y_pred))
    disp.plot()
    if filename:
        plt.savefig(filename)

In [None]:
def plotplz(filename=None):
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12,5))
    # ax1.title("Training and Validation Accuracy")
    ax1.plot(acc_valid,label="val")
    ax1.plot(acc_train,label="train")
    ax1.set_xlabel("iterations")
    ax1.set_ylabel("accuracy")
    ax1.legend()

    ax2.plot(losses_valid,label="val")
    ax2.plot(losses_train,label="train")
    ax2.set_xlabel("iterations")
    ax2.set_ylabel("loss")
    ax2.legend()
    if filename:
        fig.savefig(filename)

## Dataset and Dataloader

In [None]:
class DAEDataset(Dataset):
    def __init__(self, notes_list, sample_rate):
        self.notes_list = notes_list
        self.sample_rate = sample_rate

    def __len__(self):
        return len(self.notes_list)
    
    def __getitem__(self, item):
        notes_list = self.notes_list[item]['wave']
        r_noise = self.notes_list[item]['noise']
        piece = append_notes(notes_list, self.sample_rate)
        noise_itself, noisy_piece = add_random_noise(r_noise, piece, self.sample_rate)
        return {
            'noisy': torch.Tensor(noisy_piece),
            'normal': torch.Tensor(piece)
        }


In [None]:
BATCH_SIZE = 16
TRAIN_SIZE = 50
TEST_SIZE = 10
DAE_SAMPLE_RATE = 16000

train_dae_notes_list = []
for i in range(TRAIN_SIZE):
    train_dae_notes_list.append({'wave': get_random_note(30), 'noise': get_random_noise()})
    
test_dae_notes_list = []
for i in range(TEST_SIZE):
    test_dae_notes_list.append({'wave': get_random_note(30), 'noise': get_random_noise()})

trainset = DAEDataset(train_dae_notes_list, DAE_SAMPLE_RATE)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=BATCH_SIZE)

testset = DAEDataset(test_dae_notes_list, DAE_SAMPLE_RATE)
testloader = torch.utils.data.DataLoader(testset, batch_size=BATCH_SIZE)

## Autoencoder Model

In [None]:
class AutoEncoder(torch.nn.Module) :
    def __init__(self, input_dim) :
        super().__init__()
        self.encoder = nn.Sequential(nn.Linear(input_dim, 1024),
                                     nn.ReLU(True),
                                     nn.Linear(1024, 512),
                                     nn.ReLU(True),
                                     nn.Linear(512, 256),
                                     nn.ReLU(True),
                                     nn.Linear(256, 128))
    
        self.decoder = nn.Sequential(nn.Linear(128, 256),
                                     nn.ReLU(True),
                                     nn.Linear(256, 512),
                                     nn.ReLU(True),
                                     nn.Linear(512, 1024),
                                     nn.ReLU(True),
                                     nn.Linear(1024, input_dim))
        
    def forward(self, x):
        enc = self.encoder(x)
        dec = self.decoder(enc)
        return dec

In [None]:
DAEmodel = AutoEncoder(trainset[0]['noisy'].shape[0]).to(device)
criterion = nn.MSELoss().to(device)
optimizer = torch.optim.Adam(DAEmodel.parameters(), lr=1e-4)
# losses_train, losses_valid = DAEfit(DAEmodel, criterion, optimizer, 20)

## Utility functions

In [None]:
def noisy_to_normal(piece):
    return piece
    duration = trainset[0]['noisy'].shape[0]
    fixed_len_piece = librosa.util.fix_length(piece, duration, mode = 'constatnt')
    denoised = DAEmodel(fixed_len_piece)
    return denoised

# Note Identification

## Model functions

In [None]:
def NItrain(model, trainloader, criterion, optimizer, n):
    losses = []
    corrects = 0
    model.train()
    for item in trainloader:
        
        X = item['noisy'].to(device)
        y = item['target'].squeeze().to(device)
        output = model(X)
        _, pred = torch.max(output, dim=1)
        corrects += torch.sum(pred == y)
        loss = criterion(output, y)
        losses.append(loss.item())

        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
        
    return (corrects.item() / n)*100, np.mean(losses)

In [None]:
def NItest(model, testloader, criterion, n):
    losses = []
    corrects = 0
    model.eval()
    with torch.no_grad():
        for item in testloader:
            
            X = item['noisy'].to(device)
            y = item['target'].squeeze().to(device)
            output = model(X)
            _, pred = torch.max(output, dim=1)
            corrects += torch.sum(pred == y)
            loss = criterion(output, y)
            losses.append(loss.item())
        
    return (corrects.item() / n), np.mean(losses)

In [None]:
def NIfit(model, criterion, optimizer, EPOCHS):
    losses_train, losses_valid, acc_train, acc_valid, predicts = ([] for i in range(5))
    best_acc = 0

    for e in range(EPOCHS):
        train_c, train_l = NItrain(model, trainloader, criterion, optimizer, len(trainset))
        print(f"Epoch {e + 1} -- train accuracy {train_c:.3f}     train losses {train_l:.3f}", end='')
        losses_train.append(train_l)  
        acc_train.append(train_c)

        val_c, val_l = NItest(model, testloader, criterion, len(testset))
        print(f" --- valid accuracy {val_c * 100:.3f}   valid losses {val_l:.3f}")
        losses_valid.append(val_l)
        acc_valid.append(val_c)
    return losses_train, losses_valid, acc_train, acc_valid


## Dataset and Dataloader

In [None]:
BATCH_SIZE = 16
TRAIN_SIZE = 10 # number of all notes * TRAIN_SIZE
TEST_SIZE = 2
NI_SAMPLE_RATE = 16000

In [None]:
test_ni_notes_list = []
for i in range(TEST_SIZE):
    for j in notes:
        test_ni_notes_list.append({'note': [j], 'noise': get_random_noise()})

train_ni_notes_list = []
for i in range(TRAIN_SIZE):
    for j in notes:
        train_ni_notes_list.append({'note': [j], 'noise': get_random_noise()})

train_numpy = []
for i in train_ni_notes_list:
    piece = append_notes(i['note'], sample_rate)
    noise_itself, noisy_piece = add_random_noise(i['noise'], piece, sample_rate)
    train_numpy.append({"note": noisy_piece, "target": i['note']})

test_numpy = []
for i in test_ni_notes_list:
    piece = append_notes(i['note'], sample_rate)
    noise_itself, noisy_piece = add_random_noise(i['noise'], piece, sample_rate)
    test_numpy.append({"note": noisy_piece, "target": i['note']})

np.save("train.npy", train_numpy)
np.save("NItest.npy", test_numpy)

In [None]:
class NIDataset(Dataset):
    def __init__(self, notes_list, sample_rate):
        self.notes_list = notes_list
        self.sample_rate = sample_rate

    def __len__(self):
        return len(self.notes_list)
    
    def __getitem__(self, item):
        piece = self.notes_list[item]['note']
        target = self.notes_list[item]['target']
        return {
            'noisy': torch.tensor(piece, dtype=torch.float32),
            'target': torch.tensor(target, dtype=torch.long)
        }


In [None]:
# train_ni_notes_list = np.load("data/NItrain.npy", allow_pickle=True)
# test_ni_notes_list = np.load("data/NItest.npy", allow_pickle=True)
train_ni_notes_list = np.load("NItrain.npy", allow_pickle=True)
test_ni_notes_list = np.load("NItest.npy", allow_pickle=True)

print("train samples:", len(train_ni_notes_list))
print("test samples:", len(test_ni_notes_list))

trainset = NIDataset(train_ni_notes_list, NI_SAMPLE_RATE)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=BATCH_SIZE)

testset = NIDataset(test_ni_notes_list, NI_SAMPLE_RATE)
testloader = torch.utils.data.DataLoader(testset, batch_size=BATCH_SIZE)

## Note Identifier Model

In [None]:
class NoteIdentifier(torch.nn.Module) :
    def __init__(self, input_dim, output_dim):
        super().__init__()
        self.encoder = nn.Sequential(nn.Linear(input_dim, 2048),
                                     nn.ReLU(True),
                                     nn.Linear(2048, 1024),
                                     nn.ReLU(True),
                                     nn.Linear(1024, 512),
                                     nn.ReLU(True),
                                     nn.Linear(512, output_dim))
        
    def forward(self, x):
        enc = self.encoder(x)
        return enc

In [None]:
NImodel = NoteIdentifier(trainset[0]['noisy'].shape[0], len(notes)).to(device)
criterion = nn.CrossEntropyLoss().to(device)
optimizer = torch.optim.Adam(NImodel.parameters(), lr=1e-4)
losses_train, losses_valid, acc_train, acc_valid = NIfit(NImodel, criterion, optimizer, 20)

In [None]:
# torch.save(NImodel.state_dict(), "checkpoints/NImodel.pth")
torch.save(NImodel.state_dict(), "NImodel.pth")

## Utilty functions

In [None]:
def break_down_piece(piece):
    return [piece[i:i + LEN_OF_NOTE] for i in range(0, len(piece), LEN_OF_NOTE)]

In [None]:
a = break_down_piece(append_notes(get_random_note(5), sample_rate))
a

In [None]:
def identify_note(note):
    inp = torch.tensor(note, dtype=torch.float32).to(device)
    output = NImodel(inp)
    _, pred = torch.max(output, dim=0)
    return pred.item()

In [None]:
# NImodel = NoteIdentifier(trainset[0]['noisy'].shape[0], len(notes)).to(device)
# NImodel.load_state_dict(torch.load("checkpoints/ck.pth"))
NImodel.eval()
note = get_random_note(1)
noisee = get_random_noise()
print("input note with noise:", note)
piece = append_notes(note, sample_rate)
noise_itself, noisy_piece = add_random_noise(noisee, piece, sample_rate)
print("prediction of model:", identify_note(noisy_piece))

# Next note prediction

## Preprocessing

In [None]:
text = os.listdir("data/piano_pieces")
list_notes = ''
for i in range(len(text)):
    records = open('data/piano_pieces/' + text[i], "r")
    if i != len(text) - 1:
        list_notes += ' <START> / ' + records.readline() + ' / <END> /'
    else:
        list_notes += ' <START> / ' + records.readline() + ' / <END>'
notes = [token.strip() for token in list_notes.split('/')]

## create ngrams

In [None]:
ngram_n = 5
text_sequences = []
for i in range(ngram_n,len(notes)+1):
  seq = notes[i-ngram_n:i]
  text_sequences.append(seq)

## Tokenizing the input

In [None]:
#converting the texts into integer sequence
tokenizer = Tokenizer()
tokenizer.fit_on_texts(text_sequences)
sequences = tokenizer.texts_to_sequences(text_sequences)
sequences=np.asarray(sequences)

In [None]:
#vocabulary size
vocabulary_size = len(tokenizer.word_counts)+1
vocabulary_size

## Dataset and Dataloader

In [None]:
from sklearn.model_selection import train_test_split

seq_train, seq_test = train_test_split(sequences, test_size=0.2)

X_train = seq_train[:,:-1]
y_train = seq_train[:,-1]

X_test = seq_test[:,:-1]
y_test = seq_test[:,-1]

In [None]:
class dataset(Dataset):
    def __init__(self, x, y, vocab_size):
        self.x = x
        self.y = y
        self.vocab_size = vocab_size

    def __len__(self):
        return len(self.x)

    def __getitem__(self, item):
        x = self.x[item]
        # y = np.eye(self.vocab_size)[int(self.y[item])]
        y = self.y[item]
        return torch.tensor(x, dtype=torch.long), torch.tensor(y, dtype=torch.long)

In [None]:
BATCH_SIZE = 32

trainset = dataset(X_train, y_train, vocabulary_size)
trainloader = DataLoader(trainset, batch_size=BATCH_SIZE)

testset = dataset(X_test, y_test, vocabulary_size)
testloader = DataLoader(testset, batch_size=BATCH_SIZE)

## Model functions

In [None]:
def S2Strain(model, trainloader, criterion, optimizer, n):
    corrects = 0 
    losses = []
    model.train()
    for X, y in trainloader:
        X = X.to(device)
        y = y.to(device)
        output = model(X)
        _, predict = torch.max(output, dim = 1)
        # corrects += torch.sum(predict == np.argmax(y.cpu()))
        corrects += torch.sum(predict == y)


        loss = criterion(output, y)
        losses.append(loss.item())

        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
        
    return (corrects.item() / n), np.mean(losses)

In [None]:
def S2Stest(model, testloader, criterion, n):
    losses = []
    corrects = 0
    model.eval()
    with torch.no_grad():
        for X, y in testloader:
            X = X.to(device)
            y = y.to(device)
        
            outputs = model(X)
            _, predict = torch.max(outputs, dim=1)
            # corrects += torch.sum(predict == np.argmax(y.cpu()))
            corrects += torch.sum(predict == y)
    

            loss = criterion(outputs, y)
            losses.append(loss.item())

    return (corrects / n), np.mean(losses)

In [None]:
def S2Sfit(model, criterion, optimizer, EPOCHS):
    losses_train, losses_valid, acc_train, acc_valid, predicts = ([] for i in range(5))
    best_acc = 0

    for e in range(EPOCHS):
        train_c, train_l = S2Strain(model, trainloader, criterion, optimizer, len(trainset))
        print(f"Epoch {e + 1} --- train accuracy {train_c * 100:.3f}    train losses {train_l:.3f}", end='')
        losses_train.append(train_l)  
        acc_train.append(train_c)  

        val_c, val_l = S2Stest(model, testloader, criterion, len(testset))
        print(f" --- valid accuracy {val_c * 100:.3f}   valid losses {val_l:.3f}")
        losses_valid.append(val_l)
        acc_valid.append(val_c)
    return losses_train, losses_valid, acc_train, acc_valid

## Language model

In [None]:
class NextNote(nn.Module):
    def __init__(self, vocab_size, embed_size, hidden_size):
        super().__init__()
        self.embed = nn.Embedding(vocab_size, embed_size)
        self.lstm = nn.LSTM(embed_size, hidden_size, num_layers=2, bidirectional=False)
        self.linear = nn.Linear(hidden_size * (ngram_n-1), vocab_size)
    
    def forward(self, x):
        embedded = self.embed(x)
        output, hidden = self.lstm(embedded)
        output= output.view(output.size(0), -1)
        
        output = self.linear(output)
        return output

In [None]:
NNmodel = NextNote(vocab_size=vocabulary_size, embed_size=128, hidden_size=256).to(device)
criterion = nn.CrossEntropyLoss().to(device)
optimizer = torch.optim.Adam(NNmodel.parameters(), lr=1e-3, weight_decay=1e-4)
losses_train, losses_valid, acc_train, acc_valid = S2Sfit(NNmodel, criterion, optimizer, 20)

## Utility functions

In [None]:
def get_next_note(notes_list_n):
    NNmodel.eval()
    while len(notes_list_n) < ngram_n:
        notes_list_n.insert(0, 1)
    # return torch.tensor(notes_list[-4:])
    outputs = NNmodel(torch.tensor(notes_list_n[-(ngram_n-1):]).unsqueeze(0).to(device))
    
    _, predict = torch.max(outputs, dim=1)
    return predict.item()

# Main Loop

In [None]:
sample_rate = 16000
notes_list = get_random_note(3)
initial_piece = append_notes(notes_list, sample_rate)
random_noise = get_random_noise()
noise_itself, noisy_piece = add_random_noise(random_noise, initial_piece, sample_rate)
final_piece = None
piece_limit, counter = 30, 0
while True:
    denoised_piece = noisy_to_normal(noisy_piece)
    sub_notes = break_down_piece(denoised_piece)
    print(len(sub_notes))
    sub_notes_classes = [identify_note(note) for note in sub_notes]
    next_note = get_next_note(sub_notes_classes[:])
    notes_list = sub_notes_classes + [next_note]
    new_piece = append_notes(notes_list, sample_rate)
    if next_note == 1:
        final_piece = new_piece
        break
    else:
        noise_itself, noisy_piece = add_random_noise(get_random_noise(), new_piece, sample_rate)
        print(noisy_piece.shape)
        counter += 1
        if counter == piece_limit:
            final_piece = new_piece
            break

In [None]:
sf.write("out.wav", final_piece, sample_rate)