In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        #print(os.path.join(dirname, filename))
        pass

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
import json
import librosa
import matplotlib.pyplot as plt
import tensorflow as tf
import tensorflow_addons as tfa
from keras.applications.inception_resnet_v2 import InceptionResNetV2
from tqdm import tqdm
from sklearn.model_selection import train_test_split
import soundfile as sf
from joblib import Parallel, delayed
from pathlib import Path
from sklearn.model_selection import StratifiedKFold

In [None]:
SEED = 42
DATA_PATH = "../input/birdclef-2022/"
AUDIO_PATH = '../input/birdclef-2022/train_audio'
MEAN = np.array([0.485, 0.456, 0.406])
STD = np.array([0.229, 0.224, 0.225])
NUM_WORKERS = 4
CLASSES = sorted(os.listdir(AUDIO_PATH))
NUM_CLASSES = len(CLASSES)

class AudioParams:
    """
    Parameters used for the audio data
    """
    sr = 32000
    duration = 5
    # Melspectrogram
    n_mels = 224
    fmin = 20
    fmax = 16000

train = pd.read_csv('../input/birdclef-2022/train_metadata.csv')
train["file_path"] = AUDIO_PATH + '/' + train['filename']
paths = train["file_path"].values

Fold = StratifiedKFold(n_splits=5, shuffle=True, random_state=SEED)
for n, (trn_index, val_index) in enumerate(Fold.split(train, train['primary_label'])):
    train.loc[val_index, 'kfold'] = int(n)
train['kfold'] = train['kfold'].astype(int)

train.to_csv('train_folds.csv', index=False)

print(train.shape)
train.head()

In [None]:
def compute_melspec(y, params):
    """
    Computes a mel-spectrogram and puts it at decibel scale
    Arguments:
        y {np array} -- signal
        params {AudioParams} -- Parameters to use for the spectrogram. Expected to have the attributes sr, n_mels, f_min, f_max
    Returns:
        np array -- Mel-spectrogram
    """
    melspec = librosa.feature.melspectrogram(
        y=y, sr=params.sr, n_mels=params.n_mels, fmin=params.fmin, fmax=params.fmax,
    )

    melspec = librosa.power_to_db(melspec).astype(np.float32)
    return melspec

In [None]:
def crop_or_pad(y, length, sr, train=True, probs=None):
    """
    Crops an array to a chosen length
    Arguments:
        y {1D np array} -- Array to crop
        length {int} -- Length of the crop
        sr {int} -- Sampling rate
    Keyword Arguments:
        train {bool} -- Whether we are at train time. If so, crop randomly, else return the beginning of y (default: {True})
        probs {None or numpy array} -- Probabilities to use to chose where to crop (default: {None})
    Returns:
        1D np array -- Cropped array
    """
    if len(y) <= length:
        y = np.concatenate([y, np.zeros(length - len(y))])
    else:
        if not train:
            start = 0
        elif probs is None:
            start = np.random.randint(len(y) - length)
        else:
            start = (
                    np.random.choice(np.arange(len(probs)), p=probs) + np.random.random()
            )
            start = int(sr * (start))

        y = y[start: start + length]

    return y.astype(np.float32)

In [None]:
def mono_to_color(X, eps=1e-6, mean=None, std=None):
    """
    Converts a one channel array to a 3 channel one in [0, 255]
    Arguments:
        X {numpy array [H x W]} -- 2D array to convert
    Keyword Arguments:
        eps {float} -- To avoid dividing by 0 (default: {1e-6})
        mean {None or np array} -- Mean for normalization (default: {None})
        std {None or np array} -- Std for normalization (default: {None})
    Returns:
        numpy array [3 x H x W] -- RGB numpy array
    """
    X = np.stack([X, X, X], axis=-1)

    # Standardize
    mean = mean or X.mean()
    std = std or X.std()
    X = (X - mean) / (std + eps)

    # Normalize to [0, 255]
    _min, _max = X.min(), X.max()

    if (_max - _min) > eps:
        V = np.clip(X, _min, _max)
        V = 255 * (V - _min) / (_max - _min)
        V = V.astype(np.uint8)
    else:
        V = np.zeros_like(X, dtype=np.uint8)

    return V

