In [1]:
KAGGLE_UPLOAD = False

if KAGGLE_UPLOAD:
    # /content/ 폴더에 kaggle.json을 업로드 (매번 colab노트북 생성시 반복)
    from google.colab import files
    files.upload()
else:
    kaggle_username = 'ventureparker'
    from google.colab import userdata
    kaggle_key = userdata.get(kaggle_username)

In [2]:
import json
import os

def create_kaggle_json(username, key, file_path):
    # 데이터 생성
    kaggle_data = {
        "username": username,
        "key": key
    }

    # JSON 파일로 저장
    with open(file_path, 'w') as json_file:
        json.dump(kaggle_data, json_file)

    print(f"kaggle.json 파일이 '{os.path.abspath(file_path)}' 경로에 생성되었습니다.")



# kaggle.json 파일 생성 함수 호출
if KAGGLE_UPLOAD==False:
    # 사용자 정보 입력
    key = kaggle_key  # 여기에 key값을 입력하세요
    filename = 'kaggle.json'
    create_kaggle_json(kaggle_username, key, filename)

kaggle.json 파일이 '/content/kaggle.json' 경로에 생성되었습니다.


In [3]:
!cat kaggle.json

{"username": "ventureparker", "key": "dd42a6316bbc050370e11511e7a91e9e"}

In [4]:
import os
import shutil

os.makedirs('/root/.kaggle', exist_ok=True)
path = os.getcwd()

# 파일을 복사하는데, 이미 복사된 경우 복사하지 않는다.
if not os.path.exists(os.path.join('/root/.kaggle','kaggle.json')):
    src = os.path.join(path,'kaggle.json')
    dst = os.path.join('/root/.kaggle', 'kaggle.json')
    shutil.copy(src,dst)
    print('파일 복사 완료')

파일 복사 완료


In [5]:
!cat /root/.kaggle/kaggle.json

{"username": "ventureparker", "key": "dd42a6316bbc050370e11511e7a91e9e"}

In [6]:
!chmod 600 /root/.kaggle/kaggle.json
!kaggle datasets download -d puneet6060/intel-image-classification

Dataset URL: https://www.kaggle.com/datasets/puneet6060/intel-image-classification
License(s): copyright-authors
Downloading intel-image-classification.zip to /content
100% 346M/346M [00:06<00:00, 29.0MB/s]
100% 346M/346M [00:06<00:00, 53.0MB/s]


In [7]:
!unzip -qq intel-image-classification.zip -d /content/dataset/

In [8]:
!ls dataset

seg_pred  seg_test  seg_train


In [9]:
# train데이터를 8:2로 나눠서 train:valid
# pred데이터를 test데이터로 사용

In [10]:
import os

dataPath = '/content/dataset/'
trainPath = os.path.join(dataPath, 'seg_train/seg_train/')
testPath = os.path.join(dataPath, 'seg_test/seg_test/')

In [11]:
folderList = os.listdir(trainPath)
for folder in folderList:
    currentDir = os.path.join(trainPath,folder)
    print(len(os.listdir(currentDir)))

2382
2271
2274
2191
2404
2512


In [12]:
import random

train_ratio = 0.8
valid_ratio = 0.2

# validPath를 지정
validPath = os.path.join(dataPath,'seg_valid')

# 모든 폴더 8:2 나누는 것을 구현
import shutil

if not os.path.exists(validPath):
    # validPath 폴더를 생성
    os.makedirs(validPath, exist_ok=True)

    folderList = os.listdir(trainPath)
    for folder in folderList:
        currentDir = os.path.join(trainPath,folder)
        # 현재 폴더의 파일 목록, 갯수
        fileList = os.listdir(currentDir)
        dataCount = len(fileList)
        validCount = int(dataCount * valid_ratio)
        # validPath의 클래스 폴더를 생성
        os.makedirs(os.path.join(validPath,folder), exist_ok=True)

        # 파일 리스트에서 랜덤하게 validCount만큼 선택해서 validList 생성
        validList = random.sample(fileList, validCount)
        for validFile in validList:
            oldPath = os.path.join(currentDir, validFile)
            newPath = os.path.join(validPath, folder, validFile)
            #print(oldPath, newPath)
            shutil.move(oldPath, newPath)
        # 일단 폴더 하나 테스트
        #break

