구글 드라이브 마운트 및 경로 설정  
(코드 실행 시 구글 계정 연결 팝업 발생)

In [None]:
from google.colab import drive
drive.mount("/content/data")

Mounted at /content/data


In [None]:
DATA_PATH = "/content/data/MyDrive/playdata 데이터 엔지니어링 21기 팀 비비고/data/"
train_csv = DATA_PATH+"train_df.csv"
test_csv = DATA_PATH+"test_df.csv"
train_data = DATA_PATH+"train/"
test_data = DATA_PATH+"test/"

## 라이브러리 불러오기

In [None]:
import warnings
warnings.filterwarnings('ignore')

import pandas as pd
import numpy as np 
from tqdm import tqdm
import cv2

import random
import torch
import torch.nn as nn
import torchvision.transforms as transforms
from sklearn.metrics import f1_score, accuracy_score
# import timm

try:
    import torch
    import torchvision
    assert int(torch.__version__.split(".")[1]) >= 12, "torch version should be 1.12+"
    assert int(torchvision.__version__.split(".")[1]) >= 13, "torchvision version should be 0.13+"
    print(f"torch version: {torch.__version__}")
    print(f"torchvision version: {torchvision.__version__}")
except:
    print(f"[INFO] torch/torchvision versions not as required, installing nightly versions.")
    !pip3 install -U torch torchvision torchaudio --extra-index-url https://download.pytorch.org/whl/cu113
    import torch
    import torchvision
    print(f"torch version: {torch.__version__}")
    print(f"torchvision version: {torchvision.__version__}")

device = torch.device('cuda')

torch version: 1.13.1+cu116
torchvision version: 0.14.1+cu116


## 파일 불러오기

train DataFrame 불러오기

In [None]:
train_y = pd.read_csv(train_csv) # 우리가 train할 target값이 포함되어 있는 데이터
test_y = pd.read_csv(test_csv)

# 이미지 피클 로드
import pickle
import matplotlib.pyplot as plt

with open(DATA_PATH+"pickled_image.pkl", 'rb') as f:
  train_imgs, test_imgs = pickle.load(f)


In [None]:
train_classes = train_y["label"]  # target data

from sklearn.model_selection import train_test_split
X_tr, X_te, y_tr, y_te = train_test_split(train_imgs, train_classes, test_size = 0.2, random_state = 42)

y_tr_unique = sorted(np.unique(y_tr))
y_tr_unique = {key:value for key,value in zip(y_tr_unique, range(len(y_tr)))}
y_tr_class = [y_tr_unique[k] for k in y_tr]
y_te_class = [y_tr_unique[k] for k in y_te]

In [None]:
count_cla = dict()
for X, y in zip(X_tr, y_tr_class):
    if y in count_cla:
      count_cla[y] = count_cla[y] + 1
    else: 
      count_cla[y] = 1

In [None]:
aug_num = dict()
for i in count_cla:
  aug_num[i] = 130//count_cla[i]

In [None]:
def augment(img):
    augmenters = [transforms.AutoAugment(transforms.AutoAugmentPolicy.CIFAR10), 
                  transforms.AutoAugment(transforms.AutoAugmentPolicy.IMAGENET),
                  transforms.AutoAugment(transforms.AutoAugmentPolicy.SVHN)]
    tran = [transforms.RandomRotation(degrees=(0, 180)),
            transforms.RandomAffine(degrees=(30, 70), translate=(0.1, 0.3), scale=(0.5, 0.75)),
            transforms.RandomApply(transforms=[transforms.RandomCrop(size=(256, 256))], p=0.5)]
    aug = augmenters[random.randint(1,3)]
    tran = tran[random.randint(1,3)]
    augmented = tran(aug(img))
    return augmented

In [None]:
from PIL import Image

X_tr_img = []
y_tr_label = []
for img, label in zip(X_tr, y_tr_class):
  img = Image.fromarray(img)
  X_tr_img.append(img)
  y_tr_label.append(label)
  for _ in range(aug_num[label]-1):
    X_tr_img.append(augment(img))
    y_tr_label.append(label)


In [None]:
len(X_tr_img)

12257

In [None]:
import torchvision.transforms as transforms
import torch
import pandas
from PIL import Image
import numpy as np

