In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import os
from glob import glob
import cv2
from tqdm import tqdm
from PIL import Image
import matplotlib.pyplot as plt
import matplotlib.image
import pandas as pd
import csv
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.layers import Input, Flatten, Conv2D, Dense
from keras.optimizers import Adam

# VAE 모델 구현

##  Create a sampling layer

In [None]:
class Sampling(layers.Layer):
    """Uses (z_mean, z_log_var) to sample z, the vector encoding a digit."""

    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = tf.random.normal(shape=(batch, dim))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon

## Build the Encoder

In [None]:
latent_dim = 2

encoder_inputs = keras.Input(shape=(28, 28, 1))
x = layers.Conv2D(32, 3, activation="relu", strides=2, padding="same")(encoder_inputs)
x = layers.Conv2D(64, 3, activation="relu", strides=2, padding="same")(x)
x = layers.Flatten()(x)
x = layers.Dense(16, activation="relu")(x)
z_mean = layers.Dense(latent_dim, name="z_mean")(x)
z_log_var = layers.Dense(latent_dim, name="z_log_var")(x)
z = Sampling()([z_mean, z_log_var])
encoder = keras.Model(encoder_inputs, [z_mean, z_log_var, z], name="encoder")
encoder.summary()

## Build the decoder

In [None]:
latent_inputs = keras.Input(shape=(latent_dim,))
x = layers.Dense(7 * 7 * 64, activation="relu")(latent_inputs)
x = layers.Reshape((7, 7, 64))(x)
x = layers.Conv2DTranspose(64, 3, activation="relu", strides=2, padding="same")(x)
x = layers.Conv2DTranspose(32, 3, activation="relu", strides=2, padding="same")(x)
decoder_outputs = layers.Conv2DTranspose(1, 3, activation="sigmoid", padding="same")(x)
decoder = keras.Model(latent_inputs, decoder_outputs, name="decoder")
decoder.summary()

## Define VAE

In [None]:
class VAE(keras.Model):
    def __init__(self, encoder, decoder, **kwargs):
        super().__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
        self.total_loss_tracker = keras.metrics.Mean(name="total_loss")
        self.reconstruction_loss_tracker = keras.metrics.Mean(
            name="reconstruction_loss"
        )
        self.kl_loss_tracker = keras.metrics.Mean(name="kl_loss")

    @property
    def metrics(self):
        return [
            self.total_loss_tracker,
            self.reconstruction_loss_tracker,
            self.kl_loss_tracker,
        ]

    def train_step(self, data):
      if isinstance(data, tuple):
          data = data[0]

      # 라벨 있는 데이터셋인 경우
      if len(data.shape) == 4:
          with tf.GradientTape() as tape:
              z_mean, z_log_var, z = self.encoder(data)
              reconstruction = self.decoder(z)
              reconstruction_loss = tf.reduce_mean(
                  keras.losses.binary_crossentropy(data, reconstruction)
              )
              kl_loss = -0.5 * tf.reduce_mean(
                  1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var)
              )
              total_loss = reconstruction_loss + kl_loss

          gradients = tape.gradient(total_loss, self.trainable_variables)
          self.optimizer.apply_gradients(zip(gradients, self.trainable_variables))
          return {
              "loss": total_loss,
              "reconstruction_loss": reconstruction_loss,
              "kl_loss": kl_loss,
          }

      # 라벨 없는 데이터셋인 경우 (라벨이 없으므로 라벨 관련 손실 함수는 계산하지 않음)
      else:
          with tf.GradientTape() as tape:
              z_mean, z_log_var, z = self.encoder(data)
              reconstruction = self.decoder(z)
              reconstruction_loss = tf.reduce_mean(
                  keras.losses.binary_crossentropy(data, reconstruction)
              )

          gradients = tape.gradient(reconstruction_loss, self.trainable_variables)
          self.optimizer.apply_gradients(zip(gradients, self.trainable_variables))
          return {"loss": reconstruction_loss}


    def call(self, inputs):
        z_mean, z_log_var, z = self.encoder(inputs)
        reconstructed = self.decoder(z)
        return reconstructed

