**Цель модели:**

- Input: аудио произвольной длины, где человек напевает песню
- Output: топ-5 песен, похожих на то, что человек напел


Вопросы:
- могут ли быть какие-то проблемы, если под каждую песню мне надо добавлять новый unit в output layer?


Упрощенная версия:
- Есть 10 аудио. Хочу по тому, что напеваю, определить, какое это аудио

Датасеты, которые могут взять:
https://huggingface.co/datasets/ylacombe/tiny-humming (11 аудио, не все знаю)

https://github.com/amanteur/CHAD?tab=readme-ov-file#hummings (получилось около 300 аудио)


Youtube API KEY:
AIzaSyBg-6CddTEdtcfypMVEgqUC4E8KZpY7-d4




Хочу взять аудио:
- Mood
- Hymn for the weekend
- Dancing queen
- Rolling in the deep
- Sweet dreams
- Alejandro
- Can't help falling in love
- Voyage voyage
- In the end
- Game of thrones

Сейчас, 25.02:
- Форматирую код, чтобы его можно было выложить на Github
- Занимаюсь деплоем модели

In [None]:
# Stage 1 - preprocessing
# - Filtering - getting tracks that contain both humming and original/cover
# - Getting Youtube titles for originals/covers
# - Sorting by amount of data points

import pandas as pd
import requests

dataset_path = r"/home/asobolev/My files/Studying/ML Projects/Humming-Song classification/Dataset1"


def get_youtube_title(video_id, api_key):
    url = f"https://www.googleapis.com/youtube/v3/videos?part=snippet&id={video_id}&key={api_key}"
    response = requests.get(url)
    data = response.json()

    if "items" in data and len(data["items"]) > 0:
        return data["items"][0]["snippet"]["title"]
    else:
        return "Title not found"


df1 = pd.read_csv(dataset_path)

track_ids_with_humming = set(df1[df1["audio_type"] == "humming"]["group_id"])
track_ids_with_original_or_cover = set(df1[df1["audio_type"].isin(["original", "cover"])]["group_id"])
valid_track_ids = track_ids_with_humming.intersection(track_ids_with_original_or_cover)

df_filtered = df[df["track_id"].isin(valid_track_ids)]
df_filtered = df_filtered.copy()

api_key = ""

df_filtered["youtube_url"] = df_filtered["youtube_id"].apply(
    lambda x: f"https://www.youtube.com/watch?v={x}" if pd.notna(x) and x != "" else ""
)

df_filtered["youtube_title"] = df_filtered["youtube_id"].apply(
    lambda x: get_youtube_title(x, api_key) if pd.notna(x) and x != "" else ""
)

humming_counts = df_filtered[df_filtered["audio_type"] == "humming"].groupby("group_id")["audio_type"].count()
df_filtered["humming_count"] = df_filtered["group_id"].map(humming_counts)

df_sorted = df_filtered.sort_values(by=["humming_count", "group_id"], ascending=[False, True])


In [12]:
# Stage 1 - preprocessing
# - Moving humming audios to the folder with training data
# - Composing dataframe for training

import os
import shutil

# From where to take audios
source_dir = r"/home/asobolev/My files/Studying/ML Projects/Humming-Song classification/Dataset1/chad_hummings_subset"

# Where to put final audios for the training set
target_dir = r"/home/asobolev/My files/Studying/ML Projects/Humming-Song classification/Dataset1/dataset_for_training"

# File where final dataset will be located
training_dataset_path = r"/home/asobolev/My files/Studying/ML Projects/Humming-Song classification/Dataset1/dataset_for_training.csv"


track_names_to_ids_mapping = {
    "Alejandro": "77542e08d182cd0c", "Can't Help Falling in Love": "ca0edcccdf0481d8",
    "Dancing Queen": "777e721176e34e09", "Game of Thrones": "f9b4b692b088bce7",
    "Hymn for the Weekend": "0f49946df4f06c48", "In the End": "386d01a6c07ecd75", "Mood": "08a58e67791f0d3a",
    "Rolling in the Deep": "8da26c7bee72d22c", "Sweet Dreams": "e734e7f38e373b27", "Voyage Voyage": "e7138b4b2fb161f6"
}

