<a href="https://colab.research.google.com/github/sashford/Honors-Thesis-Spencer-Ashford/blob/main/HonorsThesis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>



# SETUP

In [None]:
!sudo apt-get install ffmpeg libavcodec-extra
!sudo apt-get install libopenblas-dev build-essential
!sudo apt-get install python3 python-dev python3-dev
!apt install subversion
!svn checkout https://github.com/sashford/Honors-Thesis-Spencer-Ashford.git

In [90]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import random
import torch
import torchvision
from torchvision import transforms, datasets
from torch.utils.data import DataLoader, random_split, Subset, Dataset
from torch import nn
from torch.nn.modules.loss import _Loss
import torch.nn.functional as F
import torch.optim as optim
from sklearn import linear_model

In [91]:
def npy_loader(path):
    sonar_data = torch.from_numpy(np.load(path))
    return sonar_data

class SonarDataset(Dataset):
  def __init__(self):

    root_folder = "/content/Honors-Thesis-Spencer-Ashford.git/trunk/dataset/"
    self.noise_folder = datasets.DatasetFolder(root=root_folder + 'noise', loader=npy_loader, extensions=['.npy'])
    self.noiseless_folder = datasets.DatasetFolder(root=root_folder + 'noiseless',loader=npy_loader, extensions=['.npy'])

  def __getitem__(self,index):
    noisy = self.noise_folder[index]
    noiseless = self.noiseless_folder[index]

    if noisy[0].is_cuda:
      return noisy[0], noiseless[0]
    else:
      return noisy[0].cuda(), noiseless[0].cuda()

  def __len__(self):
    return len(self.noise_folder)

In [92]:
data = SonarDataset()
train_data, val_data = random_split(data, [0.85, 0.15])
batch_size = 5

train_loader = DataLoader(train_data, batch_size=batch_size)
val_loader = DataLoader(val_data, batch_size=batch_size)

# ENCODER/DECODER

In [93]:
from torch.nn.modules.batchnorm import BatchNorm2d
from torch.nn.modules.activation import ReLU

channel_const_dimensions = [256, 15, 2]
channel_const = np.prod(channel_const_dimensions)

def init_weights(m):
  if isinstance(m, nn.Conv2d) or isinstance(m, nn.ConvTranspose2d):
    torch.nn.init.xavier_uniform_(m.weight)
    m.bias.data.fill_(0.01)

class Encoder(nn.Module):
  def __init__(self, encoded_space_dim, fc2_input_dim):
    super().__init__()

    self.encoder_cnn = nn.Sequential(
        nn.Conv2d(1, 1024, 7, stride=2, padding=3),
        nn.BatchNorm2d(1024),
        nn.ReLU(True),
        nn.Conv2d(1024, 1024, 5, stride=2, padding=2),
        nn.BatchNorm2d(1024),
        nn.ReLU(True),
        nn.Conv2d(1024, 1024, 3, stride=2, padding=1),
        nn.BatchNorm2d(1024),
        nn.ReLU(True),
        nn.Conv2d(1024, 1024, 3, stride=2, padding=1),
        nn.BatchNorm2d(1024),
        nn.ReLU(True),
        nn.Conv2d(1024, 256, 3, stride=2, padding=0),
        nn.ReLU(True)
    )

    self.flatten = nn.Flatten(start_dim=1)

    self.encoder_lin = nn.Sequential(
        nn.Linear(channel_const, 128),
        nn.ReLU(True),
        nn.Linear(128, encoded_space_dim)
    )

    self.encoder_cnn.apply(init_weights)

  def forward(self, x):
    x = x.view(x.shape[0], 1 , 512, 90)
    x = self.encoder_cnn(x)
    x = self.flatten(x)
    x = self.encoder_lin(x)
    return x


class Decoder(nn.Module):
  def __init__(self, encoded_space_dim, fc2_input_dim):
    super().__init__()
    self.decoder_lin = nn.Sequential(
        nn.Linear(encoded_space_dim, 128),
        nn.ReLU(True),
        nn.Linear(128, channel_const),
        nn.ReLU(True)
    )

    self.unflatten = nn.Unflatten(dim=1, unflattened_size=(channel_const_dimensions[0],channel_const_dimensions[1],channel_const_dimensions[2]))

    self.decoder_conv = nn.Sequential(
        nn.ConvTranspose2d(256, 1024, 3, stride=2, output_padding=(1,1)),
        nn.BatchNorm2d(1024),
        nn.ReLU(True),
        nn.ConvTranspose2d(1024, 1024, 3, stride=2, padding=1, output_padding=(1,1)),
        nn.BatchNorm2d(1024),
        nn.ReLU(True),
        nn.ConvTranspose2d(1024, 1024, 3, stride=2, padding=1, output_padding=(1,0)),
        nn.BatchNorm2d(1024),
        nn.ReLU(True),
        nn.ConvTranspose2d(1024, 1024, 5, stride=2, padding=2, output_padding=(1,0)),
        nn.BatchNorm2d(1024),
        nn.ReLU(True),
        nn.ConvTranspose2d(1024, 1, 7, stride=2, padding=3, output_padding=(1,1))
    )

    self.decoder_conv.apply(init_weights)

  def forward(self, x):
    x = self.decoder_lin(x)
    x = self.unflatten(x)
    x = self.decoder_conv(x)
    return x

