In [None]:
import tensorflow as tf
import os
from os.path import isfile, join
import numpy as np
import shutil
from tensorflow import keras
from pathlib import Path
from IPython.display import display, Audio
import subprocess

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

In [None]:
!cp -r "../input/song-recognition-dataset" ./

In [None]:
data_directory = "/content/gdrive/MyDrive/Malayalam_Songs"
songs_folder = "songs"
noise_folder = "noise"

songs_path = os.path.join(data_directory, songs_folder)
noise_path = os.path.join(data_directory, noise_folder)

In [None]:
songs_path

In [None]:
valid_split = 0.1

shuffle_seed = 43

sample_rate = 16000

scale = 0.5
batch_size = 128

epochs = 30



In [None]:
for folder in os.listdir(data_directory):
    if os.path.isdir(os.path.join(data_directory, folder)):
        if folder in [songs_folder, noise_folder]:

            continue
        elif folder in ["other", "background_noise"]:

            shutil.move(
                os.path.join(data_directory, folder),
                os.path.join(noise_path, folder),
            )
        else:
            shutil.move(
                os.path.join(data_directory, folder),
                os.path.join(songs_path, folder),
            )


In [None]:
noise_paths = []
for subdir in os.listdir(noise_path):
    subdir_path = Path(noise_path) / subdir
    if os.path.isdir(subdir_path):
        noise_paths += [
            os.path.join(subdir_path, filepath)
            for filepath in os.listdir(subdir_path)
            if filepath.endswith(".wav")
        ]

In [None]:
noise_paths

In [None]:
command = (
    "for dir in `ls -1 " + noise_path + "`; do "
    "for file in `ls -1 " + noise_path + "/$dir/*.wav`; do "
    "sample_rate=`ffprobe -hide_banner -loglevel panic -show_streams "
    "$file | grep sample_rate | cut -f2 -d=`; "
    "if [ $sample_rate -ne 3 ]; then "
    "ffmpeg -hide_banner -loglevel panic -y "
    "-i $file -ar 3 temp.wav; "
    "mv temp.wav $file; "
    "fi; done; done"
)

In [None]:
os.system(command)
def load_noise_sample(path):
    sample, sampling_rate = tf.audio.decode_wav(
        tf.io.read_file(path), desired_channels=1
    )
    if sampling_rate == sample_rate:
        slices = int(sample.shape[0] / sample_rate)
        sample = tf.split(sample[: slices * sample_rate], slices)
        return sample
    else:
        print("Sampling rate for",path, "is incorrect")
        return None


noises = []
for path in noise_paths:
    sample = load_noise_sample(path)
    if sample:
        noises.extend(sample)
noises = tf.stack(noises)

In [None]:
def paths_and_labels_to_dataset(songs_paths, labels):
    path_ds = tf.data.Dataset.from_tensor_slices(songs_paths)
    songs_ds = path_ds.map(lambda x: path_to_songs(x))
    label_ds = tf.data.Dataset.from_tensor_slices(labels)
    return tf.data.Dataset.zip((songs_ds, label_ds))

In [None]:
def path_to_songs(path):
    songs = tf.io.read_file(path)
    songs, _ = tf.audio.decode_wav(songs, 1, sample_rate)
    return songs

In [None]:
def add_noise(songs, noises=None, scale=0.5):
    if noises is not None:
        tf_rnd = tf.random.uniform(
            (tf.shape(songs)[0],), 0, noises.shape[0], dtype=tf.int32
        )
        noise = tf.gather(noises, tf_rnd, axis=0)

        prop = tf.math.reduce_max(songs, axis=1) / tf.math.reduce_max(noise, axis=1)
        prop = tf.repeat(tf.expand_dims(prop, axis=1), tf.shape(songs)[1], axis=1)

        songs = songs + noise * prop * scale

    return songs