# Create reverse mapping from group_id to song name
group_id_to_name_mapping = {v: k for k, v in track_names_to_ids_mapping.items()}

# Create mapping from group_id to integer label (0 to 9)
unique_group_ids = list(track_names_to_ids_mapping.values())
group_id_to_label_mapping = {group_id: idx for idx, group_id in enumerate(sorted(unique_group_ids))}
label_to_name_mapping = {value: group_id_to_name_mapping[key] for key, value in group_id_to_label_mapping.items()}

dataset_records = list()

for _, row in df_sorted[
    (df_sorted['audio_type'] == "humming") &
    (df_sorted['group_id'].isin(track_names_to_ids_mapping.values()))
].iterrows():
    group_id = row["group_id"]
    fragment_id = str(row["fragment_id"])
    id_ = str(row["id"])

    # Construct the expected file path using fragment_id and id
    source_file = os.path.join(source_dir, group_id, fragment_id, f"{id_}.wav")

    if not os.path.exists(source_file):
        print(f"Warning: File {source_file} not found!")
        continue  # Skip if file doesn't exist

    # Create group_id folder inside target directory
    group_folder = os.path.join(target_dir, str(group_id))
    os.makedirs(group_folder, exist_ok=True)

    # Define new file path
    new_file_path = os.path.join(group_folder, f"{id_}.wav")

    # Copy the file to the new location
    try:
        shutil.copy2(source_file, new_file_path)
    except Exception as e:
        print(f"Error copying file {source_file}: {str(e)}")
        continue

    # Append record to dataset list
    dataset_records.append([new_file_path, group_id])


dataset_df = pd.DataFrame(dataset_records, columns=['file_path', 'group_id'])

# Add new columns to the DataFrame
dataset_df['label'] = dataset_df['group_id'].map(group_id_to_label_mapping)
dataset_df['name'] = dataset_df['group_id'].map(group_id_to_name_mapping)

# Reorder columns to have a more logical sequence
dataset_df = dataset_df[['file_path', 'group_id', 'label', 'name']]

dataset_df.to_csv(training_dataset_path)


In [None]:
# Stage 2
"""
Верхнеуровнево, шаги:
+ Доформатировать датасет, чтобы можно было начать обучение
- Обучить модель. Проверить, что распознает мои аудио
- Задеплоить модель, чтобы была доступна через API. Сделать простой фронтенд, который показывает список треков, которые можно распознать, 
и дает возможность загрузить аудио. Подключить к API и собрать все в docker compose

"""

"""
Форматирование датасета:
Humming-кусочки + cover-кусочки + нахожу изначальные аудио, спличу их на кусочки и добавляю их в датасет

Todo:
изначальные аудио и cover-кусочки не добавлял


min_duration, max_duration
(4.789875, 17.7540625)



"""

In [1]:
# Training model stage
import tensorflow_hub as hub

# YamNet Model URL
YAMNET_MODEL_URL = "https://tfhub.dev/google/yamnet/1"

# Load YamNet Model
yamnet = hub.load(YAMNET_MODEL_URL)

print("yamnet: ", yamnet)

2025-02-25 10:01:16.931561: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2025-02-25 10:01:16.974561: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2025-02-25 10:01:16.975609: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


yamnet:  <tensorflow.python.saved_model.load.Loader._recreate_base_user_object.<locals>._UserObject object at 0x7f893c6ce1c0>


In [14]:
# Stage 1 - preprocessing
# - converting audios to YamNet embeddings
# - Applying augmentation, 2 new augmented audios per each audio

import os
import shutil
import random

import librosa
import tensorflow as tf
import numpy as np
import pandas as pd
import soundfile as sf

