## 드라이브 마운트

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## 라이브러리 임포트

In [None]:
# 필요한 라이브러리 import
import os
import json
import datetime
import shutil
import time
import copy
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix, classification_report
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
from PIL import Image
import numpy as np

In [None]:
# 이미지 분류 데이터 전처리 코드

import os
import json
import datetime
import shutil
from sklearn.model_selection import train_test_split

class Config:
    drive_root = '/content/drive/MyDrive/의장공정/데이터/Validation'
    original_data_path = os.path.join(drive_root, '원천데이터')
    label_data_path = os.path.join(drive_root, '라벨링데이터')

    # 작업 디렉토리 설정
    work_dir = '/content/drive/MyDrive/의장공정/데이터/image_classification'
    dataset_dir = os.path.join(work_dir, 'dataset')
    model_dir = os.path.join(work_dir, 'models')
    result_dir = os.path.join(work_dir, 'results')

    # 데이터 분할 비율
    train_ratio = 0.7
    valid_ratio = 0.15
    test_ratio = 0.15

    # 현재 시간
    now = datetime.datetime.now()
    timestamp = now.strftime('%y%m%d_%H%M')

def create_directories():
    """필요한 디렉토리들을 생성합니다."""
    dirs_to_create = [
        Config.work_dir,
        Config.dataset_dir,
        Config.model_dir,
        Config.result_dir,
        os.path.join(Config.dataset_dir, 'train'),
        os.path.join(Config.dataset_dir, 'valid'),
        os.path.join(Config.dataset_dir, 'test')
    ]

    for dir_path in dirs_to_create:
        if not os.path.exists(dir_path):
            os.makedirs(dir_path)
            print(f"Created directory: {dir_path}")

def split_and_copy_dataset(paired_data, class_names):
    """데이터를 train/valid/test로 분할하고 복사합니다."""
    print("데이터셋 분할 및 복사 중...")

    # 클래스별로 데이터 분할
    train_data = []
    valid_data = []
    test_data = []

    # 클래스별로 분할
    for class_name in class_names:
        class_data = [item for item in paired_data if item['label'] == class_name]

        if len(class_data) == 0:
            continue

        # 데이터가 너무 적으면 train에만 할당
        if len(class_data) <= 2:
            train_data.extend(class_data)
            print(f"클래스 {class_name}: train={len(class_data)}, valid=0, test=0 (데이터 부족)")
            continue

        # train/valid/test 분할
        train_class, temp = train_test_split(
            class_data,
            test_size=(Config.valid_ratio + Config.test_ratio),
            random_state=42
        )

        if len(temp) > 1:
            valid_class, test_class = train_test_split(
                temp,
                test_size=Config.test_ratio/(Config.valid_ratio + Config.test_ratio),
                random_state=42
            )
        else:
            valid_class = temp
            test_class = []

        train_data.extend(train_class)
        valid_data.extend(valid_class)
        test_data.extend(test_class)

        print(f"클래스 {class_name}: train={len(train_class)}, valid={len(valid_class)}, test={len(test_class)}")

    # 파일 복사
    datasets = {
        'train': train_data,
        'valid': valid_data,
        'test': test_data
    }

    for split_name, data_list in datasets.items():
        split_dir = os.path.join(Config.dataset_dir, split_name)

        print(f"\n{split_name} 데이터 복사 중... ({len(data_list)}개)")
        for item in data_list:
            src_path = item['image_path']
            dst_path = os.path.join(split_dir, item['image_name'])

            try:
                shutil.copy2(src_path, dst_path)
            except Exception as e:
                print(f"파일 복사 오류: {e}")

    return len(train_data), len(valid_data), len(test_data)

