In [1]:
import os
import pickle
import shutil
from tqdm import tqdm

# import annoy
import pandas
import transformers
import numpy
import cv2
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import cv2


import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

from torch.utils.data import Dataset, DataLoader
from PIL import Image
import random

import albumentations as A
import cv2

from urllib.request import urlopen
from PIL import Image
import timm

from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

# Declare an augmentation pipeline
transform = A.Compose([
    A.HorizontalFlip(p=0.3),
    A.RandomBrightnessContrast(p=0.5),
    A.GaussianBlur(p=0.3, blur_limit=(1, 3)),
    A.GaussNoise(p=0.3, var_limit=(200.0, 300.0))
])

In [2]:
# data = pd.read_excel("data/output/Book1.xlsx")
# id2idx = {id: i for i, id in enumerate(data["id"].unique())}
# idx2id = {i: id for i, id in enumerate(data["id"].unique())}

# pickle.dump(id2idx, open("id2idx.pkl", 'wb'))
# pickle.dump(idx2id, open("idx2id.pkl", 'wb'))

In [3]:
# label_folders = os.listdir("photos")
# bad_nums = 0
# for lf in label_folders:
#     path_ = "photos/" + lf + "/1"
#     len_ = len(os.listdir(path_))
#     if not len_:
#         bad_nums += 1
#         shutil.rmtree("photos/" + lf) 
               

In [4]:
def concat_two_images(img_1, img_2):
    dim = (175, 175)
    img_1_ = cv2.resize(img_1, dim, interpolation = cv2.INTER_AREA)
    img_2_ = cv2.resize(img_2, dim, interpolation = cv2.INTER_AREA)
    
    concated_images = np.hstack([img_1_, img_2_])
    
    return concated_images

def get_folders(path_to_label_folders):
    res_folders = []
    labels_folders = os.listdir(path_to_label_folders)#[:10]
    for label_folder in labels_folders:
        sample_folders = os.listdir(os.path.join(path_to_label_folders, label_folder))
        for sample_folder in sample_folders:
            res_folders.append(os.path.join(path_to_label_folders, label_folder, sample_folder))

    return res_folders

def get_model_n_processor(base_model_name, device="cpu"):
    model = timm.create_model(
        base_model_name,
        pretrained=True,
        num_classes=num_classes,  # remove classifier nn.Linear
    )
    model.to(device)
    
    # get model specific transforms (normalization, resize)
    data_config = timm.data.resolve_model_data_config(model)
    processor = timm.data.create_transform(**data_config, is_training=False)

    print(f"model: {base_model_name}, loaded")
    
    return model, processor

class CPCDataset(Dataset):
    def __init__(self, label_folders, processor, train=True, transform=None):
        self.label_folders = label_folders
        self.processor = processor
        self.train = train
        self.transform = transform
        self.id2idx = id2idx
        
    def __len__(self):
        return len(self.label_folders)

    def __getitem__(self, idx):
        label_folder_path = self.label_folders[idx]

        try:
            img_1, img_2 = self.get_tow_imgs(label_folder_path)
            id_ = int(label_folder_path.split("/")[-2])
            target = self.id2idx[id_]
    
            if self.train and np.random.randint(1,2) == 1:
                img_1, img_2 = img_2, img_1
    
            if self.transform is not None:
                try:
                    transformed = transform(image=img_1)
                    img_1 = transformed["image"]
            
                    transformed = transform(image=img_2)
                    img_2 = transformed["image"]
                except Exception:
                    pass
    
            image = concat_two_images(img_1, img_2)
            image = Image.fromarray(image)
            
            return self.processor(image), torch.tensor(target)
        except Exception as e:
            print(f"Something goes wrong during reading data/preproc: {e}")
            return self.__getitem__(np.random.randint(0, len(self.label_folders)))

    @staticmethod
    def get_tow_imgs(label_folder_path):
        img_names = os.listdir(label_folder_path)

        if len(img_names) == 0:
            raise IndexError(f"No image in {label_folder_path}!")
            
        img_1 = cv2.imread(f"{label_folder_path}/{img_names[0]}")
        if len(img_names) > 1:
            img_2 = cv2.imread(f"{label_folder_path}/{img_names[1]}")
        else:
            img_2 = cv2.imread(f"{label_folder_path}/{img_names[0]}")
    
        return img_1, img_2

