### 모델 임포트

In [None]:
import json
from shutil import copy2
import pandas as pd
import numpy as np
from tqdm import tqdm
from glob import glob
from typing import *
from IPython.display import Image as IPImage
from sklearn.model_selection import train_test_split
import random
import os
from PIL import Image as Image
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, models, transforms
from torch.utils.data import DataLoader, Dataset
from torch.utils.data import ConcatDataset
from sklearn.metrics import confusion_matrix, accuracy_score, f1_score
import seaborn as sns
import matplotlib.pyplot as plt
from torchvision.transforms import functional as F

import timm  # PyTorch Image Models 라이브러리 설치 필요: pip install timm


## 이미지 분류 작업

In [None]:
# import json
# import os
# from shutil import copy2

def classify_and_modify(json_dir, image_dir, output_dir):
    """
    JSON 데이터를 기반으로 이미지를 분류하고, 수정된 JSON 데이터를 저장합니다.

    Args:
        json_dir (str): JSON 파일이 저장된 디렉토리 경로.
        image_dir (str): 이미지 파일이 저장된 디렉토리 경로.
        output_dir (str): 결과 데이터를 저장할 디렉토리 경로.
    """
    # 분류된 이미지와 수정된 JSON 파일을 저장할 디렉토리 설정
    data_image_dir = os.path.join(output_dir, "Data_image")
    data_label_dir = os.path.join(output_dir, "Data_label")
    os.makedirs(data_image_dir, exist_ok=True)  # 이미지 저장 폴더 생성
    os.makedirs(data_label_dir, exist_ok=True)  # JSON 저장 폴더 생성

    # JSON 디렉토리 내 파일 탐색
    for root, _, files in os.walk(json_dir):
        for file in files:
            if file.endswith(".json"):  # JSON 파일만 처리
                json_path = os.path.join(root, file)  # JSON 파일의 전체 경로 생성
                # print(f"Processing JSON file: {json_path}")  # 실행 여부 확인용 로그

                # JSON 파일 읽기 (UTF-8 인코딩)
                with open(json_path, "r", encoding="utf-8") as f:
                    data = json.load(f)

                # JSON에서 이미지 파일 이름과 object_included 값을 추출
                image_name = data["image"]["name"]
                object_included = data["image"]["object_included"]
                # print(f"Image name: {image_name}, Object included: {object_included}")  # 실행 여부 확인용 로그

                # object_included 값에 따라 카테고리 분류
                category = "with_crack" if object_included == "Y" else "no_crack"
                category_dir = os.path.join(data_image_dir, category)
                os.makedirs(category_dir, exist_ok=True)  # 카테고리 폴더 생성

                # 하위 폴더 탐색 함수
                def find_image_in_subfolders(image_dir, image_name):
                    for root, _, files in os.walk(image_dir):
                        if image_name in files:
                            return os.path.join(root, image_name)
                    return None

                # 이미지 파일 복사
                image_path = find_image_in_subfolders(image_dir, image_name) # 이미지 경로 생성
                if image_path:  # 이미지 파일이 존재하는 경우
                    copy2(image_path, category_dir)  # 이미지를 해당 카테고리 폴더로 복사
                    # print(f"Copied image to {category_dir}")  # 실행 여부 확인용 로그
                else:
                    # 이미지 파일이 존재하지 않을 경우 경로 출력
                    print(f"실패 이미지 없음: {image_path}")  # 오류 확인용 로그

                # 수정된 JSON 데이터 생성
                modified_data = {
                    "image": {
                        "name": data["image"].get("name", ""),  # "name" 값을 포함
                        "object_included": object_included,  # "object_included" 값 포함
                        "labels": []  # "labels" 초기화
                    }
                }
                # "annotations" 리스트 처리
                for annotation in data["image"].get("annotations", []):
                    if annotation["labelNum"] == 0:  # "labelNum" 값이 0인 경우만 처리
                        modified_data["image"]["labels"].append({
                            "label": annotation["label"],  # "label" 값 추가
                            "points": annotation["points"]  # "points" 값 추가
                        })

                # 수정된 JSON 데이터를 저장할 경로 설정
                modified_json_path = os.path.join(data_label_dir, file)
                with open(modified_json_path, "w", encoding="utf-8") as f:
                    json.dump(modified_data, f, indent=4, ensure_ascii=False)  # 수정된 JSON 저장
                    print(f"Saved modified JSON to {modified_json_path}")  # 저장 완료 로그