def create_mapping_files_v3():
    """전체 폴더를 탐색해서 JSON 파일 찾고 매핑 생성"""
    dataset_dir = '/content/drive/MyDrive/의장공정/데이터/image_classification/dataset'
    validation_dir = '/content/drive/MyDrive/의장공정/데이터/Validation'

    print("올바른 JSON 경로로 매핑 파일 재생성 시작...")

    # 모든 JSON 파일 찾기
    print("JSON 파일 탐색 중...")
    all_json_files = []
    for root, dirs, files in os.walk(validation_dir):
        for file in files:
            if file.endswith('.json'):
                all_json_files.append(os.path.join(root, file))

    print(f"JSON 파일 발견: {len(all_json_files)}개")

    # JSON 파일명 -> 경로 딕셔너리 생성
    label_dict = {}
    for json_path in all_json_files:
        base_name = os.path.splitext(os.path.basename(json_path))[0]
        label_dict[base_name] = json_path

    print(f"JSON 딕셔너리 생성 완료: {len(label_dict)}개")

    # 각 split별로 매핑 생성
    splits = ['train', 'valid', 'test']

    for split in splits:
        print(f"\n=== {split} 매핑 생성 중 ===")

        split_dir = os.path.join(dataset_dir, split)
        if not os.path.exists(split_dir):
            print(f"{split} 폴더가 없습니다.")
            continue

        # 해당 split의 파일들 가져오기
        image_files = [f for f in os.listdir(split_dir)
                      if f.lower().endswith(('.jpg', '.jpeg', '.png', '.bmp'))]

        print(f"{split} 파일 수: {len(image_files)}")

        # 매핑 딕셔너리 생성
        split_mapping = {}
        matched_count = 0

        # 배치로 처리 (1000개씩)
        batch_size = 1000
        for i in range(0, len(image_files), batch_size):
            batch_files = image_files[i:i+batch_size]

            print(f"  배치 {i//batch_size + 1}: {len(batch_files)}개 처리 중...")

            for image_file in batch_files:
                # 파일명에서 JSON 파일명 추출
                base_name = os.path.splitext(image_file)[0]

                # 해당 JSON 파일이 있는지 확인
                if base_name in label_dict:
                    try:
                        # JSON 파일 읽기
                        with open(label_dict[base_name], 'r', encoding='utf-8-sig') as f:
                            label_data = json.load(f)

                        # 라벨 추출
                        category_name = None
                        quality = None

                        # categories 딕셔너리 생성
                        categories_dict = {}
                        if 'categories' in label_data:
                            for cat in label_data['categories']:
                                categories_dict[cat['id']] = cat['name']

                        # annotations에서 정보 추출
                        if 'annotations' in label_data and len(label_data['annotations']) > 0:
                            annotation = label_data['annotations'][0]

                            if 'category_id' in annotation and annotation['category_id'] in categories_dict:
                                category_name = categories_dict[annotation['category_id']]

                            if 'attributes' in annotation and 'quality' in annotation['attributes']:
                                quality = annotation['attributes']['quality']

                        # 라벨 생성
                        if category_name and quality:
                            label = f"{category_name}_{quality}"
                            split_mapping[image_file] = label
                            matched_count += 1

                    except Exception as e:
                        # JSON 읽기 실패시 무시
                        pass

        print(f"  처리 완료: {len(image_files)}개 중 {matched_count}개 매칭")
        print(f"  매칭률: {matched_count/len(image_files)*100:.1f}%")

        # 매핑 파일을 로컬에 저장 (Drive 용량 절약)
        mapping_file_path = f'/content/{split}_mapping.json'
        with open(mapping_file_path, 'w', encoding='utf-8') as f:
            json.dump(split_mapping, f, ensure_ascii=False, indent=2)

        print(f"  매핑 파일 저장: {mapping_file_path}")

        # 매칭된 클래스들 확인
        matched_classes = set(split_mapping.values())
        print(f"  발견된 클래스 수: {len(matched_classes)}")
        if matched_classes:
            print(f"  클래스 예시: {list(matched_classes)[:3]}")

    print("\n매핑 파일 생성 완료!")

# 실행 함수
def run_preprocessing(paired_data, class_names):
    """전체 전처리 실행"""
    print("=== 데이터 전처리 시작 ===")

    # 1. 디렉토리 생성
    create_directories()

    # 2. 데이터셋 분할 및 복사
    train_count, valid_count, test_count = split_and_copy_dataset(paired_data, class_names)

    # 3. 매핑 파일 생성
    create_mapping_files_v3()

    print(f"\n=== 전처리 완료 ===")
    print(f"Train: {train_count}, Valid: {valid_count}, Test: {test_count}")

    return train_count, valid_count, test_count

