In [1]:
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import Reshape
from keras.layers.core import Activation
from keras.layers.normalization import BatchNormalization
from keras.layers.convolutional import UpSampling2D
from keras.layers.convolutional import Conv2D, MaxPooling2D
from keras.layers.advanced_activations import LeakyReLU
from keras.layers.core import Flatten, Dropout
from keras.optimizers import Adam
from keras.datasets import mnist
import numpy as np
from PIL import Image
import argparse
import math

  from ._conv import register_converters as _register_converters
Using TensorFlow backend.


In [2]:
EPOCH = 30
PNG_CREATE = 20
N_CLASS = 10

In [3]:
# 偽物を作る生成機
def auto_encoder_generator_model():
    model = Sequential()
    # encoder
    model.add(Conv2D(16, (3, 3), padding='same', input_shape=(28, 28, 1)))
    model.add(MaxPooling2D((2, 2), padding='same'))
    model.add(Conv2D(8, (3, 3), padding='same'))
    model.add(Activation('relu'))
    model.add(MaxPooling2D((2, 2), padding='same'))
    model.add(Conv2D(8, (3, 3), padding='same'))
    model.add(Activation('relu'))
    #model.add(MaxPooling2D((2, 2), padding='same'))
    # decoder
    model.add(Conv2D(8, (3, 3), padding='same'))
    model.add(Activation('relu'))
    model.add(UpSampling2D(size=(2, 2)))
    model.add(Conv2D(8, (3, 3), padding='same'))
    model.add(Activation('relu'))
    model.add(UpSampling2D(size=(2, 2)))
    model.add(Conv2D(16, (3, 3), padding='same'))
    #model.add(UpSampling2D(size=(2, 2)))
    model.add(Conv2D(1, (3, 3), padding='same'))
    model.add(Activation('sigmoid'))
    
    print('*** Auto encoder generator model ***')
    model.summary()
    
    return model

In [4]:
# 偽物を検知する発見機
def discriminator_model():
    model = Sequential()
    model.add(Conv2D(64, (5, 5), padding='same', input_shape=(28, 28, 1)))
    model.add(LeakyReLU(0.2))
    model.add(Conv2D(128, (5, 5), subsample=(2, 2))) # subsampleでダウンサンプリング
    model.add(LeakyReLU(0.2))
    model.add(Flatten())
    model.add(Dense(1024))
    model.add(LeakyReLU(0.2))
    model.add(Dropout(0.5))
    model.add(Dense(N_CLASS)) 
    model.add(Activation('sigmoid'))
    print('*** discriminator model ***')
    model.summary()

    return model

In [5]:
def generator_containing_discriminator(g, d):
    model = Sequential()
    model.add(g)
    d.trainable = False
    model.add(d)
    model.summary()

    return model

In [6]:
def combine_images(generated_images):
    num = generated_images.shape[0]
    width = int(math.sqrt(num))
    height = int(math.ceil(float(num)/width))
    shape = generated_images.shape[1:3]
    image = np.zeros((height*shape[0], width*shape[1]),
                     dtype=generated_images.dtype)
    for index, img in enumerate(generated_images):
        i = int(index/width)
        j = index % width
        image[ i*shape[0]:(i+1)*shape[0], j*shape[1]:(j+1)*shape[1] ] = img[:, :, 0]

    return image

