<a href="https://colab.research.google.com/github/roxyrong/emotion_detection/blob/main/Data_Augmentation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import auth
from google.cloud import storage
auth.authenticate_user()
client = storage.Client()

import math
import random
import pandas as pd
import numpy as np
import librosa
import librosa.display

In [2]:
# === connect to SAVEE dataset
bucket = client.get_bucket('savee')
# === list the paths for all audio data
blobs = list(bucket.list_blobs(prefix='AudioData/'))

In [3]:
# === Create Empty List
paths, labels, data = [], [], []

# === Loop audio files
for audio in blobs:
  # === convert to string
  file_path = str(audio).replace("<Blob: savee, AudioData/","")
  # === filter out txt file
  if "txt" not in file_path:
    # === Label Processing
    label = file_path.split('.')[0]
    if label[3] == 'a':
        labels.append('a')
    elif label[3] == 'd':
        labels.append('d')
    elif label[3] == 'f':
        labels.append('f')
    elif label[3] == 'h':
        labels.append('h')
    elif label[3] == 'n':
        labels.append('n')
    elif label[3] == 's':
        if label[4] == 'sa':
            labels.append('sa')
        else:
            labels.append('su')

    # === Duration Processing
    file_path = file_path.split(",")[0]
    file_path = 'AudioData/' + file_path
    blob = bucket.blob(file_path)
    blob.download_to_filename("audios")

    # === Path Processing
    paths.append(file_path)

    # === Data Loading
    y, sr = librosa.load('audios')
    data.append(y)

# === Create a dataframe to store
df_savee = pd.DataFrame({'data': data, 'path':paths, 'dataset': 'SAVEE', 
                         'emotion':labels})
df_savee["speaker"] = df_savee["path"].apply(lambda x:x[10:12])
df_savee['augmented'] = False

In [4]:
# === Audio Augmentation Functions
def add_white_noise(signal, min_fac=0.1, max_fac=0.5):
    noise_percentage_factor = random.uniform(min_fac, max_fac)
    noise = np.random.normal(0, signal.std(), signal.size)
    augmented_signal = signal + noise * noise_percentage_factor
    return augmented_signal

def time_stretch(signal, min_rt=0.9, max_rt=1.1):
    time_stretch_rate = random.uniform(min_rt, max_rt)
    return librosa.effects.time_stretch(signal, time_stretch_rate)

def pitch_scale(signal, sr=22050, min_fac=-2, max_fac=2):
    num_semitones = random.uniform(min_fac, max_fac)
    return librosa.effects.pitch_shift(signal, sr, num_semitones)

def random_gain(signal, min_factor=0.05, max_factor=0.12):
    gain_rate = random.uniform(min_factor, max_factor)
    augmented_signal = signal * gain_rate
    return augmented_signal

def invert_polarity(signal):
    return signal * -1

In [5]:
# === Audio Augmentation Process
def aug_through_dataset(raw_df, sr, white_noise_only_prob, functions):
  augmented_dataset = []
  for data in raw_df.data:
      if random.random() < white_noise_only_prob:
          augmented_data = add_white_noise(data)
      else:
          num_functions = random.randint(1, len(functions))
          selected_functions = random.sample(functions, num_functions)
          augmented_data = data
          for func in selected_functions:
              augmented_data = func(augmented_data)
      augmented_dataset.append(augmented_data)

  aug_df = pd.DataFrame(columns=raw_df.columns)
  aug_df['data'] = pd.Series(augmented_dataset)
  for col in raw_df.columns:
    if col == 'data':
      aug_df[col] = pd.Series(augmented_dataset)
    elif col == 'augmented':
      aug_df[col] = True
    else:
      aug_df[col] = list(raw_df[col])
  return aug_df

In [7]:
# === Params for Data Augmentation
random.seed(207) 
augmentation_num_loop = 3 # we have 4x480, nearly 2000 examples.
functions = [add_white_noise, time_stretch, pitch_scale, random_gain, 
             invert_polarity]
white_noise_only_prob = 0.2 # hard threshold for pitch adjustment.
sr = 22050 # librosa default

# === Start Augmentation
augmented_df_list = []
for i in range(augmentation_num_loop):
  aug_df = aug_through_dataset(df_savee, sr, white_noise_only_prob, functions)
  augmented_df_list.append(aug_df)

final_df_savee = pd.concat(augmented_df_list, axis=0)
final_df_savee = pd.concat([df_savee, final_df_savee], 
                           axis=0).reset_index(drop=True)
print(final_df_savee.head(5))

                                                data                  path  \
0  [0.035011273, 0.052110124, 0.0455472, 0.049692...  AudioData/DC/a01.wav   
1  [0.028584875, 0.043024283, 0.038623992, 0.0423...  AudioData/DC/a02.wav   
2  [0.029121982, 0.043259364, 0.037821554, 0.0412...  AudioData/DC/a03.wav   
3  [0.028819187, 0.042954646, 0.037683364, 0.0410...  AudioData/DC/a04.wav   
4  [0.01032823, 0.015278679, 0.013250433, 0.01440...  AudioData/DC/a05.wav   

  dataset emotion speaker  augmented  
0   SAVEE       a      DC      False  
1   SAVEE       a      DC      False  
2   SAVEE       a      DC      False  
3   SAVEE       a      DC      False  
4   SAVEE       a      DC      False  


In [8]:
# === Upload to GCloud
np.save(file="augmented_dataset.npy", arr=final_df_savee)
final_df_savee.to_csv("augmented_dataset.csv",index=False)
blob = bucket.blob("augmented_dataset.npy")
blob.upload_from_filename("augmented_dataset.npy")
blob = bucket.blob("augmented_dataset.csv")
blob.upload_from_filename("augmented_dataset.csv")