##GANLearner - 敵対的生成ネットワークを用いた深層学習や、画像生成機能を提供するクラス

In [1]:
import os
import re
import math
import datetime
import numpy as np
import matplotlib.pyplot as plt
from tensorflow.keras.models import Sequential
 
class GANLearner:
    """敵対的生成ネットワークを用いた深層学習や、画像生成機能を提供するクラス。
 
        Parameters
        ----------
        generator : tensorflow.keras.models.Sequential\n
            生成器。識別器との結合は行われていない状態の生成器を指定する。\n
        discriminator :  tensorflow.keras.models.Sequential\n
            識別器のモデルを指定する。\n
        batch_size : int\n
            一度の学習で計算するデータ数を指定する。\n
            default : 128\n
            validation : >0\n
        z_dim : int\n
            生成画像の元となる特徴量を指定する。
            default : 100\n
            validation : >0\n
    """
 
    # property
    __generator = None
    __discriminator = None
    __batch_size = 0
    __z_dim = 0
    __total_epochs = 0
 
    @property
    def generator(self):
        """生成器を取得する。"""
        return self.__generator
 
    @property
    def discriminator(self):
        """識別器を取得する。"""
        return self.__discriminator
 
    @property
    def batch_size(self):
        """int : バッチサイズを取得、または設定する。\n
            validation : >0"""
        return self.__batch_size
 
    @batch_size.setter
    def batch_size(self, value):
        """int : バッチサイズを取得、または設定する。\n
            validation : >0"""
        self.__batch_size = self.__validateNaturalNumber(value)
 
    @property
    def z_dim(self):
        """int : 潜在変数の次元数を取得、または設定する。\n
            validation : >0"""
        return self.__z_dim
 
    @z_dim.setter
    def z_dim(self, value):
        """int : 潜在変数の次元数を取得、または設定する。\n
            validation : >0"""
        self.__z_dim = self.__validateNaturalNumber(value)
 
    @property
    def total_epochs(self):
        """int : インスタンスが生成されてから実行されたトレーニング数を取得する。\n"""
        return self.__total_epochs
 
    # コンストラクタ
    def __init__(self, generator, discriminator, batch_size, z_dim):
        self.batch_size = batch_size
        self.z_dim = z_dim
        self.__discriminator = discriminator
        self.__generator = generator
    
    # 入力チェック（自然数）
    def __validateNaturalNumber(self, value):
        if not isinstance(value, int):
            raise TypeError("argument type must be int.")
        elif value <= 0:
            raise ValueError("need more than 1 value.")
        else:
            return value
 
    # 生成器と識別器の結合とコンパイルを行う。
    def __recompile(self, generator, discriminator):
        
        # コンパイル確認
        if not generator._is_compiled or not discriminator._is_compiled:
            raise RuntimeError("You must compile a model before training.")
 
        # discriminatorがcompile済のため、trainableをfalseにしても、discriminatorは学習できる
        discriminator.trainable = False
 
        # 結合
        combined = Sequential([generator, discriminator])
 
        # 結合済のgeneratorをcompile
        if generator.compiled_metrics == None:
            combined.compile(
                loss=generator.loss,
                optimizer=generator.optimizer)
        else:
            combined.compile(
                loss=generator.loss,
                optimizer=generator.optimizer,
                metrics=generator.compiled_metrics._user_metrics)
        
        return combined
 
    def defit(self, x_train, epochs=1, *steps):
        """生成器と識別器を用いた学習を、エポック数回だけ実行する。
 
            Parameters
            ----------
            x_train : numpy.ndarray\n
                学習データ\n
            epochs :  int\n
                エポック数。学習を繰り返す回数を指定する。\n
                default : 1\n
                validation : >0\n
            steps : *int
                未使用（デバッグ用）
        """
        for i in self.__defit(self.__generator, self.__discriminator, x_train, self.__validateNaturalNumber(epochs), self.__batch_size, self.__z_dim):
            if i in steps:
                pass # 未使用（デバッグ用）
 
    # defit本体。epoch単位で結果をyieldする。
    def __defit(self, generator, discriminator, x_train, epochs, batch_size, z_dim):
        
        # GモデルとDモデルを結合しコンパイル
        combined = self.__recompile(generator, discriminator)
 
        # カウンタ
        cnt = 0
        y_batch = int(batch_size / 2)
 
        # 結果表示用テンプレート
        print_header = lambda : print(f"{self.total_epochs} [", end="")
        print_footer = lambda : print(f"] D_MODEL - loss:{d_loss_ave[0]:.4f}, acc:{d_loss_ave[1] * 100:.2f}  G_MODEL - loss:{np.average(g_loss):.4f}")
        
        while cnt < epochs:
 
            # 進捗
            self.__total_epochs += 1
            print_header()
            
            # 損失関数
            d_loss = np.zeros((batch_size, 2))
            g_loss = np.zeros(batch_size)
 
            for i in range(batch_size):
                
                # Generatorから生成 (num=batch_size/2)
                noise = np.random.normal(0, 1, (y_batch, z_dim))
                fake_imgs = generator.predict(noise)
                
                # 教師データ取得 (num=batch_size/2)
                idx = np.random.randint(0, x_train.shape[0], y_batch)
                imgs = x_train[idx]
 
                # Discriminatorの学習 - 訓練データを"1"と認識するよう、欠損値関数を元に学習
                d_loss_real = discriminator.train_on_batch(imgs, np.ones((y_batch, 1)))
 
                # Discriminatorの学習 - 訓練データを"0"と認識するよう、欠損値関数を元に学習
                d_loss_fake = discriminator.train_on_batch(fake_imgs, np.zeros((y_batch, 1)))
 
                # Generatorの学習 - ランダムな画像データを生成
                noise = np.random.normal(0, 1, (batch_size, z_dim))
 
                # Generatorの学習 - 生成データをDモデルが"1"と認識するように、損失関数を元に学習
                g_loss[i] = combined.train_on_batch(noise, np.array([1] * batch_size))
 
                # Discriminatorの損失関数の平均
                d_loss[i] = np.add(d_loss_real, d_loss_fake) / 2
 
                # 進捗表示
                if i % int(batch_size / 25) == 0: print("=", end="")
            
            # 損失関数表示
            d_loss_ave = np.average(d_loss, axis=0)
            print_footer()
            
            # 戻り値
            yield cnt
            cnt += 1
 
    def generateImage(self, fileName=None, num=20):
        """学習データを基に、生成器で画像を生成する。
 
            Parameters
            ----------
            fileName : string\n
                保存する画像ファイルのフルパスを指定する。指定しない場合は保存しない。\n
                default : None\n
            num :  int\n
                生成する画像の個数を指定する。\n
                default : 20\n
                validation : >0\n
            returns
            -------
            fig : figManager.canvas.figure\n
                生成した画像を返す。
        """
 
        # plotのrow、col
        c = math.ceil(math.sqrt(self.__validateNaturalNumber(num)))
        r = math.ceil(num / c)
 
        # numpy.random.normal(平均, 標準偏差 (配列の個数, 次元数))
        noise = np.random.normal(0, 1, (num, self.__z_dim))
 
        # 画像生成
        gen_imgs = self.__generator.predict(noise)
 
        # 生成画像を0-1に再スケール
        gen_imgs = 0.5 * gen_imgs + 0.5
 
        # plotの初期化
        plt.ioff()
        fig, axs = plt.subplots(r, c)
 
        # 画像表示用ラムダ式
        func = lambda ax, idx: [ax.axis("off"), ax.imshow(gen_imgs[idx, :, :, 0], cmap="gray")]
 
        # 画像表示
        if num == 1:
            func(axs, 0)
        elif num == 2:
            func(axs[0], 0)
            func(axs[1], 1)
        else:
            cnt = 0
            for i in range(r):
                for j in range(c):
                    if cnt < num: func(axs[i, j], cnt)
                    else: axs[i, j].axis("off")
                    cnt += 1
 
        # 保存してclose
        if fileName != None: fig.savefig(fileName)
        return fig
 
    def saveWeights(self, path):
        """学習データを保存する。
 
            Parameters
            ----------
            path : string\n
                保存するファイルのパス、またはディレクトリを指定する。\n
                ディレクトリを指定した場合、保存ファイル名は{yyyymmddhhmmss}_mnist_{total_epochs}.h5となる。\n
            returns
            -------
            filepath : string\n
                保存したファイルのパスを返す。
        """
 
        # 保存ファイル名のテンプレ
        SAVE_FILE_NAME = lambda : os.path.join(path, f"{dt}_mnist_{self.total_epochs}.h5")
 
        # ディレクトリの場合はテンプレで保存
        if os.path.isdir(path):
            dt = datetime.datetime.today().strftime('%Y%m%d%H%M%S')
            filename = SAVE_FILE_NAME()
        else:
            filename = path
        
        # 再コンパイルして保存
        combined = self.__recompile(self.__generator, self.__discriminator)
        combined.save_weights(filename)
 
        # 保存したファイルパスを戻す
        return filename
 
    def loadWeights(self, path):
        """学習データを読込む。
 
            Parameters
            ----------
            path : string\n
                読込むファイルのパス、またはディレクトリを指定する。\n
                ディレクトリを指定した場合、ファイル名が\d{14}_mnist_\d+\.h5であるファイルの内、ソート後の順が最後尾のファイルを使用する。\n
            returns
            -------
            filepath : string\n
                保存したファイルのパスを返す。
            raises
            ------
            FileNotFoundError
                読込むファイルが存在しない場合発生する。
        """
 
        # 読込みファイル名の正規表現
        LOAD_FILE_NAME = "\d{14}_mnist_(\d+)\.h5"
 
        # ディレクトリの場合はLOAD_FILE_NAMEをソート後の最後尾
        if os.path.isdir(path):
 
            # ファイルのみ取得
            files = [f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))]
            if len(files) == 0:
                raise FileNotFoundError(f"No such file or directory: '{path}/*'")
            
            # 学習ファイルを検索
            files = [f for f in files if re.match(LOAD_FILE_NAME, f)]
            if len(files) == 0:
                raise FileNotFoundError(f"No such file or directory: '{path}/*'")
            
            # 最新のファイル（最後尾）を取得
            files = sorted(files)
            filename = os.path.join(path, files[-1])
 
            # epoch番号取得
            num = re.match(LOAD_FILE_NAME, files[-1])
            self.__total_epochs = int(num.group(1))
            
        else:
            filename = path
 
        # 再コンパイルして読込み
        combined = self.__recompile(self.__generator, self.__discriminator)
        combined.load_weights(filename)
 
        # 読込んだファイルパスを戻す
        return filename

