In [None]:
import os, io, math, tarfile, gzip, zipfile
import numpy as np
import pylab as plt
from PIL import Image
from datetime import datetime
from keras.models import Sequential
from keras.layers import Dense, Activation, Reshape, Flatten, Dropout
from keras.layers.advanced_activations import LeakyReLU
from keras.layers.normalization import BatchNormalization
from keras.layers.convolutional import UpSampling2D, Conv2D
from keras.optimizers import Adam

In [None]:
def mkdir(path):
    if not os.path.exists(path): os.mkdir(path)
        
def load_data_gz(path, resize):
    with gzip.open(file_path, 'rb') as f:
        imgs = np.frombuffer(f.read(), np.uint8, offset=16).reshape(-1,*resize)
    f.close()
    print("データ件数：%s、縦：%s、横：%s、チャンネル数：%s" % (imgs.shape[0], imgs.shape[1], imgs.shape[2], 1))
    return imgs.astype(int)

def load_data_zip(path, resize):
    with zipfile.ZipFile(path, 'r') as zf:
        file_num, img_num = len(zf.namelist()), 0
        imgs = np.empty((file_num, *resize, 3))
        for idx, f in enumerate(zf.namelist()):
            try:
                img      = Image.open(io.BytesIO(zf.read(f))).resize(resize)
                img_ary  = np.array(img)
                imgs[img_num,:,:,:] = img_ary
                img_num += 1
            except:
                print(f, "は読み込めませんでした。")
        print("%s/%s ファイル読み込めました。" % (img_num, file_num))
        return imgs[:img_num]

def load_data_tar(path, names, resize):
    imgs = np.empty((len(names)*1000, *resize, 3))
    num  = 0
    tar  = tarfile.open(path, 'r')
    for ti in tar:
        if sum([1 if -1 < ti.name.find(n) else 0 for n in names]) == 0:
            continue
        if ti.name.find(".jpg")==-1:
            print("%sを読み込んでいます。" % ti.name)
            continue
        try:
            img = tar.extractfile(ti.name)
            img = Image.open(io.BytesIO(img.read())).resize(resize)
            imgs[num,:,:,:] = np.array(img)
            num += 1
        except:
            print("%sは読み込めませんでした。" % ti.name)
    tar.close()
    imgs = imgs[:num]
    print("データ件数：%s、縦：%s、横：%s、チャンネル数：%s" % (imgs.shape[0], imgs.shape[1], imgs.shape[2], imgs.shape[3]))
    return imgs.astype(int)

def combine_images(generated_images, generated_images_path,
                   epoch="", index="", save=True, show=True):
    total = generated_images.shape[0]
    cols  = int(math.sqrt(total))
    rows  = math.ceil(float(total)/cols)
    plt.figure(figsize=(4, 4), dpi=170)
    if epoch != "" and index != "":
        plt.suptitle('epoch=%04d,index=%04d' % (epoch, index), fontsize=10)
    for i in range(total):
        plt.subplot(rows, cols, i + 1)
        plt.imshow(generated_images[i].astype(int))
        plt.gray()
        plt.xticks([]), plt.yticks([])
    if save:
        filename = generated_images_path + "%04d_%04d.png" % (epoch, index)
        plt.savefig(filename)
    if show:
        plt.show()

