<a href="https://colab.research.google.com/github/kbleejohn/twitteR/blob/master/f105_efficientnet_right_bottom_basicmodel_v0_1_upgraded.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install efficientnet_pytorch

Collecting efficientnet_pytorch
  Downloading efficientnet_pytorch-0.7.1.tar.gz (21 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: efficientnet_pytorch
  Building wheel for efficientnet_pytorch (setup.py) ... [?25l[?25hdone
  Created wheel for efficientnet_pytorch: filename=efficientnet_pytorch-0.7.1-py3-none-any.whl size=16428 sha256=a3913014b57dbfe28fc49ffda8835f3a452dc603e1873253f2b664a08d4d6e6d
  Stored in directory: /root/.cache/pip/wheels/03/3f/e9/911b1bc46869644912bda90a56bcf7b960f20b5187feea3baf
Successfully built efficientnet_pytorch
Installing collected packages: efficientnet_pytorch
Successfully installed efficientnet_pytorch-0.7.1


In [None]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader, random_split
from torchvision import models
from efficientnet_pytorch import EfficientNet

In [None]:
# 데이터셋 디렉토리 경로 지정
dataset_dir = "/content/drive/MyDrive/Colab Notebooks/_0_0_Central_Inspection Image/F105_Class/Right_Bottom_Small"

In [None]:
# 불량 부분을 강조하기 위한 데이터 증강 및 전처리
transform = transforms.Compose([
    transforms.RandomRotation(10),
    transforms.RandomHorizontalFlip(),
    transforms.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1, hue=0.1),
    # 불량 영역을 강조하는 커스텀 데이터 증강
    transforms.RandomApply([transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0, hue=0)], p=0.3),
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # ImageNet 정규화 값 사용
])

In [None]:
# 전처리된 데이터셋을 저장하고 로드하는 기능 추가
import pickle
preprocessed_data_path = "/content/drive/MyDrive/Colab Notebooks/_0_0_Central_Inspection Image/F105_Class/RB_Basic_preprocessed_data.pkl"

In [None]:
# 이전에 전처리된 데이터가 있는지 확인하고 있으면 로드
if os.path.exists(preprocessed_data_path):
    with open(preprocessed_data_path, 'rb') as f:
        dataset = pickle.load(f)
else:
    # ImageFolder를 사용하여 데이터셋 불러오기
    dataset = ImageFolder(dataset_dir, transform=transform)

    # 전처리된 데이터셋을 저장
    with open(preprocessed_data_path, 'wb') as f:
        pickle.dump(dataset, f)

In [None]:
# ImageFolder를 사용하여 데이터셋 불러오기
dataset = ImageFolder(dataset_dir, transform=transform)

In [None]:
# Access the class names
class_names = dataset.classes
print("Class names:", class_names)

Class names: ['aNormal', 'bDefective']


In [None]:
# Access the class-to-index mapping
class_to_idx = dataset.class_to_idx
print("Class to index mapping:", class_to_idx)

Class to index mapping: {'aNormal': 0, 'bDefective': 1}


In [None]:
# 데이터셋을 train, validation, test로 나누기 (8:1:1 비율)
total_size = len(dataset)
train_size = int(0.8 * total_size)
val_size = (total_size - train_size) // 2
test_size = total_size - train_size - val_size

In [None]:
train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])

In [None]:
# DataLoader를 사용하여 데이터 로드
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [None]:
# EfficientNet 모델 불러오기 (B0 버전 사용)
model = EfficientNet.from_pretrained('efficientnet-b7', num_classes=2)

Downloading: "https://github.com/lukemelas/EfficientNet-PyTorch/releases/download/1.0/efficientnet-b7-dcc49843.pth" to /root/.cache/torch/hub/checkpoints/efficientnet-b7-dcc49843.pth
100%|██████████| 254M/254M [00:15<00:00, 17.1MB/s]


Loaded pretrained weights for efficientnet-b7


In [None]:
# 학습을 위해 CUDA를 사용할 수 있다면 CUDA 사용
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