##models - generator, discriminatorを定義する

In [2]:
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Input, Dense, Reshape, Flatten, Dropout, Activation
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.layers import LeakyReLU
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.layers import UpSampling2D, Conv2D

def GAN_MODEL_1(noise_shape, img_shape):

    # Gモデル
    generator = Sequential()
    generator.add(Dense(256, activation="relu", input_shape=noise_shape))   # 入力層
    generator.add(Dense(512, activation="relu"))                            # 中間層
    generator.add(Dense(1024, activation="relu"))                           # 中間層
    generator.add(Dense(np.prod(img_shape), activation="tanh"))             # 中間層
    generator.add(Reshape(img_shape))                                       # 出力層
    generator.summary()

    # Dモデル
    discriminator = Sequential()
    discriminator.add(Flatten(input_shape=img_shape))
    discriminator.add(Dense(512, activation="relu"))    # 入力層
    discriminator.add(Dense(256, activation="relu"))    # 中間層
    discriminator.add(Dense(1, activation="sigmoid"))   # 出力層
    discriminator.summary()

    # 最適化手法
    optimizer = Adam(0.0002, 0.5)

    # モデル構築
    discriminator.compile(loss="binary_crossentropy", optimizer=optimizer, metrics=["accuracy"])
    generator.compile(loss="binary_crossentropy", optimizer=optimizer)

    return generator, discriminator


