## Basic data reading for data in folders.
- Returns:
  
  train_data_path: List[str]

## Setting basic training configs

In [None]:
# разные режимы датасета
DATA_MODES = ['train', 'val', 'test']
# все изображения будут масштабированы к размеру 224x224 px. Размер, хорошо воспринимаемый сетями, предобученными на ImageNet
RESCALE_SIZE = 224
# работаем на видеокарте
DEVICE = torch.device("cuda")

## Audio Torch Dataset

- input: List[str]

"""
list of train urls
"""

In [None]:
import io
import torch
from scipy.io import wavfile
import torchaudio
from torch.utils.data import Dataset
import whisper
from pydub import AudioSegment

class CustomAudioDataset(Dataset):
    """
    Датасет с картинками, который паралельно подгружает их из папок
    производит скалирование и превращение в торчевые тензоры, а также добавляет аугментации
    """
    def __init__(self, mode, files, labels=None, train_transforms=None, val_test_transforms=None):
        super().__init__()
        # список файлов для загрузки
        self.files = files
        # режим работы
        self.mode = mode

        if self.mode not in DATA_MODES:
            print(f"{self.mode} is not correct; correct modes: {DATA_MODES}")
            raise NameError

        self.len_ = len(self.files)

        self.train_transforms = train_transforms
        self.val_test_transforms = val_test_transforms

        if self.mode != 'test':
            self.labels = labels

    def __len__(self):
        return self.len_

    def load_audio_sample(self, file):

#        audio_bytes = file.read()
#        wav_readed = wavfile.read(io.BytesIO(audio_bytes))[1]
#        audio = torch.from_numpy(wav_readed)

        # sound = AudioSegment.from_mp3(file)   #если данные в формате mp3 переведем их в wav
        # file = sound.export(format="wav")

        # audio = torchaudio.load(file)[0]  #for other models may be better

        audio = whisper.audio.load_audio(file)  #only for whisper - special format
        return audio

    def __getitem__(self, index):
        # введем тут наши аугментации для train и val/test данных.
        x = self.load_audio_sample(self.files[index])

        if self.mode == 'train':
            if self.train_transforms:
                transform = self.train_transforms
                x = transform(x)
        else:
            if self.val_test_transforms:
                transform = self.val_test_transforms
                x = transform(x)
        if self.mode == 'test':
            return x
        else:
            label = self.labels[index]
            y = label
            return x, y

In [None]:
#wavfile reading:
import io
import torch

def wav_read(wavfile):
  audio_bytes = wavfile.read()
  wav_readed = wavfile.read(io.BytesIO(audio_bytes))[1]
  audio = torch.from_numpy(wav_readed)
  return audio

In [None]:
#fleur dataset as tarfile

import tarfile
from scipy.io import wavfile
from tqdm.notebook import tqdm

def download(url: str, target_path: str):
    with urllib.request.urlopen(url) as source, open(target_path, "wb") as output:
        with tqdm(total=int(source.info().get("Content-Length")), ncols=80, unit='iB', unit_scale=True, unit_divisor=1024) as loop:
            while True:
                buffer = source.read(8192)
                if not buffer:
                    break

                output.write(buffer)
                loop.update(len(buffer))


class Fleurs(torch.utils.data.Dataset):
    """
    A simple class to wrap Fleurs and subsample a portion of the dataset as needed.
    """
    def __init__(self, files, labels):
        url = f"https://storage.googleapis.com/xtreme_translations/FLEURS102/{lang}.tar.gz"
        tar_path = os.path.expanduser(f"~/.cache/fleurs/{lang}.tgz")
        os.makedirs(os.path.dirname(tar_path), exist_ok=True)
        if not os.path.exists(tar_path):
            download(url, tar_path)

        all_audio = []
        with tarfile.open(tar_path, "r:gz") as tar:
            for member in tar.getmembers():
                name = member.name
                audio_bytes = tar.extractfile(member).read()
                all_audio.append(wavfile.read(io.BytesIO(audio_bytes))[1])

        self.labels = labels
        self.all_audio = all_audio

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, index):
        record_label = self.labels[index]
        audio = torch.from_numpy(self.all_audio[index].copy())
        #text = record_label["transcription"]

        return (audio, record_label)

## Image Classification Torch DataSet configuration