# File where final dataset will be located
training_dataset_path = r"/home/asobolev/My files/Studying/ML Projects/Humming-Song classification/Dataset1/dataset_for_training.csv"

# Load the dataset
df = pd.read_csv(training_dataset_path)

# Audio parameters
SAMPLE_RATE = 16000  # Required sample rate for YamNet

random.seed(42)

augmented_audios_dir = r"/home/asobolev/My files/Studying/ML Projects/Humming-Song classification/Dataset1/augmented_audios"
if os.path.exists(augmented_audios_dir):
    shutil.rmtree(augmented_audios_dir)
os.makedirs(augmented_audios_dir, exist_ok=True)

possible_augmentations = ["time_stretch", "pitch_shift", "volume_change", ]


def load_audio(file_path, target_sr=SAMPLE_RATE):
    """Loads an audio file and resamples it to the target sample rate."""
    audio, sr = librosa.load(file_path, sr=target_sr)
    return audio


def extract_embedding(audio):
    """Extracts YamNet embeddings from an audio clip."""
    waveform = tf.convert_to_tensor(audio, dtype=tf.float32)
    scores, embeddings, spectrogram = yamnet(waveform)
    return embeddings.numpy().mean(axis=0)  # Average pooling across time


def augment_audio(audio, technique: str, sr=SAMPLE_RATE):
    if technique == "time_stretch":
        rate = random.uniform(0.8, 1.2)
        audio = librosa.effects.time_stretch(y=audio, rate=rate)

    if technique == "pitch_shift":
        steps = random.randint(-2, 2)
        audio = librosa.effects.pitch_shift(y=audio, sr=sr, n_steps=steps)

    if technique == "volume_change":
        gain = random.uniform(0.7, 1.3)  # Scale volume 70%-130%
        audio = audio * gain

    return audio

# # Process dataset
# features = []
# labels = []
# initial_rows_indexes = [] # Needed to extract info about audio from initial training dataset, when testing model on CV set
# dirs_exist = set()

original_features = []
original_labels = []
original_indexes = []

augmented_features = []
augmented_labels = []

dirs_exist = set()

for idx, row in df.iterrows():
    audio = load_audio(row["file_path"])
    embedding = extract_embedding(audio)
    original_features.append(embedding)
    original_labels.append(row["label"])
    original_indexes.append(idx)

    group_id = row["group_id"]

    for augmentation in random.sample(possible_augmentations, 2):
        augmented_audio = augment_audio(audio, augmentation)
        embedding = extract_embedding(augmented_audio)

        augmented_features.append(embedding)
        augmented_labels.append(row["label"])

        # Saving audio for future analysis
        if group_id not in dirs_exist:
            os.makedirs(os.path.join(augmented_audios_dir, group_id))
        dirs_exist.add(group_id)

        augmented_audio_path = os.path.join(augmented_audios_dir, group_id)
        file_name, extension = os.path.basename(row["file_path"]).split(".")
        augmented_audio_file_name = f"{file_name}_{augmentation}.{extension}"
        sf.write(
            os.path.join(augmented_audios_dir, group_id, augmented_audio_file_name), augmented_audio, samplerate=SAMPLE_RATE
        )

# Convert to NumPy arrays
X_original = np.array(original_features)
y_original = np.array(original_labels)
row_indexes = np.array(original_indexes)

X_augmented = np.array(augmented_features)
y_augmented = np.array(augmented_labels)


# Save preprocessed original data for splitting
np.save("X_original.npy", X_original)
np.save("y_original.npy", y_original)
np.save("row_indexes.npy", row_indexes)
np.save("X_augmented.npy", X_augmented)
np.save("y_augmented.npy", y_augmented)

print("Preprocessing done. Original features shape:", X_original.shape, "Original labels shape:", y_original.shape)

2025-02-25 11:05:22.754025: W tensorflow/tsl/framework/cpu_allocator_impl.cc:83] Allocation of 16908288 exceeds 10% of free system memory.