In [None]:
def songs_to_fft(songs):
    songs = tf.squeeze(songs, axis=-1)
    fft = tf.signal.fft(
        tf.cast(tf.complex(real=songs, imag=tf.zeros_like(songs)), tf.complex64)
    )
    fft = tf.expand_dims(fft, axis=-1)

    return tf.math.abs(fft[:, : (songs.shape[1] // 2), :])

In [None]:
class_names = os.listdir(songs_path)
print(class_names,)

songs_paths = []
labels = []
for label, name in enumerate(class_names):
    print("Songs:",(name))
    dir_path = Path(songs_path) / name
    songs_sample_paths = [
        os.path.join(dir_path, filepath)
        for filepath in os.listdir(dir_path)
        if filepath.endswith(".wav")
    ]
    songs_paths += songs_sample_paths
    labels += [label] * len(songs_sample_paths)

In [None]:
# Shuffle to generate random data
rng = np.random.RandomState(shuffle_seed)
rng.shuffle(songs_paths)
rng = np.random.RandomState(shuffle_seed)
rng.shuffle(labels)

In [None]:
# Split into training and validation
num_val_samples = int(valid_split * len(songs_paths))
train_songs_paths = songs_paths[:-num_val_samples]
train_labels = labels[:-num_val_samples]


valid_songs_paths = songs_paths[-num_val_samples:]
valid_labels = labels[-num_val_samples:]

In [None]:
# Create datasets, one for training and the other for validation
train_ds = paths_and_labels_to_dataset(train_songs_paths, train_labels)
train_ds = train_ds.shuffle(buffer_size=batch_size * 8, seed=shuffle_seed).batch(
    batch_size
)

valid_ds = paths_and_labels_to_dataset(valid_songs_paths, valid_labels)
valid_ds = valid_ds.shuffle(buffer_size=32 * 8, seed=shuffle_seed).batch(32)


In [None]:
# Add noise to the training set
# train_ds = train_ds.map(
#      lambda x, y: (add_noise(x, noises, scale=scale), y),
#      num_parallel_calls=tf.data.experimental.AUTOTUNE,
#  )

# Transform audio wave to the frequency domain using `audio_to_fft`
train_ds = train_ds.map(
    lambda x, y: (songs_to_fft(x), y), num_parallel_calls=tf.data.experimental.AUTOTUNE
)

train_ds = train_ds.prefetch(tf.data.experimental.AUTOTUNE)

valid_ds = valid_ds.map(
    lambda x, y: (songs_to_fft(x), y), num_parallel_calls=tf.data.experimental.AUTOTUNE
)
valid_ds = valid_ds.prefetch(tf.data.experimental.AUTOTUNE)

In [None]:
from tensorflow.keras.layers import Conv1D
# import tensorflow as tf
# from tensorflow import keras
# from tensorflow.keras.optimizers.legacy import Adam

In [None]:
from keras.utils import plot_model


In [None]:
import keras
from keras.utils import plot_model

In [None]:
def residual_block(x, filters, conv_num = 3, activation = "relu"):
    s = keras.layers.Conv1D(filters, 1, padding = "same")(x)

    for i in range(conv_num - 1):
        x = keras.layers.Conv1D(filters, 3, padding = "same")(x)
        x = keras.layers.Activation(activation)(x)

    x = keras.layers.Conv1D(filters, 3, padding = "same")(x)
    x = keras.layers.Add()([x, s])
    x = keras.layers.Activation(activation)(x)

    return keras.layers.MaxPool1D(pool_size = 2, strides = 2)(x)

def build_model(input_shape, num_classes):
    inputs = keras.layers.Input(shape = input_shape, name = "input")

    x = residual_block(inputs, 16, 2)
    x = residual_block(inputs, 32, 2)
    x = residual_block(inputs, 64, 3)
    x = residual_block(inputs, 128, 3)
    x = residual_block(inputs, 128, 3)
    x = keras.layers.AveragePooling1D(pool_size=3, strides=3)(x)
    x = keras.layers.Flatten()(x)
    x = keras.layers.Dense(256, activation="relu")(x)
    x = keras.layers.Dense(128, activation="relu")(x)

    outputs = keras.layers.Dense(num_classes, activation = "softmax", name = "output")(x)

    return keras.models.Model(inputs = inputs, outputs = outputs)

model = build_model((sample_rate // 2, 1), len(class_names))

plot_model(model, to_file='model_architecture.png', show_shapes=True, show_layer_names=True)

model.summary()

model.compile(optimizer="Adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"])

model_save_filename = "model.h5"

earlystopping_cb = keras.callbacks.EarlyStopping(patience=10, restore_best_weights=True)

mdlcheckpoint_cb = keras.callbacks.ModelCheckpoint(model_save_filename, monitor="val_accuracy", save_best_only=True)

In [None]:
from google.colab import files
files.download('model_architecture.png')

In [None]:
history = model.fit(
    train_ds,
    epochs=epochs,
    validation_data=valid_ds,
    callbacks=[earlystopping_cb, mdlcheckpoint_cb],
)

model.save("model.h5")

In [None]:
from google.colab import files
files.download("model.h5")

In [None]:
print("Accuracy of model:",model.evaluate(valid_ds))

In [None]:
from sklearn.metrics import classification_report, roc_curve, auc
import matplotlib.pyplot as plt


In [None]:
# Evaluate the model on the validation dataset
loss, accuracy = model.evaluate(valid_ds)

# Make predictions on the validation dataset
y_pred = model.predict(valid_ds)
y_true = np.concatenate([y for x, y in valid_ds], axis=0)

# Compute precision, recall, and F1-score
# Compute precision, recall, and F1-score
print("Classification Report:")
#print(classification_report(y_true, np.argmax(y_pred, axis=1), labels=np.arange(len(class_names)), target_names=class_names))
print(classification_report(y_true, np.argmax(y_pred, axis=1), labels=np.arange(len(class_names)), target_names=class_names, zero_division="warn"))



In [None]:
# Compute ROC curve for each class
fpr = dict()
tpr = dict()
roc_auc = dict()
for i in range(len(class_names)):
    fpr[i], tpr[i], _ = roc_curve((y_true == i).astype(int), y_pred[:, i])
    roc_auc[i] = auc(fpr[i], tpr[i])

# Plot ROC curve for each class
plt.figure(figsize=(8, 6))
for i in range(len(class_names)):
    plt.plot(fpr[i], tpr[i], label=f'{class_names[i]} (AUC = {roc_auc[i]:.2f})')

plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic (ROC) Curve')
plt.legend(loc="lower right")
plt.show()


In [None]:
SAMPLES_TO_DISPLAY = 10

test_ds = paths_and_labels_to_dataset(valid_songs_paths, valid_labels)
test_ds = test_ds.shuffle(buffer_size=batch_size * 8, seed=shuffle_seed).batch(
    batch_size
)


In [None]:
def predict(path, labels):
    test = paths_and_labels_to_dataset(path, labels)

    test = test.shuffle(buffer_size=batch_size * 8, seed=shuffle_seed).batch(
        batch_size
    )
    test = test.prefetch(tf.data.experimental.AUTOTUNE)

    # test = test.map(lambda x, y: (add_noise(x, noises, scale=scale), y))

    for songs, labels in test.take(1):
        ffts = songs_to_fft(songs)
        y_pred = model.predict(ffts)
        rnd = np.random.randint(0, 1, 1)
        songs = songs.numpy()[rnd, :]
        labels = labels.numpy()[rnd]
        y_pred = np.argmax(y_pred, axis=-1)[rnd]

        for index in range(1):
            print(
                "Song:\33{} {}\33[0m\tPredicted:\33{} {}\33[0m".format(
                    "[92m", class_names[y_pred[index]],
                    "[92m", y_pred[index]
                )
            )

            print("Song Predicted:", class_names[y_pred[index]])


In [None]:
path = ["/content/gdrive/MyDrive/Malayalam_denoised/songs/Karimizhi2.wav"]
labels = ["unknown"]
predict(path, labels)