# 라벨링 된 데이터 Preprocessing

In [None]:
# 경로 안에 있는 모든 이미지를 데이터화하는 함수

def preprocess_image(image_path, target_size=(28, 28)):
    # 이미지 파일 경로 불러오기
    image_list = glob(os.path.join(image_path, '*.png'))

    # 빈 numpy 배열 생성
    img_array = np.empty((len(image_list), *target_size, 1), dtype=np.uint8)

    for i, img in enumerate(image_list):
        image = Image.open(img)
        image = image.resize(target_size)
        img_array[i] = np.array(image)[:, :, np.newaxis]

    return img_array


In [None]:
from sklearn.model_selection import train_test_split

# 정상 up 이미지 로드
X_normal_up = preprocess_image('/content/drive/MyDrive/Project/good_up')
# 정상 under 이미지 로드
X_normal_under = preprocess_image('/content/drive/MyDrive/Project/good_under')
# 비정상 up 이미지 로드
X_abnormal_up = preprocess_image('/content/drive/MyDrive/Project/notgood_up')
# 비정상 under 이미지 로드
X_abnormal_under = preprocess_image('/content/drive/MyDrive/Project/notgood_under')

# 각 클래스에 대한 클래스 레이블 생성
# "정상 up" 클래스: 0
# "정상 under" 클래스: 1
# "비정상 up" 클래스: 2
# "비정상 under" 클래스: 3
y_normal_up = np.zeros(len(X_normal_up))
y_normal_under = np.ones(len(X_normal_under))
y_abnormal_up = np.full(len(X_abnormal_up), 2)
y_abnormal_under = np.full(len(X_abnormal_under), 3)

# 각 클래스별 데이터를 하나로 합치기
# X_up은 "정상 up"과 "비정상 up" 데이터를 하나로 합침
X_up = np.concatenate((X_normal_up, X_abnormal_up), axis=0)
# X_under -> "정상 under"과 "비정상 under" 데이터를 하나로 합침
X_under = np.concatenate((X_normal_under, X_abnormal_under), axis=0)
# y_up -> "정상 up"과 "비정상 up"을 하나로 합쳐서 y_up 안에는 0과 1 두 개의 레이블이 존재
y_up = np.concatenate((y_normal_up, y_abnormal_up), axis=0)
# y_under -> "정상 under"과 "비정상 under"을 하나로 합쳐서 y_under 안에는 0과 1 두 개의 레이블이 존재
y_under = np.concatenate((y_normal_under, y_abnormal_under), axis=0)

# 각 데이터를 Train, Test 데이터로 나누기
X_train_up, X_test_up, y_train_up, y_test_up = train_test_split(X_up, y_up, test_size=0.2, random_state=42, shuffle=False)
X_train_under, X_test_under, y_train_under, y_test_under = train_test_split(X_under, y_under, test_size=0.2, random_state=42, shuffle=False)

In [None]:
# 이미지 정규화
X_train_up = X_train_up / 255
X_test_up = X_test_up / 255

X_train_under = X_train_under / 255
X_test_under = X_test_under / 255

In [None]:
print(np.shape(X_train_up))

(99, 28, 28, 1)


### VAE 모델 인스턴스 생성

In [None]:
vae = VAE(encoder, decoder)

### Compile

In [None]:
# Adam Optimizer와 MSE Loss 함수 사용
vae.compile(optimizer = Adam(), loss = 'mse')

### 라벨링 된 데이터 학습

In [None]:
# X_train_up와 X_train_under를 하나의 데이터셋으로 합치기
X_train_combined = np.concatenate((X_train_up, X_train_under), axis=0)
X_train_combined = tf.convert_to_tensor(X_train_combined, dtype=tf.float32)


# tf.data.Dataset으로 변환
train_dataset = tf.data.Dataset.from_tensor_slices(X_train_combined)
train_dataset = train_dataset.shuffle(buffer_size=len(X_train_combined)).batch(32)

