#### 경로 설정 & 라이브러리 import

In [1]:
model_save_path = 'data/'
model_save_name = 'model_v17.1_0202_best_f1(0.95324).pt'
submission_file_name = 'submission_file_name.csv'

In [15]:
import warnings; warnings.filterwarnings('ignore')
import random
import json
import cv2
import time
from glob import glob
import pandas as pd
import numpy as np
from tqdm import tqdm
import sys

import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms

from sklearn.preprocessing import RobustScaler
from sklearn.impute import SimpleImputer

from skimage.transform import warp, rotate, AffineTransform, ProjectiveTransform
from skimage.util import random_noise
from scipy import ndimage

import timm
from timm.data.auto_augment import auto_augment_transform
from timm.data.auto_augment import rand_augment_transform
from timm.data.transforms import RandomResizedCropAndInterpolation
from PIL import Image
sys.getdefaultencoding()

'utf-8'

In [None]:
train_csv = sorted(glob('data/train/*/*.csv'))
train_jpg = sorted(glob('data/train/*/*.jpg'))
train_json = sorted(glob('data/train/*/*.json'))

In [None]:
test_csv = sorted(glob('data/test/*/*.csv'))
test_jpg = sorted(glob('data/test/*/*.jpg'))

#### 데이터 전처리 & 로드

- 환경 데이터 전처리

In [None]:
max_len = 600
csv_features = ['내부 온도 1 평균', '내부 습도 1 평균', '내부 이슬점 평균']
imputer = SimpleImputer(missing_values=np.nan, strategy='median')

In [None]:
first_env = pd.read_csv(train_csv[0])[csv_features]
first_env = first_env.replace('-',np.nan)
first_env = pd.DataFrame(imputer.fit_transform(first_env))
env_concat = first_env.astype(float).diff().iloc[1:].values
for csv_path in tqdm(train_csv[1:]):
    tmp = pd.read_csv(csv_path)[csv_features]
    tmp = tmp.replace('-', np.nan)
    tmp = pd.DataFrame(imputer.fit_transform(tmp))
    tmp = tmp.astype(float).diff().replace(np.nan, 0.).values
    if len(tmp) <= 1:
        continue
    env_concat = np.vstack((env_concat, tmp[1:]))
print('\n(num_row, num_feat): ', env_concat.shape, sep='')

100%|██████████| 5766/5766 [01:17<00:00, 74.17it/s]


(num_row, num_feat): (1772360, 3)





In [None]:
scaler = RobustScaler().fit(env_concat)

In [None]:
def prepro_csv(csv_file, max_len=200):
    df = pd.read_csv(csv_file)[csv_features]
    df = df.replace('-', np.nan).dropna()
    df = pd.DataFrame(df).astype(float).diff()
    df = df.replace(np.nan, 0.)
    scaled = scaler.transform(df)
    scaled_df = pd.DataFrame(scaled)
    pad = np.zeros((max_len, len(scaled_df.columns)))
    length = min(max_len, len(scaled_df))
    pad[-length:] = scaled_df.to_numpy()[-length:]
    env_output = np.round(pad.T, 6)
    return env_output

- 레이블 데이터 전처리

In [None]:
crops = []
diseases = []
risks = []
labels_str = []
for i in range(len(train_json)):
    with open(train_json[i], 'r') as f:
        sample = json.load(f)
        crop = sample['annotations']['crop']
        disease = sample['annotations']['disease']
        risk = sample['annotations']['risk']
        label=f"{crop}_{disease}_{risk}"  
        crops.append(crop)
        diseases.append(disease)
        risks.append(risk)
        labels_str.append(label)
label2int = sorted(np.unique(labels_str))
label2int = {key:value for key,value in zip(label2int, range(len(label2int)))}
labels = [label2int[k] for k in labels_str]
int2label = {}
for key, value in label2int .items():
    int2label[value] = key
len(labels)

5767

- 데이터 로드

In [None]:
def img_load(path):
    img = cv2.imread(path)[:,:,::-1]
    return img

