<a href="https://colab.research.google.com/github/veryHapppy/study_ai/blob/main/Kaggle/leaf_disease.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#!pip install optuna
#!pip install plotly
#!pip install konlpy
#!pip install transformers datasets accelerate
#!pip install evaluate
#!pip install timm
from google.colab import drive
#drive.mount('/content/drive')

import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, TensorDataset
import matplotlib.pyplot as plt
import torch.nn.functional as F
import optuna
import optuna.visualization as vis
from PIL import Image
import torchvision.transforms as transforms
import numpy as np
import pandas as pd
import itertools
from sklearn.metrics import confusion_matrix
from konlpy.tag import Okt
import urllib.request
import pickle
from collections import Counter
from transformers import AutoTokenizer, AutoModelForSequenceClassification, TrainingArguments, Trainer
from datasets import Dataset
from sklearn.model_selection import train_test_split
import evaluate
from transformers import pipeline
import os
from torch.utils.data import Dataset, DataLoader, random_split
import zipfile
import json
import torchvision.models as models
from torchvision.datasets import ImageFolder
import timm
from timm.data import Mixup
from timm.loss import SoftTargetCrossEntropy
import gc

In [None]:
'''
import timm

# 1. 'swinv2'와 '384'가 모두 포함된 모델 찾기
models = timm.list_models('*swinv2*384*', pretrained=True)

# 2. 결과 출력
for m in models:
    print(m)
'''

"\nimport timm\n\n# 1. 'swinv2'와 '384'가 모두 포함된 모델 찾기\nmodels = timm.list_models('*swinv2*384*', pretrained=True)\n\n# 2. 결과 출력\nfor m in models:\n    print(m)\n"

In [None]:
class LeafDiseaseDataset(Dataset):
    def __init__(self, root_dir, csv_file, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.df = pd.read_csv(csv_file)

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        # CSV에서 이미지 ID와 견종 가져오기
        img_id = self.df.iloc[idx, 0]
        label = int(self.df.iloc[idx, 1])

        # 이미지 경로 생성 (ID에 .jpg 붙이기)
        img_path = os.path.join(self.root_dir, f"{img_id}")
        image = Image.open(img_path).convert('RGB')


        if self.transform:
            image = self.transform(image)

        return image, label


train_transform = transforms.Compose([
    transforms.RandomResizedCrop(384, scale=(0.8, 1.0)),
    transforms.RandomHorizontalFlip(), # 컬러 이미지에선 필수!
    transforms.RandomRotation(10),
    transforms.ColorJitter(brightness=0.2, contrast=0.2),
    transforms.ToTensor(),
    transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))
])

# 2. 테스트/검증용 (증강 제외 - 깨끗한 원본)
test_transform = transforms.Compose([
    transforms.Resize(448),
    transforms.CenterCrop(384),
    transforms.ToTensor(),
    transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))
])

# 2. 데이터셋 생성 (경로를 본인의 폴더명으로 바꾸세요)
zip_path = '/content/drive/MyDrive/Colab Notebooks/[CNN]Cassava Leaf Disease Classification/train.zip'
extract_path = '/content/dataset/train' # 코랩 내부 경로

with zipfile.ZipFile(zip_path, 'r') as z:
    z.extractall(extract_path)

# 1. 전체 데이터셋 생성 (아까 만든 DogCatDataset 활용)
full_dataset = LeafDiseaseDataset(root_dir='/content/dataset/train',csv_file='/content/drive/MyDrive/Colab Notebooks/[CNN]Cassava Leaf Disease Classification/train.csv', transform=None)

# 2. 비율 정하기 (예: 80%는 학습용, 20%는 검증용)
train_size = int(0.8 * len(full_dataset))
val_size = len(full_dataset) - train_size

# 3. 무작위로 데이터 분할
train_dataset, val_dataset = random_split(full_dataset, [train_size, val_size])

class ApplyTransform(Dataset):
    def __init__(self, subset, transform=None):
        self.subset = subset
        self.transform = transform

    def __getitem__(self, index):
        x, y = self.subset[index]
        if self.transform:
            x = self.transform(x)
        return x, y

    def __len__(self):
        return len(self.subset)