def train_epoch(model, loader, criterion, optimizer):
    model.train()
    for iteration, data in tqdm(enumerate(loader), total=len(loader)):
        optimizer.zero_grad()
        
        inputs, targets = data
        inputs = inputs.to(device)
        targets = targets.to(device)
        
        pred_logits = model(inputs)#.flatten()
        ce_loss = criterion(pred_logits, targets)
        
        ce_loss.backward()
        optimizer.step()

def eval_epoch(model, loader):
    model.eval()
    preds = []
    trues = []
    for iteration, data in tqdm(enumerate(loader), total=len(loader)):
        inputs, targets = data
        inputs = inputs.to(device)
        targets = targets.to(device)
        
        pred_logits = model(inputs)
        
        pred_labels = list(pred_logits.argmax(dim=1).detach().cpu().numpy())
        
        preds += pred_labels
        trues += list(targets.detach().cpu().numpy())

    return trues, preds

def train_n_eval(model, train_loader, val_loader, criterion, optimizer, n_epochs):
    for i in range(n_epochs):
        train_epoch(model, train_loader, criterion, optimizer)
        trues, preds = eval_epoch(model, val_loader)
        accuracy = accuracy_score(trues, preds)
        print(f"epoch {i}: val accuracy: {accuracy}")

In [5]:
workflow_test = True # если нужно на небольшом кол-ве протестить
path_to_label_folders = "photos/" # путь до папок с картинками
base_model_name = 'mobilenetv3_large_100.ra_in1k' #  название предобученной модели с хагингфейс
model_save_path = "model_state_dict.pt" # куда сохранить модель
device = "cuda"
n_epochs = 1
lr = 1e-4
batch_size = 4

In [6]:
all_label_folders = get_folders(path_to_label_folders)
if workflow_test:
    all_label_folders = all_label_folders[:1000]
    
id2idx = pickle.load(open("id2idx.pkl", 'rb'))
num_classes = len(id2idx)

print(f"num imgs: {len(all_label_folders)*2}, num classes: {num_classes}")

model, processor = get_model_n_processor(base_model_name, device=device)
optimizer = optim.AdamW(model.parameters(), lr=lr, weight_decay=1e-4)
criterion = nn.CrossEntropyLoss()

train_label_folders, val_label_folders = train_test_split(all_label_folders, test_size=0.3, random_state=42)
train_dataset = CPCDataset(train_label_folders, processor, train=True, transform=transform)
train_loader = DataLoader(train_dataset, shuffle=True, batch_size=batch_size, drop_last=True)

val_dataset = CPCDataset(val_label_folders, processor, train=False, transform=None)
val_loader = DataLoader(val_dataset, shuffle=False, batch_size=batch_size, drop_last=False)

print("start training")
train_n_eval(model, train_loader, val_loader, criterion, optimizer, n_epochs)
print("training completed!")

print("Saving model!")
torch.save(model.state_dict(), model_save_path)

num imgs: 2000, num classes: 61656
model: mobilenetv3_large_100.ra_in1k, loaded
start training


100%|██████████| 175/175 [00:17<00:00, 10.02it/s]
100%|██████████| 75/75 [00:03<00:00, 24.74it/s]


epoch 0: val accuracy: 0.0
training completed!
Saving model!


In [7]:
class Identity(nn.Module):
    def __init__(self):
        super().__init__()
        
    def forward(self, x):
        return x

model, processor = get_model_n_processor(base_model_name, device=device)
model.load_state_dict(torch.load(model_save_path))
model.classifier = Identity()

print(model(next(iter(val_loader))[0].to(device)).shape)
print("Check completed!")

model: mobilenetv3_large_100.ra_in1k, loaded
torch.Size([4, 1280])
Check completed!