# 코드 실행을 위한 입력 경로와 출력 경로 설정
json_dir = r"E:\Desktop\Data_preprocessing\Label"  # JSON 파일 경로
image_dir = r"E:\Desktop\Data_preprocessing\Data"  # 이미지 파일 경로
output_dir = r"E:\Desktop\Data_preprocessing"  # 결과 데이터 저장 경로

# 함수 호출
classify_and_modify(json_dir, image_dir, output_dir)

# 디렉토리 내용 확인
print(f"JSON directory contents: {os.listdir(json_dir)}")  # JSON 파일 디렉토리 내용 출력
print(f"Image directory contents: {os.listdir(image_dir)}")  # 이미지 파일 디렉토리 내용 출력


### 이미지 Crop 작업

In [None]:
# import os
# import cv2
# import json
# import numpy as np

# 원본 이미지가 있는 폴더 경로
with_crack_dir = r'E:\Desktop\Data_preprocessing\Data_image\with_crack'

# 크롭된 이미지를 저장할 폴더 경로
with_crack_crop_dir = r'E:\Desktop\Data_preprocessing\Data_image\with_crack_crop'

# JSON 파일들이 위치한 폴더 경로
json_folder = r'E:\Desktop\Data_preprocessing\Data_label'

# 크롭된 이미지를 저장할 폴더 생성 (폴더가 없으면 생성)
os.makedirs(with_crack_crop_dir, exist_ok=True)

# JSON 폴더에서 모든 JSON 파일을 읽음
json_files = [f for f in os.listdir(json_folder) if f.endswith('.json')]

# 모든 JSON 파일 처리
for json_file in json_files:
    json_path = os.path.join(json_folder, json_file)  # JSON 파일의 전체 경로

    # JSON 파일 읽기
    with open(json_path, 'r') as f:
        points_data = json.load(f)  # JSON 데이터 읽기

    # 이미지 이름 가져오기
    image_name = points_data["image"].get("name", None)  # "image" 섹션에서 "name" 값 읽기
    if image_name is None:
        print(f"JSON 파일에 이미지 이름이 없습니다: {json_file}")
        continue

    # 이미지 경로 생성
    image_path = os.path.join(with_crack_dir, image_name)
    print(f"처리 중인 이미지 경로: {image_path}")  # 경로 출력

    # 이미지 읽기
    image = cv2.imread(image_path)
    if image is None:
        print(f"이미지를 불러오지 못했습니다: {image_name}")
        continue

     # JSON에서 "labels" 필드 처리
    labels = points_data["image"].get("labels", [])
    for label in labels:
        points = label.get("points", [])
        if not points:
            print(f"Label에 points 데이터가 없습니다: {label}")
            continue
        # 중심점 계산
        points = np.array(points)  # 포인트 데이터를 numpy 배열로 변환
        center_x, center_y = points.mean(axis=0).astype(int)  # 포인트의 평균값으로 중심점 계산

        # 크롭 영역 계산
        crop_size = 640  # 크롭할 이미지 크기 (640x640)
        half_size = crop_size // 2  # 크롭 크기의 절반값
        x_start = max(center_x - half_size, 0)  # 크롭 시작 x 좌표 (0 이하로 내려가지 않음)
        y_start = max(center_y - half_size, 0)  # 크롭 시작 y 좌표 (0 이하로 내려가지 않음)
        x_end = x_start + crop_size  # 크롭 끝 x 좌표
        y_end = y_start + crop_size  # 크롭 끝 y 좌표

        # 이미지 경계를 초과하지 않도록 좌표를 조정
        x_start = max(0, min(x_start, image.shape[1] - crop_size))  # x 좌표 조정
        y_start = max(0, min(y_start, image.shape[0] - crop_size))  # y 좌표 조정
        x_end = x_start + crop_size  # 조정된 끝 x 좌표
        y_end = y_start + crop_size  # 조정된 끝 y 좌표
        
        print(f"크롭 좌표: x_start={x_start}, y_start={y_start}, x_end={x_end}, y_end={y_end}")

        # 이미지 크롭
        cropped_image = image[y_start:y_end, x_start:x_end]  # 이미지 배열에서 크롭 영역 추출

        # 크롭된 이미지 저장
        crop_file_name = f"{os.path.splitext(image_name)[0]}_crop_{label['label']}.jpg"
        save_path = os.path.join(with_crack_crop_dir, image_name)
        cv2.imwrite(save_path, cropped_image)  # 크롭된 이미지를 저장
        print(f"{crop_file_name} 크롭 완료 및 저장: {save_path}")  # 처리 완료 메시지 출력