# 이제 각각의 로더에 맞게 적용
train_dataset = ApplyTransform(train_dataset, transform=train_transform)
val_dataset = ApplyTransform(val_dataset, transform=test_transform)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
class PetFindermodel(nn.Module):
    def __init__(self, model_name='swinv2_base_window12to24_192to384.ms_in22k_ft_in1k', pretrained=True):
        super().__init__()

        self.backbone = timm.create_model(model_name, pretrained=pretrained, num_classes=0)
        num_img_features = self.backbone.num_features

        self.meta_fc = nn.Sequential(
            nn.Linear(16, 64),
            nn.ReLU(),
            nn.BatchNorm1d(64),
            nn.Dropout(0.2)
        )

        self.combined_fc = nn.Sequential(
            nn.Linear(num_img_features + 64, 256),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(256, 5) # Pawpularity 점수 (회귀)
        )

    def forward(self, image, meta):
        # 이미지 특징 추출
        img_features = self.backbone(image) # (batch, 768)

        # 메타데이터 특징 추출
        meta_features = self.meta_fc(meta) # (batch, 64)

        # 특징 결합 (옆으로 붙이기)
        combined = torch.cat((img_features, meta_features), dim=1) # (batch, 768 + 64)

        # 최종 점수 계산
        output = self.combined_fc(combined)
        return output

In [None]:
def train_and_eval(model, train_loader, test_loader, optimizer, criterion):
    scaler = torch.amp.GradScaler() # AMP를 위한 스케일러
    accumulation_steps = 8 # 4번 모아서 업데이트 (배치 4 -> 16 효과)
    # 3 에폭 정도만 학습해서 성능을 확인 (시간 절약)
    for epoch in range(2):
        model.train()
        optimizer.zero_grad()
        for i, (images, labels) in enumerate(train_loader):
            images, labels = images.to(device), labels.to(device)
            with torch.amp.autocast('cuda'):
                outputs = model(images)
                loss = criterion(outputs, labels)
                loss = loss / accumulation_steps
            scaler.scale(loss).backward()

            if (i + 1) % accumulation_steps == 0:
                scaler.step(optimizer) # 업데이트
                scaler.update() # 스케일러 갱신
                optimizer.zero_grad()

    # 최종 검증 단계
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            with torch.amp.autocast('cuda'):
                outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total
    return accuracy

def objective_step1(trial):
    # 1. 하이퍼파라미터 제안
    batch_size = trial.suggest_categorical("batch_size", [2,4])
    lr = trial.suggest_float("lr", 1e-5, 1e-3, log=True)
    weight_decay = trial.suggest_float("weight_decay", 1e-6, 1e-3, log=True)

    # 2. 데이터 로더 설정
    train_loader = DataLoader(train_dataset, batch_size=batch_size,
                              shuffle=True, num_workers=0, pin_memory=True)
    val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False)

    # 3. 모델 초기화
    model_name='swinv2_base_window12to24_192to384.ms_in22k_ft_in1k'
    model = timm.create_model(model_name, pretrained=True, num_classes=5)
    model = model.to(device)

    criterion = nn.CrossEntropyLoss(label_smoothing=0.1)

    for param in model.parameters(): param.requires_grad = False
    for param in model.head.parameters(): param.requires_grad = True

    optimizer_head = optim.Adam(model.head.parameters(), lr=1e-3)
    try:
        model.train()
        for i, (images, labels) in enumerate(train_loader):
          if i > 30: break # 더 빨리 훑고 넘어가도록 조정
          images, labels = images.to(device), labels.to(device)
          optimizer_head.zero_grad()
          with torch.amp.autocast('cuda'): # AMP 적용
              outputs = model(images)
              loss = criterion(outputs, labels)
          loss.backward()
          optimizer_head.step()

        for param in model.parameters(): param.requires_grad = True

        optimizer_full = optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)

        acc = train_and_eval(model, train_loader, val_loader, optimizer_full, criterion)
        return acc
    except RuntimeError as e:
        if "out of memory" in str(e):
            print(f"| OOM 발생 - Trial {trial.number} 건너뜀 |")
            return 0.0 # OOM 발생 시 낮은 점수를 주고 다음으로 넘어감
        else: raise e
    finally:
        if 'model' in locals(): del model
        if 'optimizer_head' in locals(): del optimizer_head
        if 'optimizer_full' in locals(): del optimizer_full
        if 'train_loader' in locals(): del train_loader
        if 'val_loader' in locals(): del val_loader
        gc.collect()
        torch.cuda.empty_cache()


