<a href="https://colab.research.google.com/github/koleshjr/NOISE-DATA-CLASSIFICATION/blob/main/Noise_data_simple_exploration.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount("/content/drive")

In [None]:
!cp "/content/drive/MyDrive/Noise" -r "/content/"

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from glob import glob ## list all files 

##working with audio
import librosa
import librosa.display 
import IPython.display as ipd ## play audio in the notebook

from itertools import cycle


## pretty visualizations
sns.set_theme(style="white", palette= None)
color_pal = plt.rcParams["axes.prop_cycle"].by_key()["color"]
color_cycle = cycle(plt.rcParams["axes.prop_cycle"].by_key()["color"])



In [None]:
path = '/content/drive/MyDrive/Noise/Noise Classification/'

In [None]:
train = pd.read_csv(path + "Train.csv")
test = pd.read_csv(path + "Test.csv")
ss = pd.read_csv(path + "SampleSubmission.csv")

In [None]:
train.head()

In [None]:
# !unzip '/content/drive/MyDrive/Noise/Noise Classification/Audio_clips.zip'

### FREQUENCY
* Differences in audio wavelengths
* Every audio has multiple frequencies within it 
* we have low, high and medium frequencies 

### INTENSITY(POWER)
*  Changes in the height of the wave(loudness, pitch)

### Sample Rate
* Discrete observations viewed by the computer 
* Resolution of the audio
* Specific to how the computer reads in the audio file 

In [None]:
audio_files = glob("/content/AUDIO_CLIPS/*.wav")

## Play Audio File

In [None]:
train['category'].value_counts()

In [None]:
ipd.Audio(audio_files[500])

In [None]:
y, sr = librosa.load(audio_files[5])
print(f'y:{y[:10]}')
print(f'shape y:{y.shape}')
print(f'sr:{sr}')

In [None]:
pd.Series(y).plot(figsize=(10,5), title='Raw Audio Example',lw=1,color = color_pal[0])
plt.show()

In [None]:
# y_trimmed, _ = librosa.effects.trim(y, top_db=20)
# pd.Series(y_trimmed).plot(figsize=(10,5), title='Raw Audio Example',lw=1,color = color_pal[1])
# plt.show()

In [None]:
pd.Series(y[150000:155000]).plot(figsize=(10,5), title='Raw Audio Zoomed in Example',lw=1,color = color_pal[0])
plt.show()

## Spectogram
* a detailed view of audio, able to represent time, frequency, and amplitude all on one graph
* Ideally suited for applications where all frequencies have equal importance

In [None]:
D = librosa.stft(y)
s_db = librosa.amplitude_to_db(np.abs(D), ref=np.max)
s_db.shape

In [None]:
# plot the transformed audio data
fig, ax = plt.subplots(figsize=(18, 5))
img= librosa.display.specshow(s_db, x_axis = 'time',y_axis = 'log', ax=ax)
ax.set_title('Spectogram Example', fontsize=28)
fig.colorbar(img, ax=ax, format = f'%0.2f')
plt.show()

## MEL SPECTOGRAM
* remaps the frequency in Hz to the mel_scale
* better suited for applications that need to model human hearing perception

In [None]:
S = librosa.feature.melspectrogram(y,sr=sr,n_mels=64)
s_db_mel= librosa.amplitude_to_db(S, ref=np.max)
s_db_mel.shape

In [None]:
# plot the transformed audio data
fig, ax = plt.subplots(figsize=(18, 5))
img= librosa.display.specshow(s_db_mel, x_axis = 'time',y_axis = 'log', ax=ax)
ax.set_title('Mel Spectogram Example', fontsize=28)
fig.colorbar(img, ax=ax, format = f'%0.2f')
plt.show()

## PYTORCH CUSTOM DATASET IN AUDIO CLASSIFICATION

In [None]:
## create_folds for the model
## add .wav extension to the audio_id to match the audio

In [None]:
train.head()

In [None]:
from sklearn.preprocessing import LabelEncoder
df = train.copy()
encoder = LabelEncoder()
df['Label'] = encoder.fit_transform(df['category'])
df = df.reset_index()
df.drop(df[df.index %2 != 0].index, inplace = True)
df =df[['CLIP_ID','category','Label']].reset_index(drop = True)
df