### json 데이터 내용을 수정 및 Data set 생성

In [None]:
# import os
# import json
# import cv2
# import pandas as pd

# 이미지 데이터와 라벨 JSON 파일 경로
image_dir = r"E:\Desktop\Data_preprocessing\Data_image\with_crack_crop"
label_dir = r"E:\Desktop\Data_preprocessing\Data_label"

# 결과 저장 경로
output_csv_path = r"E:\Desktop\Data_preprocessing\dataset.csv"

# JSON 폴더에서 모든 JSON 파일을 읽음
json_files = [f for f in os.listdir(label_dir) if f.endswith('.json')]

# 데이터셋을 저장할 리스트
dataset = []

# 모든 JSON 파일 처리
for json_file in json_files:
    json_path = os.path.join(label_dir, json_file)  # JSON 파일의 전체 경로

    # JSON 파일 읽기
    with open(json_path, 'r') as f:
        label_data = json.load(f)  # JSON에서 데이터 읽기

    # 이미지 이름 가져오기
    image_name = label_data["image"].get("name", None)
    if image_name is None:
        print(f"JSON 파일에 이미지 이름이 없습니다: {json_file}")
        continue

    # 이미지 경로 생성
    image_path = os.path.join(image_dir, image_name)
    if not os.path.exists(image_path):
        print(f"이미지 파일이 없습니다: {image_path}")
        continue

    # 라벨 값 가져오기
    labels = label_data["image"].get("labels", [])
    for label in labels:
        label_value = label.get("label", None)
        if label_value is None:
            print(f"Label 데이터가 없습니다: {label}")
            continue

        # 데이터셋에 추가
        dataset.append({
            "image_path": image_path,
            "label": label_value
        })

# 데이터셋을 Pandas DataFrame으로 변환
df = pd.DataFrame(dataset)

# 데이터셋을 CSV 파일로 저장
df.to_csv(output_csv_path, index=False)
print(f"데이터셋 생성 완료: {output_csv_path}")


### 데이터셋 이미지 확인

In [None]:
sampled_data = df.sample(10)

# 이미지 경로와 라벨 추출
image_paths = sampled_data['image_path'].tolist()
labels = sampled_data['label'].tolist()

# 이미지를 열고 플롯 준비
images = [Image.open(img_path) for img_path in image_paths]

# 플롯 설정
fig, axes = plt.subplots(2, 5, figsize=(20, 10))  # 2x5 격자
axes = axes.flatten()

# 각 격자에 이미지와 라벨 표시
for img, label, ax in zip(images, labels, axes):
    ax.imshow(img)
    ax.axis('off')
    ax.set_title(label, fontsize=10)

plt.tight_layout()
plt.show()

### Data EDA

#### 데이터 불균형 맞추기

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import os
from PIL import Image
import pandas as pd
from torchvision import transforms
import random
from sklearn.model_selection import train_test_split


# # CSV 데이터 읽기
# df = pd.read_csv(csv_path)

# 라벨별 개수 계산
label_counts = df['label'].value_counts()

# 막대그래프 시각화
plt.figure(figsize=(10, 6))
label_counts.sort_index().plot(kind='bar')  # 라벨 순서대로 정렬하여 막대그래프 생성
plt.title("Number of Images per Label", fontsize=16)
plt.xlabel("Label", fontsize=14)
plt.ylabel("Number of Images", fontsize=14)
plt.xticks(rotation=0)
plt.grid(axis='y', linestyle='--', alpha=0.7)
plt.show()