def DCGAN_MODEL_1(noise_shape, img_shape):
    
    generator = Sequential()
    generator.add(Dense(1024, activation="relu", input_shape=noise_shape))
    generator.add(BatchNormalization()) #平均を0、分散が1に
    generator.add(Dense(7 * 7 * 128, activation="relu"))
    generator.add(BatchNormalization())
    generator.add(Reshape((7, 7, 128), input_shape=(7 * 7 * 128, ))) # 畳み込み層に7*7の画像を与える
    generator.add(UpSampling2D((2, 2))) # UpSamplingをし、画像を14*14にする 間の要素は0で埋める
    generator.add(Conv2D(64, kernel_size=(5, 5), padding="same", activation="relu"))
    generator.add(UpSampling2D((2, 2))) # UpSamplingをし、画像を28*28にする
    generator.add(Conv2D(1, kernel_size=(5, 5), padding="same", activation="tanh")) # tanh:双曲線正接関数 (0,0基点の点対称S字型曲線で-1.0～1.0の値をとる)
    generator.summary()

    leakyReLU = LeakyReLU(alpha=0.2)
    discriminator = Sequential()
    discriminator.add(Conv2D(64, kernel_size=(5, 5), strides=(2, 2), padding="same", activation=leakyReLU, input_shape=(img_shape))) # 畳み込む（ゼロパディングをし5*5のフィルタを2マスごとに動かす）
    discriminator.add(Conv2D(128, kernel_size=(5, 5), strides=(2, 2), activation=leakyReLU)) # 畳み込み2回目
    discriminator.add(Flatten()) # 入力の平坦化
    discriminator.add(Dense(256, activation=leakyReLU))
    discriminator.add(Dropout(0.5))
    discriminator.add(Dense(1, activation="sigmoid"))
    discriminator.summary()

    # 最適化関数
    optimizer = Adam(lr=0.0001, beta_1=0.5)

    # モデル構築
    discriminator.compile(loss="binary_crossentropy", optimizer=optimizer, metrics=["accuracy"])
    generator.compile(loss="binary_crossentropy", optimizer=optimizer)

    return generator, discriminator