# Loss

In [94]:
loss_fn = torch.nn.L1Loss()
lr = 0.0000001

torch.manual_seed(0)

d = 2

encoder = Encoder(encoded_space_dim=d, fc2_input_dim=128)
decoder = Decoder(encoded_space_dim=d, fc2_input_dim=128)

params_to_optimize = [
    {'params': encoder.parameters()},
    {'params': decoder.parameters()}
]

optim = torch.optim.Adam(params_to_optimize, lr=lr, weight_decay=1e-05)

encoder.cuda();
decoder.cuda();

In [95]:
def train_epoch_den(encoder, decoder, dataloader, loss_fn, optimizer):
  encoder.train()
  decoder.train()
  train_loss = []
  i = 0
  for noisy_image, noiseless_image in dataloader:
    encoded_data = encoder(noisy_image)
    decoded_data = decoder(encoded_data)

    loss = loss_fn(decoded_data, noiseless_image)


    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    print(f'\t partial train loss {i} (single batch): {loss.data}| sizes: {decoded_data.size()}. {np.shape(noiseless_image)}')
    train_loss.append(loss.detach().cpu().numpy())
    i += 1

    torch.cuda.empty_cache()

  return np.mean(train_loss)

In [96]:
from torch.serialization import normalize_storage_type
def test_epoch_den(encoder, decoder, dataloader, loss_fn):
  encoder.eval()
  decoder.eval()

  with torch.no_grad():
    conc_out = []
    conc_label = []
    for noisy_image, noiseless_image in dataloader:
      if len(noisy_image) == batch_size:
        encoded_data = encoder(noisy_image)
        decoded_data = decoder(encoded_data)

        output = decoded_data.cpu()
        label = noiseless_image.cpu()

        conc_out.append(output)
        conc_label.append(label)


    conc_out = torch.cat(conc_out)
    conc_label = torch.cat(conc_label)

    val_loss = loss_fn(conc_out, conc_label)

    torch.cuda.empty_cache()

  return val_loss.data

# Plotting

In [97]:
# Values come from the file used to create the simulation
azi = 120
binsA = 90
minR = 1
maxR = 30
binsR = 512

def plot_outputs(encoder, decoder):
  index = np.random.randint(0, len(val_data))
  img = val_data[index][0].unsqueeze(0)


  encoder.eval()
  decoder.eval()

  with torch.no_grad():
    rec_img = decoder(encoder(img))


  fig, (ax1, ax2, ax3) = plt.subplots(1,3,subplot_kw=dict(projection='polar'))

  ax1.set_title("Noisy")
  ax1.set_theta_zero_location("N")
  ax1.set_thetamin(-azi/2)
  ax1.set_thetamax(azi/2)
  ax1.grid(False)

  ax2.set_title("Denoised")
  ax2.set_theta_zero_location("N")
  ax2.set_thetamin(-azi/2)
  ax2.set_thetamax(azi/2)
  ax2.grid(False)

  ax3.set_title("Noiseless") # Ground Truth Values
  ax3.set_theta_zero_location("N")
  ax3.set_thetamin(-azi/2)
  ax3.set_thetamax(azi/2)
  ax3.grid(False)

  theta = np.linspace(-azi/2, azi/2, binsA) * np.pi / 180
  r = np.linspace(minR, maxR, binsR)
  T, R = np.meshgrid(theta,r)
  z = np.zeros_like(T)

  plot_noise = ax1.pcolormesh(T, R, z, cmap='gray', shading='auto', vmin=0, vmax=1)
  plot_denoised = ax2.pcolormesh(T, R, z, cmap='gray', shading='auto', vmin=0, vmax=1)
  plot_noiseless = ax3.pcolormesh(T, R, z, cmap='gray', shading='auto', vmin=0, vmax=1)
  plt.tight_layout()

  plot_noise.set_array(img.cpu().squeeze().numpy().ravel())
  plot_denoised.set_array(rec_img.cpu().squeeze().numpy().ravel())
  plot_noiseless.set_array(val_data[index][1].cpu().squeeze().numpy().ravel())

  plt.show()

# Train Loop

In [None]:
num_epochs = 90
history_da={'train_loss':[], 'val_loss':[]}

for epoch in range(num_epochs):
  print(f'EPOCH {epoch + 1}/{num_epochs}')
  train_loss=train_epoch_den(
    encoder=encoder,
    decoder=decoder,
    dataloader=train_loader,
    loss_fn=loss_fn,
    optimizer=optim)
  print("beginning validation")
  val_loss = test_epoch_den(
      encoder=encoder,
      decoder=decoder,
      dataloader=val_loader,
      loss_fn=loss_fn,
  )
  print("begin plotting")
  history_da['train_loss'].append(train_loss)
  history_da['val_loss'].append(val_loss)
  print('\n EPOCH {}/{} \t train loss {:.3f} \t val loss {:.3f}'.format(epoch + 1, num_epochs,train_loss,val_loss))
  plot_outputs(encoder,decoder)

In [None]:
# Optional Cell, plays a sound to indicate when training has finished
import librosa
from IPython.display import display, Audio

# for playing wav file
sound, rate = librosa.load("/content/drive/MyDrive/EndTrainingLoop.wav")
print('playing sound using  pydub')
display(Audio(sound, rate=rate, autoplay=True))