In [None]:
# 이미지 감소 
image_crack = df[(df['label']==0)]
image_crack_reduce = image_crack.sample(frac=0.4,random_state=50)
image_other = df[(df['label']!=0)]
image_reduce = pd.concat([image_crack_reduce,image_other])
df_reduce = image_reduce

## 모델 설계

#### dataset csv image_path경로 , label 인코딩

In [None]:
csv_path = 'E:\Desktop\Data_preprocessing\dataset.csv'
df =  pd.read_csv(csv_path)

# dataset csv 경로 변경
# df['image_path'] = df['image_path'].apply(lambda x:os.path.join('/content/Data_image/with_crack_crop',x.split('\\')[-1]))

# 라벨 값 맵핑

# 1.라벨 고유값 확인
print(df['label'].unique())

# 2.라벨 값을 숫자로 변환하기 위해 매핑 생성
label_to_number = {label: idx for idx, label in enumerate(df['label'].unique())}
print("\n라벨 매핑:")
print(label_to_number)

# 3.숫자로 변환
df['label'] = df['label'].map(label_to_number)

# 4.변환된 데이터 확인
print("\n숫자로 변환된 데이터:")
print(df.head())

0 : Crack (균열)
1 : Reticular Crack (망상 균열)
2 : Spalling (스폴링)
3 : Detachment (박리)
4 : Material Separation (재료 분리)
5 : Exhilaration (팽창)
6 : Damage (손상)
7 : Rebar (철근)
8 : Efflorescence (백태)
9 : Leak (누수)

#### Train / Valid 데이터 셋 분리

In [None]:
train_df, valid_df = train_test_split(
     df, test_size=0.2,
     random_state=0,
     shuffle=True,
     stratify=df["label"],
)

# Reduced data set

# train_df, valid_df = train_test_split(
#     df_reduce,
#     test_size=0.2,
#     random_state=0,
#     shuffle=True,
#     stratify=df_reduce["label"],
# )

#### DataSet, DataLoader 생성

In [None]:

# 사용자 정의 데이터셋 클래스 생성 (CustomDataset)
class CustomDataset(Dataset):
    def __init__(self, dataframe, transform=None):
        # 데이터셋 초기화
        # dataframe: 이미지 경로와 레이블이 포함된 데이터프레임
        # transform: 이미지에 적용할 전처리(transform) 함수
        self.dataframe = dataframe
        self.transform = transform

    def __len__(self):
        # 데이터셋의 총 샘플 수 반환
        return len(self.dataframe)

    def __getitem__(self, idx):
        # 주어진 인덱스(idx)에 해당하는 샘플을 반환
        # 이미지 경로를 데이터프레임에서 가져옴
        image_path = self.dataframe.iloc[idx]['image_path']
        # 레이블 정보를 정수형으로 가져옴
        label = int(self.dataframe.iloc[idx, 1])

        # 이미지 파일을 열고 RGB 모드로 변환
        img = Image.open(image_path).convert("RGB")  # 이미지를 RGB로 로드


        # transform이 지정되어 있다면 이미지에 전처리를 적용
        if self.transform:
            img = self.transform(img)

        # 이미지와 레이블을 반환
        return img, label
    

# Dataset 및 DataLoader 생성
train_dataset = CustomDataset(train_df, transform=train_transform)
valid_dataset = CustomDataset(valid_df, transform=valid_transform)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=32, shuffle=False)

#### AugmentedDataset 클래스