In [13]:
folderList = os.listdir(validPath)
for folder in folderList:
    currentDir = os.path.join(validPath,folder)
    print(len(os.listdir(currentDir)))

476
454
454
438
480
502


## 데이터가 준비 되었으니 ResNet50.ipynb내용 적용

In [14]:
import torch
import torchvision
import torchvision.models as models
from torchvision.models import ResNet50_Weights
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, random_split
from torchvision.datasets import ImageFolder
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix, f1_score
import numpy as np
import pandas as pd

In [15]:
data_dir = '/content/dataset'

In [16]:
train_transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.RandomResizedCrop(224, scale=(0.8, 1.0)),
    transforms.ColorJitter(brightness=0.1, contrast=0.1),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

test_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

In [17]:
train_data = ImageFolder(root=f"{data_dir}/seg_train/seg_train", transform=train_transform)
test_data = ImageFolder(root=f"{data_dir}/seg_test/seg_test", transform=test_transform)

train_size = int(0.8 * len(train_data))
val_size = len(train_data) - train_size
train_dataset, val_dataset = random_split(train_data, [train_size, val_size])

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_data, batch_size=64, shuffle=False)

In [18]:
class ModifiedResNet50(nn.Module):
    def __init__(self):
        super(ModifiedResNet50, self).__init__()
        self.base_model = models.resnet50(weights=ResNet50_Weights.DEFAULT)
        num_ftrs = self.base_model.fc.in_features
        self.base_model.fc = nn.Linear(num_ftrs, 512)
        self.dropout = nn.Dropout(0.5)
        self.fc = nn.Linear(512, 6)

    def forward(self, x):
        x = self.base_model(x)
        x = self.dropout(x)
        x = self.fc(x)
        return x

In [19]:
model = ModifiedResNet50()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)

results = []

Downloading: "https://download.pytorch.org/models/resnet50-11ad3fa6.pth" to /root/.cache/torch/hub/checkpoints/resnet50-11ad3fa6.pth
100%|██████████| 97.8M/97.8M [00:02<00:00, 50.5MB/s]


In [20]:
from tqdm import tqdm

def train_model(model, criterion, optimizer, num_epochs=10):
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        for inputs, labels in tqdm(train_loader):
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

        val_loss, val_acc, val_f1 = evaluate_model(model, val_loader)
        results.append({
            'epoch': epoch + 1,
            'train_loss': running_loss / len(train_loader),
            'val_loss': val_loss,
            'val_accuracy': val_acc,
            'val_f1_score': val_f1
        })
        print(f"Epoch {epoch+1}, Loss: {running_loss/len(train_loader)}, Val Loss: {val_loss}, Val Accuracy: {val_acc}%, Val F1-Score: {val_f1}")

def evaluate_model(model, data_loader):
    model.eval()
    total = 0
    correct = 0
    loss_total = 0
    all_targets = []
    all_preds = []
    with torch.no_grad():
        for inputs, labels in data_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            loss_total += loss.item()
            all_targets.extend(labels.cpu().numpy())
            all_preds.extend(predicted.cpu().numpy())
    accuracy = 100 * correct / total
    f1 = f1_score(all_targets, all_preds, average='macro')
    return loss_total / len(data_loader), accuracy, f1

In [None]:
# 학습 진행
train_model(model, criterion, optimizer, num_epochs=10)

100%|██████████| 141/141 [02:11<00:00,  1.07it/s]


Epoch 1, Loss: 0.4550134102609141, Val Loss: 0.23220602133207852, Val Accuracy: 92.69813000890471%, Val F1-Score: 0.9259432179769825


 26%|██▌       | 37/141 [00:34<01:39,  1.05it/s]

In [None]:
# 테스트 진행
evaluate_model(model, test_loader)

In [None]:
def plot_confusion_matrix(labels, predictions, class_names):
    cm = confusion_matrix(labels, predictions)
    plt.figure(figsize=(10, 7))
    sns.heatmap(cm, annot=True, fmt='d', xticklabels=class_names, yticklabels=class_names)
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.show()

_, predictions, labels = evaluate_model(model, test_loader)
plot_confusion_matrix(labels, predictions, train_data.classes)