In [None]:
# imports -----------------------------------------------------------------
import torchvision.transforms as T_vision
import librosa
import torch
from torch.utils.data import DataLoader
import torch.nn as nn
import torch.optim as optim
from PIL import Image
import numpy as np
import os

In [None]:
# organize directories ----------------------------------------------------
# stores train/val fricative waveform files for all sentences and speakers
ai_wav_dir = '/content/data/ai_wav/'
real_wav_dir = '/content/data/real_wav/'

# stores train/val fricative spectrograms for all sentences and speakers
ai_spect_dir = '/content/data/ai_spect/'
real_spect_dir = '/content/data/real_spect/'

# stores train/val fricative spectrograms for all sentences and speakers
ai_spect_dir = '/content/data/ai_spect/'
real_spect_dir = '/content/data/real_spect/'

In [None]:
# TODO: probably should move all this to preproc.py
# helper functions --------------------------------------------------------
def normalize_to_img(spect):
  '''
  this function normalizes all values in spectrogram to 0-255
    INPUTS
      spect   : spectrogram
    OUTPUTS
      normalized spect
  '''
  return (spect - spect.min()) / (spect.max() - spect.min()) *255

def get_spectrograms(wav_dir, spect_dir, label):
  '''
  this function creates mel spectrogram in db for all waveforms in a folder
    INPUTS
      wav_dir   : directory name of waveforms
      spect_dir : directory name of where to store spectrograms
      label     : 0 - real or 1 - AI
    OUTPUTS
      none
  '''
  for idx, wav in enumerate(list(os.listdir(wav_dir))):
    y, sr = librosa.load(wav_dir + wav)

    spect = librosa.feature.melspectrogram(y,sr)
    spect = librosa.amplitude_to_db(spect)

    # remove single-dimension, normalize to pixel val 0-255, typecast to uint8
    spect_img = normalize_to_img(spect.squeeze().np()).astype(np.uint8)
    # flip to correct y-axis, frequencies from low -> high
    spect_img = np.flip(spect_img)
    # invert pixels s.t. more energy is represented by darker pixels
    spect_img = 255-spect_img

    im = Image.fromarray(spect_img)
    # TODO: should we be more specific with the naming? like sentence number
    # and speaker number?
    fname = spect_dir + f"{label}_{idx}"
    im.save(fname)
    # TODO: if this takes a super long time, should we just be saving data as
    # matrix data?


# prep data ---------------------------------------------------------------
get_spectrograms(ai_wav_dir,ai_spect_dir, 1)
get_spectrograms(real_wav_dir, real_spect_dir, 0)

In [None]:
# TODO: dataset class and dataloader
# split train/val sets ----------------------------------------------------

In [None]:
# CUDA --------------------------------------------------------------------
if torch.cuda.is_available():
    device = torch.device('cuda')
else:
    device = torch.device('cpu')

print('**setup note** using device: ', device)

In [None]:
# CNN model ---------------------------------------------------------------
#   1. conv ReLU
#   2. maxpooling, dropout 50%
#   3. conv ReLU
#   4. maxpooling
#   5. FC ReLU, dropout 50%
#   6. FC ReLU, dropout 50%
#   7. softmax
class TwoLayerCNN(nn.Module):
  def __init__(self, c1, c2, c3):
    super().__init__()
    self.conv1 = nn.Conv2d(in_channels=c1, out_channels=c2, kernel_size=3, stride=1, padding=0)
    self.conv2 = nn.Conv2d(in_channels=c2, out_channels=c3, kernel_size=3, stride=1, padding=0)

    self.relu = nn.ReLU()

    self.dropout = nn.Dropout(0.5)

    self.maxPool1 = nn.MaxPool2d((4,3), stride=(1,3))
    self.maxPool2 = nn.MaxPool2d((1,3), stride=(1,3))

    self.fc1 = nn.Linear(in_features=5000, out_features=2)
    self.fc2 = nn.Linear(in_features=5000, out_features=2)

  def forward(self, x):
    scores = None
    x = self.relu(self.conv_2(self.relu(self.conv_1(x))))
    x = nn.Flatten(x)
    scores = self.fc1(x)
    return scores
  
  
def validate(model, device, val_loader):
  correct = 0
  total = 0
  model.eval()

  with torch.no_grad():
    for input, target in val_loader:
      input = input.to(device)
      target = target.to(device)

      output = model(input)
      _, prediction = output.max(axis=1)
      correct += (prediction == target).sum()
      total += prediction.size(0)

    accuracy = float(correct)/total
  
  return accuracy


def train(model, device, train_loader, val_loader, lr, m, wd, epochs,
           print = 100):
  loss_curve = []
  acc_curve = []

  model = model.to(device)
  model.train()
  optimizer = optim.SGD(model.parameters(), lr=lr, momentum=m,
                         weight_decay=wd, nesterov=True)

  for epoch in range(epochs):
    for iter, (input, target) in enumerate(train_loader):
      input = input.to(device)
      target = target.to(device)

      output = model(input)
      loss = nn.functional.cross_entropy(output, target)

      optimizer.zero_grad()
      loss.backward()
      optimizer.step()

      loss_curve.append(loss.item())

      # TODO: can move to print block if too slow        
      accuracy = validate(model, device, val_loader)
      acc_curve.append(accuracy)


      if (i+1) % print == 0:
        print(f'Running Epoch {epoch} and Iteration {iter}: Loss is {loss.item()}')

        print(f'Accuracy against validation set is {accuracy}')

  return loss_curve, acc_curve

In [None]:
layers = 1
channel1 = 512
channel2 = 1024
num_classes = 2
lr = 0.002
m = 0.9
wd = 0.001
epochs = 300

model = TwoLayerCNN(layers, channel1, channel2, num_classes)
loss_curve, acc_curve = train(model, device, train_loader, val_loader, lr, m, wd, epochs)