In [None]:
# 특정 레이블을 가진 데이터만 포함하는 AugmentedDataset 클래스 생성
class AugmentedDataset(Dataset):
    def __init__(self, dataframe, transform=None):
        # 데이터프레임에서 특정 레이블(5, 6, 7, 8)만 필터링하여 데이터셋 생성
        self.dataframe = dataframe[dataframe['label'].isin([5, 6, 7, 8])]
        self.transform = transform
        self.repeat = 5 # aug 증가량 조절절

    def __len__(self):
        # 데이터셋의 총 샘플 수 반환
        return len(self.dataframe) * self.repeat

    def __getitem__(self, idx):
          # 반복된 데이터셋 인덱스를 원본 인덱스로 변환
        original_idx = idx % len(self.dataframe)
        
        # 주어진 인덱스(idx)에 해당하는 샘플을 반환
        # 이미지 경로를 데이터프레임에서 가져옴
        img_name = self.dataframe.iloc[original_idx]['image_path']
        # 이미지 파일을 열고 RGB 모드로 변환
        img = Image.open(img_name).convert('RGB')
        # 레이블 정보를 정수형으로 가져옴
        label = int(self.dataframe.iloc[original_idx]['label'])

        # transform이 지정되어 있다면 이미지에 전처리를 적용
        if self.transform:
            img = self.transform(img)

        # 이미지와 레이블을 반환
        return img, label

#### 전처리 파이프라인

In [None]:
# 전처리 파이프라인 정의
train_transform = transforms.Compose([
    transforms.Resize((224, 224)),  # 이미지 크기 조정
    transforms.ToTensor(),  # 텐서로 변환 및 자동으로 픽셀 값 /255 정규화
    transforms.Normalize(mean=[0.485, 0.456, 0.406], # 이미지넷 기준
                        std=[0.229, 0.224, 0.225])
])

# 데이터 증강을 위한 transform 정의
augment_transform = transforms.Compose([
    transforms.Resize((224, 224)),  # 이미지를 224x224로 리사이즈
    transforms.RandomRotation(90),  # 이미지를 -90도에서 90도 사이로 랜덤 회전
    transforms.RandomHorizontalFlip(p=0.5), # 이미지 반전
    transforms.ToTensor(),  # 이미지를 텐서로 변환
    transforms.Normalize(mean=[0.485, 0.456, 0.406], # 이미지넷 기준
                        std=[0.229, 0.224, 0.225]),  # 채널별 평균과 표준편차로 정규화
])

valid_transform = transforms.Compose([
    transforms.Resize((224, 224)),  # 검증 데이터 크기 조정
    transforms.ToTensor(),  # 텐서로 변환 및 자동 /255 정규화
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                        std=[0.229, 0.224, 0.225])
])

# Dataset 및 DataLoader 생성
#train set
    # CustomDataset 인스턴스 생성 (기본 학습 데이터셋)
train_dataset = CustomDataset(train_df, transform=train_transform)
    # AugmentedDataset 인스턴스 생성 (증강된 학습 데이터셋)
augmented_dataset = AugmentedDataset(train_df, transform=augment_transform)
    # 기본 학습 데이터셋과 증강된 데이터셋을 결합하여 하나의 데이터셋으로 생성
combined_train_dataset = ConcatDataset([train_dataset, augmented_dataset])

#Valid set
valid_dataset = CustomDataset(valid_df, transform=valid_transform)

#Loader
train_loader = DataLoader(combined_train_dataset, batch_size=32, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=32, shuffle=False)

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class CustomCNN(nn.Module):
    def __init__(self, num_classes=10):
        super(CustomCNN, self).__init__()
        
        # Convolutional Block 1
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2, padding=1)
        self.dropout1 = nn.Dropout(0.3)
        
        # Convolutional Block 2
        self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(64)
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2, padding=1)
        self.dropout2 = nn.Dropout(0.3)
        
        # Convolutional Block 3
        self.conv3 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm2d(128)
        self.pool3 = nn.MaxPool2d(kernel_size=2, stride=2, padding=1)
        self.dropout3 = nn.Dropout(0.3)
        
        # Fully Connected Layers
        self.fc1 = nn.Linear(128 * 28 * 28, 256)  # Input size depends on the input image dimensions
        self.fc2 = nn.Linear(256, 128)
        self.fc3 = nn.Linear(128, num_classes)

    def forward(self, x):
        # Convolutional Blocks
        x = self.pool1(F.relu(self.bn1(self.conv1(x))))
        x = self.dropout1(x)
        
        x = self.pool2(F.relu(self.bn2(self.conv2(x))))
        x = self.dropout2(x)
        
        x = self.pool3(F.relu(self.bn3(self.conv3(x))))
        x = self.dropout3(x)
        
        # Flatten
        x = torch.flatten(x, 1)  # Flatten all dimensions except batch
        
        # Fully Connected Layers
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        
        return x