In [7]:
def train(BATCH_SIZE):
    ####
    # Dataの読み込み
    ####
    (X_train, y_train), (X_test, y_test) = mnist.load_data()
    print ("データのサイズ")
    print ("X_train: %s, y_train:%s" % (X_train.shape, y_train.shape))
    print ("X_test:  %s, y_test: %s" % (X_test.shape, y_test.shape))
    X_train = (X_train.astype(np.float32) - 127.5)/127.5
    X_train = X_train[:, :, :, None] # channelは使用しない為None設定
    X_test  = X_test [:, :, :, None] # channelは使用しない為None設定
    
    def create_label_data(in_y_data):
        out_data = []
        for index, data in enumerate(in_y_data):
            if data == 0:
                out_data.append([1,0,0,0,0,0,0,0,0,0])
            elif data == 1:
                out_data.append([0,1,0,0,0,0,0,0,0,0])
            elif data == 2:
                out_data.append([0,0,1,0,0,0,0,0,0,0])
            elif data == 3:
                out_data.append([0,0,0,1,0,0,0,0,0,0])
            elif data == 4:
                out_data.append([0,0,0,0,1,0,0,0,0,0])
            elif data == 5:
                out_data.append([0,0,0,0,0,1,0,0,0,0])
            elif data == 6:
                out_data.append([0,0,0,0,0,0,1,0,0,0])
            elif data == 7:
                out_data.append([0,0,0,0,0,0,0,1,0,0])
            elif data == 8:
                out_data.append([0,0,0,0,0,0,0,0,1,0])
            else:
                out_data.append([0,0,0,0,0,0,0,0,0,1])
        return np.array(out_data)
    
    # yのデータを多次元化
    ylabel_train = create_label_data(y_train)
    ylabel_test = create_label_data(y_test)
    
    # generatorで生成した画像について間違えさせたいラベルを生成（ターゲットは適当）
    def create_target_data(in_ylabel_data):
        out_data = []
        for index, data in enumerate(in_ylabel_data):
            if data[0] == 1:
                # 0を1と間違えさせるようにデータを生成する
                out_data.append([0,1,0,0,0,0,0,0,0,0])
            elif data[1] == 1:
                out_data.append([0,0,1,0,0,0,0,0,0,0])
            elif data[2] == 1:
                out_data.append([0,0,0,1,0,0,0,0,0,0])
            elif data[3] == 1:
                out_data.append([0,0,0,0,1,0,0,0,0,0])
            elif data[4] == 1:
                out_data.append([0,0,0,0,0,1,0,0,0,0])
            elif data[5] == 1:
                out_data.append([0,0,0,0,0,0,1,0,0,0])
            elif data[6] == 1:
                out_data.append([0,0,0,0,0,0,0,1,0,0])
            elif data[7] == 1:
                out_data.append([0,0,0,0,0,0,0,0,1,0])
            elif data[8] == 1:
                out_data.append([0,0,0,0,0,0,0,0,0,1])
            else:
                out_data.append([1,0,0,0,0,0,0,0,0,0])
        return np.array(out_data)
    
    # target用データを生成
    target_ylabel_train = create_target_data(ylabel_train)
    target_ylabel_test = create_target_data(ylabel_test)

    ####
    # モデルの準備
    ####
    # discriminatorとgeneratorの作成
    d = discriminator_model()
    g = auto_encoder_generator_model()
    
    # discriminatorとgeneratorのCONCATをしてAdversarial Training
    d_on_g = generator_containing_discriminator(g, d) # この内部では d.trainable=False となっているためdは学習しない
    
    # discriminatorとgenerator,Adversarial Trainingの最適化手法としてAdamを利用する
    d_optim = Adam(lr=0.001, beta_1=0.9, beta_2=0.999, epsilon=1e-08, decay=0.0)
    g_optim = Adam(lr=0.001, beta_1=0.9, beta_2=0.999, epsilon=1e-08, decay=0.0) # 
    d_on_g_optim = Adam(lr=0.001, beta_1=0.9, beta_2=0.999, epsilon=1e-08, decay=0.0)
    g.compile(loss='binary_crossentropy', optimizer=g_optim)
    d_on_g.compile(loss='binary_crossentropy', optimizer=d_on_g_optim)
    d.trainable = True # Discriminatorの学習をONにしておく
    d.compile(loss='binary_crossentropy', optimizer=d_optim)

    ####
    # epoch数分のループ
    ####
    for epoch in range(EPOCH):
        print("Epoch is", epoch)
        print("Number of batches", int( X_train.shape[0] / BATCH_SIZE ))

        ####
        # batchのループ
        ####
        for index in range(int( X_train.shape[0] / BATCH_SIZE )):
            image_batch = X_train[index*BATCH_SIZE:(index+1)*BATCH_SIZE] # 学習画像をBatch分、取得する
            ylabel_train_batch = ylabel_train[index*BATCH_SIZE:(index+1)*BATCH_SIZE]
            target_ylabel_train_batch = target_ylabel_train[index*BATCH_SIZE:(index+1)*BATCH_SIZE]
            
            generated_images = g.predict(image_batch, verbose=0) # Auto encoder でAdversarial画像データを生成
            
            ####
            # 指定回数の度に、状況をPNG画像で確認
            ####
            if index % PNG_CREATE == 0:
                image = combine_images(generated_images)
                image = image*127.5+127.5
                print('*** Generate Image by Auto encoder ***')
                Image.fromarray(image.astype(np.uint8)).save(str(epoch)+"_"+str(index)+".png")

            ####
            # 評価
            ####
            print ("image_batch: %s, generated_images:%s" % (image_batch.shape, generated_images.shape))
            print ("ylabel_train_batch: %s, target_ylabel_train_batch:%s" % (ylabel_train_batch.shape, target_ylabel_train_batch.shape))
            X = np.concatenate((image_batch, generated_images)) # 学習画像と生成画像のCONCAT
            y = np.concatenate((ylabel_train_batch, target_ylabel_train_batch)) # 正解ラベルと生成画像に付与するtargetラベルのCONCAT
            
            # discriminatorで評価(入力画像に対して付与されたラベルを正しく検出できるかどうか)
            d_loss = d.train_on_batch(X, y)
            print("batch %d D_loss : %f" % (index, d_loss))
            
            # generatorを評価(Adversarial画像が生成できているかどうか)
            d.trainable = False # discriminatorの学習をOFFにする
            g_loss = d_on_g.train_on_batch(image_batch, ylabel_train)
            d.trainable = True # discriminatorの学習をONにする
            print("batch %d G_loss : %f" % (index, g_loss))
    
            # 適度にモデルのパラメータを出力する
            if index % 10 == 9:
                g.save_weights('generator', True)
                d.save_weights('discriminator', True)

