# MobileNet - Test

In [19]:
from tensorflow.keras import layers, models, applications
from tensorflow.keras.applications import mobilenet
from tensorflow.keras.utils import Sequence
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from sklearn.utils import shuffle

import cv2


In [20]:
# @title 모델 생성
model = applications.MobileNetV2()
model.summary()


In [21]:
base_model = applications.MobileNetV2(
    input_shape=(224, 224, 3),
    include_top=False,
    weights=None
)

x = base_model.output
x = layers.GlobalAveragePooling2D()(x)
x = layers.Dropout(0.3)(x)
x = layers.Dense(1024, activation='relu', kernel_initializer='he_normal')(x)
x = layers.Dropout(0.3)(x)
output = layers.Dense(11, activation='softmax')(x)

model = models.Model(base_model.input, output)
model.summary()

In [22]:
# @title 데이터 준비

# https://drive.google.com/file/d/11n3xFXnld6hET2AV6Eph102ScxSnoYFq/view?usp=sharing

import gdown, zipfile, os

file_id = '11n3xFXnld6hET2AV6Eph102ScxSnoYFq'
output = 'file.zip'

gdown.download(f'https://drive.google.com/uc?id={file_id}', output, quiet=False)

output_dir = 'cloud_2'
os.makedirs(output_dir, exist_ok=True)

with zipfile.ZipFile(output, 'r') as z:
  z.extractall(output_dir)

Downloading...
From: https://drive.google.com/uc?id=11n3xFXnld6hET2AV6Eph102ScxSnoYFq
To: /content/file.zip
100%|██████████| 15.6M/15.6M [00:00<00:00, 118MB/s] 


In [23]:
# @title 메타데이터 생성

def load_data(base_dir='cloud_2'):
  images_path = []
  labels = []
  class_names = []

  train_base = os.path.join(base_dir, 'new_sample_img')

  for class_name in os.listdir(train_base):
    class_name_path = os.path.join(train_base, class_name)

    # 디렉토리인 경우만
    if os.path.isdir(class_name_path):
      for image_name in os.listdir(class_name_path):
        image_path = os.path.join(class_name_path, image_name)
        images_path.append(image_path)
        labels.append(class_name)

  return np.array(images_path), np.array(labels)

images_path, labels = load_data()
print(images_path.shape, labels.shape)
print(images_path[:5], labels[:5])
type(images_path), type(labels)

(472,) (472,)
['cloud_2/new_sample_img/St/St-N195.jpg'
 'cloud_2/new_sample_img/St/St-N010.jpg'
 'cloud_2/new_sample_img/St/St-N120.jpg'
 'cloud_2/new_sample_img/St/St-N194.jpg'
 'cloud_2/new_sample_img/St/St-N126.jpg'] ['St' 'St' 'St' 'St' 'St']


(numpy.ndarray, numpy.ndarray)

In [24]:
# @title CloudSequence 생성
class CloudSequence(Sequence):
    def __init__(self, images, labels, batch_size=32, augmentor=None,
                 target_size=(224, 224, 11), save_dir=None, shuffle=False, preprocess_function=None, augment_times=1):
        self.images = images_path
        self.labels = labels
        self.batch_size = batch_size
        self.augmentor = augmentor
        self.target_size = target_size
        self.save_dir = save_dir
        self.shuffle = shuffle
        self.preprocess_function = preprocess_function  # 스케일링 함수
        self.augment_times = augment_times  # 증강 반복 횟수
        self.on_epoch_end()

    def __len__(self):
        # 데이터 수 = 원본 이미지 수 × 증강 횟수
        return int(np.ceil(self.images_path.shape[0] / self.batch_size))

    def __getitem__(self, index):
        start = index * self.batch_size
        stop = (index + 1) * self.batch_size

        # 증강을 반복하여 데이터 생성
        batch_images = []
        batch_labels = []
        for _ in range(self.augment_times):
            for img_path, label in zip(self.images_path[start:stop], self.labels[start:stop]):
                image = cv2.imread(img_path)
                if self.augmentor is not None:
                    image = self.augmentor(image=image)['image']
                image = cv2.resize(image, self.target_size)
                batch_images.append(image)
                batch_labels.append(label)

        # 최종 배치 반환
        batch_images = np.array(batch_images)
        batch_labels = np.array(batch_labels)
        if self.preprocess_function:
            batch_images = self.preprocess_function(batch_images)
        return batch_images, batch_labels



