<a href="https://colab.research.google.com/github/st20080675/Agnes/blob/master/audio2geo.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torchaudio
import torchaudio.transforms as transforms


def get_sample(path, resample=None):
  effects = [
    ["remix", "1"]
  ]
  if resample:
    effects.extend([
      ["lowpass", f"{resample // 2}"],
      ["rate", f'{resample}'],
    ])
  return torchaudio.sox_effects.apply_effects_file(path, effects=effects)

# waveform, sample_rate = torchaudio.load('/home/LG/input.wav', normalize=True)
waveform, sample_rate = get_sample('/content/drive/MyDrive/audio_to_geometry_homework_v2/audio_to_geometry_homework/train/seq_0/input.wav')
print(waveform.shape)

torch.Size([1, 155184])


In [None]:
from os import listdir
import os
from numpy import load
import torch

def load_sequence(path):
  targets = None
  audo_path = os.path.join(path, 'input.wav')
  waveform, sample_rate = get_sample(audo_path)
  tar_path = os.path.join(path, 'target')
  tar_list = listdir(tar_path)
  for i in tar_list:
    p = os.path.join(tar_path, i)
    x = load(p)
    if targets == None:
      targets = torch.from_numpy(x['values'].reshape(-1))
      targets = targets[None, :]
    else:
      y = torch.from_numpy(x['values'].reshape(-1))
      y = y[None, :]
      targets = torch.cat((targets, y), 0)

  return waveform, sample_rate, targets


waveform, sample_rate, targets = load_sequence('/content/drive/MyDrive/audio_to_geometry_homework_v2/audio_to_geometry_homework/train/seq_0')
print(waveform.shape)
print(sample_rate)
print(targets.shape)



torch.Size([1, 155184])
48000
torch.Size([97, 24])


In [None]:
def mfcc_extractor(waveform, sample_rate, desired_rate, n_mfcc):
  frame_length = 1/desired_rate  # Frame length in seconds
  hop_length = int(sample_rate * frame_length)  # Determine hop length based on desired frame length
  # win_length = int(sample_rate * frame_length)  # Determine window length based on desired frame length
  mfcc_transform = transforms.MFCC(
      sample_rate=sample_rate,
      n_mfcc=n_mfcc,
      melkwargs={
        'hop_length': hop_length,
      }
  )
  mfcc = mfcc_transform(waveform)
  return mfcc

x = mfcc_extractor(waveform, sample_rate, 30, 50)
print(x.shape)
print(type(x))

torch.Size([1, 50, 97])
<class 'torch.Tensor'>




In [None]:
from torch.utils.data import Dataset, DataLoader
from os import listdir
import os
import torch

class MfccDataset(Dataset):
  def __init__(self, path, desired_rate = 30, n_mfcc = 50):
    self.n_mfcc = n_mfcc
    self.desired_rate = desired_rate

    # load all sequences
    seq_names = listdir(path)
    self.data = None
    self.labels = None
    for seq in seq_names[:3]:
      seq_path = os.path.join(path, seq)
      waveform, sample_rate, targets = load_sequence(seq_path)
      # nan handle
      waveform = torch.nan_to_num(waveform)
      targets = torch.nan_to_num(targets)

      mfcc = mfcc_extractor(waveform, sample_rate, self.desired_rate, n_mfcc)
      mfcc = torch.squeeze(mfcc)
      mfcc = torch.swapaxes(mfcc, 0, 1)

       # trim data number
      num = min(mfcc.shape[0], targets.shape[0])
      mfcc = mfcc[:num]
      targets = targets[:num]

      if self.data == None:
        self.data = mfcc
        self.labels = targets

      else:
        self.data = torch.cat((self.data, mfcc), 0)
        self.labels = torch.cat((self.labels, targets), 0)

  def __len__(self):
    return len(self.data)

  def __getitem__(self, index):
    return self.data[index], self.labels[index]

mydataset = MfccDataset('/content/drive/MyDrive/audio_to_geometry_homework_v2/audio_to_geometry_homework/train')
print(mydataset.data.shape)
print(mydataset.labels.shape)

torch.Size([335, 50])
torch.Size([335, 24])


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader

class LSTMModel(nn.Module):
  def __init__(self, input_size, hidden_size=64, output_size=24):
    super(LSTMModel, self).__init__()
    self.hidden_size = hidden_size
    self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True)
    self.fc = nn.Linear(hidden_size, output_size)

  def forward(self, x):
    x, _ = self.lstm(x)
    x = self.fc(x)
    return x

data = torch.randn((2, 4, 40))
model = LSTMModel(40, 40, 24)
x = model(data)
print(x.shape)

torch.Size([2, 4, 24])


In [None]:
batch_size = 4
shuffle = False
dataset = MfccDataset('/content/drive/MyDrive/audio_to_geometry_homework_v2/audio_to_geometry_homework/train')
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=shuffle)



In [None]:
def train(model, train_loader, loss_fn, optimizer):
  model.train()
  loss_buf = []
  for X_batch, y_batch in train_loader:
    y_pred = model(X_batch)
    loss = loss_fn(y_pred, y_batch)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    train_rmse = np.sqrt(loss.detach().numpy())
    loss_buf.append(train_rmse)

  return loss_buf



In [None]:
def eval(model, eval_loader, criterion):
  model.eval()
  loss_buf = []
  for X_batch, y_batch in eval_loader:
    y_pred = model(X_batch)
    loss = criterion(y_pred, y_batch)

    eval_loss = np.sqrt(loss.detach().numpy())
    loss_buf.append(eval_loss)

  return sum(loss_buf)/len(loss_buf)

In [None]:
import numpy as np
model = LSTMModel(50)
optimizer = optim.Adam(model.parameters())
loss_fn = nn.MSELoss()

n_epochs = 100

for epoch in range(n_epochs):
  loss_buf = train(model, dataloader, loss_fn, optimizer)

  if epoch % 10 == 0:
    train_loss = sum(loss_buf)/len(loss_buf)
    print("Epoch %d: train RMSE %.4f" % (epoch, train_loss))
    # eval_loss = eval(model, dataloader, loss_fn)
    # print("Epoch %d: eval RMSE %.4f" % (epoch, eval_loss))


Epoch 0: train RMSE 31.1864
Epoch 10: train RMSE 13.0288
Epoch 20: train RMSE 8.3341
Epoch 30: train RMSE 8.0338
Epoch 40: train RMSE 7.8078
Epoch 50: train RMSE 7.7629
Epoch 60: train RMSE 7.6870
Epoch 70: train RMSE 7.5449
Epoch 80: train RMSE 7.5008
Epoch 90: train RMSE 7.4618


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class DepthwiseSeparableConv(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super(DepthwiseSeparableConv, self).__init__()
        self.depthwise = nn.Conv2d(in_channels, in_channels, kernel_size=3, stride=stride, padding=1, groups=in_channels)
        self.pointwise = nn.Conv2d(in_channels, out_channels, kernel_size=1)

    def forward(self, x):
        x = self.depthwise(x)
        x = self.pointwise(x)
        return x

class MobileNet(nn.Module):
    def __init__(self, output_dim=24):
        super(MobileNet, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, stride=2, padding=1)
        self.conv2 = DepthwiseSeparableConv(32, 64)
        self.conv3 = DepthwiseSeparableConv(64, 128, stride=2)
        self.conv4 = DepthwiseSeparableConv(128, 128)
        self.conv5 = DepthwiseSeparableConv(128, 256, stride=2)
        self.conv6 = DepthwiseSeparableConv(256, 256)

        self.avgpool = nn.AdaptiveAvgPool2d(1)
        self.fc = nn.Linear(256, output_dim)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        x = F.relu(self.conv4(x))
        x = F.relu(self.conv5(x))
        x = F.relu(self.conv6(x))

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)

        return x

# Create an instance of the MobileNet-like model
model = MobileNet(output_dim=24)

# Print the model summary
print(model)

