### Prologue
An attempt at implementing [Denoising Induction Motor Sounds](https://arxiv.org/pdf/2208.04462.pdf) for this use case.

##### Setup

In [1]:
!git clone https://github.com/juggernautjha/denoising.git denoising

Cloning into 'denoising'...
remote: Enumerating objects: 21, done.[K
remote: Counting objects: 100% (21/21), done.[K
remote: Compressing objects: 100% (16/16), done.[K
remote: Total 21 (delta 4), reused 21 (delta 4), pack-reused 0[K
Unpacking objects: 100% (21/21), 3.38 MiB | 8.42 MiB/s, done.


In [2]:
!cp -r denoising/*.py .
!cp -r denoising/tensorblur .

In [3]:
from google.colab import drive
drive.mount('/content/drive')
!cp -r /content/drive/MyDrive/samples .

Mounted at /content/drive


In [4]:
!pip install pydub acoustics

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pydub
  Downloading pydub-0.25.1-py2.py3-none-any.whl (32 kB)
Collecting acoustics
  Downloading acoustics-0.2.6.tar.gz (3.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.5/3.5 MB[0m [31m41.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Building wheels for collected packages: acoustics
  Building wheel for acoustics (pyproject.toml) ... [?25l[?25hdone
  Created wheel for acoustics: filename=acoustics-0.2.6-py3-none-any.whl size=68184 sha256=c08d6fc4a266dd4b16ad6cd149a20ef7c59ad01e6a72061016e53b5b966828fb
  Stored in directory: /root/.cache/pip/wheels/dd/9b/f0/0fa91c83f48277b7ef24800a9930694f78839531b5bab52a42
Successfully built acoustics
Installing collected packages: pydub, acoustics
Succes

##### Imports

In [None]:
from __future__ import print_function, division
%matplotlib inline

import sys, time, random, glob, os, pandas
import numpy as np
import librosa
import pydub
import json
from pydub import AudioSegment
from pydub import effects

import tensorflow as tf
import keras

from keras.utils import np_utils
from keras import optimizers
from keras.layers import Convolution2D, MaxPooling2D

from tensorflow.keras import layers, losses

import keras.models
from keras.losses import mse as kmse
import keras.backend as K
from keras.models import Model, Sequential
from keras.layers import Input, Dense, Dropout, Add, Multiply, Lambda, UpSampling2D, Dot, Permute, RepeatVector
from keras.layers import BatchNormalization
from keras.layers import Conv2D
from keras.layers import Cropping2D
from keras.layers import Conv2DTranspose
from keras.layers import LeakyReLU
from keras.layers import Activation
from keras.layers import Flatten
from keras.layers import Dense
from keras.layers import Reshape
from keras.layers import Input
from keras.models import Model

from tqdm.notebook import tqdm

from keras.callbacks import LambdaCallback, EarlyStopping, ModelCheckpoint
import noising
from common import *
from common import cal_midpoints, gen_mel_feature

##### Generators

In [None]:
INPUT_DIR = 'samples'
files = glob.glob(f'{INPUT_DIR}/*.flac')
noising.save_overlaid_dataset(files, 3, 15, 'dataset.json', 'overlaid', 150)

  0%|          | 0/20 [00:00<?, ?it/s]

  0%|          | 0/3 [00:00<?, ?it/s]

  0%|          | 0/3 [00:00<?, ?it/s]

  0%|          | 0/3 [00:00<?, ?it/s]

  0%|          | 0/3 [00:00<?, ?it/s]

  0%|          | 0/3 [00:00<?, ?it/s]

  0%|          | 0/3 [00:00<?, ?it/s]

  0%|          | 0/3 [00:00<?, ?it/s]

  0%|          | 0/3 [00:00<?, ?it/s]

  0%|          | 0/3 [00:00<?, ?it/s]

  0%|          | 0/3 [00:00<?, ?it/s]

  0%|          | 0/3 [00:00<?, ?it/s]

  0%|          | 0/3 [00:00<?, ?it/s]

  0%|          | 0/3 [00:00<?, ?it/s]

  0%|          | 0/3 [00:00<?, ?it/s]

  0%|          | 0/3 [00:00<?, ?it/s]

  0%|          | 0/3 [00:00<?, ?it/s]

  0%|          | 0/3 [00:00<?, ?it/s]

  0%|          | 0/3 [00:00<?, ?it/s]

  0%|          | 0/3 [00:00<?, ?it/s]

  0%|          | 0/3 [00:00<?, ?it/s]

In [None]:
class FastGenerator(tf.keras.utils.Sequence):
    def __init__(self, directory : str = 'samples',
                train_ratio : float = 0.8, window_size : int = 49000,
                batch_size : int = 128, gen_type = 'train', shuffle : bool = True):
        self.shuffle = shuffle
        self.gen_type = gen_type
        self.train_ratio = train_ratio
        self.test_ratio = self.train_ratio + (1-train_ratio)/2
        self.batch_size = batch_size
        self.epoch = 0
        self.window_size = window_size
        self.noise_types = ['white', 'pink', 'blue', 'brown', 'violet']
        files = glob.glob(f'{directory}/*.flac')
        idx = int(train_ratio*len(files))
        test_idx = int(self.test_ratio*len(files))
        if gen_type == 'train':
            self.files = files[0:idx]
        elif gen_type == 'test':
            self.files = files[idx:test_idx]
        else:
            self.files = files[test_idx:]
        print(f"{gen_type} loader created with {len(self.files)} samples")

        self.len = self.__len__()
        if self.gen_type != 'test':
            self.on_epoch_end()

    def __len__(self):
        '''Denotes the number of batches per epoch'''
        return (2*len(self.files)) // self.batch_size

    def on_epoch_end(self):
        if self.shuffle and self.gen_type != 'test' :
            random.shuffle(self.files)
        self.epoch += 1

    def __getitem__(self, index):
        #! generates self.samples_per_file noisy samples per file, can be of the
        #! same kind of noise, or can be of different kinds of noises.
        inp = []
        out = []
        const = 1 #! const files make up a batch in case of 128
        for path in self.files[index*const : (index+1)*const]:
            clear, noisy = noising.chop_overlay_spit(path, self.window_size, self.noise_types)
            inp += noisy
            out += clear
        inp = np.array(inp)
        out = np.array(out)
        return inp, out

In [None]:
class Generator(tf.keras.utils.Sequence):
    def __init__(self, directory : str = '/home/juggernautjha/Desktop/to_rahul/maruti/data/true_samples',
                length : int = 200, train_ratio : float = 0.8,
                batch_size : int = 32, gen_type = 'train',
                samples_per_file : int = 3, shuffle : bool = True):
        self.shuffle = shuffle
        self.gen_type = gen_type
        self.train_ratio = train_ratio
        self.test_ratio = self.train_ratio + (1-train_ratio)/2
        self.batch_size = batch_size
        self.epoch = 0
        self.pad_to = length
        self.samples_per_file = samples_per_file
        self.noise_types = ['white', 'pink', 'blue', 'brown', 'violet']
        files = glob.glob(f"{directory}/*.flac")
        idx = int(train_ratio*len(files))
        test_idx = int(self.test_ratio*len(files))
        if gen_type == 'train':
            self.files = files[0:idx]
        elif gen_type == 'test':
            self.files = files[idx:test_idx]
        else:
            self.files = files[test_idx:]
        print(f"{gen_type} loader created with {len(self.files)} samples")

        self.len = self.__len__()
        if self.gen_type != 'test':
            self.on_epoch_end()

    def __len__(self):
        '''Denotes the number of batches per epoch'''
        return (2*len(self.files)) // self.batch_size

    def on_epoch_end(self):
        if self.shuffle and self.gen_type != 'test' :
            random.shuffle(self.files)
        self.epoch += 1

    def __getitem__(self, index):
        #! generates self.samples_per_file noisy samples per file, can be of the
        #! same kind of noise, or can be of different kinds of noises.
        inp = []
        out = []
        const = int(self.batch_size//self.samples_per_file) #! const files make up a batch
        for path in self.files[index*const : (index+1)*const]:
            colors = random.choices(self.noise_types, k =self.samples_per_file)
            for color in colors:
                noisy, clear = noising.overlay_noise(path, 15, color, self.pad_to)
                inp.append(noisy.get_array_of_samples())
                out.append(clear.get_array_of_samples())
        inp = np.array(inp)
        out = np.array(out)
        return inp, out


In [None]:
train_generator = FastGenerator()
val_generator = FastGenerator(gen_type='val')

train loader created with 14 samples
val loader created with 3 samples


In [None]:
len(train_generator.__getitem__(1)[0][1])

6239232

##### MODEL (finally??)

In [None]:
class Denoise(Model):
  def __init__(self, input_size, batch_size):
    super(Denoise, self).__init__()
    self.encoder = tf.keras.Sequential([
      layers.Input(shape = (input_size, 1), batch_size = batch_size),
      layers.Conv1D(128, kernel_size=3, activation='relu', kernel_initializer='he_uniform'),
      layers.Conv1D(32, kernel_size=3, activation='relu', kernel_initializer='he_uniform'),
      layers.Conv1D(16, kernel_size=3, activation='relu', kernel_initializer='he_uniform'),
      layers.Conv1D(8, kernel_size=3, activation='relu', kernel_initializer='he_uniform')
      ])

    self.decoder = tf.keras.Sequential([
      layers.Conv1DTranspose(8, kernel_size=3, activation='relu', kernel_initializer='he_uniform'),
      layers.Conv1DTranspose(16, kernel_size=3, activation='relu', kernel_initializer='he_uniform'),
      layers.Conv1DTranspose(32, kernel_size=3, activation='relu', kernel_initializer='he_uniform'),
      layers.Conv1DTranspose(128, kernel_size=3, activation='relu', kernel_initializer='he_uniform'),
      layers.Conv1DTranspose(1, kernel_size=3, activation='relu', kernel_initializer='he_uniform')
      ])

  def call(self, x):
    encoded = self.encoder(x)
    decoded = self.decoder(encoded)
    return decoded

In [None]:
denoiser = Denoise(6239232, 8)

In [None]:
denoiser.compile('adam', loss=tf.keras.losses.mean_squared_error)

In [None]:
denoiser.fit(train_generator, validation_data=val_generator, epochs=10)

ResourceExhaustedError: ignored