### ResNet 모델 학습

In [None]:
# 사전 학습된 ResNet18 모델 불러오기
model = models.resnet18(pretrained=True)

# 모델의 마지막 완전 연결 계층(fc)을 사용자 정의 계층으로 대체
# 이 계층은 in_features에서 10개의 출력으로 매핑합니다 (10개 클래스 분류를 위함)
model.fc = nn.Sequential(
    nn.Linear(model.fc.in_features, 10),
)

# 교차 엔트로피 손실 함수 초기화
criterion = nn.CrossEntropyLoss()
# 최적화 알고리즘으로 Adam 사용
optimizer = torch.optim.Adam(model.parameters(), lr=1e-5, weight_decay=1e-5)

# 베스트 가중치 로드 
# model.load_state_dict(torch.load('/content/best_model_before(res).pth'))  

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

### Xception 모델 학습

In [None]:
# 사전 학습된 Xception 모델 불러오기
model = timm.create_model('xception', pretrained=True)

# 모델의 마지막 계층 수정 (출력 클래스 수에 맞게 조정)
model.fc = nn.Linear(
    model.num_features, 10)  # 10개의 클래스 분류

# 교차 엔트로피 손실 함수 초기화
criterion = nn.CrossEntropyLoss()

# 최적화 알고리즘으로 Adam 사용
optimizer = torch.optim.Adam(model.parameters(), lr=1e-5, weight_decay=1e-5)

# GPU 또는 CPU 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

### 모델 학습

In [None]:
# 학습 파라미터 설정
num_epochs =50   # 학습 반복 횟수
best_val_acc = 0.0  # 최상의 검증 정확도를 저장하는 변수
patience = 5  # 얼리 스탑핑을 위한 patience 설정 (개선되지 않은 횟수)
no_improve = 0  # 개선되지 않은 에포크 수를 카운트하는 변수

# 훈련 및 검증 손실을 추적하기 위한 리스트
train_acces = []
valid_acces = []
train_losses = []
valid_losses = []

for epoch in range(num_epochs):

    # 모델을 학습 모드로 전환
    model.train()
    running_loss = 0.0  # 에포크 동안의 총 손실을 저장하는 변수
    correct_train = 0  # 정확하게 예측한 학습 데이터의 개수를 저장하는 변수
    total_train = 0  # 총 학습 데이터 개수를 저장하는 변수

    # 학습 데이터를 반복하여 모델 업데이트
    for inputs, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}", dynamic_ncols=True):
        # 데이터를 장치로 이동 (GPU 또는 CPU)
        inputs, labels = inputs.to(device), labels.to(device)

        # 옵티마이저의 기울기 초기화
        optimizer.zero_grad()
        # 모델에 입력을 전달하여 예측값 출력
        outputs = model(inputs)
        # 손실 함수 계산
        loss = criterion(outputs, labels)
        # 손실에 대한 역전파 수행 (기울기 계산)
        loss.backward()
        # 옵티마이저를 통해 모델 파라미터 업데이
        optimizer.step()
        # 현재 배치의 손실을 누적
        running_loss += loss.item()

        # 정확도 계산을 위한 예측값 처리
        _, predicted = torch.max(outputs.data, 1) # 최대값을 가지는 클래스 예측
        total_train += labels.size(0) # 총 학습 데이터 수 누적
        correct_train += (predicted == labels).sum().item() # 맞춘 예측의 개수 누적

    # 에포크별 학습 정확도와 손실 계산
    train_acc = correct_train / total_train
    train_loss = running_loss / len(train_loader)

    # Validate
    model.eval()  # 모델을 평가 모드로 전환 (드롭아웃, 배치 정규화 등 비활성화)
    running_val_loss = 0.0  # 검증 손실을 저장하는 변수
    correct_val = 0  # 정확하게 예측한 검증 데이터의 개수를 저장하는 변수
    total_val = 0  # 총 검증 데이터 개수를 저장하는 변수
    with torch.no_grad(): # 검증 시에는 기울기를 계산하지 않음 (메모리 및 계산량 절약)
        for inputs, labels in valid_loader:
            # 데이터를 장치로 이동
            inputs, labels = inputs.to(device), labels.to(device)
            # 모델에 입력을 전달하여 예측값 출력
            outputs = model(inputs)
            # 손실 함수 계산
            loss = criterion(outputs, labels)
            # 현재 배치의 손실을 누적
            running_val_loss += loss.item()

            # 정확도 계산을 위한 예측값 처리
            _, predicted = torch.max(outputs.data, 1)  # 최대값을 가지는 클래스 예측
            total_val += labels.size(0)  # 총 검증 데이터 수 누적
            correct_val += (predicted == labels).sum().item()  # 맞춘 예측의 개수 누적

    # 에포크별 검증 정확도와 손실 계산
    val_acc = correct_val / total_val  # 검증 정확도 계산
    val_loss = running_val_loss / len(valid_loader)  # 검증 손실 평균 계산
    
    # 정확도 & 손실 기록
    train_acces.append(train_acc)
    valid_acces.append(val_acc)
    train_losses.append(train_loss)
    valid_losses.append(val_loss)

    # 학습 및 검증 결과 출력
    print(f'Train Loss: {train_loss:.4f}, Train Accuracy: {train_acc:.4f}, '
          f'Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_acc:.4f}')

     # 최상의 검증 정확도를 기록하고 모델 저장
    if val_acc > best_val_acc:
        best_val_acc = val_acc
        save_dir = 'E:\Desktop\Data_preprocessing\BEST_model'
        os.makedirs(save_dir, exist_ok=True)  # 경로가 없으면 생성
        torch.save(model.state_dict(), os.path.join(save_dir, 'best_model_before.pth'))
        no_improve = 0
        print(f"Epoch {epoch+1}: val_acc={val_acc:.4f}, best_val_acc={best_val_acc:.4f}")

    else:
        no_improve += 1
        if no_improve >= patience:  # 설정한 얼리 스타핑 patience에 도달하면 학습을 중단합니다.
            print(f"Epoch {epoch+1}: no_improve={no_improve}, patience={patience}")
            print("Early stopping")
            break