In [None]:
# original

path = train['file_path'][0]
y, sr = sf.read(path, always_2d=True)
y = np.mean(y, 1)

X = compute_melspec(y, AudioParams)
X = mono_to_color(X)
X = X.astype(np.uint8)

plt.imshow(X)

In [None]:
# 5 sec cropped

path = train['file_path'][0]
y, sr = sf.read(path, always_2d=True)
y = np.mean(y, 1)
y = crop_or_pad(y, AudioParams.duration * AudioParams.sr, sr=AudioParams.sr, train=True, probs=None)

X = compute_melspec(y, AudioParams)
X = mono_to_color(X)
X = X.astype(np.uint8)

plt.imshow(X)

In [None]:
def Audio_to_Image(path, params):
    y, sr = sf.read(path, always_2d=True)
    y = np.mean(y, 1) # there is (X, 2) array
    y = crop_or_pad(y, params.duration * params.sr, sr=params.sr, train=True, probs=None)
    image = compute_melspec(y, params)
    image = mono_to_color(image)
    image = image.astype(np.uint8)
    return image

In [None]:
def save_(path):
    save_path = "../working/" + "/".join(path.split('/')[-2:])
    np.save(save_path, Audio_to_Image(path, AudioParams))

In [None]:
# Parallel Execution
NUM_WORKERS = 4
for dir_ in CLASSES:
    _ = os.makedirs(dir_, exist_ok=True)
_ = Parallel(n_jobs=NUM_WORKERS)(delayed(save_)(AUDIO_PATH) for AUDIO_PATH in tqdm(paths))

In [None]:
"""
mydict = {
    "label": [],
    "segment": []
}

for folder in CLASSES:
    for file in os.listdir(folder):
        mydict["label"].append(folder)
        segment = np.load(os.path.join(folder, file))
        mydict["segment"].append(segment)
"""

In [None]:
data = []
labels = []

for folder in CLASSES:
    for file in os.listdir(folder):
        labels.append(folder)
        segment = np.load(os.path.join(folder, file))
        data.append(segment)

In [None]:
X = np.array(data)
targets = np.array(labels)
y = pd.get_dummies(targets).values

In [None]:
X_train,X_val,y_train,y_val = train_test_split(X,y,random_state=1)

In [None]:
def design_model(input_shape):

    # load the InceptionResNetV2 architecture with imagenet weights as base
    base_model = tf.keras.applications.InceptionResNetV2(
                         include_top=False,
                         weights='imagenet',
                         input_shape=input_shape
                         )

    base_model.trainable=False
    # For freezing the layer we make use of layer.trainable = False
    # means that its internal state will not change during training.
    # model's trainable weights will not be updated during fit(),
    # and also its state updates will not run.

    model = tf.keras.Sequential([ 
            base_model,   
            tf.keras.layers.BatchNormalization(renorm=True),
            tf.keras.layers.GlobalAveragePooling2D(),
            tf.keras.layers.Dense(512, activation='relu'),
            tf.keras.layers.Dense(256, activation='relu'),
            tf.keras.layers.Dropout(0.5),
            tf.keras.layers.Dense(128, activation='relu'),
            tf.keras.layers.Dense(len(CLASSES), activation='softmax')
            ])
    return model

In [None]:
"""

input_shape = (X_train.shape[1], X_train.shape[2], 3)
model = design_model(input_shape)

# Selection of the optimizer, loss type and metrics for performance evaluation.

model.compile(loss='categorical_crossentropy', 
              optimizer='adam', 
              metrics=['accuracy', tfa.metrics.F1Score(num_classes=len(train.primary_label.unique()))]
             )


model.summary()

# Training the model.
history = model.fit(X_train, y_train,
                    validation_data=(X_val, y_val),
                    epochs=3,
                    batch_size=8
                    )

#plot_performance(history)

# Testing the model on never seen before data.
#make_prediction(model, Xtest, ytest, 24)