In [3]:
import os

import librosa
import numpy as np
import pandas as pd
import tensorflow as tf
import tensorflow_addons as tfa
import torch
import torchaudio
from PIL import Image
from tensorflow import keras

In [5]:
class Functions:
    def make_df(path, sample_rate, df_coord_sites):
        list_files = []
        for filename in os.listdir(path):
            if filename.split(".")[-1] == "ogg":
                list_files.append(filename)
                wave, _ = librosa.load(path + filename, sr=sample_rate)
                duration = len(wave) / sample_rate
        df = pd.DataFrame()
        for filename in list_files:
            df.loc[filename, "filename"] = filename
            df.loc[filename, "audio_id"] = filename.split("_")[0]
            df.loc[filename, "site"] = filename.split("_")[1]
            df.loc[filename, "date"] = filename.split("_")[2].split(".")[0]
            df.loc[filename, "duration"] = duration
        df["date"] = pd.to_datetime(df["date"], format="%Y%m%d")
        df["month"] = df["date"].dt.month
        df["year"] = df["date"].dt.year
        df = df.merge(df_coord_sites, on="site", how="left")
        df["sin_month"] = np.sin(2 * np.pi * df["month"] / 12)
        df["cos_month"] = np.cos(2 * np.pi * df["month"] / 12)
        df["sin_longitude"] = np.sin(2 * np.pi * (df["longitude"]) / 360)
        df["cos_longitude"] = np.cos(2 * np.pi * (df["longitude"]) / 360)
        df["norm_latitude"] = (df["latitude"] + 90) / 180
        df["audio_id"] = df["audio_id"]
        df = df.reset_index(drop=True)
        long_df = pd.DataFrame(columns=["row_id", "end_sec", "filename"])
        for i in df.index.tolist():
            audio_id = df.loc[i, "audio_id"]
            duration = df.loc[i, "duration"]
            site = df.loc[i, "site"]
            for end_sec in range(5, int(duration) + 1, 5):
                row_id = "_".join([str(audio_id), site, str(end_sec)])
                long_df.loc[row_id, "row_id"] = row_id
                long_df.loc[row_id, "end_sec"] = end_sec
                long_df.loc[row_id, "filename"] = df.loc[i, "filename"]
        df = long_df.merge(df, on="filename", how="left")
        return df

    def row_wise_f1_score_micro(y_true, y_pred):
        F1 = []
        for preds, trues in zip(y_pred, y_true):
            TP, FN, FP = 0, 0, 0
            preds = preds.split()
            trues = trues.split()
            for true in trues:
                if true in preds:
                    TP += 1
                else:
                    FN += 1
            for pred in preds:
                if pred not in trues:
                    FP += 1
            F1.append(2 * TP / (2 * TP + FN + FP))
        return np.mean(F1)

    def boost_multiple_occurences(
        df,
        labels,
        pred_col,
        out_col="y_pred",
        boost_coef=1.1,
        max_boost_coef=12,
        threshold=0.5,
    ):
        """
        Boost predictions in file:
            - if something occured once, multiply that class by boost_coef
            - if something occured more than once - keep multiplying until
                boost_coef reaches max_boost_coef
        """

        def _compute_boost_matrix(
            y_preds, labels, threshold, boost_coef, max_boost_coef
        ):
            nocall_ix = labels.index("nocall")
            boost_matrix = np.ones((len(labels)), dtype=np.float64)
            for p in y_preds:
                boost_matrix = boost_matrix * np.where(p > threshold, boost_coef, 1.0)
                boost_matrix = np.clip(boost_matrix, 1.0, max_boost_coef)
                boost_matrix[nocall_ix] = 1.0
            return boost_matrix

        dict_pred = {}
        for filename in set(df["filename"]): 
            file_df = df[df.filename == filename]
            file_y_preds = file_df[pred_col].values
            list_row_id = file_df["row_id"].values
            bm = _compute_boost_matrix(
                file_y_preds,
                labels=labels,
                threshold=threshold,
                boost_coef=boost_coef,
                max_boost_coef=max_boost_coef,
            )

            file_y_preds = bm * file_y_preds
            for i in range(len(list_row_id)):
                dict_pred[list_row_id[i]] = file_y_preds[i]
        return dict_pred


