# Imports

In [None]:
!pip install audiomentations cylimiter | tail

In [None]:
import librosa

import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
import numpy as np
import pandas as pd
import random

import torch
import torchvision
from torch import nn, optim
from torch import nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm

import torch
import torchmetrics
import os

import warnings
warnings.filterwarnings('ignore')

In [None]:
from audiomentations import (
    Compose,
    OneOf,
    AddGaussianSNR,
    AddShortNoises,
    AddBackgroundNoise,
    PolarityInversion,
    AdjustDuration,
    Aliasing,
    ApplyImpulseResponse,
    BandPassFilter,
    BandStopFilter,
    Gain,
    GainTransition,
    HighPassFilter,
    Limiter,
    LowPassFilter,
    PitchShift, # https://iver56.github.io/audiomentations/waveform_transforms/pitch_shift/
    TanhDistortion,
    TimeMask,
    Shift
    
)

SR = 32_000

transform = Compose([
        OneOf([ # 음성 길이 5초로 만듦
            AdjustDuration(
                duration_seconds=5,
                padding_mode="silence",
                padding_position="start",
                p=1.0
            ),
            AdjustDuration(
                duration_seconds=5,
                padding_mode="silence",
                padding_position="end",
                p=1.0
            ),
        ]),
        OneOf([
            Gain(p=1.0),
            GainTransition(p=1.0),
            PitchShift(
                min_semitones=-4.0,
                max_semitones=4.0,
                p=1.0
            ),
        ], p=0.5),
        OneOf([
            Aliasing(
                min_sample_rate=8000,
                max_sample_rate=32000,
                p=1.0
            ),
            BandPassFilter(min_center_freq=200.0, max_center_freq=4000, p=1.0),
            BandStopFilter(p=1.0),
            TanhDistortion(p=1.0),
            HighPassFilter(p=1.0),
            Limiter(p=1.0),
            LowPassFilter(p=1.0),
            TimeMask(p=1.0),
            Shift(p=1.0)
        ], p=0.5),
        OneOf([
            AddGaussianSNR(
                min_snr_db=5.0,
                max_snr_db=40.0,
                p=1.0
            ),
        ], p=0.5),
    ])

In [None]:
device = torch.device('cuda:0') if torch.cuda.is_available() else torch.device('cpu')

device

# Config

In [None]:
path = './' # 데이터 풀려있는 폴더로 수정해주세요(마지막에 '/' 붙여주세요!!)
com_path = './'
noise_path = './noise/noise/' # 수정

class Config:
    SR = 32000
    N_MEL = 128
    # Dataset
    ROOT_FOLDER = path
    # Training
    N_CLASSES = 2
    BATCH_SIZE = 128
    N_EPOCHS = 4
    LR = 3e-4
    # Others
    SEED = 42

CONFIG = Config()

In [None]:
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(CONFIG.SEED) # Seed 고정

In [None]:
df = pd.read_csv(path + 'train.csv')
df_com = pd.read_csv(com_path + 'composition_train.csv')
df_noise = pd.read_csv('./no_label.csv') # 수정

In [None]:
df_noise['id'] = 0
df_noise = df_noise.set_index('id').reset_index()

In [None]:
for i in range(len(df_noise['path'])):
    tmp = df_noise['path'][i].split('\\')[1]
    df_noise['path'][i] = noise_path + tmp
    df_noise['id'][i] = tmp

In [None]:
df['real'] = 0
df['fake'] = 0

for i in range(55438):
    if df['label'][i] == 'real':
        df['real'][i] = 1
    else:
        df['fake'][i] = 1
    df['path'][i] = path + df['path'][i][2:]

In [None]:
for i in range(55339):
    df_com['path'][i] = com_path + df_com['path'][i]

In [None]:
df = df.drop(columns=['label'])
df_all = pd.concat([df, df_com, df_noise])

del df, df_com, df_noise

df_all

In [None]:
train, val, _, _ = train_test_split(df_all, df_all[['real', 'fake']], test_size=0.2, random_state=CONFIG.SEED)

## Data Pre-processing : MFCC

