# Prelim experiments.

In [None]:
import numpy as np
import pandas as pd
from os import listdir
import matplotlib.pyplot as plt
from matplotlib.backends.backend_agg import FigureCanvasAgg as FigureCanvas
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader

In [None]:
def loadDataset(datasetPath = '../ADL_DCASE_DATA/'):
    
    def loadAudio(path):
        files = listdir(path)
        audioDataset = []
        for file in files:
            audioDataset.append((file, np.load(path + "/" + file)))
        return audioDataset
    
    def loadCSV(path):
        return pd.read_csv(path, names = ["file","category"], index_col = "file")
    
    def getFileCategory(InputFeat, OutputFeat):
        return [(OutputFeat.loc[file][0], audio) for (file, audio) in InputFeat]
    
#     trainingInput = loadAudio(datasetPath + "development/audio")
    
    trainingGroundTruth = loadCSV(datasetPath + "development/labels.csv")
        
#     trainingDataset = getFileCategory(trainingInput, trainingGroundTruth)

    
    return trainingGroundTruth["category"].tolist()
    
    
categories = loadDataset()
ind_categories = []
for category in categories:
    if category not in ind_categories:
        ind_categories.append(category)
        

print(ind_categories)
print(len(ind_categories))

In [None]:
class AudioEnviromentDataset(Dataset):

    def __init__(self, root_dir):
        self.root_dir = root_dir
        self.fileCategory = pd.read_csv(self.root_dir + "/labels.csv", names = ["file","category"])

    def __len__(self):
        return len(self.fileCategory)

    def __getitem__(self, idx):
        file = self.fileCategory.iloc[idx]["file"]
        category = ind_categories.index(self.fileCategory.iloc[idx]["category"])
        return np.load(self.root_dir + "/audio/" + file).reshape(-1, 60, 1501), category
    
training_data = AudioEnviromentDataset("../ADL_DCASE_DATA/development")
test_dataloader = AudioEnviromentDataset("../ADL_DCASE_DATA/evaluation")

train_dataloader = DataLoader(training_data, batch_size = 32)
test_dataloader = DataLoader(training_data, batch_size = 32)

print(training_data[0][0].shape)
print(training_data[0][1])
plt.figure(figsize=(15,15))
plt.imshow(training_data[0][0][0], aspect=10)


In [None]:
# # 60, 1501
# y = 60
# x = 1501


# y = y
# x = x

# x = x / 2
# y = y / 2

# x = x / 2
# y = y / 2

# print(64 * x * y)
# print(x, y)

In [None]:
device = "cpu"

if torch.cuda.is_available():
    device = "cuda"
else:
    device = "cpu"

class AudioCNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.cnn_neuro_stack = nn.Sequential(
            
            nn.Conv2d(
                in_channels=1,
                out_channels=16,
                kernel_size=(5, 5),
                padding=(2, 2),
            ),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2)),
            
            nn.Conv2d(
                in_channels=16,
                out_channels=2,
                kernel_size=(5, 5),
                padding=(2, 2),
            ),
            nn.BatchNorm2d(2),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2)),
            
            
            nn.Flatten(),
            nn.Dropout(p=0.2),
            nn.BatchNorm1d(11250),
            nn.Linear(11250,1000),
            nn.ReLU(),
            
            nn.Dropout(p=0.2),
            nn.BatchNorm1d(1000),
            nn.Linear(1000,15)
        )

    def forward(self, x):
        logits = self.cnn_neuro_stack(x)
        return logits
    

model = AudioCNN().to(device)
print(model)

In [None]:
# loss = nn.CrossEntropyLoss()
# a = torch.randn(3, 5, requires_grad=True)
# b = torch.empty(3, dtype=torch.long).random_(5)
# print(a.shape)
# print(b.shape)
# loss(a,b)

In [None]:
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)

def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)

        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if batch % 5 == 0:
            loss, current = loss.item(), batch * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")
        
def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")        

epochs = 5
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train(train_dataloader, model, loss_fn, optimizer)
    test(test_dataloader, model, loss_fn)
print("Done!")

# Writing training file.

In [None]:
%load_ext autoreload
%autoreload 2

import dcase as dc

In [None]:
training_data = dc.DCASE("../ADL_DCASE_DATA/development/" , 15)
print("datapoints:", len(training_data))
print("segments per point:", len(training_data[0][0]))
plt.imshow(training_data[0][0][0], aspect=5)

In [None]:
training_data = dc.DCASE("../ADL_DCASE_DATA/development/" , 30)
plt.figure(figsize=(15,15))
print("datapoints:", len(training_data))
print("segments per point:", len(training_data[0][0]))
plt.imshow(training_data[0][0][0], aspect=5)

In [None]:
training_data = dc.DCASE("../ADL_DCASE_DATA/development/" , 3)
print("segments per point:", len(training_data[0][0]))

In [None]:
training_data[0][0].mean()

In [None]:
a = np.arange(6).reshape(2,3)
midpoint = (a.max() + a.min())/2

fig, ax = plt.subplots()
ax.matshow(a, cmap='Greens')
ax.set_xticks(range(3))
ax.set_yticks(range(2))
ax.set_xticklabels(["pee","poo","fart"])
ax.set_xlabel("What happened")
ax.set_yticklabels(["pee","poo"])
ax.set_ylabel("What happens next")

for (i, j), z in np.ndenumerate(a):
    if z > midpoint:
        ax.text(j, i, '{:1}'.format(z), color='white')
    else:
        ax.text(j, i, '{:1}'.format(z))

# fig.canvas.draw()
# img = np.fromstring(fig.canvas.tostring_rgb(), dtype=np.uint8, sep='')
# img = img.reshape(fig.canvas.get_width_height()[::-1] + (3,))
# img = img / 255.0

# print(fig.canvas.tostring_rgb())

canvas = FigureCanvas(fig)
canvas.draw()

image = np.array(canvas.renderer.buffer_rgba())
image = image/255
fig, ax = plt.subplots()
print(image.shape)
plt.imshow(image)
image_y, image_x = image.shape[0], image.shape[1]
image = image[:,:,0:3].reshape(3, image_y, image_x)
print(image.shape)