save_dir = "/content/drive/MyDrive/Colab Notebooks/[CNN]Cassava Leaf Disease Classification"
os.makedirs(save_dir, exist_ok=True)

# 데이터베이스 파일 경로
db_path = os.path.join(save_dir, "Leaf_Disease.db")
storage_name = f"sqlite:///{db_path}"
study1 = optuna.create_study(
    study_name="optimization_v2", # 스터디 이름 지정
    storage=storage_name,                # SQLite 파일로 저장
    direction="maximize",
    load_if_exists=True                  # 중단됐을 때 다시 실행하면 이어서 시작함!
)
study1.optimize(objective_step1, n_trials=20) # 시도횟수

best_lr = study1.best_params['lr']
best_batch_size = study1.best_params['batch_size']
best_weight_decay = study1.best_params['weight_decay']
print(f"1단계 완료, 최적의 lr: {best_lr}, batch_size: {best_batch_size}, weight_decay: {best_weight_decay}")

[I 2026-01-08 08:48:30,351] Using an existing study with name 'optimization_v2' instead of creating a new one.


1
2
3


[I 2026-01-08 10:07:58,780] Trial 4 finished with value: 88.0607476635514 and parameters: {'batch_size': 4, 'lr': 0.00014597217243992892, 'weight_decay': 1.1070759528758532e-05}. Best is trial 4 with value: 88.0607476635514.


1
2
3


[I 2026-01-08 11:27:08,641] Trial 5 finished with value: 78.69158878504673 and parameters: {'batch_size': 4, 'lr': 0.0004402954738575345, 'weight_decay': 8.070064192352575e-05}. Best is trial 4 with value: 88.0607476635514.


1
2
3


[I 2026-01-08 12:57:15,968] Trial 6 finished with value: 79.92990654205607 and parameters: {'batch_size': 2, 'lr': 0.0001409649636458517, 'weight_decay': 6.770169185954349e-06}. Best is trial 4 with value: 88.0607476635514.


1
2


In [None]:
with open('/content/drive/MyDrive/Colab Notebooks/[CNN]Cassava Leaf Disease Classification/best_params_step1.json', 'w') as f:
    json.dump(study1.best_params, f)

In [None]:
# 2. 그 레시피대로 '진짜 최종 모델'을 생성합니다.
model_name='swinv2_base_window12to24_192to384.ms_in22k_ft_in1k'
final_model = timm.create_model(model_name, pretrained=True, num_classes=5)
final_model = final_model.to(device)

history = {'train_loss': [], 'val_acc': []}

train_dataset, val_dataset = random_split(full_dataset, [train_size, val_size])

train_dataset = ApplyTransform(train_dataset, transform=train_transform)
val_dataset = ApplyTransform(val_dataset, transform=test_transform)

train_loader = DataLoader(train_dataset, batch_size=best_batch_size,
                          shuffle=True, num_workers=2, pin_memory=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

optimizer = optim.Adam(final_model.head.parameters(), lr=1e-3)

mixup_fn = Mixup(
    mixup_alpha=0.8,
    cutmix_alpha=1.0,
    prob=0.5,        # 50% 확률로 섞음
    switch_prob=0.5,
    mode='batch',
    label_smoothing=0.1,
    num_classes=5
)
criterion_mixup = SoftTargetCrossEntropy()

criterion = nn.CrossEntropyLoss(label_smoothing=0.1)

for param in final_model.parameters(): param.requires_grad = False
for param in final_model.head.parameters(): param.requires_grad = True

print("Starting Phase 1...")
for epoch in range(5):
    final_model.train()
    total_loss = 0
    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = final_model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"Phase 1 - Epoch [{epoch+1}/5] Loss: {total_loss/len(train_loader):.4f}")

for param in final_model.parameters(): param.requires_grad = True