In [None]:
from sklearn.model_selection import StratifiedKFold
kf = StratifiedKFold(n_splits = 10, shuffle = True, random_state=42)
for n,(_,valid_index)  in enumerate(kf.split(df,df['Label'])):
  df.loc[valid_index,'fold'] = int(n)
df['fold'] = df['fold'].astype(int)

In [None]:
df[df['fold']==0][df[df['fold']==0]['Label']== 2]

In [None]:
df["CLIP_ID"] = [i +".wav" for i in df["CLIP_ID"]]

In [None]:
df.head()

In [None]:
df.to_csv('/content/train_10folds.csv', index=False)

In [None]:
train = pd.read_csv('/content/train_10folds.csv')

In [None]:
import torch
import torchaudio
import os
from torch.utils.data import Dataset
from torch import nn
from torchsummary import summary
from torch.nn.modules.activation import ReLU
from torch.utils.data import DataLoader


In [None]:
train.head()

In [None]:
class NoiseDataset(Dataset):
  def __init__(self, annotations_file, audio_dir, transformation, target_sample_rate, num_samples, device ):
    self.annotations = pd.read_csv(annotations_file)
    self.audio_dir = audio_dir
    self.device = device
    self.transformation = transformation.to(self.device)
    self.target_sample_rate = target_sample_rate
    self.num_samples = num_samples
    
  def __len__(self):
    return len(self.annotations)
    
  def __getitem__(self, index):
    ##we are using private methods 
    audio_sample_path = self._get_audio_sample_path(index)
    label = self._get_audio_sample_label(index)
    signal, sr= torchaudio.load(audio_sample_path)

    signal = signal.to(self.device) ## transforms on gpu if there 
    signal = self._resample_if_necessary(signal,sr) # each audio has diff so uniform
    signal = self._mix_down_if_necessary(signal) ##multiple channels to one channel
    signal = self._right_pad_if_necessary(signal)
    signal = self._cut_if_necessary(signal)
    signal = self.transformation(signal)
    return signal, label

  def _cut_if_necessary(self, signal):
    # signal -> Tensor ->(1, num_samples)
    if signal.shape[1] > self.num_samples:
      signal = signal[:, :self.num_samples]
    return signal

  def _right_pad_if_necessary(self,signal):
    len_signal = signal.shape[1]
    if len_signal < self.num_samples:
      num_missing_samples = self.num_samples - len_signal
      last_dim_padding = (0, num_missing_samples)
      signal = torch.nn.functional.pad(signal, last_dim_padding)
    return signal
  def _resample_if_necessary(self,signal,sr):
    if sr!= self.target_sample_rate:## we dont want to resample when sr is equal to target sr
      resampler = torchaudio.transforms.Resample(sr,self.target_sample_rate)
      signal = resampler(signal)
    return signal

  def _mix_down_if_necessary(self, signal):
    if signal.shape[0] > 1: # (2, 1000)
      signal = torch.mean(signal, dim=0, keepdim= True)
    return signal

  def _get_audio_sample_path(self, index):
    path = os.path.join(self.audio_dir,self.annotations.iloc[index,0])
    return path

  def _get_audio_sample_label(self, index):
    ##pass the index with the label_encoded_class
    return self.annotations.iloc[index, 2]



In [None]:
ANNOTATIONS_FILE = '/content/train_10folds.csv'
AUDIO_DIR = '/content/AUDIO_CLIPS'
SAMPLE_RATE = 22050
NUM_SAMPLES = 22050

if torch.cuda.is_available():
  device="cuda"
else:
  device = 'cpu'

print(f"running_device is {device}")

mel_spectrogram = torchaudio.transforms.MelSpectrogram(
    sample_rate= SAMPLE_RATE,
    n_fft=1024,
    hop_length=512,
    n_mels = 64
)

##
noise_data = NoiseDataset(ANNOTATIONS_FILE, AUDIO_DIR, mel_spectrogram, SAMPLE_RATE,
                          NUM_SAMPLES,device)

