In [None]:
# Base
import openl3
import librosa # alternativa pyAudioAnalysis ali audioFlux
import numpy as np
import os
import h5py
import time
import datetime
from scipy import signal
import matplotlib.pyplot as plt
# Preprocessing, Metrics
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score
# Keras, Classification
import keras
from keras import models
from keras import layers
from sklearn.svm import SVC
import tensorflow as tf
from keras.callbacks import EarlyStopping, ModelCheckpoint
from sklearn.metrics import confusion_matrix
from keras.utils import to_categorical
# Parameters
genres = np.array('pop rock classical blues country disco metal jazz reggae hiphop'.sp
n_genres = len(genres)
n_genres_files = 100
# n_genres_files = 1
# size of feature vector extracted by model for each segment of audio
embedding_size = 512
# number of time windows that model outputs for each segment of audio
# 5 sec segment - 46 time windows
n_windows = 46
# number of 5 sec segments per audio file
n_parts_sig = 6
data_dir = './archive/Data/genres_original/'

In [None]:
total_chunks = n_genres * n_genres_files * n_parts_sig
# stores features extracted from each segment of audio;
# parameters: [NumberOfSignalParts,NumberOfWindows,NumberOfFeatures]
data = np.zeros((total_chunks, n_windows, embedding_size))
# stores genre labels corresponding to each feature in data
# parameters: [NumberOfSignalParts,1]
data_labels = np.zeros((total_chunks, 1))

In [None]:

# Load and preprocess data
# Dataset
data_index = 0 # keep track where to store emmbeddings
for i_genre, genre in enumerate(genres):
    genre_path = os.path.join(data_dir, genre)
    files = os.listdir(genre_path)
    for file in files:
        fn = os.path.join(genre_path, file)
        try:
            # Load file (sig-signal; sr-sampling rate)
            # Load full 30-sec audio file
            sig, sr = librosa.load(fn, mono=True, duration=30)
            # Divide the 30-sec file into 6 chunks
            for start in range(0, len(sig), sr*5):
            segment = sig[start : start+sr*5]
            if len(segment) < sr*5:
            continue
            emb, _ = openl3.get_audio_embedding(segment, sr, content_type="music",
            if data_index < len(data):
            # Features - Data
            data[data_index, :, :] = emb
            # Genre - Label
            data_labels[data_index] = i_genre
            data_index = data_index + 1
        except Exception as e:
            print(f"Error processing {fn}: {e}")
            pass

# Save to h5 file
hf = h5py.File('dataset_openl3.h5', 'w')
hf.create_dataset('data', data=data)
hf.create_dataset('data_labels', data=data_labels)
hf.close()

In [None]:
# Normalize
scaler = StandardScaler() # works with 2D array
# reshaping into 2D array where each row is a time window of an embedding
x = np.reshape(data, newshape=(data.shape[0]*data.shape[1], data.shape[2]))
# after scaling, data is reshaped into its original 3D shape
X = np.reshape(scaler.fit_transform(np.array(x, dtype = float)), newshape=data.shape)
# Split into test and train
# Why stratify=data_labels?
# stratify ensures that the labels in train and test set is the same as in the orignal
# Check the histograms, try removing stratify
X_train, X_test, y_train, y_test = train_test_split(X, data_labels, test_size=0.2, str
# Split into train and valid
# Why stratify=y_train?
# Check the histograms, try removing stratify
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.25, st
# Sizes
print('Original dims')
print('Train X:', np.shape(X_train))
print('Train Y:', np.shape(y_train))
print('Test X:', np.shape(X_test))
print('Test Y:', np.shape(y_test))
print('Val X:', np.shape(X_val))
print('Val Y:', np.shape(y_val))
# Why correction?
# probably for compatibility with some Keras layers
print('Corrected dims')
X_train = np.expand_dims(X_train, 3)
X_test = np.expand_dims(X_test, 3)
X_val = np.expand_dims(X_val, 3)
print('Train X:', np.shape(X_train))
print('Train Y:', np.shape(y_train))
print('Test X:', np.shape(X_test))
print('Test Y:', np.shape(y_test))
print('Val X:', np.shape(X_val))
print('Val Y:', np.shape(y_val))
plt.hist(y_train, bins=n_genres, rwidth=0.7)
plt.show()
plt.hist(y_test, bins=n_genres, rwidth=0.7)
plt.show()
plt.hist(y_val, bins=n_genres, rwidth=0.7)
plt.show()

In [None]:
# Normalize
scaler = StandardScaler()
X = scaler.fit_transform(np.array(data, dtype = float))

# Split into test and train
# Why stratify=data_labels?
# Check the histograms, try removing stratify
X_train, X_test, y_train, y_test = train_test_split(X, data_labels, test_size=0.2, stratify=data_labels)

# Split into train and valid
# Why stratify=y_train?
# Check the histograms, try removing stratify
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.25, stratify=y_train)

# Sizes
print('Train:', np.shape(y_train))
print('Test:', np.shape(y_test))
print('Val:', np.shape(y_val))

# The truth is — there is no optimal split percentage
# train 80%; valid 10%; test 10%
# train 70%; valid 15%; test 15%
# tarin 60%; valid 20%; test 20%

plt.hist(y_train, bins=n_genres, rwidth=0.7)
plt.show()
plt.hist(y_test, bins=n_genres, rwidth=0.7)
plt.show()
plt.hist(y_val, bins=n_genres, rwidth=0.7)
plt.show()

In [None]:
# NN
model = models.Sequential()
model.add(layers.Dense(32, activation='relu', input_shape=(X_train.shape[1],))) # Input layer - number of features
# Fix the model - add extra layers, change the number of neurons, etc...
model.add(layers.Dense(n_genres, activation='softmax')) # Output layer - 10 genres

opt = keras.optimizers.Adam(learning_rate = 0.001) # Maybe a bit too high?
loss = tf.keras.losses.SparseCategoricalCrossentropy() # Computes the crossentropy loss between the labels and predictions
metr = keras.metrics.SparseCategoricalAccuracy() # Calculates how often predictions match integer labels
model.compile(optimizer=opt, loss=loss, metrics=metr)

In [None]:
# Stopping criterion to avoid overfitting
# patience: Number of epochs with no improvement after which training will be stopped.
early_stopping = EarlyStopping(monitor='val_loss', patience=5)

# Save best weights
model_checkpoint = ModelCheckpoint("weights.h5", save_best_only=True, save_weights_only=True)

# Train
t_epochs = 50 # Needs to be tuned
b_size = 8 # Needs to be tuned as well - What is batch_size?
history = model.fit(X_train, y_train, validation_data=(X_val, y_val), epochs=t_epochs, batch_size=b_size,
                    callbacks=[early_stopping, model_checkpoint])

# Load best weights
model.load_weights("weights.h5")

In [None]:
# Lets observe the loss metric on both the training (blue) and validation (orange) set
# What do we noice?
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])