Preprocessing done. Original features shape: (379, 1024) Original labels shape: (379,)


In [None]:
print(df[:5])

In [7]:
# Stage 2 - training the model


import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout
from tensorflow.keras.optimizers import Adam
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import EarlyStopping

# Load precomputed embeddings
X_original = np.load("X_original.npy")
y_original = np.load("y_original.npy")
X_augmented = np.load("X_augmented.npy")
y_augmented = np.load("y_augmented.npy")

# One-hot encode labels
num_classes = len(set(y_original))
y_original = to_categorical(y_original, num_classes=num_classes)
y_augmented = to_categorical(y_augmented, num_classes=num_classes)


# Split dataset
X_train, X_temp, y_train, y_temp = train_test_split(X_original, y_original, test_size=0.3, random_state=42)
X_cv, X_test, y_cv, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)

# Add augmented data only to the training set
X_train = np.concatenate([X_train, X_augmented])
y_train = np.concatenate([y_train, y_augmented])


# Define a simple classifier
model = Sequential([
    Dense(128, activation='relu', input_shape=(1024,)),  # 1024 is YamNet embedding size
    Dropout(0.3),  # regularization
    Dense(64, activation='relu'),
    Dropout(0.2),  # regularization
    Dense(num_classes, activation='softmax')  # Output layer with softmax for classification
])

learning_rate = 0.003

early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)

# Compile the model
model.compile(
    optimizer=Adam(learning_rate=learning_rate),
    loss='categorical_crossentropy',
    metrics=['accuracy']
)

# Train the model
history = model.fit(
    X_train, y_train, epochs=200, batch_size=64, validation_data=(X_cv, y_cv),
    # callbacks=[early_stopping]
)

# Save model
# model.save("melody_classifier.h5")

print("Model summary: ", model.summary())

"""
Results:

Epoch 199/200
5/5 [==============================] - 0s 59ms/step - loss: 0.3234 - accuracy: 0.9208 - val_loss: 2.4522 - val_accuracy: 0.3860
Epoch 200/200
5/5 [==============================] - 0s 72ms/step - loss: 0.2734 - accuracy: 0.9283 - val_loss: 2.5792 - val_accuracy: 0.4211

High variance, not high bias


After augmentation:
Epoch 199/200
13/13 [==============================] - 0s 6ms/step - loss: 0.2011 - accuracy: 0.9283 - val_loss: 1.6642 - val_accuracy: 0.7544
Epoch 200/200
13/13 [==============================] - 0s 5ms/step - loss: 0.1983 - accuracy: 0.9308 - val_loss: 1.3218 - val_accuracy: 0.7602

"""



Epoch 1/200
Epoch 2/200
Epoch 3/200
Epoch 4/200
Epoch 5/200
Epoch 6/200
Epoch 7/200
Epoch 8/200
Epoch 9/200
Epoch 10/200
Epoch 11/200
Epoch 12/200
Epoch 13/200
Epoch 14/200
Epoch 15/200
Epoch 16/200
Epoch 17/200
Epoch 18/200
Epoch 19/200
Epoch 20/200
Epoch 21/200
Epoch 22/200
Epoch 23/200
Epoch 24/200
Epoch 25/200
Epoch 26/200
Epoch 27/200
Epoch 28/200
Epoch 29/200
Epoch 30/200
Epoch 31/200
Epoch 32/200
Epoch 33/200
Epoch 34/200
Epoch 35/200
Epoch 36/200
Epoch 37/200
Epoch 38/200
Epoch 39/200
Epoch 40/200
Epoch 41/200
Epoch 42/200
Epoch 43/200
Epoch 44/200
Epoch 45/200
Epoch 46/200
Epoch 47/200
Epoch 48/200
Epoch 49/200
Epoch 50/200
Epoch 51/200
Epoch 52/200
Epoch 53/200
Epoch 54/200
Epoch 55/200
Epoch 56/200
Epoch 57/200
Epoch 58/200
Epoch 59/200
Epoch 60/200
Epoch 61/200
Epoch 62/200
Epoch 63/200
Epoch 64/200
Epoch 65/200
Epoch 66/200
Epoch 67/200
Epoch 68/200
Epoch 69/200
Epoch 70/200
Epoch 71/200
Epoch 72/200
Epoch 73/200
Epoch 74/200
Epoch 75/200
Epoch 76/200
Epoch 77/200
Epoch 78