In [None]:
def preprocess(df, train_mode=True):
    features = []
    labels = []
    for _, row in tqdm(df.iterrows()):
        features.append(row['path'])

        if train_mode:
            label_vector = np.zeros(CONFIG.N_CLASSES, dtype=float)
            label_vector[0] = 1 if row['fake'] == 1 else 0
            label_vector[1] = 1 if row['real'] == 1 else 0
            labels.append(label_vector)

    if train_mode:
        return features, labels
    return features

In [None]:
train_data, train_labels = preprocess(train, True)
val_data, val_labels = preprocess(val, True)

# Dataset

In [None]:
class CustomDataset(Dataset):
    def __init__(self, data, label):
        self.data = data
        self.label = label

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        # load audio
        img, sr = librosa.load(self.data[index], sr=CONFIG.SR)
        
        if self.label is not None:
            img = transform(samples=img, sample_rate=CONFIG.SR)
        
        # audio to melspectogram
        img = librosa.feature.melspectrogram(y=img, sr=CONFIG.SR, n_mels=CONFIG.N_MEL)
        
        # melspectogram to log scale spectogram(decibel)
        img = librosa.power_to_db(img, ref=np.max)

        img = torch.tensor(img).unsqueeze(dim=0)
        img = torch.cat([img, img, img], dim=0)

        if self.label is not None:
            return img, self.label[index]
        return img

In [None]:
train_dataset = CustomDataset(train_data, train_labels)
val_dataset = CustomDataset(val_data, val_labels)

In [None]:
train_loader = DataLoader(
    train_dataset,
    batch_size=CONFIG.BATCH_SIZE,
    shuffle=True
)
val_loader = DataLoader(
    val_dataset,
    batch_size=CONFIG.BATCH_SIZE,
    shuffle=False
)

# Define Model

In [None]:
from torchvision import models
# model = models.wide_resnet50_2(weights="IMAGENET1K_V2")
# model = models.resnet50(pretrained=True)
#num_ftrs = model.fc.in_features
#model.fc = nn.Linear(num_ftrs, 2)

class MODEL(nn.Module):
    def __init__(self):
        super(MODEL, self).__init__()
        
        self.backbone = models.convnext_large(weights=models.ConvNeXt_Large_Weights.DEFAULT)
        self.norm = nn.LayerNorm(1000)
        self.act = nn.SiLU()
        self.drop = nn.Dropout1d()
        self.classifier = nn.Linear(1000, CONFIG.N_CLASSES)
        self.s = torch.sigmoid
        
    def forward(self, x):
        x = self.backbone(x)
        x = self.norm(x)
        x = self.act(x)
        x = self.drop(x)
        x = self.classifier(x)
        x = self.s(x)
        return x

# Train & Validation

In [None]:
from sklearn.metrics import roc_auc_score

def train(model, optimizer, train_loader, val_loader, device, scheduler):
    model.to(device)
    model.train()
#     criterion = nn.BCELoss().to(device)
    criterion = nn.CrossEntropyLoss().to(device)

    best_val_score = 0
    best_model = None

    for epoch in range(1, CONFIG.N_EPOCHS+1):
        model.train()
        train_loss = []
        for features, labels in tqdm(iter(train_loader)):
            features = features.float().to(device)
            labels = labels.float().to(device)

            optimizer.zero_grad()

            output = model(features)
            loss = criterion(output, labels)

            loss.backward()
            optimizer.step()

            train_loss.append(loss.item())
        scheduler.step()

        _val_loss, _val_score = validation(model, criterion, val_loader, device)
        _train_loss = np.mean(train_loss)
        print(f'Epoch [{epoch}], Train Loss : [{_train_loss:.5f}] Val Loss : [{_val_loss:.5f}] Val AUC : [{_val_score:.5f}]')


        if best_val_score < _val_score:
            best_val_score = _val_score
            best_model = model
            torch.save(best_model.state_dict(), './ckpt_convnext_modified_best_model.pt')

    return best_model

def multiLabel_AUC(y_true, y_scores):
    auc_scores = []
    for i in range(y_true.shape[1]):
        auc = roc_auc_score(y_true[:, i], y_scores[:, i])
        auc_scores.append(auc)
    mean_auc_score = np.mean(auc_scores)
    return mean_auc_score