In [None]:
train(BATCH_SIZE=100)

データのサイズ
X_train: (60000, 28, 28), y_train:(60000,)
X_test:  (10000, 28, 28), y_test: (10000,)


  


*** discriminator model ***
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d_1 (Conv2D)            (None, 28, 28, 64)        1664      
_________________________________________________________________
leaky_re_lu_1 (LeakyReLU)    (None, 28, 28, 64)        0         
_________________________________________________________________
conv2d_2 (Conv2D)            (None, 12, 12, 128)       204928    
_________________________________________________________________
leaky_re_lu_2 (LeakyReLU)    (None, 12, 12, 128)       0         
_________________________________________________________________
flatten_1 (Flatten)          (None, 18432)             0         
_________________________________________________________________
dense_1 (Dense)              (None, 1024)              18875392  
_________________________________________________________________
leaky_re_lu_3 (LeakyReLU)    (None, 1024)       

In [None]:
def generate(BATCH_SIZE, nice=False):
    g = auto_encoder_generator_model()
    g_optim = Adam(lr=0.001, beta_1=0.9, beta_2=0.999, epsilon=1e-08, decay=0.0)
    g.compile(loss='binary_crossentropy', optimizer=g_optim)
    g.load_weights('generator')
    if nice:
        d = discriminator_model()
        d_optim = Adam(lr=0.001, beta_1=0.9, beta_2=0.999, epsilon=1e-08, decay=0.0)
        d.compile(loss='binary_crossentropy', optimizer=g_optim)
        d.load_weights('discriminator')

        noise = np.random.uniform(-1, 1, (BATCH_SIZE*20, 100))
        generated_images = g.predict(noise, verbose=1)
        d_pret = d.predict(generated_images, verbose=1)
        index  = np.arange(0, BATCH_SIZE*20)
        index.resize((BATCH_SIZE*20, 1))
        pre_with_index = list(np.append(d_pret, index, axis=1))
        pre_with_index.sort(key=lambda x: x[0], reverse=True)
        nice_images = np.zeros((BATCH_SIZE,) + generated_images.shape[1:3], dtype=np.float32)
        nice_images = nice_images[:, :, :, None]

        for i in range(BATCH_SIZE):
            idx = int(pre_with_index[i][1])
            nice_images[i, :, :, 0] = generated_images[idx, :, :, 0]
        image = combine_images(nice_images)
    else:
        noise = np.random.uniform(-1, 1, (BATCH_SIZE, 100))
        generated_images = g.predict(noise, verbose=1)
        image = combine_images(generated_images)
    image = image*127.5+127.5
    Image.fromarray(image.astype(np.uint8)).save("generated_image.png")

In [None]:
generate(BATCH_SIZE=100, nice=False)