In [8]:
indices = np.arange(len(X_original))

# Getting info to which set each data point went

idx_train, idx_temp = train_test_split(
    indices, test_size=0.3, random_state=42
)
idx_cv, idx_test = train_test_split(
    idx_temp, test_size=0.5, random_state=42
)

In [21]:
# Stage 3 - making predictions

initial_rows_indexes = np.load("row_indexes.npy")


def make_predictions(X):
    predictions = model.predict(X)
    return np.argmax(predictions, axis=1)

correctly_classified = 0

# iterating through indexes of CV examples
predictions = make_predictions(X_cv)

for prediction_idx, idx in enumerate(idx_cv):
    prediction_numeric = predictions[prediction_idx]
    initial_dataset_row = df.iloc[idx]

    predicted_audio_name = label_to_name_mapping[prediction_numeric]
    target_value_numeric = np.argmax(y_cv[prediction_idx])
    print(f"{initial_dataset_row['file_path']=}. Predicted name: {predicted_audio_name}. Actual name: {label_to_name_mapping[target_value_numeric]}")

    if prediction_numeric == target_value_numeric:
        correctly_classified += 1

print("\n Accuracy: ", correctly_classified / len(idx_cv))

initial_dataset_row['file_path']='/home/asobolev/My files/Studying/ML Projects/Humming-Song classification/Dataset1/dataset_for_training/77542e08d182cd0c/329dd4c5b93b9a8f.wav'. Predicted name: Alejandro. Actual name: Alejandro
initial_dataset_row['file_path']='/home/asobolev/My files/Studying/ML Projects/Humming-Song classification/Dataset1/dataset_for_training/0f49946df4f06c48/76faa950c0377083.wav'. Predicted name: Hymn for the Weekend. Actual name: Hymn for the Weekend
initial_dataset_row['file_path']='/home/asobolev/My files/Studying/ML Projects/Humming-Song classification/Dataset1/dataset_for_training/ca0edcccdf0481d8/7d19de4db71df9f9.wav'. Predicted name: Can't Help Falling in Love. Actual name: Can't Help Falling in Love
initial_dataset_row['file_path']='/home/asobolev/My files/Studying/ML Projects/Humming-Song classification/Dataset1/dataset_for_training/777e721176e34e09/b827d3139d07cb2b.wav'. Predicted name: Dancing Queen. Actual name: Dancing Queen
initial_dataset_row['file_pa

[[0. 0. 0. ... 0. 1. 0.]
 [0. 0. 1. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 1. 0.]
 ...
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]]


In [29]:
file_1 = r"/home/asobolev/Downloads/Record (online-voice-recorder.com).wav"
file_2 = r"/home/asobolev/Downloads/Record (online-voice-recorder.com) (3).wav"

for audio_file in (file_1, file_2):
    audio = load_audio(audio_file)
    embedding = extract_embedding(audio)
    prediction_numeric = make_predictions(embedding.reshape(1, -1))[0]
    predicted_audio_name = label_to_name_mapping[prediction_numeric]
    print("Audio: ", audio_file, f" predicted label: {predicted_audio_name}")



Audio:  /home/asobolev/Downloads/Record (online-voice-recorder.com).wav  predicted label: Alejandro
Audio:  /home/asobolev/Downloads/Record (online-voice-recorder.com) (3).wav  predicted label: Sweet Dreams
