In [None]:
import torch
import torch.optim as optim
import numpy as np
from torch.utils.data import DataLoader, Dataset,Subset, random_split
import torchvision
from torchvision import datasets, models
from torchvision import transforms as T
import torchvision.transforms.functional as F
import torch.nn as nn
from lightning.pytorch.callbacks.early_stopping import EarlyStopping
import matplotlib.pyplot as plt
from IPython.display import display
import lightning as L
from lightning.pytorch import loggers as pl_loggers
from lightning.pytorch.callbacks import ModelCheckpoint
import torchmetrics, argparse

from torchvision.datasets import ImageFolder

from PIL import Image
import os

import multiprocessing
num_workers = multiprocessing.cpu_count()
print(num_workers)
import timm

In [10]:
class CFG:
    ver = 4.1
    seed = 42
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    base_dir =  "/root/signate_tecno/"
    base_input_dir = base_dir + "input"
    input_dir = base_dir + "input/train"
    test_dir = base_dir + "input/test"
    output_dir = base_dir + "output/"
    sub_dir  = base_dir + "submit/"
    log_dir = base_dir + "logs/"
    model_dir = base_dir + "model/"
    ckpt_dir = base_dir + "ckpt/"

    MODEL = "vit-tiny"
    DATASET = "TECNO"


    learning_rate = 1e-3
    weight_decay = 1e-5
    optimizer = "SGD"
    data_aug = "RandAug"

In [11]:
class ViTNet(L.LightningModule):
    def __init__(self,learning_rate = 1e-3, weight_decay = 1e-5, optimizer_name = "SGD", data_aug = "RandAug"):
        super().__init__()
        self.model = timm.create_model("vit_tiny_patch16_384" , pretrained = True, num_classes = 2)
        self.learning_rate = learning_rate
        self.weight_decay = weight_decay
        self.optimizer_name = optimizer_name
        self.data_aug = data_aug
        self.save_hyperparameters()
        self.acc = torchmetrics.classification.Accuracy(task= 'binary')
        self.class_acc = torchmetrics.classification.Accuracy(task = 'binary')
        self.loss_fn = nn.CrossEntropyLoss()
        self.predictions = []

    def forward(self,x):
        out = self.model(x)
        return out

    def _eval(self,batch,phase, on_step , on_epoch):
        x,y = batch
        out = self(x)
        loss = self.loss_fn(out, y)
        preds = torch.argmax(out, dim=1)
        acc = self.acc(preds, y)
        self.log(f"{phase}_loss", loss)
        self.log(f"{phase}_acc", acc, on_step = on_step, on_epoch = on_epoch)
        if phase == "val":
            self.class_acc(preds,y)
            self.log('hp_metric', acc, on_step = False, on_epoch = True,prog_bar = True, logger = True)
        return loss

    def training_step ( self,batch, batch_idx):
        loss = self._eval(batch, "train", on_step = False, on_epoch = True)
        return loss

    def validation_step(self, batch, batch_idx):
        loss = self._eval(batch, "val", on_step = False, on_epoch = True)
        return loss

    def test_step(self, batch, batch_idx):
        x = batch
        out = self(x)
        self.predictions.append(out)
        return out

    def on_test_epoch_end(self):
        all_preds = torch.cat(self.predictions, dim=0)
        probs = torch.softmax(all_preds, dim=1)[:, 1]  # クラス1の確率を取得
        self.predictions.clear()  # 保存された出力をクリア

        return probs.cpu().numpy()

    def configure_optimizers(self):
        if self.optimizer_name == "SGD":
            optimizer = optim.SGD(self.parameters(), lr = self.learning_rate, weight_decay = self.weight_decay)
        elif self.optimizer_name == "Adam":
            optimizer = optim.Adam(self.parameters(), lr = self.learning_rate, weight_decay = self.weight_decay)
        elif self.optimizer_name == "AdamW":
            optimizer = optim.AdamW(self.parameters(), lr = self.learning_rate, weight_decay = self.weight_decay)

        return optimizer