class Mel_Provider:
    def __init__(
        self,
        n_fft,
        win_length,
        n_mels,
        sample_rate,
        mel_image_size,
        min_frequency,
        max_frequency,
        signal_lenght,
        hop_length=None,
        norm_mel_long=False,
        device="cpu",
    ):
        self.norm_mel_long = norm_mel_long
        self._device = device
        self.signal_lenght = signal_lenght
        self.sample_rate = sample_rate
        self.mel_image_size = mel_image_size
        if hop_length is None:
            self.hop_length = int(
                self.signal_lenght * self.sample_rate / (self.mel_image_size - 1)
            )
        else:
            self.hop_length = hop_length
        self._melspectrogram = torchaudio.transforms.MelSpectrogram(
            power=2.0,
            win_length=win_length,
            n_fft=n_fft,
            n_mels=n_mels,
            sample_rate=sample_rate,
            hop_length=self.hop_length,
            f_min=min_frequency,
            f_max=max_frequency,
        ).to(self._device)

    def msg(self, wave):
        wave = torch.tensor(wave.reshape([1, -1]).astype(np.float32)).to(self._device)
        mel_spec = self._melspectrogram(wave)[0].cpu().numpy()
        mel_spec = librosa.power_to_db(mel_spec, ref=np.max)
        if self.norm_mel_long:
            mel_spec = (mel_spec - mel_spec.min()) / (mel_spec.max() - mel_spec.min())
        mel_spec.astype(np.float32)
        return mel_spec