In [7]:
# @title 데이터 증강 및 저장

import os
import cv2
import numpy as np
import albumentations as A

# 증강 구성
augmentor = A.Compose([
    A.ShiftScaleRotate(shift_limit=0.2, scale_limit=0.2, rotate_limit=90, p=0.5),
    A.HorizontalFlip(p=0.3),
    A.RandomBrightnessContrast(brightness_limit=0.2, contrast_limit=0.2, p=0.4)
])

augment_times = 10  # 증강 횟수
save_dir = "cloud_2/cloud_aug_2"  # 저장 디렉토리
target_size = (224, 224)  # 고정 크기
os.makedirs(save_dir, exist_ok=True)

# 증강 데이터 생성 및 저장
augmented_images = []
augmented_labels = []

for img_path, label in zip(images_path, labels):
    # 원본 이미지 읽기
    image = cv2.imread(img_path)
    if image is None:
        print(f"Error reading image: {img_path}")
        continue

    # 리사이즈
    image = cv2.resize(image, target_size)

    # 원본 이미지 저장
    label_dir = os.path.join(save_dir, str(label))
    os.makedirs(label_dir, exist_ok=True)
    original_image_path = os.path.join(label_dir, os.path.basename(img_path))
    cv2.imwrite(original_image_path, image)

    # 원본 추가
    augmented_images.append(image)
    augmented_labels.append(label)

    # 증강 데이터 생성 및 저장
    for i in range(augment_times):
        aug_image = augmentor(image=image)['image']
        aug_image = cv2.resize(aug_image, target_size)  # 고정 크기로 리사이즈
        aug_image_name = f"aug_{i}_{os.path.basename(img_path)}"
        aug_image_path = os.path.join(label_dir, aug_image_name)
        cv2.imwrite(aug_image_path, aug_image)

        # 증강 이미지 추가
        augmented_images.append(aug_image)
        augmented_labels.append(label)

# NumPy 배열로 변환
augmented_images = np.array(augmented_images)
augmented_labels = np.array(augmented_labels)

# 출력 확인
print(f"Augmented images shape: {augmented_images.shape}")
print(f"Augmented labels shape: {augmented_labels.shape}")



  check_for_updates()


Augmented images shape: (2832, 224, 224, 3)
Augmented labels shape: (2832,)


In [8]:
# @title AUG 메타데이터 생성
from sklearn.model_selection import train_test_split

def load_data(base_dir='cloud_2'):
  augmented_images = []
  augmented_labels = []
  class_names = []

  train_base = os.path.join(base_dir, 'cloud_aug_2')

  for class_name in os.listdir(train_base):
    class_name_path = os.path.join(train_base, class_name)

    # 디렉토리인 경우만
    if os.path.isdir(class_name_path):
      for image_name in os.listdir(class_name_path):
        image_path = os.path.join(class_name_path, image_name)
        augmented_images.append(image_path)
        augmented_labels.append(class_name)

  return np.array(augmented_images), np.array(augmented_labels)

images, labels = load_data()
print(images.shape, labels.shape)
type(images), type(labels)


(2832,) (2832,)


(numpy.ndarray, numpy.ndarray)

In [25]:
# @title CloudSequence 생성
class CloudSequence(Sequence):
    def __init__(self, images, labels, batch_size=32,
                 target_size=(224, 224), preprocess_function=None):
        self.images = images
        self.labels = labels
        self.batch_size = batch_size
        self.target_size = target_size
        self.preprocess_function = preprocess_function  # 스케일링 함수

    def __len__(self):
        # 데이터 수 = 원본 이미지 수 × 증강 횟수
        return int(np.ceil(self.images.shape[0]/ self.batch_size))

    def __getitem__(self, index):
        start = index * self.batch_size
        stop = (index + 1) * self.batch_size

        # 증강을 반복하여 데이터 생성
        batch_images = []
        batch_labels = []
        for img_path, label in zip(self.images[start:stop], self.labels[start:stop]):
            image = cv2.imread(img_path)
            image = cv2.resize(image, self.target_size)
            batch_images.append(image)
            batch_labels.append(label)

        # 최종 배치 반환
        batch_images = np.array(batch_images)
        batch_labels = np.array(batch_labels)
        return np.array(batch_images), np.array(batch_labels)


In [26]:
train_aug_images=images
train_aug_labels=labels
train_aug_images.shape, train_aug_labels.shape

