In [2]:
import numpy as np
import io
import matplotlib.pyplot as plt
import librosa


def append(value, obj):
    result = obj if type(obj) is list else [obj]
    values = [value.copy() for it in range(0, len(result))]
    for i, v in enumerate(result): values[i]['feature'] = v
    return values

class Processable:
    identifier = None
    
    @classmethod # process object (return list)
    def process_object(cls, value): return []
    
    @classmethod # internal json generation
    def _generate_json(cls, *args): return None
    
    @classmethod # internal audio processing
    def _process_audio(cls, value): return None

# Spectrogram Processable

In [3]:
class Spectrogram(Processable):
    identifier = 'spectrogram'
    image_size = (256, 128)
    
    @classmethod
    def process_object(cls, value):
        time_series, sample_rate = cls._process_audio(value)
        return append(value, cls._generate_json(time_series, sample_rate))
    
    @classmethod
    def _process_audio(cls, value):
        x, sample_rate = librosa.load(value['file_path'], sr=None)
        return np.trim_zeros(x), sample_rate
    
    @classmethod
    def _generate_json(cls, *args):
        io_buf, px = io.BytesIO(), 1/plt.rcParams['figure.dpi']
        fsize = (px * cls.image_size[0], px * cls.image_size[1])
        fig = plt.figure(figsize=fsize)
        ax = plt.Axes(fig, [0., 0., 1., 1.])
        ax.set_axis_off(); fig.add_axes(ax)
        plt.specgram(args[0], Fs=args[1], cmap="jet")
        plt.savefig(io_buf, format='raw')
        plt.close(fig); io_buf.seek(0)
        buff = np.frombuffer(io_buf.getvalue(), dtype=np.uint8)
        shape = (int(fig.bbox.bounds[3]), int(fig.bbox.bounds[2]), -1)
        io_buf.close()
        return np.reshape(buff, newshape=shape)[:,:,:3]

# Augmented Spectogram

In [7]:
%run /datc/emo/notebooks/source/pipeline/augmentation.ipynb

class Spectrogram(Processable):
    identifier = 'spectrogram'
    image_size = (256, 128)
    
    @classmethod
    def process_object(cls, value):
        time_series, sample_rate = cls._process_audio(value)
        return append(value, cls._generate_json(time_series, sample_rate))
    
    @classmethod
    def _process_audio(cls, value):
        x, sample_rate = librosa.load(value['file_path'], sr=None)
        return np.trim_zeros(x), sample_rate
    
    @classmethod
    def _generate_json(cls, *args):
        io_buf, px = io.BytesIO(), 1/plt.rcParams['figure.dpi']
        fsize = (px * cls.image_size[0], px * cls.image_size[1])
        fig = plt.figure(figsize=fsize)
        ax = plt.Axes(fig, [0., 0., 1., 1.])
        ax.set_axis_off(); fig.add_axes(ax)
        plt.specgram(args[0], Fs=args[1], cmap="jet")
        plt.savefig(io_buf, format='raw')
        plt.close(fig); io_buf.seek(0)
        buff = np.frombuffer(io_buf.getvalue(), dtype=np.uint8)
        shape = (int(fig.bbox.bounds[3]), int(fig.bbox.bounds[2]), -1)
        io_buf.close()
        return np.reshape(buff, newshape=shape)[:,:,:3]

class AugmentedSpectogram(Spectrogram):
    identifier = 'Augmented spectrogram'
    
    @classmethod
    def process_object(cls, value):
        processed_objects = []
        for time_series, sample_rate in cls._process_audio(value):
            processed_objects.append(cls._generate_json(time_series, sample_rate))
        return append(value, processed_objects)
        
    
    @classmethod
    def _process_audio(cls, value):
        x, sample_rate = librosa.load(value['file_path'], sr=None)
        x, sample_rate = np.trim_zeros(x), sample_rate
        augmented_audios = [(x, sample_rate)]
        
        # Change pitch down
        augmented_audios.append((Augmenter.change_pitch(audio=x, sr=sample_rate), sample_rate))

        # Change pitch up
        augmented_audios.append((Augmenter.change_pitch(audio=x, sr=sample_rate, pitch_type="up"), sample_rate))

        # Change speed slow
        augmented_audios.append((Augmenter.change_speed(audio=x), sample_rate))
        
        # Change speed fast
        augmented_audios.append((Augmenter.change_speed(audio=x, speed_change="high"), sample_rate))

        # Change speed & pitch down
        augmented_audios.append((Augmenter.change_speed_and_pitch(audio=x, sr=sample_rate), sample_rate))  

        # Change speed & pitch up
        augmented_audios.append((Augmenter.change_speed_and_pitch(audio=x, sr=sample_rate, pitch_type="up"), sample_rate))  

        # Add distribution noise
        augmented_audios.append((Augmenter.add_distribution_noise(audio=x), sample_rate))
        
        return augmented_audios