In [1]:
import numpy as np
import pandas as pd
import io, os, json
import matplotlib.pyplot as plt
import librosa
import librosa.display
from pydub import AudioSegment
from pydub.utils import make_chunks
from skimage.feature import hog
import soundfile as sf

%run /data/emo/notebooks/source/pipeline/augmenter.ipynb

def append(value, obj):
    result = obj if type(obj) is list else [obj]
    values = [value.copy() for it in range(0, len(result))]
    for i, v in enumerate(result): values[i]['feature'] = v
    return values

class Processable:
    identifier = None
    
    @classmethod # process object (return list)
    def process_object(cls, value): return []
    
    @classmethod # internal json generation
    def _generate_json(cls, *args): return None
    
    @classmethod # internal audio processing
    def _process_audio(cls, value): return None

# Spectrogram Processable

In [2]:
class Spectrogram(Processable):
    identifier = 'spectrogram'
    image_size = (256, 128)
    
    @classmethod
    def process_object(cls, value):
        time_series, sample_rate = cls._process_audio(value)
        return append(value, cls._generate_json(time_series, sample_rate))
    
    @classmethod
    def _process_audio(cls, value):
        x, sample_rate = librosa.load(value['file_path'], sr=None)
        x, index = librosa.effects.trim(x, top_db=20)
        
        return x[x != 0], sample_rate
    
    @classmethod
    def _generate_json(cls, *args):
        io_buf, px = io.BytesIO(), 1/plt.rcParams['figure.dpi']
        fsize = (px * cls.image_size[0], px * cls.image_size[1])
        fig = plt.figure(figsize=fsize)
        ax = plt.Axes(fig, [0., 0., 1., 1.])
        ax.set_axis_off(); fig.add_axes(ax)
        plt.specgram(args[0], Fs=args[1], cmap="jet")
        plt.savefig(io_buf, format='raw')
        plt.close(fig); io_buf.seek(0)
        buff = np.frombuffer(io_buf.getvalue(), dtype=np.uint8)
        shape = (int(fig.bbox.bounds[3]), int(fig.bbox.bounds[2]), -1)
        io_buf.close()
        return np.reshape(buff, newshape=shape)[:,:,:3]

# Chunked Processable

In [3]:
class Chunked(Processable):
    identifier = 'chunked'
    chunk_length_ms = None
    
    @classmethod
    def _process_audio(cls, value):
        processed_audio = []
        audio = AudioSegment.from_wav(value['file_path'])
        for chunk in make_chunks(audio, cls.chunk_length_ms):
            chunk_duration_ms = int(chunk.duration_seconds * 1000)
            if chunk_duration_ms < cls.chunk_length_ms:
                reverse_duration = cls.chunk_length_ms - chunk_duration_ms
                chunk += make_chunks(audio.reverse(), reverse_duration)[0]
            processed_audio.append((cls._process_segment(chunk), audio.frame_rate))
        return processed_audio

    @classmethod
    def _process_segment(cls, segment):
        samples = [s.get_array_of_samples() for s in segment.split_to_mono()]
        audio_samples = np.array(samples).T.astype(np.float32)
        audio_samples /= np.iinfo(samples[0].typecode).max
        return audio_samples.reshape(-1)

# Chunked Spectrogram Processable

In [4]:
class ChunkedSpectrogram(Chunked, Spectrogram):
    identifier = 'chunked_spectrogram'
    chunk_length_ms = 750
    
    @classmethod
    def process_object(cls, value):
        processed_objects = []
        for time_series, sample_rate in cls._process_audio(value):
            processed_objects.append(cls._generate_json(time_series, sample_rate))
        return append(value, processed_objects)

# Fixed Ratio Spectrogram Processable

In [5]:
class FixedSpectrogram(Spectrogram):
    identifier = 'fixed_spectrogram'

    @classmethod
    def _generate_json(cls, *args):
        duration = librosa.get_duration(args[0], args[1])
        io_buf, px = io.BytesIO(), 1/plt.rcParams['figure.dpi']
        fig = plt.figure(figsize=(px * int(duration * 100), px * 128))
        ax = plt.Axes(fig, [0., 0., 1., 1.])
        ax.set_axis_off(); fig.add_axes(ax)
        plt.specgram(args[0], Fs=args[1])
        plt.savefig(io_buf, format='raw')
        plt.close(fig); io_buf.seek(0)
        buff = np.frombuffer(io_buf.getvalue(), dtype=np.uint8)
        shape = (int(fig.bbox.bounds[3]), int(fig.bbox.bounds[2]), -1)
        io_buf.close()
        return np.reshape(buff, newshape=shape)[:,:,:3]

# HOG Processable

In [6]:
class HoG(Spectrogram):
    identifier = 'hog'
    
    @classmethod
    def _generate_json(cls, *args):
        image = super()._generate_json(*args)
        features = hog(image, orientations=8, pixels_per_cell=(4, 4), 
                       cells_per_block=(2, 2), multichannel=True)
        return features

# Clean Spec