def validation(model, criterion, val_loader, device):
    model.eval()
    val_loss, all_labels, all_probs = [], [], []

    with torch.no_grad():
        for features, labels in tqdm(iter(val_loader)):
            features = features.float().to(device)
            labels = labels.float().to(device)

            probs = model(features)

            loss = criterion(probs, labels)

            val_loss.append(loss.item())

            all_labels.append(labels.cpu().numpy())
            all_probs.append(probs.cpu().numpy())

        _val_loss = np.mean(val_loss)

        all_labels = np.concatenate(all_labels, axis=0)
        all_probs = np.concatenate(all_probs, axis=0)

        # Calculate AUC score
        auc_score = multiLabel_AUC(all_labels, all_probs)

    return _val_loss, auc_score

In [None]:
import gc
gc.collect()

## Run

In [None]:
model = MODEL()
optimizer = torch.optim.AdamW(params = model.parameters(), lr = CONFIG.LR)
scheduler = torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda = lambda epoch: 0.90 ** epoch)

infer_model = train(model, optimizer, train_loader, val_loader, device, scheduler)

In [None]:
torch.save(infer_model.state_dict(), './convnext_modified_model.pt')

In [None]:
ckpt_model = MODEL()
ckpt_model.load_state_dict(torch.load('./convnext_modified_best_model.pt'))

ckpt_infer_model = train(ckpt_model, optimizer, train_loader, val_loader, device, scheduler)


In [None]:
def replace_backslash_with_slash(input_string):
    return input_string.replace('\\', '/')

In [None]:
def preprocess_soft_label(df, train_mode=True):
    features = []
    labels = []
    for _, row in tqdm(df.iterrows()):
        features.append('./'+replace_backslash_with_slash(row['path'][4:]))

        if train_mode:
            label_vector = np.zeros(CONFIG.N_CLASSES, dtype=float)
            label_vector[0] = 0.9 if row['fake'] >= 0.5 else 0.1
            label_vector[1] = 0.9 if row['real'] >= 0.5 else 0.1
            labels.append(label_vector)

    if train_mode:
        return features, labels
    return features

In [None]:
df_psuedo = pd.read_csv('./pesudo_label.csv') # 수정
psuedo_data, psuedo_labels = preprocess_soft_label(df_psuedo, True)
psuedo_dataset = CustomDataset(psuedo_data, psuedo_labels)


In [None]:
psuedo_loader = DataLoader(
    psuedo_dataset,
    batch_size=CONFIG.BATCH_SIZE,
    shuffle=True
)

In [None]:
psuedo_model = train(ckpt_infer_model, optimizer, psuedo_loader, val_loader, device, scheduler)

In [None]:
torch.save(psuedo_model.state_dict(), './ckpt_convnext_modified_psuedo.pt')

In [None]:
class ModelWithTemperature(nn.Module):
    """
    A thin decorator, which wraps a model with temperature scaling
    model (nn.Module):
        A classification neural network
        NB: Output of the neural network should be the classification logits,
            NOT the softmax (or log softmax)!
    """
    def __init__(self, model):
        super(ModelWithTemperature, self).__init__()
        self.model = model
        self.temperature = nn.Parameter(torch.ones(1) * 1.5)

    def forward(self, input):
        logits = self.model(input)
        return self.temperature_scale(logits)

    def temperature_scale(self, logits):
        """
        Perform temperature scaling on logits
        """
        # Expand temperature to match the size of logits
        temperature = self.temperature.unsqueeze(1).expand(logits.size(0), logits.size(1))
        return logits / temperature

    # This function probably should live outside of this class, but whatever
    def set_temperature(self, valid_loader):
        """
        Tune the tempearature of the model (using the validation set).
        We're going to set it to optimize NLL.
        valid_loader (DataLoader): validation set loader
        """
        self.cuda()
        nll_criterion = nn.CrossEntropyLoss().cuda()
        ece_criterion = _ECELoss().cuda()

        # First: collect all the logits and labels for the validation set
        logits_list = []
        labels_list = []
        with torch.no_grad():
            for input, label in tqdm(iter(valid_loader)):
                input = input.cuda()
                logits = self.model(input)
                logits_list.append(logits)
                labels_list.append(label)
            logits = torch.cat(logits_list).cuda()
            labels = torch.cat(labels_list).cuda()

        # Calculate NLL and ECE before temperature scaling
