In [7]:
import tensorflow as tf
from tensorflow import keras
from sklearn.model_selection import train_test_split
import numpy as np
import matplotlib.pyplot as plt
import os
import pandas as pd

In [None]:
"""
CNNによる２クラス画像分類に使うデータセットを読み込む関数を作成する。

# 状況設定
・クラスごとに別々のフォルダに画像が格納されている。ただ、クラス名の下にサブクラスフォルダがある2階層構造になっている。
・フォルダ名がクラス名になっている
・画像のファイル名は識別idになっている
・画像のサイズは同じ
・画像はRGBAの4チャネルカラー画像

# フォルダ構成例
images/class1/subclass1/1.png
                        2.png
                        3.png
             /subclass2/1.png
                        2.png
                        3.png
images/class2/subclass1/1.png
                        2.png
                        3.png
             /subclass2/1.png
                        2.png
                        3.png
# 処理の流れ
・クラスごとに画像を読み込む。CNNで使用クラスにはサブクラスで識別せず、クラス名で識別する。
・つまり、class1/subclass1/1.pngとclass1/subclass2/1.pngは同じクラスの画像として扱う。
・画像を結合した後にシャッフルする。
・画像の値を0~1に正規化する。
・画像を訓練用とテスト用に分割する。
・クラスごとにどの画像を訓練用に、どの画像をテスト用にしたかを記録する。
・その際、画像名はクラスが違うと重複する。例えば、Class1/1.pngとClass2/1.pngは別の画像だが、画像名は同じである。
・そのため、画像名の前にクラス名を付けて重複を回避する。
・返り値は、train_dataとtest_dataの2つのリストである。
・各返り値の構成は以下の通りである。
    train_data = np.ndarray([(訓練用画像パス1、訓練用画像配列1、訓練用画像ラベル1),
                                     (訓練用画像パス2、訓練用画像配列2、訓練用画像ラベル2),])
    test_data = np.ndarray([(テスト用画像パス1、テスト用画像配列1、テスト用画像ラベル1),
                                     (テスト用画像パス2、テスト用画像配列2、テスト用画像ラベル2),])

# 以下に関数を定義していく。
"""

In [None]:
"""
TensorFlowを使った２クラス画像分類CNNを作成する。
その際、学習データが多い場合にそれらのデータを一度に全てGPUのメモリ上に乗せるのではなく、
少しずつ読み込みながら学習を進めるにはTensorFlow, Kerasの何を使えばいいか？
"""

In [2]:
import os
import random
from PIL import Image
import numpy as np

def load_dataset(data_dir, train_ratio=0.8):
    classes = os.listdir(data_dir)
    num_classes = len(classes)
    class_indices = {classes[i]: i for i in range(num_classes)}
    train_data, test_data = [], []
    for class_name in classes:
        class_dir = os.path.join(data_dir, class_name)
        subclass_dirs = os.listdir(class_dir)
        for subclass_dir in subclass_dirs:
            subclass_path = os.path.join(class_dir, subclass_dir)
            image_files = os.listdir(subclass_path)
            random.shuffle(image_files)
            num_images = len(image_files)
            num_train = int(num_images * train_ratio)
            train_images = image_files[:num_train]
            test_images = image_files[num_train:]
            for image_file in train_images:
                image_path = os.path.join(subclass_path, image_file)
                image = Image.open(image_path).convert('RGBA')
                image = np.array(image)
                image = image / 255.0
                train_data.append((image_path, image, class_indices[class_name]))
            for image_file in test_images:
                image_path = os.path.join(subclass_path, image_file)
                image = Image.open(image_path).convert('RGBA')
                image = np.array(image)
                image = image / 255.0
                test_data.append((image_path, image, class_indices[class_name]))
    random.shuffle(train_data)
    random.shuffle(test_data)
    train_data = [(f"{classes[label]}/{os.path.basename(path)}", image, label) for path, image, label in train_data]
    test_data = [(f"{classes[label]}/{os.path.basename(path)}", image, label) for path, image, label in test_data]
    return np.array(train_data), np.array(test_data)

In [3]:
train_data, test_data = load_dataset(r'Z:\miyata\RiverShapeRecognition_exp\test\output')

In [4]:
train_data[0][1].shape

(504, 448, 4)

# CNNの実装

２値分類を行うCNNを実装。入力は(504, 448, 4)の画像で0~1に正規化済み


In [5]:
from tensorflow import keras
from tensorflow.keras import layers

model = keras.Sequential()

# 畳み込み層1  
model.add(layers.Conv2D(32, kernel_size=3, activation='relu', input_shape=(504, 448, 4)))
model.add(layers.MaxPool2D(2, 2))

# 畳み込み層2
model.add(layers.Conv2D(64, kernel_size=3, activation='relu')) 
model.add(layers.MaxPool2D(2, 2))

# 全結合層
model.add(layers.Flatten())
model.add(layers.Dense(128, activation='relu'))

# 出力層
model.add(layers.Dense(1, activation='sigmoid'))

model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])

In [6]:
model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d (Conv2D)             (None, 502, 446, 32)      1184      
                                                                 
 max_pooling2d (MaxPooling2D  (None, 251, 223, 32)     0         
 )                                                               
                                                                 
 conv2d_1 (Conv2D)           (None, 249, 221, 64)      18496     
                                                                 
 max_pooling2d_1 (MaxPooling  (None, 124, 110, 64)     0         
 2D)                                                             
                                                                 
 flatten (Flatten)           (None, 872960)            0         
                                                                 
 dense (Dense)               (None, 128)               1

In [8]:
X_train = np.array([train_data[i][1] for i in range(len(train_data))])
y_train = np.array([train_data[i][2] for i in range(len(train_data))])
X_test = np.array([test_data[i][1] for i in range(len(test_data))])
y_test = np.array([test_data[i][2] for i in range(len(test_data))])

X_train, X_valid, y_train, y_valid = train_test_split(X_train, y_train, test_size=0.2)

print(X_train.shape)

(12, 504, 448, 4)


In [None]:
# チェックポイントの作成
# 検証セットで最高性能を達成したモデルを保存する
# 保存先は./testmodel
dirpath = "./testmodel"
if not os.path.exists(dirpath):
    os.mkdir(dirpath)
filepath = os.path.join(dirpath, "channelrecog.h5")
checkpoint_cb = keras.callbacks.ModelCheckpoint(filepath, save_best_only=True)

# 早期打ち切り
# 10エポック連続で検証セットで性能が向上しなければ、学習を打ち切る
ealystopping_cb = keras.callbacks.EarlyStopping(patience=10, restore_best_weights=True)

history = model.fit(X_train, y_train, epochs=100, batch_size=32, validation_data=(X_valid, y_valid), callbacks=[checkpoint_cb, ealystopping_cb])

# テストセットで性能を確認する
model.evaluate(X_test, y_test)

# 学習曲線をプロットする
pd.DataFrame(history.history).plot(figsize=(8, 5))
plt.grid(True)
plt.gca().set_ylim(0, 1)
plt.show()