##学習準備

In [3]:
import numpy as np
from tensorflow.keras.datasets import mnist

#入力画像
IMG_ROWS  = 28 
IMG_COLS  = 28
IMG_CHNL  = 1
IMG_SHAPE = (IMG_ROWS, IMG_COLS, IMG_CHNL)

# 潜在変数の次元数
Z_DIM = 100

# バッチサイズ
BAT_SIZE = 128

# テストデータ (mnist)
(x_train, _), (_, _) = mnist.load_data()
x_train = (x_train.astype(np.float32) - 127.5) / 127.5
x_train = np.expand_dims(x_train, axis=3)

# Gモデル、 Dモデル
generator, discriminator = DCGAN_MODEL_1((Z_DIM,), IMG_SHAPE)

# 学習用モジュール
gan = GANLearner(
    generator=generator,
    discriminator=discriminator,
    batch_size=BAT_SIZE,
    z_dim=Z_DIM
)


Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense (Dense)               (None, 1024)              103424    
                                                                 
 batch_normalization (BatchN  (None, 1024)             4096      
 ormalization)                                                   
                                                                 
 dense_1 (Dense)             (None, 6272)              6428800   
                                                                 
 batch_normalization_1 (Batc  (None, 6272)             25088     
 hNormalization)                                                 
                                                                 
 reshape (Reshape)           (None, 7, 7, 128)         0         
                                                                 
 up_sampling2d (UpSampling2D  (None, 14, 14, 128)      0

  super(Adam, self).__init__(name, **kwargs)


##トレーニング実行 & 画像生成

In [5]:
import datetime

# 初回のみ
try:
    root = root
except NameError:
    try:
        # Google Driveのマウント
        from google.colab import drive
        drive.mount("/content/drive")
        root = "drive/My Drive/GAN"
    except ModuleNotFoundError:
        # ローカルの場合
        root = "."
    finally:
        # ログ
        dt = datetime.datetime.today().strftime('%Y%m%d%H%M%S')
        gan.log_file_path = f"{root}/log/{dt}.log"
        
        # 学習ファイル読込み
        try:
            gan.loadWeights(f"{root}/wgh")
            print("前回のチェックポイントから再開")
        except FileNotFoundError:
            print("最初から学習開始")

# 学習開始
for _ in range(10):
    gan.defit(x_train, epochs=10)
    gan.saveWeights(f"{root}/wgt")
    gan.generateImage(f"{root}/img/{datetime.datetime.today().strftime('%Y%m%d%H%M%S')}_mnist_{gan.total_epochs}.png", 20)


