In [None]:
import tensorflow as tf
import os
import matplotlib.pyplot as plt
from PIL import Image
import glob
import numpy as np
from PIL import ImageFilter
import tensorflow.keras.layers as layers
from tensorflow.keras.models import load_model
from sklearn.model_selection import train_test_split

In [None]:
#https://www.kaggle.com/robgonsalves/impressionistlandscapespaintings
#↑からデータをダウンロードしてUnet_landscapeと同じ階層にアップロードしてください
data_dir = "content/drive/MyDrive/"

In [None]:
fig = plt.figure(figsize=(14, 14))
image_paths = sorted(glob.glob(data_dir + "impressionist_landscapes_resized_1024/*.jpg"))
for i in range(100):
 ax = fig.add_subplot(10, 10, i+1)
 ax.imshow(Image.open(image_paths[i]))
 ax.axis("off")

In [None]:
#画像データが64*64ピクセルに直している
ground_truths = []
for path in image_paths:
  with Image.open(path) as img:
    if img.width < 64 or img.height < 64:
      continue
    img = img.resize((64, 64), Image.LANCZOS)
    ground_truths.append(np.asarray(img, np.uint8))

###
ground_truths = np.asarray(ground_truths)
print(ground_truths.shape, ground_truths.dtype)

In [None]:
fig = plt.figure(figsize=(20,20))
for i in range(25):
    ax = fig.add_subplot(5, 5, i+1)
    ax.imshow(ground_truths[i])
    ax.axis("off")

In [None]:
#一度ピクセルを半分にし、その後ピクセルを戻し画像を荒くしている
mosaics = []
for i in range(ground_truths.shape[0]):
  with Image.fromarray(ground_truths[i]) as img:
    mosaic = img.resize((img.width//2, img.height//2), Image.NEAREST)
    mosaic = mosaic.filter(ImageFilter.GaussianBlur(1.5))
    mosaic = mosaic.resize(img.size, Image.NEAREST)
    mosaics.append(np.asarray(mosaic))
mosaics = np.asarray(mosaics)
print(mosaics.shape, mosaics.dtype)

In [None]:
fig = plt.figure(figsize=(20, 20))
for i in range(25):
    ax = fig.add_subplot(5, 5, i+1)
    ax.imshow(mosaics[i])
    ax.axis("off")

In [None]:
def se_block(inputs, ch,ratio=8):
  x = layers.GlobalAveragePooling2D()(inputs)
  x = layers.Dense(ch // ratio, activation="relu")(x)
  x = layers.Dense(ch, activation="sigmoid")(x)
  x = layers.Multiply()([inputs, x])
  return x

In [None]:
#CNNより画像全体の特徴を取ることが出来る
def sc_squeeze_and_excitation(inputs, ch):
  x = layers.GlobalAveragePooling2D()(inputs)
  x = layers.Dense(ch // 8, activation="relu")(x)
  x = layers.Dense(ch, activation="sigmoid")(x)
  x = layers.Reshape((1, 1, ch))(x)
  x = layers.Multiply()([inputs, x])
 # sSE
  y = layers.Conv2D(1, 1, activation="sigmoid")(inputs)
  y = layers.Multiply()([inputs, y])
 # add
  x = layers.Add()([x, y])
  return x

In [None]:
#入力マップのチャンネルを並べて出力マップに展開していくことで、今までよりも高速で処理を行うことが出来るらしい。
def pixel_shuffle(inputs, out_ch):
  x = layers.Conv2D(out_ch*4, 1, activation="relu")(inputs)
  x = layers.Lambda(lambda z: tf.nn.depth_to_space(z, 2))(x)
  return x

In [None]:
#CNN
def conv_bn_relu(inputs, ch):
  x = layers.Conv2D(ch, 3, padding="same")(inputs)
  x = layers.BatchNormalization()(x)
  return layers.ReLU()(x)


In [None]:
#損失関数
def loss_function(y_true, y_pred):
    return tf.reduce_mean(tf.abs(y_true-y_pred), axis=(1,2,3))

In [None]:
#評価関数
def psnr(y_true, y_pred):
    return tf.image.psnr(y_true, y_pred, max_val=1.0)

In [None]:
inputs = layers.Input((64, 64, 3))
 # エンコーダー
encoders = []
x = inputs
for i in range(3):
  x = conv_bn_relu(x, 64*2**i)
  x = conv_bn_relu(x, 64*2**i)
  x = sc_squeeze_and_excitation(x, 64*2**i)
  encoders.append(x)
  x = layers.AveragePooling2D(2)(x)
 # 中間層
for d in [1, 2, 4]:
  x = conv_bn_relu(x, 512)
  x = conv_bn_relu(x, 512)
for i in range(2, -1, -1):
  x = pixel_shuffle(x, 64*2**i)
  x = layers.Concatenate()([x, encoders[i]])
  x = conv_bn_relu(x, 64*2**i)
  x = conv_bn_relu(x, 64*2**i)
  x = sc_squeeze_and_excitation(x, 64*2**i)
 # 出力層
x = layers.Conv2D(3, 3, padding="same", activation="sigmoid")(x)

model =  tf.keras.models.Model(inputs, x)

model.summary()

In [None]:
from tensorflow.keras.callbacks import EarlyStopping

In [None]:
mosaic_train = np.asarray(mosaics, np.float32)/255.0
gt_train = np.asarray(ground_truths, np.float32)/255.0

In [None]:
X_train, X_test, y_train, y_test = train_test_split(mosaic_train,gt_train, test_size=0.33, random_state=42)

In [None]:
es_cb = EarlyStopping(monitor='val_loss', patience=5, verbose=1, mode='auto')

In [None]:
model.compile(optimizer="Adam", loss= loss_function,metrics=[ psnr ])

In [None]:
history = model.fit(X_train,y_train, batch_size = 64 ,validation_data=(X_test, y_test), epochs = 50,callbacks=[es_cb])

In [None]:
y_pred = model.predict(X_test)[:100]
fig = plt.figure(figsize=(20,20))
for i in range(3):
    ax = fig.add_subplot(3,3,3*i+1)
    ax.imshow(y_test[i])
    ax.axis("off")
    ax = fig.add_subplot(3,3,3*i+2)
    ax.imshow(y_pred[i])
    ax.axis("off")
    ax = fig.add_subplot(3,3,3*i+3)
    ax.imshow(X_test[i])
    ax.axis("off")

In [None]:
model.save("Unet_landscape.h5")