<a href="https://colab.research.google.com/github/minjaekim070/Bootcamp_Project/blob/main/CNN_project_2%EC%9D%BC%EC%B0%A8_2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

전체 데이터셋의 다운로드가 오래 걸려서 bottle 폴더에 대한 내용을 우선적으로 정상/비정상으로 분류할 수 있도록 진행

#Library import


In [360]:
# 패키지 import
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import tensorflow as tf
from torchvision import transforms, datasets, utils
from torchsummary import summary

import matplotlib.pyplot as plt
import numpy as np

from tensorflow.keras.preprocessing import image


#이미지 업로드 및 배열 변환


In [361]:
# 이미지 경로
train_data_dir = '/content/drive/MyDrive/bottle/train/good'
large_data_dir = '/content/drive/MyDrive/bottle/test/broken_large'
small_data_dir = '/content/drive/MyDrive/bottle/test/broken_small'
contamination_data_dir='/content/drive/MyDrive/bottle/test/contamination'
test_data_dir = '/content/drive/MyDrive/bottle/test/good'


In [362]:
#이미지를 데이터로 변환
def load_image_data(image_dir):
    image_data = []
    for img_name in os.listdir(image_dir):
        img_path = os.path.join(image_dir, img_name)
        img = image.load_img(img_path, target_size=(32, 32))  # 이미지를 지정된 크기로 로드합니다
        img_array = image.img_to_array(img)  # 이미지를 배열 형태로 변환합니다
        image_data.append(img_array)
    return image_data

def load_data(data_dir, test_data_dir, large_data_dir, small_data_dir, contamination_data_dir):
    train_image_data = load_image_data(data_dir)
    test_image_data = load_image_data(test_data_dir)
    large_image_data = load_image_data(large_data_dir)
    small_image_data = load_image_data(small_data_dir)
    contamination_image_data = load_image_data(contamination_data_dir)
    return train_image_data, test_image_data, large_image_data, small_image_data, contamination_image_data

train_image_data, test_image_data, large_image_data, small_image_data, contamination_image_data = load_data(train_data_dir, test_data_dir, large_data_dir, small_data_dir, contamination_data_dir)


In [363]:
#배열로 변환, normalization
def preprocess_image_data(image_data):
    return np.array(image_data) / 255

train_image_data = preprocess_image_data(train_image_data)
test_image_data = preprocess_image_data(test_image_data)
small_image_data = preprocess_image_data(small_image_data)
large_image_data = preprocess_image_data(large_image_data)
contamination_image_data = preprocess_image_data(contamination_image_data)


In [364]:
#데이터 확인
test_image_data[0][16][16]

array([0.1882353 , 0.16470589, 0.2       ], dtype=float32)

#데이터 전처리

크게 파손, 적게 파손, 오염된 데이터 셋 모두 augmentation을 통해 데이터의 양을 늘린 뒤에, train과 test로 분리 후에 예측 결과 확인

In [365]:
#배열 변환된 데이터 셋
"""
train_image_data : 정상 훈련 데이터 (200)
test_image_data : 정상 테스트 데이터 (20)
small_image_data : 적게 파손 (20)
large_image_data : 크게 파손 (20)
contamination_image_data : 오염 (20)
"""

'\ntrain_image_data : 정상 훈련 데이터 (200)\ntest_image_data : 정상 테스트 데이터 (20)\nsmall_image_data : 적게 파손 (20)\nlarge_image_data : 크게 파손 (20)\ncontamination_image_data : 오염 (20)\n'

In [366]:
#정상 데이터셋이 너무 과도하게 많기 때문에 50개 정도로 추출
subset_train_data = train_image_data[:50]

In [367]:
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from PIL import Image

# 이미지 데이터 증강을 위한 ImageDataGenerator 생성
datagen = ImageDataGenerator(
    rotation_range=20,  # 이미지 회전 범위 (0~20도 사이에서 무작위로 회전)
    width_shift_range=0.1,  # 가로로 이미지 이동 범위 (전체 너비의 0.1 비율로 무작위로 이동)
    height_shift_range=0.1,  # 세로로 이미지 이동 범위 (전체 높이의 0.1 비율로 무작위로 이동)
    shear_range=0.2,  # 이미지 변형을 위한 전단 강도 (0.2 비율 내외로 무작위로 전단)
    zoom_range=0.2,  # 이미지 확대/축소 범위 (0.2 비율 내외로 무작위로 확대/축소)
    horizontal_flip=True,  # 이미지 수평 뒤집기
    fill_mode='nearest'  # 이미지 변형 시 채울 픽셀 값 설정
)

# augmented 이미지 개수
desired_num_augmented_images = 10