optimizer = optim.Adam(final_model.parameters(), lr=best_lr, weight_decay=best_weight_decay)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=0.1, patience=3)
scaler = torch.cuda.amp.GradScaler() # AMP를 위한 스케일러
accumulation_steps = 8
best_acc = 0.0
epochs = 30
for epoch in range(epochs):
    final_model.train()
    total_loss = 0
    optimizer.zero_grad()
    for i, (images, labels) in enumerate(train_loader):
        images, labels = images.to(device), labels.to(device)
        images, mix_labels = mixup_fn(images, labels)
        with torch.amp.autocast('cuda'):
            outputs = final_model(images)
            loss = criterion_mixup(outputs, mix_labels)
            loss = loss / accumulation_steps
        scaler.scale(loss).backward()
        total_loss += (loss.item() * accumulation_steps)

        if (i + 1) % accumulation_steps == 0:
            scaler.step(optimizer) # 업데이트
            scaler.update() # 스케일러 갱신
            optimizer.zero_grad()

# 최종 검증 단계
    final_model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in val_loader:
            images, labels = images.to(device), labels.to(device)
            with torch.amp.autocast('cuda'):
                outputs = final_model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total
    avg_loss = total_loss / len(train_loader)

    scheduler.step(accuracy)
    history['train_loss'].append(avg_loss)
    history['val_acc'].append(accuracy)

    if accuracy > best_acc:
        best_acc = accuracy

    checkpoint = {
    'epoch': epoch,
    'model_state_dict': final_model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'scheduler_state_dict': scheduler.state_dict(),
    'best_acc': best_acc,
    }
    # 구글 드라이브 경로에 'last_checkpoint.pth'로 저장
    torch.save(checkpoint, '/content/drive/MyDrive/Colab Notebooks/[CNN]Cassava Leaf Disease Classification/last_checkpoint.pth')

    print(f"Epoch [{epoch+1}/{epochs}] - Loss: {avg_loss:.4f}, Acc: {accuracy:.2f}%")

print("최종 모델 학습 완료")

In [None]:
'''
model_name='swinv2_base_window12to24_192to384.ms_in22k_ft_in1k'
final_model = timm.create_model(model_name, pretrained=True, num_classes=5)
final_model = final_model.to(device)

history = {'train_loss': [], 'val_acc': []}

train_dataset, val_dataset = random_split(full_dataset, [train_size, val_size])

train_dataset = ApplyTransform(train_dataset, transform=train_transform)
val_dataset = ApplyTransform(val_dataset, transform=test_transform)

train_loader = DataLoader(train_dataset, batch_size=best_batch_size,
                          shuffle=True, num_workers=2, pin_memory=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

optimizer = optim.Adam(final_model.head.parameters(), lr=1e-3)

mixup_fn = Mixup(
    mixup_alpha=0.8,
    cutmix_alpha=1.0,
    prob=0.5,        # 50% 확률로 섞음
    switch_prob=0.5,
    mode='batch',
    label_smoothing=0.1,
    num_classes=5
)
criterion_mixup = SoftTargetCrossEntropy()
optimizer = optim.Adam(final_model.parameters(), lr=best_lr, weight_decay=best_weight_decay)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=0.1, patience=3)
scaler = torch.cuda.amp.GradScaler() # AMP를 위한 스케일러
accumulation_steps = 8
best_acc = 0.0

load_path = '/content/drive/MyDrive/Colab Notebooks/[CNN]Cassava Leaf Disease Classification/last_checkpoint.pth'

if os.path.exists(load_path):
    checkpoint = torch.load(load_path)
    final_model.load_state_dict(checkpoint['model_state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    scheduler.load_state_dict(checkpoint['scheduler_state_dict'])
    start_epoch = checkpoint['epoch'] + 1
    best_acc = checkpoint['best_acc']
    history = checkpoint['history']
    print(f"[{start_epoch+1}] 에폭부터 이어서 시작합니다. 현재 최고 정확도: {best_acc:.2f}%")
else:
    start_epoch = 0
    print("처음부터 학습을 시작합니다.")

epochs = 30
for epoch in range(epochs):
    final_model.train()
    total_loss = 0
    optimizer.zero_grad()
    for i, (images, labels) in enumerate(train_loader):
        images, labels = images.to(device), labels.to(device)
        images, mix_labels = mixup_fn(images, labels)
        with torch.amp.autocast('cuda'):
            outputs = final_model(images)
            loss = criterion_mixup(outputs, mix_labels)
            loss = loss / accumulation_steps
        scaler.scale(loss).backward()
        total_loss += (loss.item() * accumulation_steps)

        if (i + 1) % accumulation_steps == 0:
            scaler.step(optimizer) # 업데이트
            scaler.update() # 스케일러 갱신
            optimizer.zero_grad()

# 최종 검증 단계
    final_model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in val_loader:
            images, labels = images.to(device), labels.to(device)
            with torch.amp.autocast('cuda'):
                outputs = final_model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total
    avg_loss = total_loss / len(train_loader)

    scheduler.step(accuracy)
    history['train_loss'].append(avg_loss)
    history['val_acc'].append(accuracy)

    if accuracy > best_acc:
        best_acc = accuracy

    checkpoint = {
    'epoch': epoch,
    'model_state_dict': final_model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'scheduler_state_dict': scheduler.state_dict(),
    'best_acc': best_acc,
    }
    # 구글 드라이브 경로에 'last_checkpoint.pth'로 저장
    torch.save(checkpoint, '/content/drive/MyDrive/Colab Notebooks/[CNN]Cassava Leaf Disease Classification/last_checkpoint.pth')

    print(f"Epoch [{epoch+1}/{epochs}] - Loss: {avg_loss:.4f}, Acc: {accuracy:.2f}%")

print("최종 모델 학습 완료")
    '''