print('Finished Training')

### 결과 그래프 출력

In [None]:
# 그래프 그리기
actual_epochs = len(train_losses)

fig, axes = plt.subplots(1, 2, figsize=(16,8))

#Loss 그래프
axes[0].plot(range(1, actual_epochs + 1), train_losses, label='Train Loss', color='blue') # Train Loss 그래프
axes[0].plot(range(1, actual_epochs + 1), valid_losses, label='Validation Loss', color='orange') # Validation Loss 그래프
axes[0].set_xlabel('Epoch')
axes[0].set_ylabel('Loss')
axes[0].set_title('Training and Validation Loss')
axes[0].legend()

#acc 그래프
axes[1].plot(range(1, actual_epochs + 1), train_acces, label='Train Accuracy', color='blue') # Train Accuracy 그래프
axes[1].plot(range(1, actual_epochs + 1), valid_acces, label='Validation Accuracy', color='orange') # Validation Accuracy 그래프
axes[1].set_xlabel('Epoch')
axes[1].set_ylabel('Accuracy')
axes[1].set_title('Training and Validation Accuracy')
axes[1].legend()


# 그래프 출력
plt.tight_layout()
plt.show()


### 혼동 행렬로 시각화

In [None]:
# Validate
all_labels = []
all_predictions = []
with torch.no_grad():
    for inputs, labels in valid_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        _, predicted = torch.max(outputs.data, 1)

        all_labels.append(labels.cpu().numpy())
        all_predictions.append(predicted.cpu().numpy())

all_labels = np.concatenate(all_labels)
all_predictions = np.concatenate(all_predictions)

conf_mat = confusion_matrix(all_labels, all_predictions)
conf_mat_normalized = conf_mat.astype('float') / conf_mat.sum(axis=1)[:, np.newaxis]

#그래프 출력
plt.figure(figsize=(8, 8))
sns.heatmap(conf_mat_normalized, annot=True, cmap='Blues')
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Normalized Confusion Matrix')
plt.show()

## best model load & Test

In [None]:
model.load_state_dict(torch.load('/content/best_model_before.pth'))
model.eval()