In [12]:
import torchvision
import torchvision.transforms as transforms
import torch
import numpy as np
import pickle as pkl
import ipdb
import skimage
from skimage.measure import block_reduce
import torch.optim as optim

In [3]:
import librosa
def cqt_specgram(audio, n_bins, bins_per_octave, hop_length, sr, fmin, filter_scale):
    '''
    :param audio:
    :param sr:
    :return: shape = (n_bins, t)
    '''
    c = librosa.cqt(audio, sr = sr, n_bins = n_bins, bins_per_octave = bins_per_octave, hop_length = hop_length,
                    fmin = fmin, filter_scale = filter_scale)
    mag, phase = librosa.core.magphase(c)
    c_p = librosa.amplitude_to_db(mag, amin=1e-13, top_db=120., ref=np.max) / 120.0 + 1.0
    return c_p


def compute_cqt_spec(audio, n_bins = 70, bins_per_octave=10, hop_length = 512, sr = 16000, fmin = librosa.note_to_hz('C1'),
             filter_scale = 0.8):
    return cqt_specgram(audio, n_bins, bins_per_octave, hop_length, sr, fmin, filter_scale)

In [4]:
class karplus_strong:
    def __init__(self, pitch, sampling_freq, stretch_factor, flag):
        """Inits the string."""
        self.pitch = pitch
        # self.starting_sample = starting_sample
        self.sampling_freq = sampling_freq
        self.stretch_factor = stretch_factor
        self.flag = flag
        self.wavetable = self.init_wavetable()
        self.current_sample = 0
        self.previous_value = 0


    def init_wavetable(self):
        """Generates a new wavetable for the string."""
        wavetable_size = int(self.sampling_freq) // int(self.pitch)
        if self.flag == 0:
            self.wavetable = np.ones(wavetable_size)
        else:
            self.wavetable = (2 * np.random.randint(0, 2, wavetable_size) - 1).astype(np.float)
        return self.wavetable


    def get_samples(self):
        """Returns samples from string."""
        samples = []
        while len(samples) < self.sampling_freq:
            if self.flag != 1:
                r = np.random.binomial(1, self.flag)
                sign = float(r == 1) * 2 - 1
                self.wavetable[self.current_sample] = sign * 0.5 * (
                self.wavetable[self.current_sample] + self.previous_value)
            else:
                d = np.random.binomial(1, 1 - 1 / self.stretch_factor)
                if d == 0:
                    self.wavetable[self.current_sample] = 0.5 * (
                    self.wavetable[self.current_sample] + self.previous_value)
            samples.append(self.wavetable[self.current_sample])
            self.previous_value = samples[-1]
            self.current_sample += 1
            self.current_sample = self.current_sample % self.wavetable.size
        return np.array(samples)

In [5]:
def pad_zeros(image, shape):
    result = np.zeros(shape)
    result[:image.shape[0],:image.shape[1]] = image
    return result

In [6]:
def sample_params(size):
    pitch = np.array([np.random.uniform(20, 2000) for _ in range(size)])
    sampling_freq = np.array([np.random.uniform(5, 10) * 1000 for i in range(size)])
    stretch_factor = np.array([np.random.uniform(1, 10) for _ in range(size)])
    flag = np.array([np.random.uniform(0, 1) for _ in range(size)])
    #  ipdb.set_trace()
    samples = []
    strings = []
    cqt_specs = []
    for i in range(size):
        strings.append(karplus_strong(pitch[i], 2 * sampling_freq[i], stretch_factor[i], 1))
        samples.append(strings[i].get_samples())
        cqt_spec = compute_cqt_spec(samples[i]).T
        padded_cqt = pad_zeros(cqt_spec, (cqt_spec.shape[1], cqt_spec.shape[1]))
        cqt_specs.append(padded_cqt)
    cqt_specs = np.array(cqt_specs)
    print(cqt_specs.shape)
    return pitch, sampling_freq, stretch_factor, flag, cqt_specs
        

def generate_data(file, size):
    pitch, sampling_freq, stretch_factor, flag, cqt_specs = sample_params(size)
    with open(file, 'wb') as fh:
        data_dict = {'parameters' : np.array([pitch, sampling_freq, stretch_factor, flag]).T, 'cqt_spec' : cqt_specs}
        pkl.dump(data_dict, fh)
    fh.close()
    print(file)
    
    
def read_data(file):
    with open(file, 'rb') as fh:
        data = pkl.loads(fh.read())
    fh.close()
    return data


def create_datasets():
    generate_data('eval.pkl', 100)
    generate_data('test.pkl', 5000)
    generate_data('train.pkl', 50000)

    
def read_dataset():
    return read_data('train.pkl'), read_data('test.pkl'), read_data('eval.pkl')

In [None]:
# generate_data('eval_data', 100)
create_datasets()

In [7]:
train_data, test_data, eval_data = read_dataset()

In [8]:
class MyDataset(torch.utils.data.Dataset):
    def __init__(self, parameters, cqt_spectrograms):
        super(MyDataset, self).__init__()
        
        self.parameters = parameters
        self.cqt_spec = cqt_spectrograms
    
    def __getitem__(self, i):
        return self.cqt_spec[i].T, self.parameters[i]
    
    def __len__(self):
        return len(self.parameters)

In [9]:
transform = transforms.Compose(
    [transforms.ToTensor(),
     transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

trainset = MyDataset(parameters=train_data['parameters'], cqt_spectrograms=train_data['cqt_spec'])
trainloader = torch.utils.data.DataLoader(trainset, batch_size=32,
                                          shuffle=True, num_workers=2)

testset = MyDataset(parameters=test_data['parameters'], cqt_spectrograms=test_data['cqt_spec'])
testloader = torch.utils.data.DataLoader(testset, batch_size=32,
                                         shuffle=False, num_workers=2)

evalset = MyDataset(parameters=eval_data['parameters'], cqt_spectrograms=eval_data['cqt_spec'])
evalloader = torch.utils.data.DataLoader(evalset, batch_size=4,
                                         shuffle=False, num_workers=2)

In [10]:
import torch.nn as nn
import torch.nn.functional as F


class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 6, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(16 * 14 * 14, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 4)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 16 * 14 * 14)        
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x


net = Net().double()

In [13]:
criterion = nn.MSELoss()
#optimizer = optim.SGD(net.parameters(), lr=0.01, momentum=0.9)
optimizer = optim.Adam(net.parameters(), lr=0.001)

In [None]:
for epoch in range(100):  # loop over the dataset multiple times

    running_loss = 0.0
    for i, data in enumerate(trainloader, 0):
        # get the inputs
        inputs, labels = data
        inputs.unsqueeze_(1)
        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = net(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
#        print("gradients:\n")
#        for param in net.parameters():
#              print(param.grad)
#        print("outputs:\n")
#        print(outputs)
        optimizer.step()

        # print statistics
        running_loss += loss.item()
        if i % 200 == 1:    # print every 200 mini-batches
            print('[%d, %5d] loss: %.3f' %
                  (epoch + 1, i + 1, running_loss))
            running_loss = 0.0
    print(running_loss)
    with open("losses.txt", "a") as text_file:
        text_file.write(str("%.10f" % running_loss))
        text_file.write("\n")

print('Finished Training')