In [None]:
save_path = '/content/drive/MyDrive/Colab Notebooks/[CNN]Cassava Leaf Disease Classification'

if not os.path.exists(save_path):
    os.makedirs(save_path)

# 모델 저장 (가장 성능이 좋았을 때 실행)
torch.save(final_model.state_dict(), f"{save_path}/final_model.pth")

In [None]:
class LeafTestDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        # test 폴더 내의 모든 이미지 파일 목록 (정렬하여 순서 보장)
        self.file_list = sorted([f for f in os.listdir(root_dir) if f.endswith(('.png', '.jpg', '.jpeg'))])

    def __len__(self):
        return len(self.file_list)

    def __getitem__(self, idx):
        img_name = self.file_list[idx]
        img_path = os.path.join(self.root_dir, img_name)
        image = Image.open(img_path).convert('RGB')

        if self.transform:
            image = self.transform(image)

        return image, img_name # 이미지와 파일명을 함께 반환

def generate_submission(model, test_loader, save_path):
    model.eval()
    all_preds = []
    all_ids = []
    with torch.no_grad():
        for images, img_names in test_loader:
            images = images.to(device)

            with torch.amp.autocast('cuda'):
                outputs = model(images) # [batch_size, 5] 형태의 출력

            # 5개 클래스 중 가장 확률이 높은 인덱스(0~4)를 선택
            _, preds = torch.max(outputs, 1)

            all_preds.extend(preds.cpu().numpy())
            all_ids.extend(img_names)


    # 4. 데이터프레임 생성 (id 컬럼 + 120개 품종 컬럼)
    submission = pd.DataFrame({
        'image_id': all_ids,
        'label': all_preds
    })

    submission.to_csv(save_path, index=False)
    print(f"제출 파일(확률 포함)이 저장되었습니다: {save_path}")

# --- 실행 부분 ---

# 1. 테스트 데이터 압축 해제 (이미 하셨다면 생략)
test_zip_path = '/content/drive/MyDrive/Colab Notebooks/[CNN]Cassava Leaf Disease Classification/test.zip'
with zipfile.ZipFile(test_zip_path, 'r') as z:
    z.extractall('/content/dataset/test_data')

# 2. 테스트 로더 설정 (test_transform 사용)
test_dataset = LeafTestDataset(root_dir='/content/dataset/test_data', transform=test_transform)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)

# 4. 제출 파일 생성
generate_submission(final_model, test_loader, '/content/drive/MyDrive/Colab Notebooks/[CNN]Cassava Leaf Disease Classification/submission.csv')