In [None]:
class CustomDataset(Dataset):
    """
    Датасет с картинками, который паралельно подгружает их из папок
    производит скалирование и превращение в торчевые тензоры, а также добавляет аугментации
    """
    def __init__(self, files, labels, label_encoder_path, train_transforms,
                 val_test_transforms, mode):
        super().__init__()
        # список файлов для загрузки
        self.files = files
        # режим работы
        self.mode = mode
        self.train_transforms = train_transforms
        self.val_test_transforms = val_test_transforms

        if self.mode not in DATA_MODES:
            print(f"{self.mode} is not correct; correct modes: {DATA_MODES}")
            raise NameError

        self.len_ = len(self.files)

        self.label_encoder = LabelEncoder()

        if self.mode != 'test':
            self.labels = labels
            self.label_encoder.fit(self.labels)

            with open(label_encoder_path, 'wb') as le_dump_file:
                  pickle.dump(self.label_encoder, le_dump_file)

    def __len__(self):
        return self.len_

    def load_sample(self, file):
        image = Image.open(file)
        image.load()
        return image

    def __getitem__(self, index):
        # введем тут наши аугментации для train и val/test данных.

        if self.mode == 'train':
            transform = self.train_transforms
        else:
            transform = self.val_test_transforms

        x = self.load_sample(self.files[index])
        x = transform(x)
        if self.mode == 'test':
            return x
        else:
            label = self.labels[index]
            label_id = self.label_encoder.transform([label])
            y = label_id.item()
            return x, y

In [None]:
train_transforms = transforms.Compose([
                transforms.Resize(size=(RESCALE_SIZE, RESCALE_SIZE)),
                transforms.RandomHorizontalFlip(p=0.5),  #аугментация переворотов по горизонтали
                transforms.RandomVerticalFlip(p=0.5), #аугментация переворотов по вертикали
                #transforms.Pad(padding = 15, padding_mode = 'constant'),
                transforms.ToTensor(),

                transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
            ])

In [None]:
val_test_transforms = transforms.Compose([
                transforms.Resize(size=(RESCALE_SIZE, RESCALE_SIZE)),
                transforms.ToTensor(),
                transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
            ])

## Image classifier model loader

In [None]:
import torch.nn.functional as F
efnet_v2_model = torch.hub.load('hankyul2/EfficientNetV2-pytorch', 'efficientnet_v2_m', pretrained=True)
#кастомный классификатор
class Custom_Classifier_efnet_v2(nn.Module):
    def __init__(self, model):
        super(Custom_Classifier_efnet_v2, self).__init__()
        self.model = model
        self.classifier = nn.Linear(1000, 20)  #numver
    def forward(self, x):
        x = self.model(x)
        x = F.selu(x)
        x = self.classifier(x)
        return x

Custom_Classifier_efnet_v2_model = Custom_Classifier_efnet_v2(efnet_v2_model)
final_model = Custom_Classifier_efnet_v2_model
final_model.to(DEVICE)

## Training Loops in native Torch

In [None]:
#back propagation fit step
def fit_epoch(model, train_loader, criterion, optimizer):
    running_loss = 0.0
    running_corrects = 0
    processed_data = 0

    for inputs, labels in train_loader:
        inputs = inputs.to(DEVICE)
        labels = labels.to(DEVICE)
        optimizer.zero_grad()

        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        preds = torch.argmax(outputs, 1)
        running_loss += loss.item() * inputs.size(0)
        running_corrects += torch.sum(preds == labels.data)
        processed_data += inputs.size(0)

    train_loss = running_loss / processed_data
    train_acc = running_corrects.cpu().numpy() / processed_data
    return train_loss, train_acc

In [None]:
# Шаг прямого распространения
from scipy.stats import mode
def eval_epoch(model, val_loader, criterion, min_loss, eps, model_name):
    model.eval()
    running_loss = 0.0
    running_corrects = 0
    processed_size = 0
    running_incorrects = []
    all_labels = []
    all_preds = []
    for inputs, labels in val_loader:
        inputs = inputs.to(DEVICE)
        labels = labels.to(DEVICE)

        with torch.set_grad_enabled(False):
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            preds = torch.argmax(outputs, 1)

        running_loss += loss.item() * inputs.size(0)
        running_corrects += torch.sum(preds == labels.data)
        all_preds.extend(list(preds.cpu()))
        all_labels.extend(list(labels.data.cpu()))
        for i in range(len(preds)):
            if preds[i] != labels.data[i]:
                running_incorrects.append(labels.data[i].cpu())
        processed_size += inputs.size(0)
    f1 = f1_score(all_labels, all_preds, average = 'weighted')
    print(f'f1 weighted = {f1}')
    most_frequent = mode(list(running_incorrects))[0][0]
    #этот блок нужен для того, чтобы сохранять только самую лучшую модель по лоссу на валидации
    val_loss = running_loss / processed_size
    if val_loss < min_loss or val_loss == min_loss+eps:
        torch.save(model.state_dict(), model_name)
    val_acc = running_corrects.double() / processed_size
    return val_loss, val_acc, most_frequent