net = ViTNet(learning_rate = CFG.learning_rate,
             weight_decay = CFG.weight_decay,
             optimizer_name = CFG.optimizer,
             data_aug = CFG.data_aug)


In [12]:
import pandas as pd
sample_submit = pd.read_csv(CFG.base_input_dir + "/sample_submit.csv", header=None)

In [13]:
class TestDatset(Dataset):
    def __init__(self,df,data_dir,transform=None):
        super().__init__()
        self.df = df
        self.image_paths = df[0]
        self.transform = transform
    def __len__(self):
        return len(self.df)
    def __getitem__(self, index):
        image_path = CFG.test_dir +"/"+self.image_paths[index]
        image = Image.open(image_path)
        if self.transform:
            image = self.transform(image)
        return image

In [14]:
class TestDataModule(L.LightningDataModule):
    def __init__(self,df,batch_size = 32, data_dir = "./input", ds = None):
        super().__init__()
        self.df = df
        self.data_dir = data_dir
        self.batch_size = batch_size
        self.test_transform = T.Compose([
                                        T.Resize((384,384)),
                                        T.ToTensor(),
                                        T.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
        ])
    def setup(self,stage = None):
        if stage == "test" or stage is None:
            self.test_dataset = TestDatset(self.df,self.data_dir,self.test_transform)

    def test_dataloader(self):
        return DataLoader(self.test_dataset, batch_size=self.batch_size, num_workers = num_workers)

In [15]:
test_dm = TestDataModule(df = sample_submit,data_dir = CFG.test_dir)
test_dm.prepare_data()
test_dm.setup(stage = "test")

In [None]:
checkpoint_path = CFG.ckpt_dir + f"TECNO-{CFG.ver}-{CFG.MODEL}-fold1.ckpt"
net = ViTNet.load_from_checkpoint(checkpoint_path)
net.eval()
net.freeze()


predict_list, targets_list = [], []

for process, images in enumerate(test_dm.test_dataloader()):
    images = images.to(CFG.device)
    with torch.no_grad():
        outputs = net(images)
        predicts = outputs.softmax(dim = 1)
    predicts = predicts.cpu().detach().numpy()
    predict_list.append(predicts)

predict_list = np.concatenate(predict_list,axis = 0)

predict_list = predict_list[:,1]
# prompt: predict_listを元に0,1に2値変換してください

predicted_labels = (predict_list > 0.5).astype(int)

sample_submit[1] = predicted_labels

In [None]:
n_folds = 5
predict_list = np.zeros(len(sample_submit))

for fold in range(n_folds):
    checkpoint_path = CFG.ckpt_dir + f"TECNO-{CFG.ver}-{CFG.MODEL}-fold{fold}.ckpt"
    net = ViTNet.load_from_checkpoint(checkpoint_path)
    net.eval()
    net.freeze()

    fold_predict_list = []
    test_dm = TestDataModule(df = sample_submit,data_dir = CFG.test_dir)
    test_dm.prepare_data()
    test_dm.setup(stage = "test")

    for process, images in enumerate(test_dm.test_dataloader()):
        images = images.to(CFG.device)
        with torch.no_grad():
            outputs = net(images)
            predicts = outputs.softmax(dim=1)
        predicts = predicts.cpu().detach().numpy()
        fold_predict_list.append(predicts)

    fold_predict_list = np.concatenate(fold_predict_list, axis=0)
    fold_predict_list = fold_predict_list[:, 1]
    predict_list += fold_predict_list

predict_list /= n_folds

predicted_labels = (predict_list > 0.5).astype(int)
sample_submit[1] = predicted_labels

In [None]:
len(predict_list)

In [None]:
sample_submit[1]

In [19]:
sample_submit[1] = predicted_labels
sample_submit.to_csv(f"{CFG.sub_dir}/{CFG.ver}-{CFG.MODEL}-{CFG.seed}.csv", index=False, header=None)

In [None]:
sample_submit.head()