In [368]:
def augment_images(image_data, desired_num_augmented_images):
    augmented_images = []
    num_generated_images = 0

    while num_generated_images < desired_num_augmented_images:
        image = image_data[0]
        generated_images = datagen.flow(np.expand_dims(image, axis=0), batch_size=1)

        augmented_images.extend(generated_images[0])
        num_generated_images = len(augmented_images)

    augmented_images = np.array(augmented_images[:desired_num_augmented_images])
    return augmented_images

# broken_small 이미지 증강
augmented_small_images = augment_images(small_image_data, desired_num_augmented_images)

# broken_large 이미지 증강
augmented_large_images = augment_images(large_image_data, desired_num_augmented_images)

# contamination 이미지 증강
augmented_contamination_images = augment_images(contamination_image_data, desired_num_augmented_images)


In [369]:
print(test_image_data.shape[0])
print(small_image_data.shape[0])
print(large_image_data.shape[0])
print(contamination_image_data.shape[0])
print(augmented_small_images.shape[0])
print(augmented_large_images.shape[0])
print(augmented_contamination_images.shape[0])

20
22
20
21
10
10
10


In [370]:
# small_image_data와 augmented_small_images 합치기
combined_small_data = np.concatenate((small_image_data, augmented_small_images), axis=0)
# 데이터 섞기
np.random.shuffle(combined_small_data)

# large_image_data와 augmented_large_images 합치기
combined_large_data = np.concatenate((large_image_data, augmented_large_images), axis=0)
# 데이터 섞기
np.random.shuffle(combined_large_data)

# contamination_image_data와 augmented_contamination_images 합치기
combined_contamination_data = np.concatenate((contamination_image_data, augmented_contamination_images), axis=0)
# 데이터 섞기
np.random.shuffle(combined_contamination_data)

combined_broken_data = np.concatenate((combined_small_data, combined_large_data), axis=0)

In [371]:
#배열 변환된 데이터 셋
"""
subset_train_data : 정상 훈련 데이터
test_image_data : 정상 테스트 데이터
combined_broken_data : augmentation 진행한 broken_small, broken_large data
combined_contamination_data : augmentation 진행한 contamination_data
"""

'\nsubset_train_data : 정상 훈련 데이터 \ntest_image_data : 정상 테스트 데이터 \ncombined_broken_data : augmentation 진행한 broken_small, broken_large data\ncombined_contamination_data : augmentation 진행한 contamination_data\n'

In [372]:
#Train set 구성

# 데이터 개수 구하기
broken_num_samples = combined_broken_data.shape[0]
contamination_num_samples = combined_contamination_data.shape[0]

# 90%의 데이터 개수 계산
b_90 = int(0.90 * broken_num_samples)
c_90 = int(0.90 * contamination_num_samples)

# 90% 데이터 추출
b_new_data = combined_broken_data[:b_90]
c_new_data = combined_contamination_data[:c_90]

# subset_train_data와 combined_small_data 합치기
resized_images = np.concatenate((subset_train_data, b_new_data, c_new_data), axis=0)

In [373]:
all_labels = []
for i in range(0,subset_train_data.shape[0]):
  all_labels.append([1,0,0])
for j in range(0,b_new_data.shape[0]):
  all_labels.append([0,1,0])
for k in range(0,c_new_data.shape[0]):
  all_labels.append([0,0,1])
all_labels = np.array(all_labels)

In [374]:
#Test set 구성
b_test_data = combined_broken_data[b_90:]
c_test_data = combined_contamination_data[c_90:]
test_set_data = np.concatenate((test_image_data, b_test_data, c_test_data),axis=0)

test_labels = []
for i in range(0,test_image_data.shape[0]):
  test_labels.append([1,0,0])
for j in range(0,b_test_data.shape[0]):
  test_labels.append([0,1,0])
for k in range(0,c_test_data.shape[0]):
  test_labels.append([0,0,1])
test_labels = np.array(test_labels)

In [375]:
print(resized_images.shape)
print(all_labels.shape)
print(test_set_data.shape)
print(test_labels.shape)

(132, 32, 32, 3)
(132, 3)
(31, 32, 32, 3)
(31, 3)


#모델 구성 및 결과 예측

In [376]:
from sklearn.model_selection import GridSearchCV
from tensorflow.keras.wrappers.scikit_learn import KerasClassifier
from tensorflow.keras.models import Sequential


# CNN 모델을 함수 형태로 정의
def create_model():
    model = Sequential()
    model.add(layers.Conv2D(32, (3, 3), activation='relu', input_shape=(32, 32, 3)))
    model.add(layers.MaxPooling2D((2, 2)))
    model.add(layers.Conv2D(64, (3, 3), activation='relu'))
    model.add(layers.MaxPooling2D((2, 2)))
    model.add(layers.Conv2D(128, (3,3), activation='relu'))
    model.add(layers.MaxPooling2D((2,2)))
    model.add(layers.Flatten())
    model.add(layers.Dense(128, activation='relu'))
    model.add(layers.Dense(64, activation='relu'))
    model.add(layers.Dense(3, activation='softmax'))
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    return model