MobileNet(
  (conv1): Conv2d(1, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
  (conv2): DepthwiseSeparableConv(
    (depthwise): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=32)
    (pointwise): Conv2d(32, 64, kernel_size=(1, 1), stride=(1, 1))
  )
  (conv3): DepthwiseSeparableConv(
    (depthwise): Conv2d(64, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), groups=64)
    (pointwise): Conv2d(64, 128, kernel_size=(1, 1), stride=(1, 1))
  )
  (conv4): DepthwiseSeparableConv(
    (depthwise): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=128)
    (pointwise): Conv2d(128, 128, kernel_size=(1, 1), stride=(1, 1))
  )
  (conv5): DepthwiseSeparableConv(
    (depthwise): Conv2d(128, 128, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), groups=128)
    (pointwise): Conv2d(128, 256, kernel_size=(1, 1), stride=(1, 1))
  )
  (conv6): DepthwiseSeparableConv(
    (depthwise): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1)

In [None]:
import torch

def Mfcc2img(mfcc, img_w):
  num = mfcc.shape[0] // img_w

  mfcc_img = mfcc[:img_w, :]
  mfcc_img = mfcc_img[None, None, :]

  for i in range(1, num):
    x = mfcc[i*img_w:(i+1)*img_w, :]
    x = x[None, None, :]
    mfcc_img = torch.cat((mfcc_img, x), 0)

  return mfcc_img

In [None]:
from torch.utils.data import Dataset, DataLoader
from os import listdir
import os
import torch

class MfccImageDataset(Dataset):
  def __init__(self, path, desired_rate = 30, img_w = 100, n_mfcc = 50):
    self.n_mfcc = n_mfcc
    self.desired_rate = desired_rate
    self.img_w = img_w

    # load all sequences
    seq_names = listdir(path)
    self.data = None
    self.labels = None
    for seq in seq_names[:3]:
      seq_path = os.path.join(path, seq)
      waveform, sample_rate, targets = load_sequence(seq_path)
      # nan handle
      waveform = torch.nan_to_num(waveform)
      targets = torch.nan_to_num(targets)

      mfcc = mfcc_extractor(waveform, sample_rate, self.desired_rate * self.img_w, n_mfcc)
      mfcc = torch.squeeze(mfcc)
      mfcc = torch.swapaxes(mfcc, 0, 1)
      mfcc_img = Mfcc2img(mfcc, self.img_w)

      # trim data number
      num = min(mfcc_img.shape[0], targets.shape[0])
      mfcc_img = mfcc_img[:num]
      targets = targets[:num]

      if self.data == None:
        self.data = mfcc_img
        self.labels = targets

      else:
        self.data = torch.cat((self.data, mfcc_img), 0)
        self.labels = torch.cat((self.labels, targets), 0)

  def __len__(self):
    return len(self.data)

  def __getitem__(self, index):
    return self.data[index], self.labels[index]

mydataset = MfccImageDataset('/content/drive/MyDrive/audio_to_geometry_homework_v2/audio_to_geometry_homework/train')
print(mydataset.data.shape)
print(mydataset.labels.shape)



torch.Size([333, 1, 100, 50])
torch.Size([333, 24])


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader

batch_size = 4
shuffle = False
dataset = MfccImageDataset('/content/drive/MyDrive/audio_to_geometry_homework_v2/audio_to_geometry_homework/train', desired_rate=30)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=shuffle)

import numpy as np
model = MobileNet()
optimizer = optim.Adam(model.parameters())
loss_fn = nn.MSELoss()

n_epochs = 100

for epoch in range(n_epochs):
  loss_buf = train(model, dataloader, loss_fn, optimizer)

  if epoch % 10 == 0:
    train_loss = sum(loss_buf)/len(loss_buf)
    print("Epoch %d: train RMSE %.4f" % (epoch, train_loss))
    # eval_loss = eval(model, dataloader, loss_fn)
    # print("Epoch %d: eval RMSE %.4f" % (epoch, eval_loss))



Epoch 0: train RMSE 14.4361
Epoch 10: train RMSE 7.2290
Epoch 20: train RMSE 7.0743
Epoch 30: train RMSE 7.0160
Epoch 40: train RMSE 6.9231
Epoch 50: train RMSE 6.9016
Epoch 60: train RMSE 6.7321
Epoch 70: train RMSE 6.2551
Epoch 80: train RMSE 5.7540
Epoch 90: train RMSE 5.1883
