# Initialization

In [None]:
import os
import json

import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

import tensorflow as tf
from tensorflow.keras import layers, models

# Dataset Initialization

In [None]:
datapath = '../data/acousticbrainz-mediaeval_labels_part_a'

data_a = pd.read_csv(datapath + 'a', delimiter='\t')
data_b = pd.read_csv(datapath + 'b', delimiter='\t')

data = pd.concat([data_a, data_b], ignore_index=True)

# labels the songs that are rnb as is_rnb
data['is_rnb'] = data.filter(like='genre').apply(lambda x: x.astype(str).str.contains(r'R&B|rnb|r&b_soul|r\'n\'b', case=False, na=False)).any(axis=1).astype(int)

data.keys()

  data_a = pd.read_csv("acousticbrainz-mediaeval_labels_part_aa.txt", delimiter='\t')
  data_b = pd.read_csv("acousticbrainz-mediaeval_labels_part_ab.txt", delimiter='\t')


Index(['recordingmbid', 'releasegroupmbid', 'genre1', 'genre2', 'genre3',
       'genre4', 'genre5', 'genre6', 'genre7', 'genre8', 'genre9', 'genre10',
       'genre11', 'genre12', 'genre13', 'genre14', 'genre15', 'genre16',
       'genre17', 'genre18', 'genre19', 'genre20', 'genre21', 'genre22',
       'genre23', 'genre24', 'genre25', 'genre26', 'genre27', 'genre28',
       'genre29', 'genre30', 'is_rnb'],
      dtype='object')

In [None]:
# only need the lables for if it is rnb and song identifier
data_labeled = data[['recordingmbid', 'is_rnb']]
print(data_labeled['is_rnb'].value_counts())

is_rnb
0    896896
1      8048
Name: count, dtype: int64


In [None]:
print(data.shape)
data.head()

# Feature Extraction
Extraction process from the Load_data notebook. For more details, please look there.

In [None]:
def extract_features_from_json(data):
    try:
        features = []

        # Timbre
        features += data['lowlevel']['mfcc']['mean']
        features += data['lowlevel']['gfcc']['mean']
        features.append(data['lowlevel']['hfc']['mean'])

        # Tonal - Harmony
        features.append(data['tonal']['chords_changes_rate'])

        # Tonal - Scale (major=0, minor=1)
        scale = data['tonal'].get('key_scale', 'major')
        features.append(1 if scale == 'minor' else 0)

        # Tonal - Pitch salience & dissonance
        features.append(data['lowlevel']['pitch_salience']['mean'])
        features.append(data['lowlevel']['dissonance']['mean'])

        # Rhythm
        features.append(data['rhythm']['bpm'])
        features.append(data['rhythm']['onset_rate'])

        # Spectrum
        features.append(data['lowlevel']['spectral_centroid']['mean'])
        features.append(data['lowlevel']['spectral_complexity']['mean'])
        features.append(data['lowlevel']['spectral_rolloff']['mean'])
        features.append(data['lowlevel']['spectral_flux']['mean'])
        features.append(data['lowlevel']['zerocrossingrate']['mean'])

        # Spectral contrast (6D, not contrast_coeffs)
        features += data['lowlevel']['spectral_contrast_coeffs']['mean']

        # Dynamics
        features.append(data['lowlevel']['average_loudness'])
        features.append(data['lowlevel']['dynamic_complexity'])

        # Rhythm extension
        features.append(data['rhythm']['beats_loudness']['mean'])

        # Energy band shape
        features.append(data['lowlevel']['spectral_energyband_low']['mean'])
        features.append(data['lowlevel']['spectral_energyband_high']['mean'])

        # Harmonic structure
        features.append(data['tonal']['hpcp_entropy']['mean'])
        features.append(data['tonal']['key_strength'])

        # Tonal energy balance
        features.append(data['lowlevel']['spectral_entropy']['mean'])
        features.append(data['lowlevel']['spectral_strongpeak']['mean'])

        return features
    except KeyError as e:
        print(f"Missing key: {e}")
        return None