In [None]:
class dcgan():
    
    def __init__(self, path, inputs_shape):
        self.path_imgs    = path + "generated_images/"
        self.path_models  = path + "models/"
        mkdir(path)
        mkdir(self.path_imgs)
        mkdir(self.path_models)
        self.inputs_shape = inputs_shape
        self.units        = int(inputs_shape[0]/4)
        self.history      = []
    
    def generator_model(self):
        model = Sequential()
        model.add(Dense(input_dim=100, units=1024))
        model.add(BatchNormalization())
        model.add(Activation('relu'))
        model.add(Dense(self.units*self.units*128))
        model.add(BatchNormalization())
        model.add(Activation('relu'))
        model.add(Reshape((self.units, self.units, 128), input_shape=(self.units*self.units*128,)))
        model.add(UpSampling2D(size=(2, 2)))
        model.add(Conv2D(64, (5, 5), padding='same'))
        model.add(BatchNormalization())
        model.add(Activation('relu'))
        model.add(UpSampling2D(size=(2, 2)))
        model.add(Conv2D(self.inputs_shape[2], (5, 5), padding='same'))
        model.add(Activation('tanh'))
        return model

    def discriminator_model(self):
        model = Sequential()
        model.add(Conv2D(64, (5, 5), strides=(2, 2), padding='same', input_shape=self.inputs_shape))
        model.add(LeakyReLU(0.2))
        model.add(Conv2D(128, (5, 5), strides=(2, 2)))
        model.add(LeakyReLU(0.2))
        model.add(Conv2D(256, (5, 5), strides=(2, 2)))
        model.add(LeakyReLU(0.2))
        model.add(Flatten())
        model.add(Dense(256))
        model.add(LeakyReLU(0.2))
        model.add(Dropout(0.5))
        model.add(Dense(1))
        model.add(Activation('sigmoid'))
        return model
    
    def create_models(self):
        self.D = self.discriminator_model()
        self.G = self.generator_model()
        self.D.trainable = False
        self.DCGAN       = Sequential([self.G, self.D])
        self.DCGAN.compile(loss='binary_crossentropy',
                           optimizer=Adam(lr=2e-4, beta_1=0.5))
        self.D.trainable = True
        self.D.compile(loss='binary_crossentropy',
                       optimizer=Adam(lr=1e-5, beta_1=0.1))
        self.G.compile(loss='binary_crossentropy', optimizer="SGD")
        
    def set_weights(self, g_weights, d_weights):
        self.G.load_weights(self.path_models + g_weights)
        self.D.load_weights(self.path_models + d_weights)
        
    def generate_images(self, save=False, show=True):
        noise = np.random.uniform(-1, 1, size=(self.batch_size, 100))
        generated_images = self.G.predict(noise, verbose=0)
        plot_data = generated_images.astype('float32')*127.5 + 127.5
        if self.inputs_shape[2]==1:
            plot_data = plot_data.reshape((self.batch_size, *self.inputs_shape[:2]))
        else:
            plot_data = plot_data.reshape((self.batch_size, *self.inputs_shape))
        combine_images(plot_data, self.path_imgs, 9999, 9999, save=save, show=show)
        
    def fit(self, x, batch, start, end, batch_output=0, epoch_output=0):
        start_time = datetime.now()
        self.batch_size = batch
        g_losses = []
        d_losses = []
        # 学習データの値域を -1 ～ 1 にする
        x_train = (x.astype(np.float32) - 127.5)/127.5
        x_train = x_train.reshape(x_train.shape[0], x_train.shape[1], x_train.shape[2], self.inputs_shape[2])
        # 学習開始
        num_batches = int(x_train.shape[0] / batch)
        print('Number of batches:', num_batches)
        for epoch in range(start, end):
            for index in range(num_batches):
                noise       = np.random.uniform(-1, 1, size=(batch, 100))
                image_batch = x_train[index*batch : (index+1)*batch]
                generated_images = self.G.predict(noise, verbose=0)
                if (batch_output!=0 and index % batch_output==0) or \
                    (epoch_output!=0 and index==0 and (epoch+1) % epoch_output==0) or \
                    (epoch_output!=0 and index==0 and epoch==0):
                    # 生成画像を出力
                    plot_data = generated_images.astype('float32')*127.5 + 127.5
                    if self.inputs_shape[2]==1:
                        plot_data = plot_data.reshape((batch, *self.inputs_shape[:2]))
                    else:
                        plot_data = plot_data.reshape((batch, *self.inputs_shape))
                    combine_images(plot_data, self.path_imgs, epoch, index)
                    # モデルの保存
                    time = datetime.strftime(datetime.now(), "%Y%m%d%H%M%S")
                    self.G.save_weights("%sgenerator_%s.h5" % (self.path_models, time))
                    self.D.save_weights("%sdiscriminator_%s.h5" % (self.path_models, time))
                    # 履歴の保存
                    self.history = [g_losses, d_losses]
                    print("epoch: %d, batch: %d, time: %s" % (epoch, index, datetime.now()-start_time))
                # discriminatorを更新
                d_x = np.concatenate((image_batch, generated_images))
                y   = [1]*batch + [0]*batch
                d_loss = self.D.train_on_batch(d_x, y)
                # generatorを更新
                noise  = np.random.uniform(-1, 1, size=(batch, 100))
                self.D.trainable = False
                g_loss = self.DCGAN.train_on_batch(noise, [1]*batch)
                self.D.trainable = True
                # 履歴の保存
                g_losses.append(g_loss)
                d_losses.append(d_loss)
        # モデルの保存
        time = datetime.strftime(datetime.now(), "%Y%m%d%H%M%S")
        self.G.save_weights("%sgenerator_%s_latest_model.h5" % (self.path_models, time))
        self.D.save_weights("%sdiscriminator_%s_latest_model.h5" % (self.path_models, time))
        print("finished. time: %s" % (datetime.now()-start_time))