In [None]:
# Функция, собирающая всё вместе для обучения и сохраняющая лог
model_weights_path = 'yolov5/classification_5_col_marked/model_classifier_mono_corrected_weights/'
#os.mkdir(model_weights_path)
def train(train_files, val_files, model, epochs, batch_size, weights_for_class, model_name):
    train_loader = DataLoader(train_dataset, batch_size=batch_size, num_workers=2, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, num_workers=2, shuffle=False)

    history = []
    log_template = "\nEpoch {ep:03d} train_loss: {t_loss:0.4f} \
    val_loss {v_loss:0.4f} train_acc {t_acc:0.4f} val_acc {v_acc:0.4f}"

    with tqdm(desc="epoch", total=epochs) as pbar_outer:
        opt = torch.optim.AdamW(model.parameters(), lr=0.0001)#, weight_decay=0.005)
        scheduler = torch.optim.lr_scheduler.StepLR(opt, 1, 0.5) #введём scheduler чтобы уменьшать learning rate динамически во время обучения
        criterion = nn.CrossEntropyLoss()#weight=weights_for_class.to(DEVICE))
        min_loss = np.inf
        eps = 0.001
        for epoch in range(epochs):
            train_loss, train_acc = fit_epoch(model, train_loader, criterion, opt)
            print("loss", train_loss)

            val_loss, val_acc, most_frequent = eval_epoch(model, val_loader, criterion, min_loss, eps, model_name)
            print(f'самая частая ошибка в классе - {most_frequent}')
            history.append((train_loss, train_acc, val_loss, val_acc))
            scheduler.step()
            pbar_outer.update(1)
            torch.save(model.state_dict(), model_weights_path + model_name + f'__epoch - {epoch+1}'+'.pt')  #Будем всё равно сохранять веса после каждой эпохи
            tqdm.write(log_template.format(ep=epoch+1, t_loss=train_loss,\
                                           v_loss=val_loss, t_acc=train_acc, v_acc=val_acc))
    return history

In [None]:
# функция предикта
def predict(model, test_loader):
    with torch.no_grad():
        logits = []
        for inputs in test_loader:
            inputs = inputs.to(DEVICE)
            model.eval()
            outputs = model(inputs).cpu()
            logits.append(outputs)

    probs = nn.functional.softmax(torch.cat(logits), dim=-1).numpy()
    return probs

In [None]:
model_name = 'custom_efnet_V2_model_weighted_20_class_corrected_monoculture'
weights_for_class = [] #no weighted cross entropy
num_epochs = 15
batch_size = 32

history = train(train_dataset,
                val_dataset,
                model=final_model,
                epochs=num_epochs,
                batch_size=batch_size,
                weights_for_class=weights_for_class,
                model_name=model_name)

In [None]:
final_model.load_state_dict(torch.load(model_weights_path+f"{model_name}__epoch - 6.pt"))

In [None]:
label_encoder = pickle.load(open(label_encoder_class, 'rb'))

In [None]:
test_dataset = GermsDataset(test_files, labels=None, mode="test")
test_loader = DataLoader(test_dataset, shuffle=False, batch_size=32)

probs_1 = predict(final_model, test_loader)
preds_1 = label_encoder.inverse_transform(np.argmax(probs_1, axis=1))

In [None]:
from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score
from sklearn.metrics import confusion_matrix
from sklearn.metrics import ConfusionMatrixDisplay
print(f'test accuracy = {accuracy_score(preds_1, test_labels)}')
print(f'test F1_macro = {f1_score(preds_1, test_labels, average="macro")}')
print(f'test F1_weighted = {f1_score(preds_1, test_labels, average="weighted")}')

In [None]:
from sklearn.metrics import classification_report
print(classification_report(test_labels, preds_1))