In [None]:
plt.plot(history.history['sparse_categorical_accuracy'])
plt.plot(history.history['val_sparse_categorical_accuracy'])

In [None]:
# Now to evaluate our model on train and test data

# Train NN
test_loss, test_acc = model.evaluate(X_train, y_train, verbose=0)
print('Acc train NN: %.3f' % test_acc)

# Test NN
test_loss, test_acc = model.evaluate(X_test, y_test, verbose=0)
print('Acc test NN: %.3f' % test_acc)

In [None]:
# Test NN
# Predictions for additional analysis
predictions = model.predict(X_test)

# Confusion matrix
predicted_labels = np.argmax(predictions, axis=1)
conf = confusion_matrix(y_test, predicted_labels, normalize="pred") # Normalize pred! Explain why?

# Visualise confusion matrix
plt.imshow(conf)
plt.ylabel("Actual")
plt.xlabel("Predicted")
plt.yticks(np.arange(n_genres), genres)
plt.xticks(np.arange(n_genres), genres, rotation='vertical')
plt.colorbar()

In [None]:
# Internet radio
import miniaudio
from IPython.display import clear_output, display
current_title = ""

def title(client: miniaudio.IceCastClient, title: str):
    global current_title 
    current_title = title

def stream_processing(source):
    while True:
        # Get frame - Only one channel - The chunk of signal is too small!
        sample_data = np.array(source.send(8192))[0::2]

        # Features
        feat = extract_features(sample_data, 44100, n_features, n_mfcc_coef)

        # Normalization
        feat_norm = scaler.transform(feat.reshape(1, -1))

        # Guess
        pred_nn = model.predict(feat_norm, verbose=0)

        # Output
        clear_output(wait=True)
        print("Title: " + current_title)
        print(datetime.datetime.now())
        print("NN: " + genres[np.argmax(pred_nn[0])])

        yield sample_data

# Internet radio source - Radio 1
source = miniaudio.IceCastClient("http://live1.radio1.si/Radio1", update_stream_title=title)

print("Connected")
print("Station: ", source.station_name)

# Stream
stream_in = miniaudio.stream_any(source, source.audio_format, output_format=miniaudio.SampleFormat.FLOAT32)

# Device
device = miniaudio.PlaybackDevice(output_format=miniaudio.SampleFormat.FLOAT32, nchannels=1, sample_rate=44100)

stream = stream_processing(stream_in)
next(stream)
device.start(stream)

while True:
    time.sleep(0.1)

In [None]:
import miniaudio

stream = miniaudio.stream_any("samples/music.mp3")
with miniaudio.PlaybackDevice() as device:
    device.start(stream)
    input("Audio file playing in the background. Enter to stop playback: ")