def build_feature_labels(data_sample):
    labels = []

    labels += [f"mfcc_{i}" for i in range(len(data_sample['lowlevel']['mfcc']['mean']))]
    labels += [f"gfcc_{i}" for i in range(len(data_sample['lowlevel']['gfcc']['mean']))]
    labels += ["hfc"]
    labels += ["chords_changes_rate"]

    labels += ["key_scale"]
    labels += ["pitch_salience"]
    labels += ["dissonance"]
    labels += ["bpm", "onset_rate"]
    labels += ["spectral_centroid", "spectral_complexity", "spectral_rolloff", "spectral_flux", "zerocrossingrate"]
    labels += [f"spectral_contrast_{i}" for i in range(len(data_sample['lowlevel']['spectral_contrast_coeffs']['mean']))]
    labels += ["average_loudness", "dynamic_complexity"]

    labels += ["beats_loudness"]
    labels += ["spectral_energyband_low", "spectral_energyband_high"]
    labels += ["hpcp_entropy", "key_strength"]
    labels += ["spectral_entropy", "spectral_strongpeak"]

    return labels

def process_dataset(root_folder):
    all_features = []
    file_ids = []
    labels_initialized = False
    feature_labels = []

    for subdir, _, files in os.walk(root_folder):
        for file in files:
            if file.endswith(".json"):
                file_path = os.path.join(subdir, file)
                try:
                    with open(file_path, "r") as f:
                        data = json.load(f)

                    features = extract_features_from_json(data)
                    if features is None:
                        continue

                    if not labels_initialized:
                        feature_labels = build_feature_labels(data)
                        labels_initialized = True

                    all_features.append(features)
                    file_ids.append(file.replace('.json', ''))

                except Exception as e:
                    print(f"Failed on {file}: {e}")

    df = pd.DataFrame(all_features, columns=feature_labels)
    df['recordingmbid'] = file_ids
    return df

In [None]:
# Extract JSON features
folder_path = '../../data/acousticbrainz-mediaeval-train'
data_features = process_dataset(folder_path)
data_features.head()

NameError: name 'os' is not defined

In [None]:
merged_df = pd.merge(data_features, data_labeled, on="recordingmbid", how="inner")

# Check results
print(merged_df.head())
print("Shape:", merged_df.shape)
print("Label value counts:\n", merged_df['is_rnb'].value_counts())

merged_df.to_csv("rnb_features_labeled.csv", index=False)

In [None]:
df = pd.read_csv("rnb_features_labeled.csv")

# load data for training
train_data = df.drop(columns=['recordingmbid', 'is_rnb'])
train_labels = df['is_rnb']

In [None]:
print(train_data)

In [None]:
train_genre_labels = data['genre1']
train_data = data.drop(['genre1'], axis=1)

# Histogram Creation

In [None]:
histogram_data = [[] for i in range(111048)]

for column in train_data:
    for i in range(len(column)):
        histogram_data[i].append(float(train_data.at[i, column]))

In [None]:
print(histogram_data[0])

In [None]:
plt.hist(histogram_data[0])
plt.show()

In [None]:
histograms = []

for i in range(len(histogram_data)):
    histogram = plt.hist(histogram_data[i])
    histograms.append(histogram)

In [None]:
print(histograms)

plt.show(histograms[0])

# CNN Model

In [None]:
# model definition
CNN_model = models.Sequential([
    layers.Conv2D(32, (3,3), activation='relu'),
    layers.MaxPooling2D((2,2)),
    layers.Conv2D(64, (3,3), activation='relu'),
    layers.MaxPooling2D((2,2)),
    layers.Conv2D(128, (3,3), activation='relu'),
    layers.MaxPooling2D((2,2)),

    layers.Flatten(),
    layers.Dense(128),
    layers.Dense(30)
])

In [None]:
# model compilation
loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
CNN_model.compile(optimizer='adam', metrics=['accuracy'], loss=loss)

In [None]:
# model summary
CNN_model.summary()

# CNN Training

In [None]:
history = CNN_model.fit(train_data, train_labels, epochs=5)

# Model Evaluation

In [None]:
test_loss, test_acc = CNN_model.evaluate(train_data, train__labels)
print(f'Test accuracy: {test_acc:.2f}')

# Song Prediction

In [None]:
prediction = CNN_model.predict(np.expand_dims(train_data[0]), axis=0)
prediction = np.argmax(prediction)
print(f'Predicted genre: {prediction}')