In [None]:
print(type(X_train_combined))
print(type(train_dataset))

<class 'tensorflow.python.framework.ops.EagerTensor'>
<class 'tensorflow.python.data.ops.batch_op._BatchDataset'>


In [None]:
vae.fit(train_dataset, epochs=10, validation_data=(X_test_up, X_test_up))

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.callbacks.History at 0x7876412d2020>

## 라벨 없는 데이터

In [None]:
# Unlabeled 데이터
X_unlabel = preprocess_image('/content/drive/MyDrive/Project/photo2')
X_unlabel = X_unlabel / 255

In [None]:
# Unlabeled 데이터셋을 텐서로 변환
X_unlabel = tf.convert_to_tensor(X_unlabel, dtype=tf.float32)

# train_dataset에 라벨 없는 데이터셋 추가
train_dataset_unlabel = tf.data.Dataset.from_tensor_slices(X_unlabel)
train_dataset_unlabel = train_dataset_unlabel.batch(32)

In [None]:
# 기존 라벨 있는 데이터셋과 라벨 없는 데이터셋을 합침
train_dataset = tf.data.Dataset.zip((train_dataset, train_dataset_unlabel))

In [None]:
# VAE 모델 훈련
vae.fit(train_dataset, epochs=10, validation_data=(X_test_up, X_test_up))

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.callbacks.History at 0x7876413aba00>

# 분류할 이미지 넣고 분류시키기

In [None]:
real_test = preprocess_image('/content/drive/MyDrive/Project/photo1')
real_test = real_test / 255
# real_test데이터셋을 텐서로 변환
real_test = tf.convert_to_tensor(real_test, dtype=tf.float32)

# train_dataset에 라벨 없는 데이터셋 추가
real_test_dataset = tf.data.Dataset.from_tensor_slices(real_test)
real_test_dataset = real_test_dataset.batch(32)

In [None]:
# VAE 모델의 잠재 특성을 추출하는 인코더 모델 정의
encoder_model = keras.Model(inputs=vae.encoder.input, outputs=vae.encoder.get_layer('sampling').output)

# 테스트 데이터에 대한 잠재 특성 추출
latent_test = encoder_model.predict(real_test)

# 분류 모델 정의
classification_model = keras.Sequential([
    keras.layers.Input(shape=(latent_dim,)),  # 잠재 특성의 차원에 맞게 입력 레이어 설정
    keras.layers.Dense(64, activation='relu'),
    keras.layers.Dense(4, activation='softmax')  # num_classes는 분류할 클래스 수
])

# 분류 모델 컴파일
classification_model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

# 테스트 데이터에 대한 분류 결과 예측
predictions = classification_model.predict(latent_test)

# real_test를 다시 NumPy 배열로 변환
real_test_np = real_test.numpy()

# 분류 결과에 따라 이미지 저장
for i in range(len(real_test_np)):
    class_index = np.argmax(predictions[i])

    # 분류 결과에 따라 이미지 저장
    if class_index == 0:
        save_path = '/content/drive/MyDrive/Project/results/up_good'
    elif class_index == 1:
        save_path = '/content/drive/MyDrive/Project/results/under_good'
    elif class_index == 2:
        save_path = '/content/drive/MyDrive/Project/results/up_notgood'
    else:
        save_path = '/content/drive/MyDrive/Project/results/under_notgood'

    # 저장할 폴더 내 이미지 파일들 개수 확인
    file_list = os.listdir(save_path)
    num_files = len(file_list)

    # 이미지 저장을 위해 다시 NumPy 배열로 변환하고 모드를 'L'로 변경
    result_image = np.array(real_test_np[i][0] * 255, dtype=np.uint8)
    result_image = Image.fromarray(result_image, mode='L')

    # 이미지 저장
    result_image.save(os.path.join(save_path, f'results_{num_files + 1}.png'))



KeyboardInterrupt: ignored