class Custom_dataset(): # 데이터셋 가공 클래스
    def __init__(self, img_data, label, mode):
        self.img_data = img_data
        self.label = label
        self.mode = mode
        self.tran = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], 
                                    std=[0.229, 0.224, 0.225])])

    def __len__(self):
        return len(self.img_data)
    def __getitem__(self, idx):
        img = self.img_data[idx]
        lab = self.label[idx]
        if self.mode == 'test':
            img = Image.fromarray(img)

        img = self.tran(img)
        
        return img, lab 

In [None]:
import sys 

sys.path.append("/content/data/MyDrive/playdata 데이터 엔지니어링 21기 팀 비비고/유수빈/")

from service import custom, data_setup, engine

# X_tr_dataset = Custom_dataset(X_tr, y_tr_class, 'test')
X_tr_dataset = Custom_dataset(X_tr_img, np.array(y_tr_label),'train')
                                    #  ,transform = manual_transforms, mode='train')
X_te_dataset = Custom_dataset(np.array(X_te), np.array(y_te_class), 'test')

test_dataset = Custom_dataset(np.array(test_imgs), np.array(['tmp']*len(test_imgs)), 'test')


## DataLoader

In [None]:
from torch.utils.data import DataLoader
import os

batch_size = 32
epochs = 60

NUM_WORKERS = os.cpu_count()


X_tr_dataloader, X_te_dataloader = data_setup.create_dataloaders(
    X_tr_dataset, 
    X_te_dataset,
    batch_size = batch_size,
)

test_loader = DataLoader(
    test_dataset,
    batch_size=batch_size,
    shuffle=True,
    num_workers=NUM_WORKERS,
    pin_memory=True,
)

### 모델 학습

In [None]:
weights = torchvision.models.EfficientNet_B1_Weights.IMAGENET1K_V1
print(weights.transforms())

ImageClassification(
    crop_size=[240]
    resize_size=[256]
    mean=[0.485, 0.456, 0.406]
    std=[0.229, 0.224, 0.225]
    interpolation=InterpolationMode.BICUBIC
)


In [None]:
# EfficienNet B0 모델 선언

weights = torchvision.models.EfficientNet_B1_Weights.IMAGENET1K_V1
model = torchvision.models.efficientnet_b1(weights=weights).to(device)

for param in model.features.parameters():
    param.requires_grad = False

torch.manual_seed(42)
torch.cuda.manual_seed(42)

output_shape = len(y_tr_unique) 

model.classifier = torch.nn.Sequential(
    torch.nn.Dropout(p=0.2, inplace=True), 
    torch.nn.Linear(in_features=1280, out_features=output_shape, # 15
                    bias=True)).to(device)

Downloading: "https://download.pytorch.org/models/efficientnet_b1_rwightman-533bc792.pth" to /root/.cache/torch/hub/checkpoints/efficientnet_b1_rwightman-533bc792.pth


  0%|          | 0.00/30.1M [00:00<?, ?B/s]

In [None]:
def score_function(real, pred):
    score = f1_score(real, pred, average="macro")
    return score

optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
loss_fn = nn.CrossEntropyLoss()


In [None]:
import torch

from tqdm.auto import tqdm
from typing import Dict, List, Tuple
from sklearn.metrics import f1_score

def score_function(real, pred):
    score = f1_score(real, pred, average="macro")
    return score

def train_step(model: torch.nn.Module, 
dataloader: torch.utils.data.DataLoader, 
loss_fn: torch.nn.Module, 
optimizer: torch.optim.Optimizer,
device: torch.device) -> Tuple[float, float]:

    model.train()
    train_loss, train_acc = 0, 0
    train_pred = []
    train_y = []
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)
        y_pred = model(X)

        loss = loss_fn(y_pred, y)
        train_loss += loss.item() 

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        y_pred_class = torch.argmax(torch.softmax(y_pred, dim=1), dim=1)
        train_acc += (y_pred_class == y).sum().item()/len(y_pred)
        train_pred += y_pred.argmax(1).detach().cpu().numpy().tolist()
        train_y += y.detach().cpu().numpy().tolist()
        # .detach().cpu().numpy().tolist()

    train_loss = train_loss / len(dataloader)
    train_acc = train_acc / len(dataloader)
    return train_loss, train_acc, train_pred, train_y