# KerasClassifier를 사용하여 Keras 모델을 래핑
keras_model = KerasClassifier(build_fn=create_model)

# 그리드 탐색을 위한 hyperparameter 조합 설정
param_grid = {
    'batch_size': [32, 64],
    'epochs': [10, 20, 30]
}

# GridSearchCV를 사용하여 그리드 탐색 수행
grid_search = GridSearchCV(estimator=keras_model, param_grid=param_grid, cv=3)
grid_search_result = grid_search.fit(resized_images, all_labels)

# 최적의 hyperparameter 조합과 결과 출력
print("Best Score: ", grid_search_result.best_score_)
print("Best Params: ", grid_search_result.best_params_)

  keras_model = KerasClassifier(build_fn=create_model)


Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20
Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20
Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/2

Best Params:  {'batch_size': 32, 'epochs': 30} 으로 확인했으므로 이를 기반으로 모델 학습

In [381]:
model = Sequential()
model.add(layers.Conv2D(32, (3, 3), activation='relu', input_shape=(32, 32, 3)))
model.add(layers.MaxPooling2D((2, 2)))
model.add(layers.Conv2D(64, (3, 3), activation='relu'))
model.add(layers.MaxPooling2D((2, 2)))
model.add(layers.Conv2D(128, (3,3), activation='relu'))
model.add(layers.MaxPooling2D((2,2)))
model.add(layers.Flatten())
model.add(layers.Dense(128, activation='relu'))
model.add(layers.Dense(64, activation='relu'))
model.add(layers.Dense(3, activation='softmax'))
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
#모델 학습
model.fit(resized_images, all_labels, epochs=30, batch_size=32, verbose=1)

Epoch 1/30
Epoch 2/30
Epoch 3/30
Epoch 4/30
Epoch 5/30
Epoch 6/30
Epoch 7/30
Epoch 8/30
Epoch 9/30
Epoch 10/30
Epoch 11/30
Epoch 12/30
Epoch 13/30
Epoch 14/30
Epoch 15/30
Epoch 16/30
Epoch 17/30
Epoch 18/30
Epoch 19/30
Epoch 20/30
Epoch 21/30
Epoch 22/30
Epoch 23/30
Epoch 24/30
Epoch 25/30
Epoch 26/30
Epoch 27/30
Epoch 28/30
Epoch 29/30
Epoch 30/30


<keras.callbacks.History at 0x7a4a78394c10>

In [382]:
# test set에 대한 예측 수행
predictions = model.predict(test_set_data)

# 최종 예측 레이블 생성
predicted_labels = []
for prediction in predictions:
    if prediction[0] > prediction[1] and prediction[0] > prediction[2]:
        predicted_labels.append([1, 0, 0])
    elif prediction[1] > prediction[0] and prediction[1] > prediction[2]:
        predicted_labels.append([0, 1, 0])
    else:
        predicted_labels.append([0, 0, 1])
predicted_labels = np.array(predicted_labels)

#예측률 확인
count = 0
for i in range(0,predicted_labels.shape[0]):
  if np.array_equal(predicted_labels[i], test_labels[i]):
        count += 1

prediction_accuracy = count / predicted_labels.shape[0] * 100
print('test set에 대한 예측률: {}%'.format(prediction_accuracy))


test set에 대한 예측률: 96.7741935483871%


In [383]:
#원본 데이터에 대한 예측 및 평가 함수
def predict_and_evaluate(model, data, target_labels):
    predictions = model.predict(data)

    predicted_labels = []
    for prediction in predictions:
        if prediction[0] > prediction[1] and prediction[0] > prediction[2]:
            predicted_labels.append([1, 0, 0])
        elif prediction[1] > prediction[0] and prediction[1] > prediction[2]:
            predicted_labels.append([0, 1, 0])
        else:
            predicted_labels.append([0, 0, 1])
    predicted_labels = np.array(predicted_labels)

    count = 0
    for i in range(predicted_labels.shape[0]):
        if np.array_equal(predicted_labels[i], target_labels):
            count += 1

    prediction_accuracy = count / predicted_labels.shape[0] * 100
    return prediction_accuracy

In [384]:
#원본 데이터에 대한 예측 및 정확도 평가

# broken_large
accuracy_large = predict_and_evaluate(model, large_image_data, [0, 1, 0])
print('broken_large에 대한 예측률: {}%'.format(accuracy_large))

# broken_small
accuracy_small = predict_and_evaluate(model, small_image_data, [0, 1, 0])
print('broken_small에 대한 예측률: {}%'.format(accuracy_small))

#contamination
accuracy_contamination = predict_and_evaluate(model, contamination_image_data, [0,0,1])
print('contamination에 대한 예측률:{}%'.format(accuracy_contamination))

broken_large에 대한 예측률: 100.0%
broken_small에 대한 예측률: 95.45454545454545%
contamination에 대한 예측률:76.19047619047619%