class Test_Kaggle(keras.utils.Sequence):
    def __init__(
        self,
        df,
        path,
        n_fft,
        sample_rate,
        mel_image_size,
        n_mels,
        signal_lenght,
        mel_provider,
        batch_size = 1,
        hop_length=None,
        img_dtype="uint8",
        sin_cos_img =True
    ):
        self.df = df
        self.path = path
        self.signal_lenght = signal_lenght
        self.sample_rate = sample_rate
        self.mel_image_size = mel_image_size
        self.hop_length = int(
            self.signal_lenght * self.sample_rate / (self.mel_image_size - 1)
        )
        self.mel_provider = mel_provider
        self.n_fft = n_fft
        self.img_dtype = "uint8"
        self.n_mels = n_mels
        self.sin_cos_img = sin_cos_img
        self.batch_size = batch_size
    def __len__(self):
        return self.df.shape[0] // self.batch_size

    def get_audio(self, file_path, end_sec):
        wave, _ = librosa.load(file_path, sr=self.sample_rate)
        end = int(end_sec * self.sample_rate)
        start = int(end - (self.signal_lenght * self.sample_rate))
        wave = wave[start:end]
        return wave

    def _get_one(self, ix):
        wave_name = self.df.loc[ix, "filename"]
        row_id = self.df.loc[ix, "row_id"]
        end_sec = self.df.loc[ix, "end_sec"]

        wave = self.get_audio(self.path + wave_name, end_sec)
        mel_short = self.mel_provider.msg(wave)
        mel_short = (mel_short - np.min(mel_short)) / (
            np.max(mel_short) - np.min(mel_short)
        )

        if mel_short.shape != (self.mel_image_size, self.mel_image_size):
            mel_short = Image.fromarray(mel_short)
            mel_short = mel_short.resize(
                (self.mel_image_size, self.mel_image_size),
                Image.BICUBIC,
            )
            mel_short = np.array(mel_short)
        if self.img_dtype == "uint8":
            max_value = 255
            mel_short = np.round(mel_short * max_value)
            mel_short = np.repeat(np.expand_dims(mel_short.astype(np.uint8), 2), 3, 2)
        else:
            max_value = 1
            mel_short = np.repeat(np.expand_dims(mel_short.astype(np.float32), 2), 3, 2)
        if self.sin_cos_img:
            #         sin_month
            mel_short[self.mel_image_size - 10 :, :20, 0] = (
                max_value * df.loc[ix, "sin_month"]
            )
            mel_short[self.mel_image_size - 10 :, :20, 1] = max_value
            mel_short[self.mel_image_size - 10 :, :20, 2] = max_value
            #         cos_month
            mel_short[self.mel_image_size - 10 :, 20:40, 0] = max_value
            mel_short[self.mel_image_size - 10 :, 20:40, 1] = (
                max_value * df.loc[ix, "cos_month"]
            )
            mel_short[self.mel_image_size - 10 :, 20:40, 2] = max_value
            #         year
            mel_short[self.mel_image_size - 10 :, 40:60, 0] = max_value
            mel_short[self.mel_image_size - 10 :, 40:60, 1] = max_value
            mel_short[self.mel_image_size - 10 :, 40:60, 2] = (
                max_value * (2021 - df.loc[ix, "year"]) / 50
            )
            #         sin_longitude
            mel_short[
                self.mel_image_size - 10 :,
                self.mel_image_size - 60 : self.mel_image_size - 40,
                0,
            ] = (
                max_value * df.loc[ix, "sin_longitude"]
            )
            mel_short[
                self.mel_image_size - 10 :,
                self.mel_image_size - 60 : self.mel_image_size - 40,
                1,
            ] = max_value
            mel_short[
                self.mel_image_size - 10 :,
                self.mel_image_size - 60 : self.mel_image_size - 40,
                2,
            ] = max_value
            #         cos_longitude
            mel_short[
                self.mel_image_size - 10 :,
                self.mel_image_size - 40 : self.mel_image_size - 20,
                0,
            ] = max_value
            mel_short[
                self.mel_image_size - 10 :,
                self.mel_image_size - 40 : self.mel_image_size - 20,
                1,
            ] = (
                max_value * df.loc[ix, "cos_longitude"]
            )
            mel_short[
                self.mel_image_size - 10 :,
                self.mel_image_size - 40 : self.mel_image_size - 20,
                2,
            ] = max_value
            #         norm_latitude
            mel_short[
                self.mel_image_size - 10 :, self.mel_image_size - 20 :, 0
            ] = max_value
            mel_short[
                self.mel_image_size - 10 :, self.mel_image_size - 20 :, 1
            ] = max_value
            mel_short[self.mel_image_size - 10 :, self.mel_image_size - 20 :, 2] = (
                max_value * df.loc[ix, "norm_latitude"]
            )
            x = mel_short

        else:
            features = np.array(
                [
                    df.loc[ix, "sin_longitude"],
                    df.loc[ix, "cos_longitude"],
                    df.loc[ix, "norm_latitude"],
                    df.loc[ix, "sin_month"],
                    df.loc[ix, "cos_month"],
                    (2022 - df.loc[ix, "year"]) / 50,
                ]
            )
            x = {"mel": mel_short, "data": features}
        y = 1
        return x, y

    def __getitem__(self, batch_ix):

        if self.sin_cos_img:
            if self.img_dtype == "uint8":
                x = np.zeros(
                    (self.batch_size, self.n_mels, self.mel_image_size, 3),
                    dtype=np.uint8,
                )
            else:
                x = np.zeros(
                    (self.batch_size, self.n_mels, self.mel_image_size, 3),
                    dtype=np.float32,
                )

            y = np.zeros(
                (self.batch_size, 1),
                dtype=np.float16,
            )

            for i in range(self.batch_size):
                x[i], y[i] = self._get_one(
                    i + self.batch_size * batch_ix,
                )

        else:
            b_x_dict, b_y = {}, []
            for i in range(self.batch_size):
                x_dict, y = self._get_one(i + self.batch_size * batch_ix)

                # single x is dictionary of <input>:<value>
                # but, batch needs to be a dictionaty of <input>:np.array(<values>)
                for k, v in x_dict.items():
                    if k not in b_x_dict:
                        b_x_dict[k] = []
                    b_x_dict[k].append(v)

                b_y.append(y)

            for k, v in b_x_dict.items():
                b_x_dict[k] = np.array(v)

            b_y = np.array(b_y)
            x = b_x_dict
            y= b_y

        return x, y