## fashion MNIST

In [None]:
path   = "./data/fashion_mnist_train_data.gz"
resize = (28,28)
imgs   = load_data_gz(path, resize)

In [None]:
start = 0   # 何枚目から
end   = 64  # 何枚目まで表示するか（64枚だと綺麗に描画される）
combine_images(imgs[start:end], "", save=False)

In [None]:
models_path  = "./fashion_mnsit/"
inputs_shape = (28,28,1)
dcgan_fashion_mnist = dcgan(models_path, inputs_shape)
dcgan_fashion_mnist.create_models()

In [None]:
#############################################
# 設定値
BATCH_SIZE = 32
NUM_EPOCH  = 2
OUTPUT_NUM = 300  # 300回学習ごとに途中結果を描画＆保存
#############################################

start_epoch = 0
continuous  = [int(f.split("_")[0]) for f in os.listdir(dcgan_fashion_mnist.path_imgs) if -1 < f.find("png")]
if continuous != []: start_epoch = max(continuous) + 1
end_epoch   = start_epoch + NUM_EPOCH

dcgan_fashion_mnist.fit(imgs, BATCH_SIZE, start_epoch, end_epoch, batch_output=OUTPUT_NUM)

## food101

In [None]:
path   = "./data/　　ここにファイル名をいれる　　.zip"
resize = (64,64)
imgs   = load_data_zip(path, resize)

In [None]:
start = 0   # 何枚目から
end   = 64  # 何枚目まで表示するか（64画像が綺麗に描画される）
combine_images(imgs[start:end], "", save=False)

In [None]:
models_path  = "./food/"
inputs_shape = (64,64,3)
dcgan_food = dcgan(models_path, inputs_shape)
dcgan_food.create_models()

In [None]:
#############################################
# 設定値
BATCH_SIZE = 32
NUM_EPOCH  = 200
OUTPUT_NUM = 10  # 10エポックごとに途中結果を描画＆保存
#############################################

start_epoch = 0
continuous  = [int(f.split("_")[0]) for f in os.listdir(dcgan_food.path_imgs) if -1 < f.find("png")]
if continuous != []: start_epoch = max(continuous) + 1
end_epoch   = start_epoch + NUM_EPOCH

dcgan_food.fit(imgs, BATCH_SIZE, start_epoch, end_epoch, epoch_output=OUTPUT_NUM)

## 学習済みモデルでの画像生成を行う場合に使用する

In [None]:
#############################################
# 設定値
D_WEIGHTS = ""
G_WEIGHTS = ""
#############################################

models_path  = "./fashion_mnsit/"
inputs_shape = (28,28,1)
dcgan_fashion_mnist = dcgan(models_path, inputs_shape)
dcgan_fashion_mnist.create_models()
dcgan_fashion_mnist.set_weights(g_weights=G_WEIGHTS, d_weights=D_WEIGHTS)

In [None]:
dcgan_fashion_mnist.generate_images()

In [None]:
#############################################
# 設定値
D_WEIGHTS = ""
G_WEIGHTS = ""
#############################################

models_path  = "./food/"
inputs_shape = (64,64,3)
dcgan_food = dcgan(models_path, inputs_shape)
dcgan_food.create_models()
dcgan_food.set_weights(g_weights=G_WEIGHTS, d_weights=D_WEIGHTS)

In [None]:
dcgan_food.generate_images()