EfficientNet(
  (_conv_stem): Conv2dStaticSamePadding(
    3, 64, kernel_size=(3, 3), stride=(2, 2), bias=False
    (static_padding): ZeroPad2d((0, 1, 0, 1))
  )
  (_bn0): BatchNorm2d(64, eps=0.001, momentum=0.010000000000000009, affine=True, track_running_stats=True)
  (_blocks): ModuleList(
    (0): MBConvBlock(
      (_depthwise_conv): Conv2dStaticSamePadding(
        64, 64, kernel_size=(3, 3), stride=[1, 1], groups=64, bias=False
        (static_padding): ZeroPad2d((1, 1, 1, 1))
      )
      (_bn1): BatchNorm2d(64, eps=0.001, momentum=0.010000000000000009, affine=True, track_running_stats=True)
      (_se_reduce): Conv2dStaticSamePadding(
        64, 16, kernel_size=(1, 1), stride=(1, 1)
        (static_padding): Identity()
      )
      (_se_expand): Conv2dStaticSamePadding(
        16, 64, kernel_size=(1, 1), stride=(1, 1)
        (static_padding): Identity()
      )
      (_project_conv): Conv2dStaticSamePadding(
        64, 32, kernel_size=(1, 1), stride=(1, 1), bias=False
  

class weights  계산에 시간이 너무 많이 걸림으로 생략

In [None]:
# from sklearn.utils.class_weight import compute_class_weight
# import numpy as np

# # 클래스 레이블을 가져와 class_weights를 계산합니다.
# class_labels = [label for _, label in train_dataset]
# class_weights = compute_class_weight('balanced', np.unique(class_labels), class_labels)
# class_weights = torch.tensor(class_weights, dtype=torch.float32).to(device)

# criterion = nn.CrossEntropyLoss(weight=class_weights)

In [None]:
# # print(class_weights)
# class_weights = [0.5]

In [None]:
# 손실 함수와 최적화 기법 정의
# criterion = nn.CrossEntropyLoss(weight=class_weights)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min')

In [None]:
# 모델 학습이 끝난 후 가장 좋은 모델을 저장합니다.
best_val_loss = float('inf')
model_save_path = "saved_model_f105_RB_Basic.pt"

In [None]:
num_epochs = 10

In [None]:
best_val_loss = float('inf')
patience = 5  # 사용자가 설정해야 하는 값
no_improvement_epochs = 0

In [None]:
# 모델 학습이 끝난 후 가장 좋은 모델을 저장합니다.
best_val_loss = float('inf')
model_save_path = "saved_model_f105_RB_Basic.pt"

num_epochs = 10
for epoch in range(num_epochs):          # 학습 과정은 그대로 유지합니다.
    model.train()
    running_loss = 0.0
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

    epoch_loss = running_loss / len(train_loader)
    print(f"Epoch [{epoch + 1}/{num_epochs}] Loss: {epoch_loss:.4f}")


    # 매 epoch마다 검증 데이터셋으로 모델 성능 평가
    model.eval()
    val_loss = 0.0
    y_true, y_pred = [], []
    with torch.no_grad():   # 검증 과정은 그대로 유지합니다.
        for inputs, labels in val_loader:
          inputs, labels = inputs.to(device), labels.to(device)
          outputs = model(inputs)
          _, predicted = torch.max(outputs, 1)
          y_true.extend(labels.cpu().numpy())
          y_pred.extend(predicted.cpu().numpy())


    val_loss /= len(val_loader)

    # 현재 epoch의 검증 손실값이 가장 좋은 경우 모델을 저장
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        # 모델 저장
        torch.save(model.state_dict(), model_save_path)


KeyboardInterrupt: ignored

In [None]:
for epoch in range(num_epochs):
    # ... (기존 학습 루프 코드)

    # Early Stopping
    val_loss = ...  # 여기서 val_loss는 검증 손실을 나타내는 값이어야 합니다.
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        no_improvement_epochs = 0
        # 모델 저장 코드 추가 (가장 좋은 모델 저장)
    else:
        no_improvement_epochs += 1
        if no_improvement_epochs >= patience:
            print("Early stopping due to no improvement in validation loss")
            break
          # 학습 과정은 그대로 유지합니다.
    model.train()
    running_loss = 0.0
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

TypeError: ignored

In [None]:
    epoch_loss = running_loss / len(train_loader)
    print(f"Epoch [{epoch + 1}/{num_epochs}] Loss: {epoch_loss:.4f}")

In [None]:

    # 매 epoch마다 검증 데이터셋으로 모델 성능 평가
    model.eval()
    val_loss = 0.0
    y_true, y_pred = [], []
    with torch.no_grad():   # 검증 과정은 그대로 유지합니다.
        for inputs, labels in val_loader:
          inputs, labels = inputs.to(device), labels.to(device)
          outputs = model(inputs)
          _, predicted = torch.max(outputs, 1)
          y_true.extend(labels.cpu().numpy())
          y_pred.extend(predicted.cpu().numpy())

In [None]:

    val_loss /= len(val_loader)

In [None]:
    # 현재 epoch의 검증 손실값이 가장 좋은 경우 모델을 저장
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        # 모델 저장
        torch.save(model.state_dict(), model_save_path)

In [None]:
# 학습이 끝난 후 가장 성능이 좋았던 모델을 불러올 때는 다음과 같이 사용합니다.
model = model  # YourModel은 모델 클래스명으로 변경해주세요.
model.load_state_dict(torch.load(model_save_path))
model.to(device)

In [None]:
# 테스트 세트에서 defective 데이터에 대한 평가 지표 측정
model.eval()
y_true, y_pred = [], []
with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        _, predicted = torch.max(outputs, 1)
        y_true.extend(labels.cpu().numpy())
        y_pred.extend(predicted.cpu().numpy())

In [None]:
import csv

In [None]:
# # Two example lists to combine
# list1 = [1, 2, 3, 4, 5]
# list2 = ['A', 'B', 'C', 'D', 'E']

In [None]:
# Combine the lists using zip
combined_data = list(zip(y_true, y_pred))

In [None]:
# Define the output CSV file name
output_file = "/content/drive/MyDrive/Colab Notebooks/_0_0_Central_Inspection Image/F105_Class/combined_data.csv"

In [None]:
# Write the combined data to the CSV file
with open(output_file, mode='w', newline='') as file:
    writer = csv.writer(file)
    writer.writerow(['y_true', 'y_pred'])  # Write header row (optional)
    writer.writerows(combined_data)

In [None]:
print(f"Data has been successfully written to {output_file}.")

In [None]:
from sklearn.metrics import recall_score, f1_score
defective_indices = [i for i, true_label in enumerate(y_true) if true_label == 1]
y_true_defective = [y_true[i] for i in defective_indices]
y_pred_defective = [y_pred[i] for i in defective_indices]

In [None]:
recall = recall_score(y_true_defective, y_pred_defective)
f1 = f1_score(y_true_defective, y_pred_defective)

In [None]:
print(f"Defective Data Recall: {recall:.4f}")
print(f"Defective Data F1-score: {f1:.4f}")