In [None]:
import math, random
import torch
import torchaudio
from torchaudio import transforms
from IPython.display import Audio

In [None]:

# used to load audio file
#specifying sample rate will resize all the files i.e Audio will be automatically resampled to the given rate
class Loader:
  def __init__(self, sample_rate,duration,mono):
    self.sample_rate=sample_rate
    self.duration=duration
    self.mono=mono
    self.channel = 2


  def load(self,filepath):
    sig, sr = torchaudio.load(filepath)
    aud = sig, sr
    return aud

  #before using this function kindly change your file paths for it to work


  def rechannel(self, aud):    #convert mono to stereo
    # aud=self.aud
    sig, sr = aud
  

    if (sig.shape[0] == self.channel):
      # Nothing to do
      return self.aud

    if (self.channel == 1):
      # Convert from stereo to mono by selecting only the first channel
      resig = sig[:1, :]
    else:
      # Convert from mono to stereo by duplicating the first channel
      resig = torch.cat([sig, sig])

    aud = resig, sr
  def resample(self,aud):                    #standardize sample rate
    sig, sr = aud
    
    if (sr == self.sample_rate):
      # Nothing to do
      return aud

    num_channels = sig.shape[0]
    # Resample first channel
    resig = torchaudio.transforms.Resample(sr, self.sample_rate)(sig[:1,:])
    if (num_channels > 1):
      # Resample the second channel and merge both channels
      retwo = torchaudio.transforms.Resample(sr, self.sample_rate)(sig[1:,:])
      resig = torch.cat([resig, retwo])
      aud = resig, self.sample_rate
    return aud

  # ----------------------------
  # Pad (or truncate) the signal to a fixed length 'max_ms' in milliseconds
  # ----------------------------
  def pad_trunc(self,aud):
    sig, sr = aud
    num_rows, sig_len = sig.shape
    max_len = sr//1000 * self.duration

    if (sig_len > max_len):
      # Truncate the signal to the given length
      sig = sig[:,:max_len]

    elif (sig_len < max_len):
      # Length of padding to add at the beginning and end of the signal
      pad_begin_len = random.randint(0, max_len - sig_len)
      pad_end_len = max_len - sig_len - pad_begin_len

      # Pad with 0s
      pad_begin = torch.zeros((num_rows, pad_begin_len))
      pad_end = torch.zeros((num_rows, pad_end_len))

      sig = torch.cat((pad_begin, sig, pad_end), 1)
      aud = sig, sr
    return aud
      # ----------------------------
  # Shifts the signal to the left or right by some percent. Values at the end
  # are 'wrapped around' to the start of the transformed signal.
  # ----------------------------
 
  def time_shift(aud, shift_limit):
    sig,sr = aud
    _, sig_len = sig.shape
    shift_amt = int(random.random() * shift_limit * sig_len)
    aud=sig.roll(shift_amt), sr
    return aud
    # ----------------------------
  # Generate a Spectrogram
  # ----------------------------
 
  def spectro_gram(aud, n_mels=64, n_fft=1024, hop_len=None):
    sig,sr = aud
    top_db = 80

    # spec has shape [channel, n_mels, time], where channel is mono, stereo etc
    spec = transforms.MelSpectrogram(sr, n_fft=n_fft, hop_length=hop_len, n_mels=n_mels)(sig)

    # Convert to decibels
    spec = transforms.AmplitudeToDB(top_db=top_db)(spec)
    return (spec)

    # ----------------------------
  # Augment the Spectrogram by masking out some sections of it in both the frequency
  # dimension (ie. horizontal bars) and the time dimension (vertical bars) to prevent
  # overfitting and to help the model generalise better. The masked sections are
  # replaced with the mean value.
  # ----------------------------
  def spectro_augment(spec, max_mask_pct=0.1, n_freq_masks=1, n_time_masks=1):
    _, n_mels, n_steps = spec.shape
    mask_value = spec.mean()
    aug_spec = spec

    freq_mask_param = max_mask_pct * n_mels
    for _ in range(n_freq_masks):
      aug_spec = transforms.FrequencyMasking(freq_mask_param)(aug_spec, mask_value)

    time_mask_param = max_mask_pct * n_steps
    for _ in range(n_time_masks):
      aug_spec = transforms.TimeMasking(time_mask_param)(aug_spec, mask_value)

    return aug_spec

In [None]:
class PreprocessingPipeline:

    
  '''Processes audio files in a directory by applying the following steps
    1. Load the data, convert to stereo and resample sampling rate
    2. Pad the audio
  '''
  def __init__(self):
        self.padder=None
        self._loader=None


  def process(self,audio_files_directory):
        for root, directories, files in os.walk(audio_files_directory):
            for filename in files:
                filepath = os.path.join(root, filename)
                self._process_file(filepath)
                print(f"Processed file {filepath}")
                self._convert_mfcc(filepath)
    
  def _process_file(self,filepath):     
        signal=self.loader.load(filepath)
        signal = self.loader.make_stereo(signal)
        signal = self.loader.resample(signal)
        signal= self.loader.pad_trunc(signal)
        signal= self.loader.time_shift(signal)
        