In [None]:
test_imgs = []
for i in tqdm(test_jpg):
    test_imgs.append(img_load(i))

100%|██████████| 51906/51906 [03:56<00:00, 219.07it/s]


In [None]:
test_envs = []
for i in tqdm(test_csv):
    test_envs.append(prepro_csv(i, max_len))

100%|██████████| 51906/51906 [06:52<00:00, 125.73it/s]


In [None]:
print(len(test_imgs), len(test_envs))

51906 51906


#### 데이터 증강 함수

In [None]:
img_size = (512, 512)

In [None]:
def randRange(a, b):
    return np.round(np.random.rand() * (b - a) + a,  2)
def rotation(img): 
    rotated_img = rotate(img, angle=random.choice([-15, 15, 30, -30, 90, -90]))
    norm_image = cv2.normalize(rotated_img, None, alpha = 0, beta = 255, norm_type = cv2.NORM_MINMAX, dtype = cv2.CV_32F)
    rotated_img = norm_image.astype(np.uint8)
    return rotated_img
def up_down(img):
    return np.flipud(img)
def left_right(img):
    return np.fliplr(img)
def noise(img):
    noised_img = random_noise(img, var=random.choice([3e-2, 3e-3]))
    norm_image = cv2.normalize(noised_img, None, alpha = 0, beta = 255, norm_type = cv2.NORM_MINMAX, dtype = cv2.CV_32F)
    noised_img = norm_image.astype(np.uint8)
    return noised_img
def brightness(img):
    return img + random.choice([20, 30, 40])
def contrast(img):
    contrasted_img = img * random.choice([5e-3, 6e-3])
    norm_image = cv2.normalize(contrasted_img, None, alpha = 0, beta = 255, norm_type = cv2.NORM_MINMAX, dtype = cv2.CV_32F)
    contrasted_img = norm_image.astype(np.uint8)
    return contrasted_img
def blur(img):
    return ndimage.uniform_filter(img, size=(random.choice([4, 6]), random.choice([4, 6]), random.choice([1, 1.3])))