((2832,), (472,))

In [27]:
# https://drive.google.com/file/d/1kHufRqwi6clK6hWB2uF56XXzW1lXa36u/view?usp=sharing

import gdown, zipfile, os

file_id = '1kHufRqwi6clK6hWB2uF56XXzW1lXa36u'
output = 'file.zip'

gdown.download(f'https://drive.google.com/uc?id={file_id}', output, quiet=False)

output_dir = 'test_data_2'
os.makedirs(output_dir, exist_ok=True)

with zipfile.ZipFile(output, 'r') as z:
  z.extractall(output_dir)

Downloading...
From: https://drive.google.com/uc?id=1kHufRqwi6clK6hWB2uF56XXzW1lXa36u
To: /content/file.zip
100%|██████████| 4.17M/4.17M [00:00<00:00, 153MB/s]


In [28]:
# @title 메타데이터 생성
from sklearn.model_selection import train_test_split

def load_data(base_dir='test_data_2'):
  val_aug_images = []
  val_aug_labels = []
  class_names = []

  train_base = os.path.join(base_dir,"test_cloud_img_2")

  for class_name in os.listdir(train_base):
    class_name_path = os.path.join(train_base, class_name)

    # 디렉토리인 경우만
    if os.path.isdir(class_name_path):
      for image_name in os.listdir(class_name_path):
        image_path = os.path.join(class_name_path, image_name)
        val_aug_images.append(image_path)
        val_aug_labels.append(class_name)

  return np.array(val_aug_images), np.array(val_aug_labels)

val_aug_images_path, val_aug_labels = load_data()
print(val_aug_images_path.shape, val_aug_labels.shape)
print(val_aug_images_path, val_aug_labels)
type(val_aug_images_path), type(val_aug_labels)

(18,) (18,)
['test_data_2/test_cloud_img_2/St/St_2.jpg'
 'test_data_2/test_cloud_img_2/St/St_1jpg.jpg'
 'test_data_2/test_cloud_img_2/St/St_3.jpg'
 'test_data_2/test_cloud_img_2/Ci/Ci_1.jpg'
 'test_data_2/test_cloud_img_2/Ci/Ci_2.jpg'
 'test_data_2/test_cloud_img_2/Ci/Ci_3.jpg'
 'test_data_2/test_cloud_img_2/Cu/Cu_1.jpg'
 'test_data_2/test_cloud_img_2/Cu/Cu_3.jpg'
 'test_data_2/test_cloud_img_2/Cu/Cu_2.jpg'
 'test_data_2/test_cloud_img_2/Ac/Ac_1.jpg'
 'test_data_2/test_cloud_img_2/Ac/Ac.jpg'
 'test_data_2/test_cloud_img_2/Ac/Ac_3.jpg'
 'test_data_2/test_cloud_img_2/Ct/Ct_1.jpg'
 'test_data_2/test_cloud_img_2/Ct/Ct_2.jpg'
 'test_data_2/test_cloud_img_2/Ct/Ct_3.jpg'
 'test_data_2/test_cloud_img_2/Cs/Cs_.jpg'
 'test_data_2/test_cloud_img_2/Cs/Cs_3.jpg'
 'test_data_2/test_cloud_img_2/Cs/Cs_2.jpg'] ['St' 'St' 'St' 'Ci' 'Ci' 'Ci' 'Cu' 'Cu' 'Cu' 'Ac' 'Ac' 'Ac' 'Ct' 'Ct'
 'Ct' 'Cs' 'Cs' 'Cs']


(numpy.ndarray, numpy.ndarray)

In [29]:
from sklearn.preprocessing import LabelEncoder

label_encoder = LabelEncoder()
train_aug_labels = label_encoder.fit_transform(train_aug_labels)
train_aug_labels = np.array(train_aug_labels, dtype='int32')
print(np.unique(train_aug_labels))
print(train_aug_labels.dtype)

val_aug_labels = label_encoder.transform(val_aug_labels)
val_aug_labels = np.array(val_aug_labels, dtype='int32')
print(np.unique(val_aug_labels))
print(val_aug_labels.dtype)



[0 1 2 3 4 5]
int32
[0 1 2 3 4 5]
int32


In [30]:
from tensorflow.keras.applications.mobilenet_v2 import preprocess_input
tr_seq=CloudSequence(train_aug_images, train_aug_labels, preprocess_function=preprocess_input)
val_seq=CloudSequence(val_aug_images_path, val_aug_labels)