In [None]:
print(f"there are {len(noise_data)} clips in the dataset")

In [None]:
noise_data[1]

#### Everything works okay

## MODEL CREATION

##### Change this to a dynamicc way by using the constructor

In [None]:

class MyModel(nn.Module):
  def __init__(self):

    super().__init__()
    # 4 conv_blocks/ flatten/ linear/ softmax
    self.conv1 = nn.Sequential(
        nn.Conv2d(
            in_channels=1, ##assume it to  be gray_scale
            out_channels=16,
            kernel_size =3,
            stride=1,
            padding=2
        ),
        nn.ReLU(),
        nn.MaxPool2d(kernel_size=2)
    )

    self.conv2 = nn.Sequential(
        nn.Conv2d(
            in_channels=16, ##assume it to  be gray_scale
            out_channels=32,
            kernel_size =3,
            stride=1,
            padding=2
        ),
        nn.ReLU(),
        nn.MaxPool2d(kernel_size=2)
    )

    self.conv3 = nn.Sequential(
        nn.Conv2d(
            in_channels=32, ##assume it to  be gray_scale
            out_channels=64,
            kernel_size =3,
            stride=1,
            padding=2
        ),
        nn.ReLU(),
        nn.MaxPool2d(kernel_size=2)
    )

    self.conv4 = nn.Sequential(
        nn.Conv2d(
            in_channels=64, ##assume it to  be gray_scale
            out_channels=128,
            kernel_size =3,
            stride=1,
            padding=2
        ),
        nn.ReLU(),
        nn.MaxPool2d(kernel_size=2)
    )
    self.flatten = nn.Flatten()
    #128 no of last block output channel
    self.linear = nn.Linear(128*5*4, 19)
    self.softmax = nn.Softmax(dim=1)


  def forward(self, input_data):
      x = self.conv1(input_data)
      x = self.conv2(x)
      x = self.conv3(x)
      x = self.conv4(x)
      x= self.flatten(x)
      logits = self.linear(x)
      predictions = self.softmax(logits)
      return predictions



In [None]:
cnn = MyModel()
## 64 mel_specs, 44 time_spec
summary(cnn.to(device), (1,64,44))

## TRAINING AND EVALUATION

In [None]:
BATCH_SIZE = 128
EPOCHS = 10
LEARNING_RATE = 0.001

In [None]:
def create_data_loader(train_data,batch_size):
  train_dataloader = DataLoader(train_data, batch_size = batch_size)
  return train_dataloader

def train_single_epoch(model, data_loader, loss_fn, optimizer, device):
  for input, target in data_loader:
    input, target = input.to(device), target.to(device)

    #calculate loss
    prediction = model(input)
    loss = loss_fn(prediction, target)

    ##backpropagate error and update weights
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
  print(f"loss: {loss.item()}")

def train(model, data_loader, loss_fn, optimizer, device, epochs):
  for i in range(epochs):
    print(f"Epoch {i+1}")
    train_single_epoch(model, data_loader, loss_fn, optimizer, device)
    print("-----------------------------------------------------")
  print("Finished Training")

In [None]:
## Instantiate our dataset object
mel_spectrogram = torchaudio.transforms.MelSpectrogram(
    sample_rate= SAMPLE_RATE,
    n_fft=1024,
    hop_length=512,
    n_mels = 64
)

##
noise_data = NoiseDataset(ANNOTATIONS_FILE, AUDIO_DIR, mel_spectrogram, SAMPLE_RATE,
                          NUM_SAMPLES,device)

train_dataloader = create_data_loader(noise_data, BATCH_SIZE)
 
## Construct model and assign it to device
my_model = MyModel().to(device)
print(my_model)

## Initialise the loss_funtion and optimiser
loss_fn = nn. CrossEntropyLoss()
optimiser = torch. optim.Adam(my_model.parameters(),
                              lr = LEARNING_RATE)

## Train the model
train(my_model, train_dataloader, loss_fn, optimiser, device, EPOCHS)

## Save the model
torch.save(my_model.state_dict(), "baseline_scratch_model.pth")
print("Trained model saved at baseline_scratch_model.pth")