In [7]:
class CleanSpec(Spectrogram):
    indentifier = 'Augmented Ravdess'
    
    @classmethod
    def process_object(cls, value):
        processed_objects = []
        for time_series, sample_rate in cls._process_audio(value):
            processed_objects.append(cls._generate_json(time_series, sample_rate))
        return append(value, processed_objects)
        
    @classmethod
    def _process_audio(cls, value):
        x, sample_rate = librosa.load(value['file_path'], sr=None)
        x, index = librosa.effects.trim(x, top_db=20)
        augmented_audios = [(x, sample_rate)]
        
#         # Change pitch down
#         augmented_audios.append((Augmenter.change_pitch(audio=x, sr=sample_rate), sample_rate))

#         # Change pitch up
#         augmented_audios.append((Augmenter.change_pitch(audio=x, sr=sample_rate, pitch_type="up"), sample_rate))

#         # Change speed slow
#         augmented_audios.append((Augmenter.change_speed(audio=x), sample_rate))
        
#         # Change speed fast
#         augmented_audios.append((Augmenter.change_speed(audio=x, speed_change="high"), sample_rate))

#         # Change speed & pitch down
#         augmented_audios.append((Augmenter.change_speed_and_pitch(audio=x, sr=sample_rate), sample_rate))  

#         # Change speed & pitch up
#         augmented_audios.append((Augmenter.change_speed_and_pitch(audio=x, sr=sample_rate, pitch_type="up"), sample_rate))  

#         # Add distribution noise
#         augmented_audios.append((Augmenter.add_distribution_noise(audio=x), sample_rate))
        
        return augmented_audios

# Household Spec

In [8]:
class Household(Processable):
    identifier = 'household'
    
#     @classmethod
#     def process_object(cls, value):
#         processed_objects = []
#         for time_series, sample_rate in cls._process_audio(value):
#             processed_objects.append(cls._generate_json(time_series, sample_rate))
#         return append(value, processed_objects)
    
    @classmethod
    def _process_audio(cls, value):
        x, sample_rate = librosa.load(value['file_path'], sr=44100)
        x, index = librosa.effects.trim(x, top_db=20)
        f = io.BytesIO()
        sf.write(f, x, sample_rate, format='wav')
        x, sample_rate = Augmenter.add_background_noise(f)
        return Augmenter.add_background_noise(f)

#     @classmethod
#     def _process_audio(cls, value):
#         x, sample_rate = librosa.load(value['file_path'], sr=44100)
#         x, index = librosa.effects.trim(x, top_db=20)
#         augmented_audios = [(x, sample_rate)]
#         f = io.BytesIO()
#         sf.write(f, x, sample_rate, format='wav')
#         x, sample_rate = Augmenter.add_background_noise(f)
#         augmented_audios.append((x, sample_rate))
#         return augmented_audios

# Logaritmic Spectrogram

In [9]:
class LogSpectrogram(Household):
    identifier = 'log_spectrogram'
    hop_length = 1024
    image_size = (256, 128)
    
    @classmethod
    def process_object(cls, value):
        time_series, sample_rate = cls._process_audio(value)
        return append(value, cls._generate_json(time_series, sample_rate))
    
#     @classmethod
#     def _process_audio(cls, value):
#         x, sample_rate = librosa.load(value['file_path'], sr=44100)
#         x, index = librosa.effects.trim(x, top_db=20)
#         return x, sample_rate
    
    @classmethod
    def _generate_json(cls, *args):
        io_buf, px = io.BytesIO(), 1/plt.rcParams['figure.dpi']
        fsize = (px * cls.image_size[0], px * cls.image_size[1])
        fig = plt.figure(figsize=fsize)
        ax = plt.Axes(fig, [0., 0., 1., 1.])
        ax.set_axis_off(); fig.add_axes(ax)
        x = librosa.amplitude_to_db(np.abs(librosa.stft(args[0], hop_length=cls.hop_length)), ref=np.max)
        librosa.display.specshow(x, sr=args[1], y_axis='log', hop_length=cls.hop_length, x_axis='time')
        plt.savefig(io_buf, format='raw')
        plt.close(fig); io_buf.seek(0)
        buff = np.frombuffer(io_buf.getvalue(), dtype=np.uint8)
        shape = (int(fig.bbox.bounds[3]), int(fig.bbox.bounds[2]), -1)
        io_buf.close()
        return np.reshape(buff, newshape=shape)[:,:,:3]

# Fixed Ratio Log Spectrogram

In [10]:
class FixedLogSpectrogram(LogSpectrogram):
    identifier = 'fixed_log_spectrogram'
    
    @classmethod
    def _generate_json(cls, *args):
        duration = librosa.get_duration(S=args[0], sr=args[1], hop_length=cls.hop_length)
        io_buf, px = io.BytesIO(), 1/plt.rcParams['figure.dpi']
        fig = plt.figure(figsize=(px * int(duration * 100), px * 128))
        ax = plt.Axes(fig, [0., 0., 1., 1.])
        ax.set_axis_off(); fig.add_axes(ax)
        librosa.display.specshow(args[0], sr=args[1], y_axis='log', hop_length=cls.hop_length, x_axis='time')
        plt.savefig(io_buf, format='raw')
        plt.close(fig); io_buf.seek(0)
        buff = np.frombuffer(io_buf.getvalue(), dtype=np.uint8)
        shape = (int(fig.bbox.bounds[3]), int(fig.bbox.bounds[2]), -1)
        io_buf.close()
        return np.reshape(buff, newshape=shape)[:,:,:3]