In [31]:
tr_seq.__getitem__(0)[0].shape

(32, 224, 224, 3)

In [32]:
print("train 라벨 값 확인:", np.unique(train_aug_labels))
print("val 라벨 값 확인:", np.unique(val_aug_labels))



train 라벨 값 확인: [0 1 2 3 4 5]
val 라벨 값 확인: [0 1 2 3 4 5]


In [33]:
batch_images=[]
for img_path in images_path:

  image = cv2.imread(img_path)

  image = cv2.resize(image, (224,224))
  image = np.array(image)
  batch_images.append(image)

In [18]:
# @title 모델 학습
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.optimizers import Adam

model.compile(
    loss='sparse_categorical_crossentropy',
    optimizer=Adam(learning_rate=0.001), # 'adam' -> Adam() 실제 객체로 변환후 학습이 진행되고 있다?
    metrics=['accuracy']
)

early_stopping_cb = EarlyStopping(patience=6, verbose=1, restore_best_weights=True)
reduce_lr_on_plateau_cb = ReduceLROnPlateau(patience=3, factor=0.5, verbose=1)


# 학습
history = model.fit(
    tr_seq,
    epochs=30,
    batch_size=32,
    validation_data=val_seq,
    callbacks=[early_stopping_cb, reduce_lr_on_plateau_cb]
    )



Epoch 1/30


  self._warn_if_super_not_called()


[1m89/89[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m87s[0m 431ms/step - accuracy: 0.1456 - loss: 3.7249 - val_accuracy: 0.1667 - val_loss: 2.0648 - learning_rate: 0.0010
Epoch 2/30
[1m89/89[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 106ms/step - accuracy: 0.1860 - loss: 1.8661 - val_accuracy: 0.1667 - val_loss: 1.8733 - learning_rate: 0.0010
Epoch 3/30
[1m89/89[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 103ms/step - accuracy: 0.1866 - loss: 1.8200 - val_accuracy: 0.1667 - val_loss: 1.8444 - learning_rate: 0.0010
Epoch 4/30
[1m89/89[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 104ms/step - accuracy: 0.1940 - loss: 1.8387 - val_accuracy: 0.1667 - val_loss: 1.8584 - learning_rate: 0.0010
Epoch 5/30
[1m89/89[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 104ms/step - accuracy: 0.2654 - loss: 1.7641 - val_accuracy: 0.1667 - val_loss: 1.8635 - learning_rate: 0.0010
Epoch 6/30
[1m89/89[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 1

In [None]:

# 학습결과 시각화
pd.DataFrame(history.history).plot()
plt.show()

# 평가
loss, accuracy = model.evaluate(test_aug_images_path, test_aug_labels)
print(f'loss: {loss:.4f}, accuracy: {accuracy:.4f}')


In [None]:
"https://drive.google.com/file/d/1ZKIFRjaxSAssTO36STrG0XrHurUqfK1B/view?usp=sharing"

import gdown, zipfile, os

file_id = '1ZKIFRjaxSAssTO36STrG0XrHurUqfK1B'
output = 'file.zip'

gdown.download(f'https://drive.google.com/uc?id={file_id}', output, quiet=False)

output_dir = 'cloud'
os.makedirs(output_dir, exist_ok=True)

with zipfile.ZipFile(output, 'r') as z:
  z.extractall(output_dir)

In [None]:
# @title 모델 예측

pred_proba = model.predict(test_aug_images_path)
print(pred_proba.shape)

class_names = np.array(['Ac', 'As', 'Cb', 'Cc', 'Ci', 'Cs', 'Ct', 'Cu', 'Ns', 'Sc', 'St'])

def show_images(images, labels, ncols = 8):
  fig, ax = plt.subplots(nrows = 1, ncols = ncols, figsize = (20, 6))
  for i in range(ncols):
    ax[i].imshow(images[i])  # ndarray 전달해서 이미지 출력
    class_name = class_names[labels[i]]
    ax[i].set_title(class_name)

  plt.show()

pred = np.argmax(pred_proba, axis = 1)
for i in range(10):
  print(f'{i} : {class_names[pred[i]]}')

show_images(test_aug_images_path[:10], test_aug_labels[:10])
show_images(test_aug_images_path[:10], pred[:10])

In [16]:
model.save('best_cloud_mobilenet_2.keras')