def random_affine(img):
    tform = AffineTransform(scale=(randRange(0.75, 1.3), randRange(0.75, 1.3)),
                                                   rotation=randRange(-0.25, 0.25),
                                                   shear=randRange(-0.2, 0.2),
                                                   translation=(randRange(-img.shape[0]//10, img.shape[0]//10), 
                                                   randRange(-img.shape[1]//10, img.shape[1]//10)))
    affined_img = np.float32(warp(img, tform.inverse, mode='reflect'))
    norm_image = cv2.normalize(affined_img, None, alpha = 0, beta = 255, norm_type = cv2.NORM_MINMAX, dtype = cv2.CV_32F)
    affined_img = norm_image.astype(np.uint8)
    return affined_img
def random_perspective(img):
    region = 1/4
    A = np.array([[0, 0], [0, img.shape[0]], [img.shape[1], img.shape[0]], [img.shape[1], 0]])
    B = np.array([[int(randRange(0, img.shape[1] * region)), int(randRange(0, img.shape[0] * region))], 
                            [int(randRange(0, img.shape[1] * region)), int(randRange(img.shape[0] * (1-region), img.shape[0]))], 
                            [int(randRange(img.shape[1] * (1-region), img.shape[1])), int(randRange(img.shape[0] * (1-region), img.shape[0]))], 
                            [int(randRange(img.shape[1] * (1-region), img.shape[1])), int(randRange(0, img.shape[0] * region))], 
                            ])
    pt = ProjectiveTransform()
    pt.estimate(A, B)
    
    perspectived_img = np.float32(warp(img, pt, output_shape=img.shape[:2]))
    norm_image = cv2.normalize(perspectived_img, None, alpha = 0, beta = 255, norm_type = cv2.NORM_MINMAX, dtype = cv2.CV_32F)
    perspectived_img = norm_image.astype(np.uint8)
    return perspectived_img

In [None]:
aug_func_list = [rotation, up_down, left_right, noise, brightness, contrast, blur, random_affine, random_perspective]
def series_aug(img, aug_func_list):
    num_func = random.choice(list(range(2, 7)))
    chosen_func = random.sample(aug_func_list, num_func)
    random.shuffle(chosen_func)
    for func in chosen_func:
        img = func(img)
    return img

In [None]:
def skimg_aug(img):
    img_augmented = series_aug(img, aug_func_list)
    img_augmented_resized = cv2.resize(img_augmented, img_size)
    return img_augmented_resized

In [None]:
autoaug = auto_augment_transform(config_str='original', hparams={'translate_const': 100, 'img_mean': (124, 116, 104)})
randaug = rand_augment_transform(config_str='rand-m9-mstd0.5', hparams={'translate_const': 117, 'img_mean': (124, 116, 104)})
resizecrop = RandomResizedCropAndInterpolation(size=random.choice([300, 400]))
timm_func_list = [autoaug, randaug, resizecrop]
def timm_aug(img):
    img = Image.fromarray(np.uint8(img))
    chosen_func = random.sample(timm_func_list, random.choice([1, 2, 3]))
    random.shuffle(chosen_func)
    for func in chosen_func:
        img = func(img)
    img_augmented_resized = cv2.resize(np.array(img), img_size)
    return img_augmented_resized

In [None]:
def mixup_aug(idx, beta):
    sampled_idx = random.sample(list(train_idx), 1)[0]
    img1 = cv2.resize(imgs_oversampled[idx], img_size)
    img2 =  cv2.resize(imgs_oversampled[sampled_idx], img_size)
    label1 = labels_oversampled[idx]
    label2 = labels_oversampled[sampled_idx]
    alpha= np.random.beta(beta, beta)
    mixup_img = alpha * img1 /255 + (1-alpha) *  img2 /255
    mixup_label = alpha * label1 + (1-alpha) * label2
    norm_image = cv2.normalize(mixup_img, None, alpha = 0, beta = 255, norm_type = cv2.NORM_MINMAX, dtype = cv2.CV_32F)
    mixup_img = norm_image.astype(np.uint8)
    return mixup_img, np.round(mixup_label, 4)

In [None]:
pre_torch_transformer = transforms.Compose([transforms.ToPILImage(),
                                                                                         transforms.RandomPosterize(bits=3, p=0.25),
                                                                                         transforms.ColorJitter(brightness=randRange(0.15, 0.3), contrast=randRange(0.15, 0.3), saturation=randRange(0.15, 0.3), hue=randRange(0.15, 0.3)), # 밝기, 대비, 채도, 색조
                                                                                         transforms.RandomInvert(p=0.1),
                                                                                         transforms.RandomAdjustSharpness(random.choice([2, 4]), p=0.75)
                                                                                         ])
post_torch_transformer = transforms.Compose([transforms.ToPILImage(),
                                                                                         transforms.RandAugment(num_ops=random.randint(1, 4), magnitude=random.randint(1, 4))
                                                                                         ])

In [None]:
def torch_aug_with_bbx(img, idx):
    torch_augmented = np.array(pre_torch_transformer(img))
    x1, x2, y1, y2 = bbxs_oversampled[idx]
    mix_type = random.randint(0, 1)
    post_aug = random.randint(0, 1)
    if mix_type == 0:
        torch_augmented[y1:y2,x1:x2] = img[y1:y2,x1:x2]
        if post_aug == 1:
            torch_augmented = np.array(post_torch_transformer(torch_augmented))
            return cv2.resize(torch_augmented, img_size)
        elif post_aug == 0:
            return cv2.resize(torch_augmented, img_size)
    elif mix_type == 1:
        img[y1:y2,x1:x2] = torch_augmented[y1:y2,x1:x2]
        if post_aug == 1:
            img = np.array(post_torch_transformer(img))
            return cv2.resize(img, img_size)
        elif post_aug == 0:
            return cv2.resize(img, img_size)

In [2]:
def torch_aug_with_part_bbx(img, idx):
    torch_augmented = np.array(pre_torch_transformer(img))
    part_list = part_oversampled[idx]
    mix_type = random.randint(0, 1)
    post_aug = random.randint(0, 1)
    if len(part_list) == 0:
        if post_aug == 1:
            torch_augmented = np.array(post_torch_transformer(torch_augmented))
            return cv2.resize(torch_augmented, img_size)
        elif post_aug == 0:
            return cv2.resize(torch_augmented, img_size)
    else:
        if mix_type == 0:
            for i in range(len(part_list)):
                x1, x2, y1, y2 = part_list[i]
                torch_augmented[y1:y2,x1:x2] = img[y1:y2,x1:x2]
            if post_aug == 1:
                torch_augmented = np.array(post_torch_transformer(torch_augmented))
                return cv2.resize(torch_augmented, img_size)
            elif post_aug == 0:
                return cv2.resize(torch_augmented, img_size)
        elif mix_type == 1:
            for i in range(len(part_list)):
                x1, x2, y1, y2 = part_list[i]
                img[y1:y2,x1:x2] = torch_augmented[y1:y2,x1:x2]
            if post_aug == 1:
                img = np.array(post_torch_transformer(img))
                return cv2.resize(torch_augmented, img_size)
            elif post_aug == 0:
                return cv2.resize(img, img_size)

In [None]:
def edge_aug(img):
    edge_type = random.randint(0, 2)
    post_aug = random.randint(0, 1)
    if edge_type == 0:
        edge = cv2.Canny(img, random.choice([60, 80]), random.choice([60, 80]))
        edge = np.stack((edge,)*3, axis=-1)
    elif edge_type == 1:
        edge = cv2.Sobel(img, -1, 0, 1)
    elif edge_type == 2:
        edge = cv2.Laplacian(img, -1)
    img_edge_added = edge + img
    if post_aug == 1:
        img_augmented = timm_aug(img_edge_added)
    else:
        img_augmented = img_edge_added
    img_augmented_resized = cv2.resize(img_augmented, img_size)
    return img_augmented_resized

In [None]:
def resize_only(img):
    img_resized = cv2.resize(img, img_size)
    return img_resized

In [None]:
def scaling(x, sigma=0.05):
    factor = np.random.normal(loc=1., scale=sigma, size=x.shape)
    return np.multiply(x, factor)
def temporal_aug(envs):
    aug = np.round(np.random.rand(), 2)
    if aug >= 0.4:
        df = pd.DataFrame(envs)
        df_nonzero = df.loc[:, (df != 0).any(axis=0)]
        df_nonzero_aug = scaling(df_nonzero)
        df[df_nonzero_aug.columns] = df_nonzero_aug
        return df.values
    else:
        return envs

#### 모델 및  커스텀 데이터셋 정의

In [None]:
num_class = len(np.unique(labels))
rnn_hidden_dim = 512
dropout_rate = 0.4
cnn_output_dim = 1000
rnn_output_dim = 128
env_temporal_len = max_len
env_feature_len = len(csv_features)

In [None]:
class rnn_decoder(nn.Module):
    def __init__(self, env_temporal_len, rnn_hidden_dim, rnn_output_dim, env_feature_len, num_class, dropout_rate):
        super(rnn_decoder, self).__init__()
        self.lstm = nn.LSTM(env_temporal_len, rnn_hidden_dim)
        self.rnn_fc = nn.Linear(env_feature_len * rnn_hidden_dim, rnn_output_dim)
        self.final_layer = nn.Linear(cnn_output_dim + rnn_output_dim, num_class)
        self.dropout = nn.Dropout(dropout_rate)

    def forward(self, enc_out, dec_inp):
        hidden, _ = self.lstm(dec_inp)
        hidden = hidden.view(hidden.size(0), -1)
        hidden = self.rnn_fc(hidden)
        concat = torch.cat([enc_out, hidden], dim=1)
        fc_input = concat
        output = self.dropout((self.final_layer(fc_input)))
        return output

In [None]:
image_model = 'tinynet_a' 

In [None]:
class custom_dataset(Dataset):
    def __init__(self, imgs, envs, labels, mode='train'):
        self.imgs = imgs
        self.envs = envs
        self.labels = labels
        self.mode=mode
    def __len__(self):
        return len(self.imgs)
    def __getitem__(self, idx):
        img = self.imgs[idx]
        if self.mode=='train':
            aug_type = random.randint(0, 6)
            if aug_type in list(range(0, 1)):
                img, label = mixup_aug(idx, beta=1)
            elif aug_type in list(range(1, 2)):
                img = timm_aug(img)
                label = self.labels[idx]
            elif aug_type in list(range(2, 3)):
                img = skimg_aug(img)
                label = self.labels[idx]
            elif aug_type in list(range(3, 4)):
                img = torch_aug_with_bbx(img, idx)
                label = self.labels[idx]
            elif aug_type in list(range(4, 5)):
                img = torch_aug_with_part_bbx(img, idx)
                label = self.labels[idx]
            elif aug_type in list(range(5, 6)):
                img = edge_aug(img)
                label = self.labels[idx]
            else:
                img = resize_only(img)
                label = self.labels[idx]
            img = transforms.ToTensor()(img)
            envs = temporal_aug(self.envs[idx])
            return img, envs, label
        elif self.mode=='valid':
            img = resize_only(img)
            img = transforms.ToTensor()(img)
            envs = self.envs[idx]
            label = self.labels[idx]
            return img, envs, label
        elif self.mode=='test':
            img = resize_only(img)
            img = transforms.ToTensor()(img)
            envs = self.envs[idx]
            return img, envs

class network(nn.Module):
    def __init__(self):
        super(network, self).__init__()
        self.cnn = timm.create_model(image_model, pretrained=True)
        self.rnn = rnn_decoder(env_temporal_len, rnn_hidden_dim, rnn_output_dim, env_feature_len, num_class, dropout_rate)

    def forward(self, img, env):
        x = self.cnn(img)
        output = self.rnn(x, env)
        return output

In [None]:
torch.cuda.empty_cache()
device = torch.device('cuda'); device

device(type='cuda')

In [None]:
learning_rate = 5e-5
label_smoothing = 0.95

In [None]:
model = network().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
criterion = nn.CrossEntropyLoss(label_smoothing=label_smoothing)
gscaler = torch.cuda.amp.GradScaler()

Downloading: "https://github.com/huawei-noah/CV-Backbones/releases/download/v1.2.0/tinynet_a.pth" to /root/.cache/torch/hub/checkpoints/tinynet_a.pth


#### 데이터로더

In [None]:
test_dataset = custom_dataset(test_imgs, test_envs, labels, mode='test')
test_loader = DataLoader(test_dataset, shuffle=False, batch_size=64, pin_memory=True, num_workers=8)
len(test_loader)

812

#### 평가

In [None]:
model.load_state_dict(torch.load(model_save_path + model_save_name))

<All keys matched successfully>

In [None]:
model.eval()
test_preds=[]
with torch.no_grad():
    for batch in tqdm(test_loader):
        img = torch.tensor(batch[0], dtype=torch.float32, device=device)
        env = torch.tensor(batch[1], dtype=torch.float32, device=device)
        with torch.cuda.amp.autocast():
            pred = model(img, env)
        test_preds += pred.argmax(1).detach().cpu().numpy().tolist()
test_preds_converted = [str(int2label[i]) for i in test_preds]
print('\n', len(test_preds_converted), sep='')

100%|██████████| 812/812 [02:23<00:00,  5.64it/s]


51906





In [None]:
submission_template = pd.read_csv('data/sample_submission.csv')
submission_template.head()

Unnamed: 0,image,label
0,10000,0_00_0
1,10001,0_00_0
2,10002,0_00_0
3,10003,0_00_0
4,10004,0_00_0


In [None]:
submission = submission_template.copy()
submission['label'] = test_preds_converted
print(len(submission))
submission.head()

51906


Unnamed: 0,image,label
0,10000,6_00_0
1,10001,5_b6_1
2,10002,4_00_0
3,10003,3_00_0
4,10004,3_b8_1


In [None]:
submission.to_csv('data/' + submission_file_name, index=False)