In [0]:
from google.colab import drive
drive.mount('/content/drive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/drive


In [1]:
from keras.layers import Input, Dense, Reshape, Flatten, Dropout, Concatenate
from keras.layers import BatchNormalization, Activation, ZeroPadding2D
from keras.layers import LeakyReLU
from keras.layers import UpSampling2D, Conv2D
from keras.models import Model
from keras.optimizers import Adam
import matplotlib.pyplot as plt
import numpy as np

import datetime, sys, os

from skimage.io import imread
from skimage.transform import resize
from glob import glob



Using TensorFlow backend.


In [0]:
class DataGenerator():
  def __init__(self):
    pass

  def process(self, batch_path, is_train):
    imgs_A, imgs_B = [], []

    for img_path in batch_path:
      img_A = imread(img_path, as_gray=True)
      img_B = imread(os.path.join('/content/drive/My Drive/portrait/trainB', os.path.basename(img_path)), as_gray=True)

      if is_train and np.random.random() < 0.5:
        img_A = np.fliplr(img_A)
        img_B = np.fliplr(img_B)

      imgs_A.append(np.expand_dims(img_A, axis=-1))
      imgs_B.append(np.expand_dims(img_B, axis=-1))

    imgs_A = np.array(imgs_A) / 127.5 - 1.
    imgs_B = np.array(imgs_B) / 127.5 - 1.

    return imgs_A, imgs_B

  def load_data(self, batch_size=1, is_train=True):
    listA = glob('/content/drive/My Drive/portrait/trainA/*.jpg')

    batch_path = np.random.choice(listA, size=batch_size)

    imgs_A, imgs_B = self.process(batch_path, is_train)

    return imgs_A, imgs_B

  def load_batch(self, batch_size=1, is_train=True):
    listA = glob('/content/drive/My Drive/portrait/trainA/*.jpg')

    self.n_batches = int(len(listA) / batch_size)

    for i in range(self.n_batches-1):
      batch_path = listA[i*batch_size:(i+1)*batch_size]
      
      imgs_A, imgs_B = self.process(batch_path, is_train)

      yield imgs_A, imgs_B

In [3]:
class Pix2Pix():
  def __init__(self):
    self.img_rows = 256
    self.img_cols = 256
    self.channels = 1
    self.img_shape = (self.img_rows, self.img_cols, self.channels)

    self.data_loader = DataGenerator()

    # calculate output shape of D (PatchGAN)
    patch = int(self.img_rows / 2**4)
    self.disc_patch = (patch, patch, 1)

    # number of filters in the first layer of G and D
    self.gf = 64
    self.df = 64

    # build discriminator
    self.discriminator = self.build_discriminator()
    self.discriminator.compile(
      loss='binary_crossentropy',
      optimizer='adam',
      metrics=['accuracy']
    )

    # build generator
    self.generator = self.build_generator()

    img_A = Input(shape=self.img_shape)
    img_B = Input(shape=self.img_shape)

    fake_B = self.generator(img_A)

    # do not train discriminator while training generator
    self.discriminator.trainable = False

    valid = self.discriminator([img_A, fake_B])

    self.combined = Model(inputs=[img_A, img_B], outputs=[valid, fake_B])
    self.combined.compile(
      loss=['binary_crossentropy', 'binary_crossentropy'],
      loss_weights=[1, 100],
      optimizer='adam'
    )

  def build_generator(self):
    def conv2d(layer_input, filters, kernel_size=4, bn=True):
      d = Conv2D(filters, kernel_size=kernel_size, strides=2, padding='same')(layer_input)
      d = LeakyReLU(alpha=0.2)(d)
      if bn:
        d = BatchNormalization(momentum=0.8)(d)
      return d

    def deconv2d(layer_input, skip_input, filters, kernel_size=4, dropout_rate=0):
      u = UpSampling2D(size=2)(layer_input)
      u = Conv2D(filters, kernel_size=kernel_size, strides=1, padding='same', activation='relu')(u)
      if dropout_rate:
        u = Dropout(dropout_rate)(u)
      u = BatchNormalization(momentum=0.8)(u)
      u = Concatenate()([u, skip_input])
      return u

    # image input
    d0 = Input(shape=self.img_shape)

    # downsampling
    d1 = conv2d(d0, self.gf, bn=False)
    d2 = conv2d(d1, self.gf*2)
    d3 = conv2d(d2, self.gf*4)
    d4 = conv2d(d3, self.gf*8)
    d5 = conv2d(d4, self.gf*8)
    d6 = conv2d(d5, self.gf*8)
    d7 = conv2d(d6, self.gf*8)

    # upsampling
    u1 = deconv2d(d7, d6, self.gf*8)
    u2 = deconv2d(u1, d5, self.gf*8)
    u3 = deconv2d(u2, d4, self.gf*8)
    u4 = deconv2d(u3, d3, self.gf*4)
    u5 = deconv2d(u4, d2, self.gf*2)
    u6 = deconv2d(u5, d1, self.gf)

    u7 = UpSampling2D(size=2)(u6)

    output_img = Conv2D(self.channels, kernel_size=4, strides=1, padding='same', activation='tanh')(u7)

    return Model(d0, output_img)

  def build_discriminator(self):

    def d_layer(layer_input, filters, kernel_size=4, bn=True):
      d = Conv2D(filters, kernel_size=kernel_size, strides=2, padding='same')(layer_input)
      d = LeakyReLU(alpha=0.2)(d)
      if bn:
        d = BatchNormalization(momentum=0.8)(d)
      return d

    img_A = Input(shape=self.img_shape)
    img_B = Input(shape=self.img_shape)

    combined_imgs = Concatenate(axis=-1)([img_A, img_B])

    d1 = d_layer(combined_imgs, self.df, bn=False)
    d2 = d_layer(d1, self.df*1)
    d3 = d_layer(d2, self.df*2)
    d4 = d_layer(d3, self.df*4)

    validity = Conv2D(1, kernel_size=4, strides=1, padding='same')(d4)

    return Model([img_A, img_B], validity)

  def train(self, epochs, batch_size=1, sample_interval=50):
    start_time = datetime.datetime.now()

    # adversarial loss ground truths
    valid = np.ones((batch_size,) + self.disc_patch)
    fake = np.zeros((batch_size,) + self.disc_patch)

    for epoch in range(epochs):
      for batch_i, (imgs_A, imgs_B) in enumerate(self.data_loader.load_batch(batch_size)):

        # ---------------------
        #  Train Discriminator
        # ---------------------
        fake_B = self.generator.predict(imgs_A)

        if np.random.random() < 0.5:
          d_loss = self.discriminator.train_on_batch([imgs_A, imgs_B], valid)
        else:
          d_loss = self.discriminator.train_on_batch([imgs_A, fake_B], fake)
        # d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)

        # -----------------
        #  Train Generator
        # -----------------
        g_loss = self.combined.train_on_batch([imgs_A, imgs_B], [valid, imgs_B])

        # print
        elapsed_time = datetime.datetime.now() - start_time

        print('[Epoch %d/%d] [Batch %d/%d] [D loss: %f, acc: %3d%%] [G loss: %f] time: %s' % (epoch, epochs, batch_i, self.data_loader.n_batches, d_loss[0], 100*d_loss[1], g_loss[0], elapsed_time))

        # save sample images
        if batch_i % sample_interval == 0:
          self.sample_images(epoch, batch_i, d_loss)

        # save sample images when discriminator has low accuracy
        if epoch > 9 and d_loss[1] < 0.6:
          self.sample_images(epoch, batch_i, d_loss, low=True)

  def sample_images(self, epoch, batch_i, d_loss, low=False):
    os.makedirs('samples', exist_ok=True)

    imgs_A, imgs_B = self.data_loader.load_data(batch_size=3, is_train=False)
    fake_B = self.generator.predict(imgs_A)

    gen_imgs = np.concatenate([imgs_A, fake_B, imgs_B])

    # rescale images to 0 - 1
    gen_imgs = 0.5 * gen_imgs + 0.5

    titles = ['Input', 'Generated', 'Ground Truth']
    fig, axs = plt.subplots(3, 3)

    for i in range(3):
      for j in range(3):
        axs[i, j].imshow(gen_imgs[3*i+j].squeeze(), cmap='gray')
        axs[i, j].set_title(titles[i])
        axs[i, j].axis('off')

    if low:
      fig.savefig('samples/low_%d_%d_%d.png' % (epoch, batch_i, d_loss[1] * 100))
    else:
      fig.savefig('samples/%d_%d_%d.png' % (epoch, batch_i, d_loss[1] * 100))

    plt.close()

if __name__ == '__main__':
  gan = Pix2Pix()
  gan.train(epochs=300, batch_size=10, sample_interval=2)














Instructions for updating:
Use tf.where in 2.0, which has the same broadcast rule as np.where



  'Discrepancy between trainable weights and collected trainable'




[Epoch 0/300] [Batch 0/2] [D loss: 2.589688, acc:  57%] [G loss: 384.232697] time: 0:00:32.188579
[Epoch 1/300] [Batch 0/2] [D loss: 9.999869, acc:  15%] [G loss: 375.673828] time: 0:00:35.700440
[Epoch 2/300] [Batch 0/2] [D loss: 3.452996, acc:   2%] [G loss: 551.385864] time: 0:00:39.473428
[Epoch 3/300] [Batch 0/2] [D loss: 7.293860, acc:   7%] [G loss: 330.582153] time: 0:00:40.334268
[Epoch 4/300] [Batch 0/2] [D loss: 3.634545, acc:  48%] [G loss: 301.946075] time: 0:00:42.237414
[Epoch 5/300] [Batch 0/2] [D loss: 6.747430, acc:   6%] [G loss: 231.204178] time: 0:00:43.087818
[Epoch 6/300] [Batch 0/2] [D loss: 6.579127, acc:   5%] [G loss: 246.322433] time: 0:00:44.994953
[Epoch 7/300] [Batch 0/2] [D loss: 6.350515, acc:   5%] [G loss: 201.882065] time: 0:00:46.999315
[Epoch 8/300] [Batch 0/2] [D loss: 6.359149, acc:   4%] [G loss: 206.873962] time: 0:00:47.883530
[Epoch 9/300] [Batch 0/2] [D loss: 4.332545, acc:   2%] [G loss: 187.861526] time: 0:00:51.465802
[Epoch 10/300] [Ba