# 실행 (이전 EDA에서 나온 paired_data, class_names 필요)
if __name__ == "__main__":
    # paired_data, class_names는 EDA에서 얻어야 함
    train_count, valid_count, test_count = run_preprocessing(paired_data, class_names)

=== 이미지 분류 학습 시작 ===
데이터 로딩 및 분석 중...
원천데이터 이미지 수: 21553
라벨링데이터 파일 수: 21553
이미지-라벨 매칭 중...
라벨링 데이터 구조 분석 중... (샘플 5개)

--- 샘플 1: 204_101_20_1bace470-0fad-4b08-a0f2-f399063a9f6f.json ---
JSON 구조:
{
  "info": {
    "contributor": "미래아이티컨소시엄",
    "date_created": "2021-12-02 08:54:24.241591",
    "name": "부품 품질 검사 영상 데이터(자동차)",
    "description": "부품 품질 검사 영상 데이터(자동차)",
    "version": "1.0",
    "url": "miraeit.net/"
  },
  "images": [
    {
      "license": 1,
      "file_name": "204_101_20_1bace470-0fad-4b08-a0f2-f399063a9f6f.jpg",
      "width": 4000,
      "date_captured": "2021-11-25 13:57:21",
      "id": 1,
      "height": 3000
    }
  ],
  "licenses": [
    {
      "name": "CC BY-N...
주요 키들: ['info', 'images', 'licenses', 'categories', 'annotations']

--- 샘플 2: 204_101_20_1bf6a4a7-ef91-4efb-aee4-965af8326c49.json ---
JSON 구조:
{
  "info": {
    "contributor": "미래아이티컨소시엄",
    "date_created": "2021-12-24 13:52:22.338968",
    "name": "부품 품질 검사 영상 데이터(자동차)",
    "description": "부품 품질

### 훈련 데이터가 없다고 판단한 원인 분석
: 클래스 명과 실제 파일명이 매칭이 안됨

In [None]:
# 실제 클래스명들 확인
train_dir = '/content/drive/MyDrive/의장공정/데이터/image_classification/dataset/train'
files = os.listdir(train_dir)

print("=== 실제 클래스명 vs 파일명 분석 ===")

# 실제 생성된 클래스명들 (로그에서 확인됨)
actual_class_names = [
    '고정 불량_불량품', '고정 불량_양품', '고정핀 불량_불량품', '고정핀 불량_양품',
    '단차_불량품', '단차_양품', '스크래치_불량품', '스크래치_양품',
    '실링 불량_불량품', '실링 불량_양품', '연계 불량_불량품', '연계 불량_양품',
    '외관 손상_불량품', '외관 손상_양품', '유격 불량_불량품', '유격 불량_양품'
]

print("실제 클래스명들:")
for name in actual_class_names[:5]:
    print(f"  {name}")

print(f"\n파일명 예시:")
for file in files[:3]:
    print(f"  {file}")

print(f"\n클래스명 매칭 테스트:")
for file in files[:3]:
    base_name = os.path.splitext(file)[0]
    matched = False

    for class_name in actual_class_names:
        if class_name in base_name:
            print(f"  {file} → {class_name} 매칭됨!")
            matched = True
            break

    if not matched:
        print(f"  {file} → 매칭 안됨 ❌")

=== 실제 클래스명 vs 파일명 분석 ===
실제 클래스명들:
  고정 불량_불량품
  고정 불량_양품
  고정핀 불량_불량품
  고정핀 불량_양품
  단차_불량품

파일명 예시:
  202_201_20_b898f3ba-698a-465d-acc7-6552d243d4b4.jpg
  202_201_20_35900030-01eb-499e-9de7-9cb82a324e6a.jpg
  202_201_20_58ac6a9a-41d8-42c7-8f63-ac729e0a8076.jpg

클래스명 매칭 테스트:
  202_201_20_b898f3ba-698a-465d-acc7-6552d243d4b4.jpg → 매칭 안됨 ❌
  202_201_20_35900030-01eb-499e-9de7-9cb82a324e6a.jpg → 매칭 안됨 ❌
  202_201_20_58ac6a9a-41d8-42c7-8f63-ac729e0a8076.jpg → 매칭 안됨 ❌


### 해결: 매핑 파일 생성


In [None]:
# 올바른 JSON 경로로 매핑 파일 다시 생성
import os
import json

def create_mapping_files_v3():
    """전체 폴더를 탐색해서 JSON 파일 찾고 매핑 생성"""

    # 경로 설정
    dataset_dir = '/content/drive/MyDrive/의장공정/데이터/image_classification/dataset'
    validation_dir = '/content/drive/MyDrive/의장공정/데이터/Validation'

    print("올바른 JSON 경로로 매핑 파일 재생성 시작...")

    # 모든 JSON 파일 찾기
    print("JSON 파일 탐색 중...")
    all_json_files = []
    for root, dirs, files in os.walk(validation_dir):
        for file in files:
            if file.endswith('.json'):
                all_json_files.append(os.path.join(root, file))

    print(f"JSON 파일 발견: {len(all_json_files)}개")

    # JSON 파일명 -> 경로 딕셔너리 생성
    label_dict = {}
    for json_path in all_json_files:
        base_name = os.path.splitext(os.path.basename(json_path))[0]
        label_dict[base_name] = json_path

    print(f"JSON 딕셔너리 생성 완료: {len(label_dict)}개")

    # 각 split별로 매핑 생성
    splits = ['train', 'valid', 'test']

    for split in splits:
        print(f"\n=== {split} 매핑 생성 중 ===")

        split_dir = os.path.join(dataset_dir, split)
        if not os.path.exists(split_dir):
            print(f"{split} 폴더가 없습니다.")
            continue

        # 해당 split의 파일들 가져오기
        image_files = [f for f in os.listdir(split_dir)
                      if f.lower().endswith(('.jpg', '.jpeg', '.png', '.bmp'))]

        print(f"{split} 파일 수: {len(image_files)}")

        # 매핑 딕셔너리 생성
        split_mapping = {}
        matched_count = 0

        # 배치로 처리 (1000개씩)
        batch_size = 1000
        for i in range(0, len(image_files), batch_size):
            batch_files = image_files[i:i+batch_size]

            print(f"  배치 {i//batch_size + 1}: {len(batch_files)}개 처리 중...")

            for image_file in batch_files:
                # 파일명에서 JSON 파일명 추출
                base_name = os.path.splitext(image_file)[0]

                # 해당 JSON 파일이 있는지 확인
                if base_name in label_dict:
                    try:
                        # JSON 파일 읽기
                        with open(label_dict[base_name], 'r', encoding='utf-8-sig') as f:
                            label_data = json.load(f)

                        # 라벨 추출
                        category_name = None
                        quality = None

                        # categories 딕셔너리 생성
                        categories_dict = {}
                        if 'categories' in label_data:
                            for cat in label_data['categories']:
                                categories_dict[cat['id']] = cat['name']

                        # annotations에서 정보 추출
                        if 'annotations' in label_data and len(label_data['annotations']) > 0:
                            annotation = label_data['annotations'][0]

                            if 'category_id' in annotation and annotation['category_id'] in categories_dict:
                                category_name = categories_dict[annotation['category_id']]

                            if 'attributes' in annotation and 'quality' in annotation['attributes']:
                                quality = annotation['attributes']['quality']

                        # 라벨 생성
                        if category_name and quality:
                            label = f"{category_name}_{quality}"
                            split_mapping[image_file] = label
                            matched_count += 1

                    except Exception as e:
                        # JSON 읽기 실패시 무시
                        pass

        print(f"  처리 완료: {len(image_files)}개 중 {matched_count}개 매칭")
        print(f"  매칭률: {matched_count/len(image_files)*100:.1f}%")

        # 매핑 파일을 로컬에 저장 (Drive 용량 절약)
        mapping_file_path = f'/content/{split}_mapping.json'
        with open(mapping_file_path, 'w', encoding='utf-8') as f:
            json.dump(split_mapping, f, ensure_ascii=False, indent=2)

        print(f"  매핑 파일 저장: {mapping_file_path}")

        # 매칭된 클래스들 확인
        matched_classes = set(split_mapping.values())
        print(f"  발견된 클래스 수: {len(matched_classes)}")
        if matched_classes:
            print(f"  클래스 예시: {list(matched_classes)[:3]}")

    print("\n매핑 파일 생성 완료!")

# 실행
create_mapping_files_v3()

올바른 JSON 경로로 매핑 파일 재생성 시작...
JSON 파일 탐색 중...
JSON 파일 발견: 21553개
JSON 딕셔너리 생성 완료: 21553개

=== train 매핑 생성 중 ===
train 파일 수: 15076
  배치 1: 1000개 처리 중...
  배치 2: 1000개 처리 중...
  배치 3: 1000개 처리 중...
  배치 4: 1000개 처리 중...
  배치 5: 1000개 처리 중...
  배치 6: 1000개 처리 중...
  배치 7: 1000개 처리 중...
  배치 8: 1000개 처리 중...
  배치 9: 1000개 처리 중...
  배치 10: 1000개 처리 중...
  배치 11: 1000개 처리 중...
  배치 12: 1000개 처리 중...
  배치 13: 1000개 처리 중...
  배치 14: 1000개 처리 중...
  배치 15: 1000개 처리 중...
  배치 16: 76개 처리 중...
  처리 완료: 15076개 중 15076개 매칭
  매칭률: 100.0%
  매핑 파일 저장: /content/train_mapping.json
  발견된 클래스 수: 24
  클래스 예시: ['실링 불량_불량품', '헤밍 불량_양품', '고정 불량_불량품']

=== valid 매핑 생성 중 ===
valid 파일 수: 3232
  배치 1: 1000개 처리 중...
  배치 2: 1000개 처리 중...
  배치 3: 1000개 처리 중...
  배치 4: 232개 처리 중...
  처리 완료: 3232개 중 3232개 매칭
  매칭률: 100.0%
  매핑 파일 저장: /content/valid_mapping.json
  발견된 클래스 수: 24
  클래스 예시: ['실링 불량_불량품', '헤밍 불량_양품', '고정 불량_불량품']

=== test 매핑 생성 중 ===
test 파일 수: 3245
  배치 1: 1000개 처리 중...
  배치 2: 1000개 처리 중...
  배치 3: 1000개

In [None]:
# 현재 복사된 파일 개수 확인
import os

dataset_dir = '/content/drive/MyDrive/의장공정/데이터/image_classification/dataset'

splits = ['train', 'valid', 'test']

print("=== 현재 복사된 파일 개수 확인 ===")
total_files = 0

for split in splits:
    split_dir = os.path.join(dataset_dir, split)

    if os.path.exists(split_dir):
        files = [f for f in os.listdir(split_dir) if f.lower().endswith(('.jpg', '.jpeg', '.png', '.bmp'))]
        file_count = len(files)
        total_files += file_count

        print(f"{split}: {file_count:,}개")

        # 첫 3개 파일명 확인
        if files:
            print(f"  예시 파일명:")
            for i, file in enumerate(files[:3]):
                print(f"    {file}")
        print()
    else:
        print(f"{split}: 폴더 없음")

print(f"전체 복사된 파일: {total_files:,}개")

# 예상된 개수와 비교
expected_total = 15076 + 3232 + 3245  # 21,553개
print(f"예상 개수: {expected_total:,}개")
print(f"차이: {total_files - expected_total:,}개")

=== 현재 복사된 파일 개수 확인 ===
train: 15,076개
  예시 파일명:
    202_201_20_b898f3ba-698a-465d-acc7-6552d243d4b4.jpg
    202_201_20_35900030-01eb-499e-9de7-9cb82a324e6a.jpg
    202_201_20_58ac6a9a-41d8-42c7-8f63-ac729e0a8076.jpg

valid: 3,232개
  예시 파일명:
    202_201_20_91b4fad8-7837-460a-a5ef-ff7fedeae580.jpg
    202_201_20_080f058e-19df-48c5-97c6-ed3aa19f430b.jpg
    202_201_20_68be6931-2aab-458d-8668-ec22c1c4dc96.jpg

test: 3,245개
  예시 파일명:
    202_201_20_394cc5ab-04a1-45c5-8246-14820e52399c.jpg
    202_201_20_af702c24-c9f0-419c-9f3f-9626be95f50e.jpg
    202_201_20_2249e3d8-2f9e-45ea-9f21-e59cebc8c4c9.jpg

전체 복사된 파일: 21,553개
예상 개수: 21,553개
차이: 0개


매핑 파일 생성 완료 ✔️
ImageDataSet 클래스 수정

In [None]:
# 매핑 파일을 사용하는 수정된 ImageDataset 클래스
import json
from torch.utils.data import Dataset
from PIL import Image
import os

class ImageDatasetWithMapping(Dataset):
    def __init__(self, data_dir, mapping_file_path, class_names, transform=None):
        self.data_dir = data_dir
        self.class_names = class_names
        self.transform = transform
        self.class_to_idx = {cls_name: idx for idx, cls_name in enumerate(class_names)}

        # 매핑 파일 로드
        with open(mapping_file_path, 'r', encoding='utf-8') as f:
            self.filename_to_label = json.load(f)

        self.image_paths = []
        self.labels = []

        self._load_from_mapping()

    def _load_from_mapping(self):
        """매핑 파일을 사용해서 이미지와 라벨 로드"""
        valid_extensions = ('.jpg', '.jpeg', '.png', '.bmp', '.tiff')

        for file_name in os.listdir(self.data_dir):
            if file_name.lower().endswith(valid_extensions):
                # 매핑 파일에서 라벨 찾기
                if file_name in self.filename_to_label:
                    label = self.filename_to_label[file_name]

                    # 클래스 인덱스 확인
                    if label in self.class_to_idx:
                        image_path = os.path.join(self.data_dir, file_name)
                        self.image_paths.append(image_path)
                        self.labels.append(self.class_to_idx[label])

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_path = self.image_paths[idx]

        try:
            image = Image.open(image_path).convert('RGB')
        except Exception as e:
            print(f"Error loading image {image_path}: {e}")
            # 에러 발생시 기본 이미지 생성
            image = Image.new('RGB', (224, 224), color='white')

        label = self.labels[idx]

        if self.transform:
            image = self.transform(image)

        return image, label

# 수정된 create_data_loaders 함수
def create_data_loaders_with_mapping(class_names):
    """매핑 파일을 사용하는 데이터 로더 생성 함수"""
    from torchvision import transforms
    from torch.utils.data import DataLoader
    import torch

    # Config 클래스 (기존과 동일)
    dataset_dir = '/content/drive/MyDrive/의장공정/데이터/image_classification/dataset'

    # 데이터 변환 정의
    train_transform = transforms.Compose([
        transforms.Resize((256, 256)),
        transforms.RandomCrop((224, 224)),
        transforms.RandomHorizontalFlip(0.5),
        transforms.RandomRotation(10),
        transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])

    test_transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])

    # 데이터셋 생성
    loaders = {}

    for split in ['train', 'valid', 'test']:
        split_dir = os.path.join(dataset_dir, split)
        mapping_file_path = f'/content/{split}_mapping.json'

        if os.path.exists(split_dir) and os.path.exists(mapping_file_path):
            dataset = ImageDatasetWithMapping(
                data_dir=split_dir,
                mapping_file_path=mapping_file_path,
                class_names=class_names,
                transform=train_transform if split == 'train' else test_transform
            )

            if len(dataset) > 0:
                loaders[split] = DataLoader(
                    dataset,
                    batch_size=32,
                    shuffle=(split == 'train'),
                    num_workers=2,
                    pin_memory=True if torch.cuda.is_available() else False
                )

                print(f"{split.capitalize()} dataset: {len(dataset)} samples")

    return loaders.get('train'), loaders.get('valid'), loaders.get('test')

train_loader, valid_loader, test_loader = create_data_loaders_with_mapping(class_names)



Train dataset: 15076 samples
Valid dataset: 3232 samples
Test dataset: 3245 samples