def test_step(model: torch.nn.Module, 
                dataloader: torch.utils.data.DataLoader, 
                loss_fn: torch.nn.Module,
                device: torch.device) -> Tuple[float, float]:
    model.eval() 
    test_loss, test_acc = 0, 0
    test_pred = []
    test_y = []

    with torch.inference_mode():
        for batch, (X, y) in enumerate(dataloader):
            X, y = X.to(device), y.to(device)
            test_pred_logits = model(X)

            loss = loss_fn(test_pred_logits, y)
            test_loss += loss.item()

            test_pred_labels = test_pred_logits.argmax(dim=1)
            test_acc += ((test_pred_labels == y).sum().item()/len(test_pred_labels))
            test_pred += test_pred_logits.argmax(1).detach().cpu().numpy().tolist()
            test_y += y.detach().cpu().numpy().tolist()

    test_loss = test_loss / len(dataloader)
    test_acc = test_acc / len(dataloader)
    return test_loss, test_acc, test_pred, test_y

def train(model: torch.nn.Module, 
            train_dataloader: torch.utils.data.DataLoader, 
            test_dataloader: torch.utils.data.DataLoader, 
            optimizer: torch.optim.Optimizer,
            loss_fn: torch.nn.Module,
            epochs: int,
            device: torch.device) -> Dict[str, List]:
    results = {"train_loss": [],
        "train_acc": [],
        "test_loss": [],
        "test_acc": []
    }

    for epoch in tqdm(range(epochs)):
        train_loss, train_acc, train_pred, train_y = train_step(model=model,
                                            dataloader=train_dataloader,
                                            loss_fn=loss_fn,
                                            optimizer=optimizer,
                                            device=device)
        test_loss, test_acc, test_pred, test_y = test_step(model=model,
                                        dataloader=test_dataloader,
                                        loss_fn=loss_fn,
                                        device=device)

        print(
            f"Epoch: {epoch+1} | "
            f"train_loss: {train_loss:.4f} | "
            f"train_acc: {train_acc:.4f} | "
            f"F1: {score_function(train_y, train_pred):.4f} | "
            f"test_loss: {test_loss:.4f} | "
            f"test_acc: {test_acc:.4f} | "
            f"F1: {score_function(test_y, test_pred):.4f}  "
        )

        results["train_loss"].append(train_loss)
        results["train_acc"].append(train_acc)
        results["test_loss"].append(test_loss)
        results["test_acc"].append(test_acc)

    return results


In [None]:
results = train(model = model, 
                train_dataloader = X_tr_dataloader,
                test_dataloader = X_te_dataloader,
                optimizer = optimizer,
                loss_fn = loss_fn,
                epochs = epochs,
                device = device)


  0%|          | 0/60 [00:00<?, ?it/s]

Epoch: 1 | train_loss: 1.8452 | train_acc: 0.4641 | F1: 0.4063 | test_loss: 0.5233 | test_acc: 0.8792 | F1: 0.3831  
Epoch: 2 | train_loss: 1.1759 | train_acc: 0.6056 | F1: 0.5682 | test_loss: 0.4702 | test_acc: 0.8827 | F1: 0.4251  
Epoch: 3 | train_loss: 1.0246 | train_acc: 0.6530 | F1: 0.6213 | test_loss: 0.4295 | test_acc: 0.8912 | F1: 0.4501  
Epoch: 4 | train_loss: 0.9521 | train_acc: 0.6748 | F1: 0.6467 | test_loss: 0.4002 | test_acc: 0.8920 | F1: 0.4683  


### 추론

In [None]:
model.eval()
f_pred = []

with torch.no_grad():
    for batch in (test_loader):
        x = torch.tensor(batch[0], dtype = torch.float32, device = device)
        with torch.cuda.amp.autocast():
            pred = model(x)
        f_pred.extend(pred.argmax(1).detach().cpu().numpy().tolist())

In [None]:
label_decoder = {val:key for key, val in y_tr_unique.items()}

f_result = [label_decoder[result] for result in f_pred]

### 제출물 생성

In [None]:
DATA_PATH = "/content/data/MyDrive/playdata 데이터 엔지니어링 21기 팀 비비고/data/"
submission = pd.read_csv(DATA_PATH+"sample_submission.csv")

submission["label"] = f_result

submission

Unnamed: 0,index,label
0,0,cable-combined
1,1,pill-good
2,2,cable-good
3,3,pill-good
4,4,screw-thread_top
...,...,...
2149,2149,zipper-good
2150,2150,cable-good
2151,2151,leather-good
2152,2152,leather-cut


In [None]:
DATA_PATH = "/content/data/MyDrive/playdata 데이터 엔지니어링 21기 팀 비비고/유수빈/submission/"
submission.to_csv(DATA_PATH+"20230318_8.csv", index = False)