#         before_temperature_nll = nll_criterion(logits, labels).item()
#         before_temperature_ece = ece_criterion(logits, labels).item()
#         print('Before temperature - NLL: %.3f, ECE: %.3f' % (before_temperature_nll, before_temperature_ece))

        # Next: optimize the temperature w.r.t. NLL
        optimizer = optim.LBFGS([self.temperature], lr=0.01, max_iter=50)

        def eval():
            optimizer.zero_grad()
            loss = nll_criterion(self.temperature_scale(logits), labels)
            loss.backward()
            return loss
        optimizer.step(eval)

        # Calculate NLL and ECE after temperature scaling
#         after_temperature_nll = nll_criterion(self.temperature_scale(logits), labels).item()
#         after_temperature_ece = ece_criterion(self.temperature_scale(logits), labels).item()
#         print('Optimal temperature: %.3f' % self.temperature.item())
#         print('After temperature - NLL: %.3f, ECE: %.3f' % (after_temperature_nll, after_temperature_ece))

        return self


class _ECELoss(nn.Module):
    """
    Calculates the Expected Calibration Error of a model.
    (This isn't necessary for temperature scaling, just a cool metric).

    The input to this loss is the logits of a model, NOT the softmax scores.

    This divides the confidence outputs into equally-sized interval bins.
    In each bin, we compute the confidence gap:

    bin_gap = | avg_confidence_in_bin - accuracy_in_bin |

    We then return a weighted average of the gaps, based on the number
    of samples in each bin

    See: Naeini, Mahdi Pakdaman, Gregory F. Cooper, and Milos Hauskrecht.
    "Obtaining Well Calibrated Probabilities Using Bayesian Binning." AAAI.
    2015.
    """
    def __init__(self, n_bins=15):
        """
        n_bins (int): number of confidence interval bins
        """
        super(_ECELoss, self).__init__()
        bin_boundaries = torch.linspace(0, 1, n_bins + 1)
        self.bin_lowers = bin_boundaries[:-1]
        self.bin_uppers = bin_boundaries[1:]

    def forward(self, logits, labels):
        softmaxes = F.softmax(logits, dim=1)
        confidences, predictions = torch.max(softmaxes, 1)
        accuracies = predictions.eq(labels)

        ece = torch.zeros(1, device=logits.device)
        for bin_lower, bin_upper in zip(self.bin_lowers, self.bin_uppers):
            # Calculated |confidence - accuracy| in each bin
            in_bin = confidences.gt(bin_lower.item()) * confidences.le(bin_upper.item())
            prop_in_bin = in_bin.float().mean()
            if prop_in_bin.item() > 0:
                accuracy_in_bin = accuracies[in_bin].float().mean()
                avg_confidence_in_bin = confidences[in_bin].mean()
                ece += torch.abs(avg_confidence_in_bin - accuracy_in_bin) * prop_in_bin

        return ece

In [None]:
scaled_model = ModelWithTemperature(psuedo_model)
scaled_model.set_temperature(val_loader)

torch.save(scaled_model.state_dict(), './ckpt_convnext_modified_psuedo_scaled.pt')

In [None]:
test = pd.read_csv(path + 'test.csv')
for i in range(50000):
    test['path'][i] = path + test['path'][i][2:]

In [None]:
test_data = preprocess(test, False)
test_dataset = CustomDataset(test_data, None)
test_loader = DataLoader(
    test_dataset,
    batch_size=CONFIG.BATCH_SIZE,
    shuffle=False
)

del test_data, test_dataset

def inference(model, test_loader, device):
    model.to(device)
    model.eval()
    predictions = []
    with torch.no_grad():
        for features in tqdm(iter(test_loader)):
            features = features.float().to(device)

            probs = model(features)

            probs  = probs.cpu().detach().numpy()
            predictions += probs.tolist()
    return predictions

# preds = inference(infer_model, test_loader, device)
preds = inference(scaled_model, test_loader, device)

submit = pd.read_csv(path + 'sample_submission.csv')
submit.iloc[:, 1:] = preds
submit.head()

In [None]:
submit.to_csv('./ckpt_convnext_modified_psuedo_scaled.csv', index=False)