<a href="https://colab.research.google.com/github/yootazi/spectrogram_extractor/blob/main/spectrogram_extractor.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Spectrogram Extractor**
# Extracting Spectrograms from an Audio Dataset located in Google Drive

---

 by Yalda Zamani, 2021
> Website: [www.yaldazamani.com](https://www.yaldazamani.com)

> Twitter: [@yootazi](https://twitter.com/yootazi)


> 


Currently it can be extract spectrograms from any dataset of .wav audio files at 44.1khz Sample Rate and 16bit bitdepth.

> Credits:
* coded following 'The Sound of AI' Youtube tutorial series by Valerio Velardo



Create a folder in your Google Drive called ai_music_projects. Create another folder within ai_music_projects called Spectrogram_Extractor. Create two empty folder 'audio' and 'spectrograms' within Spectrogram_Extractor to store your audio files and retrieve spectrograms.

Make sure the path pointing to your folder look like these:

'/content/gdrive/MyDrive/ai_music_projects/Spectrogram_Extractor/audio'
'/content/gdrive/MyDrive/ai_music_projects/Spectrogram_Extractor/spectrograms'


Move your audio files into the 'audio' folder.

In [1]:
#@title **Importing Libraries**

from PIL import Image
import os                          # to load audiofiles
import librosa                     
import librosa.display             # for visualisation of spectrograms
import IPython.display as ipd
import numpy as np
import matplotlib.pyplot as plt    # plotting spectrograms

In [None]:
#@title **Mounting Google Drive**
from google.colab import drive
drive.mount('/content/gdrive/')

In [3]:
#@title **Loading Audio Files from Google Drive**

#writing a code to load all the file in the audio directory in file_x variables

scale_file = "/content/gdrive/MyDrive/ai_music_projects/Spectrogram_Extractor/audio/scale.wav"
debussy_file = "/content/gdrive/MyDrive/ai_music_projects/Spectrogram_Extractor/audio/debussy.wav"
redhot_file = "/content/gdrive/MyDrive/ai_music_projects/Spectrogram_Extractor/audio/redhot.wav"
duke_file = "/content/gdrive/MyDrive/ai_music_projects/Spectrogram_Extractor/audio/duke.wav"

#writing a code to listen all the file in the audio directory in file_x variables

ipd.Audio(scale_file) 
scale, sr = librosa.load(scale_file)

In [None]:
#@title **Extracting Short-Time Fourier Transform**

FRAME_SIZE = 2048
HOP_SIZE = 512

S_scale = librosa.stft(scale, n_fft=FRAME_SIZE, hop_length=HOP_SIZE)
S_scale.shape
type(S_scale[0][0])


In [None]:
#@title **Calculating / Visualizing the Spectrogram**

Y_scale = np.abs(S_scale) ** 2
Y_scale.shape
type(Y_scale[0][0])

def plot_spectrogram(Y, sr, hop_length, y_axis="linear"):
    plt.figure(figsize=(25, 10))
    librosa.display.specshow(Y, 
                             sr=sr, 
                             hop_length=hop_length, 
                             x_axis="time", 
                             y_axis=y_axis)
    plt.colorbar(format="%+2.f")

Y_log_scale = librosa.power_to_db(Y_scale)
plot_spectrogram(Y_log_scale, sr, HOP_SIZE)



In [None]:
#@title **Log-Frequency Spectrogram**

plot_spectrogram(Y_log_scale, sr, HOP_SIZE, y_axis="log")

In [None]:
#@title **Visualising Songs From Different Files**

S_debussy = librosa.stft(debussy, n_fft=FRAME_SIZE, hop_length=HOP_SIZE)
S_redhot = librosa.stft(redhot, n_fft=FRAME_SIZE, hop_length=HOP_SIZE)
S_duke = librosa.stft(duke, n_fft=FRAME_SIZE, hop_length=HOP_SIZE)


Y_debussy = librosa.power_to_db(np.abs(S_debussy) ** 2)
Y_redhot = librosa.power_to_db(np.abs(S_redhot) ** 2)
Y_duke = librosa.power_to_db(np.abs(S_duke) ** 2)

plot_spectrogram(Y_debussy, sr, HOP_SIZE, y_axis="log")
plot_spectrogram(Y_redhot, sr, HOP_SIZE, y_axis="log")
plot_spectrogram(Y_duke, sr, HOP_SIZE, y_axis="log")

In [None]:
#@title **Creating Spectrograms from All the Audio Files Located in 'audio' Folder**

"""
1- load a file
2- pad the signal (if necessary)
3- extracting log spectrogram from signal
4- normalise spectrogram
5- save the normalised spectrogram
PreprocessingPipeline
"""
import os
import pickle

import librosa
import numpy as np


class Loader:
    """Loader is responsible for loading an audio file."""

    def __init__(self, sample_rate, duration, mono):                        # constructor / in MIR audio can be analized when they are mono
        self.sample_rate = sample_rate
        self.duration = duration
        self.mono = mono

    def load(self, file_path):
        signal = librosa.load(file_path,                                    # librosa.load returns 2 things: the signal and the sample rate
                              sr=self.sample_rate,
                              duration=self.duration,
                              mono=self.mono)[0]                            # we take just the first index which is the signal (don'T need the sample rate)
        return signal


class Padder:
    """Padder is responsible to apply padding to an array."""

    def __init__(self, mode="constant"):                                    # creating a constructor
        self.mode = mode

    def left_pad(self, array, num_missing_items):                           # missing items to prepend (add to the beginning of the array) / 0 items to append (add to the end of the array)
        padded_array = np.pad(array,
                              (num_missing_items, 0),
                              mode=self.mode)
        return padded_array

    def right_pad(self, array, num_missing_items):                          # 0 items to prepend (add to the beginning of the array) / missing items to append (add to the end of the array)
        padded_array = np.pad(array,
                              (0, num_missing_items),
                              mode=self.mode)
        return padded_array


class LogSpectrogramExtractor:
    """LogSpectrogramExtractor extracts log spectrograms (in dB) from a
    time-series signal.
    """

    def __init__(self, frame_size, hop_length):
        self.frame_size = frame_size
        self.hop_length = hop_length

    def extract(self, signal):
        stft = librosa.stft(signal,                                          # extracting short time furier transport (stft)   (1 + frame_size / 2, num_frames)   1024 -> 513 (-1) -> 512
                            n_fft=self.frame_size,
                            hop_length=self.hop_length)[:-1]
        spectrogram = np.abs(stft)
        log_spectrogram = librosa.amplitude_to_db(spectrogram)
        return log_spectrogram


class MinMaxNormaliser:                                                      # we take the array and squish it into a normalized range between 1 and 0 (min -> 0 / max ->1)
    """MinMaxNormaliser applies min max normalisation to an array."""

    def __init__(self, min_val, max_val):
        self.min = min_val
        self.max = max_val

    def normalise(self, array):
        norm_array = (array - array.min()) / (array.max() - array.min())     # -> 0 / 1
        norm_array = norm_array * (self.max - self.min) + self.min           # adding another range
        return norm_array

    def denormalise(self, norm_array, original_min, original_max):           # inverting the above expression
        array = (norm_array - self.min) / (self.max - self.min)
        array = array * (original_max - original_min) + original_min
        return array


class Saver:
    """saver is responsible to save features, and the min max values."""

    def __init__(self, feature_save_dir, min_max_values_save_dir):
        self.feature_save_dir = feature_save_dir
        self.min_max_values_save_dir = min_max_values_save_dir

    def save_feature(self, feature, file_path):
        save_path = self._generate_save_path(file_path)
        np.save(save_path, feature)
        return save_path                   # was added

    def save_min_max_values(self, min_max_values):                                # we need to store the min max values for all the log spectrograms to reuse that for the reconstructing the signals and regenerating it
        save_path = os.path.join(self.min_max_values_save_dir,
                                 "min_max_values.pkl")
        self._save(min_max_values, save_path)

    @staticmethod
    def _save(data, save_path):
        with open(save_path, "wb") as f:
            pickle.dump(data, f)

    def _generate_save_path(self, file_path):
        file_name = os.path.split(file_path)[1]
        save_path = os.path.join(self.feature_save_dir, file_name + ".npy")
        return save_path


class PreprocessingPipeline:                                                       # The higher level Class
    """PreprocessingPipeline processes audio files in a directory, applying
    the following steps to each file:
        1- load a file
        2- pad the signal (if necessary)
        3- extracting log spectrogram from signal
        4- normalise spectrogram
        5- save the normalised spectrogram
    Storing the min max values for all the log spectrograms.
    """

    def __init__(self):
        self.padder = None
        self.extractor = None                                                      # general extractor (logSpectrogramExtractor included)
        self.normaliser = None
        self.saver = None
        self.min_max_values = {}                                                   # we need to store the min max values for all the log spectrograms to reuse that for the reconstructing the signals and regenerating it
        self._loader = None
        self._num_expected_samples = None

    @property
    def loader(self):
        return self._loader

    @loader.setter
    def loader(self, loader):
        self._loader = loader
        self._num_expected_samples = int(loader.sample_rate * loader.duration)

    def process(self, audio_files_dir):                                            # path to directory in which all audio files are stored
        for root, _, files in os.walk(audio_files_dir):                            # lopping through all the files
            for file in files:
                file_path = os.path.join(root, file)
                self._process_file(file_path)
                print(f"Processed file {file_path}")
        self.saver.save_min_max_values(self.min_max_values)

    def _process_file(self, file_path):
        signal = self.loader.load(file_path)
        if self._is_padding_necessary(signal):
            signal = self._apply_padding(signal)
        feature = self.extractor.extract(signal)
        norm_feature = self.normaliser.normalise(feature)
        save_path = self.saver.save_feature(norm_feature, file_path)
        self._store_min_max_value(save_path, feature.min(), feature.max())

    def _is_padding_necessary(self, signal):
        if len(signal) < self._num_expected_samples:
            return True
        return False

    def _apply_padding(self, signal):
        num_missing_samples = self._num_expected_samples - len(signal)
        padded_signal = self.padder.right_pad(signal, num_missing_samples)
        return padded_signal

    def _store_min_max_value(self, save_path, min_val, max_val):
        self.min_max_values[save_path] = {
            "min": min_val,
            "max": max_val
        }

if __name__ == "__main__":
    FRAME_SIZE = 512
    HOP_LENGTH = 256
    DURATION = 0.74  # in seconds
    SAMPLE_RATE = 22050
    MONO = True

    SPECTROGRAMS_SAVE_DIR = '/content/gdrive/MyDrive/ai_music_projects/Spectrogram_Extractor/spectrograms'
    MIN_MAX_VALUES_SAVE_DIR = '/content/gdrive/MyDrive/ai_music_projects/Spectrogram_Extractor/'
    FILES_DIR = '/content/gdrive/MyDrive/ai_music_projects/Spectrogram_Extractor/audio/'

    # instantiate all objects
    loader = Loader(SAMPLE_RATE, DURATION, MONO)
    padder = Padder()
    log_spectrogram_extractor = LogSpectrogramExtractor(FRAME_SIZE, HOP_LENGTH)
    min_max_normaliser = MinMaxNormaliser(0, 1)
    saver = Saver(SPECTROGRAMS_SAVE_DIR, MIN_MAX_VALUES_SAVE_DIR)

    preprocessing_pipeline = PreprocessingPipeline()
    preprocessing_pipeline.loader = loader
    preprocessing_pipeline.padder = padder
    preprocessing_pipeline.extractor = log_spectrogram_extractor
    preprocessing_pipeline.normaliser = min_max_normaliser
    preprocessing_pipeline.saver = saver

    preprocessing_pipeline.process(FILES_DIR)



In [None]:
#@title **saving image files of spectrograms**

# saving image files of spectrograms

import matplotlib.pyplot as plt
import librosa.display

import numpy as np
import pandas as pd
import librosa


filename = librosa.util.example_audio_file()
y, sr = librosa.load(filename)
y = y[:100000] # shorten audio a bit for speed

window_size = 1024
window = np.hanning(window_size)
stft  = librosa.core.spectrum.stft(y, n_fft=window_size, hop_length=512, window=window)
out = 2 * np.abs(stft) / np.sum(window)

# For plotting headlessly
from matplotlib.backends.backend_agg import FigureCanvasAgg as FigureCanvas

fig = plt.Figure()
canvas = FigureCanvas(fig)
ax = fig.add_subplot(111)
p = librosa.display.specshow(librosa.amplitude_to_db(out, ref=np.max), ax=ax, y_axis='log', x_axis='time')
fig.savefig('spec.png')