In [3]:
from google.colab import drive
drive.mount('/content/drive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/drive


In [0]:
!tar -xf "/content/drive/My Drive/Colab Notebooks/Speculo/processed.tar.gz"

In [5]:
%tensorflow_version 2.x
from PIL import Image
import os
import numpy as np
from tensorflow.keras.callbacks import TensorBoard, ModelCheckpoint, EarlyStopping
from tensorflow.keras.layers import Input, Dense, Conv2D, Conv2DTranspose, \
    MaxPooling2D, BatchNormalization, Flatten, Reshape, Activation, Dropout
from tensorflow.keras import backend as K
from tensorflow.keras.models import Model, load_model
from sklearn.utils import shuffle


class Speculo:
    def __init__(self):
        self.image_size = (64, 64, 1)
        self.optimizer = 'adam'
        self.loss_function = 'mse'
        self.input_img = Input(shape=self.image_size, name="input")
        self.filters = (128, 256, 512)
        self.latent_size = 256

        model_number = 8
        if os.path.isdir("logs"):
            model_number += len(os.listdir("logs/")) + 1
        self.name = f"v{model_number}-{self.optimizer}-{self.loss_function}-bw"

        self.model = None

    def _build_model(self):
        chan_dim = -1

        x = self.input_img

        for f in self.filters:
            x = Conv2D(f, (3, 3), activation='relu', padding='same')(x)
            x = MaxPooling2D((2, 2))(x)
            x = Dropout(0.1)(x)

        volume_size = K.int_shape(x)
        x = Flatten()(x)
        x = Dropout(0.2)(x)
        x = Dense(self.latent_size, name="latent_space")(x)

        x = Dense(np.prod(volume_size[1:]))(x)
        x = Reshape((volume_size[1], volume_size[2], volume_size[3]))(x)

        for f in self.filters[::-1]:
            x = Conv2DTranspose(f,(3, 3), strides=2, activation='relu', padding='same')(x)
            x = BatchNormalization()(x)
            x = Dropout(0.1)(x)

        x = Conv2DTranspose(self.image_size[2], (3, 3), activation='relu', padding='same')(x)
        output = Activation("sigmoid", name="output")(x)
        
        return Model(inputs=self.input_img, outputs=output)

    def autoencoder(self):
        autoencoder = self._build_model()
        # autoencoder.build(self.input_img)
        autoencoder.compile(optimizer=self.optimizer, loss=self.loss_function, metrics=['accuracy'])
        return autoencoder

    def _load_image_set(self, directory, noise_factors=None):
        x = []
        y = []
        fronts = sorted(os.listdir(f"processed/{directory}/Front/"))
        for i, person_dir in enumerate(sorted(os.listdir(f"processed/{directory}"))):
            if person_dir == "Front":
                continue
            else:
                y_image = Image.open(f"processed/{directory}/Front/{fronts[i - 1]}")
                y_image = y_image.resize(self.image_size[:2], Image.ANTIALIAS)
                if self.image_size[2] == 1:
                    y_image = y_image.convert('L')
                for image in os.listdir(f"processed/{directory}/{person_dir}"):
                    x_image = Image.open(f"processed/{directory}/{person_dir}/{image}")
                    x_image = x_image.resize(self.image_size[:2], Image.ANTIALIAS)
                    if self.image_size[2] == 1:
                        x_image = x_image.convert('L')
                    x.append(np.array(x_image))
                    y.append(np.array(y_image))

        x = np.array(x).astype("float32") / 255.0
        y = np.array(y).astype("float32") / 255.0
        x = x.reshape([-1, self.image_size[0], self.image_size[1], self.image_size[2]])
        y = y.reshape([-1, self.image_size[0], self.image_size[1], self.image_size[2]])

        if noise_factors:
            noisy_x = []
            noisy_y = []
            for noise_factor in noise_factors:
                noisy_x.append(x + (noise_factor / 10) * np.random.normal(loc=0.0, scale=1.0, size=x.shape))
                noisy_y.append(y)
            noisy_x = np.reshape(noisy_x, [-1, self.image_size[0], self.image_size[1], self.image_size[2]])
            noisy_y = np.reshape(noisy_y, [-1, self.image_size[0], self.image_size[1], self.image_size[2]])
            return shuffle(np.clip(noisy_x, 0., 1.), noisy_y)
        return shuffle(x, y)

    def _create_dataset(self):
        x_train, y_train = self._load_image_set("train", noise_factors=(0.3, 0.6, 0.9, 1))
        x_test, y_test = self._load_image_set("test", noise_factors=(0.3, 0.6))
        return x_train, y_train, x_test, y_test

    def _load_model(self):
        self.model = load_model("models/v8-adam-mse-bw.h5")
        return self.model

    def _get_latent_space(self):
        autoencoder = self._load_model()
        print(autoencoder.layers)
        encoder = Model(autoencoder.input,
                        autoencoder.get_layer("encoder").input)
        # encoder = K.function([autoencoder.input, K.learning_phase()], [autoencoder.get_layer("encoder").output])
        # m = Sequential()
        # for layer in autoencoder.layers[42:69]:
        #     m.add(layer)
        # m.build()
        return encoder

    def predict(self, image):
        autoencoder = self._load_model()
        output = autoencoder.predict(np.reshape(image, [1, self.image_size[0], self.image_size[1], self.image_size[2]]))
        output = (output * 255).astype("uint8")
        return np.reshape(output, self.image_size)

    def train(self):
        x_train, y_train, x_test, y_test = self._create_dataset()
        model = self.autoencoder()

        checkpoint = ModelCheckpoint(f"models/{self.name}.h5", monitor='loss', verbose=1, save_best_only=True,
                                     mode='min')
        tensorboard = TensorBoard(log_dir=f'logs/{self.name}', histogram_freq=0, write_graph=False)
        early_stopping = EarlyStopping(monitor='val_loss', min_delta=0, patience=5, verbose=1, mode='auto')

        model.fit(x_train, y_train,
                  epochs=100,
                  batch_size=128,
                  shuffle=True,
                  validation_data=(x_test, y_test),
                  callbacks=[checkpoint, tensorboard, early_stopping])

        model.save(f"models/{self.name}.h5")

TensorFlow 2.x selected.


In [6]:
speculo = Speculo()
print(speculo.autoencoder().summary())

Model: "model"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input (InputLayer)           [(None, 64, 64, 1)]       0         
_________________________________________________________________
conv2d (Conv2D)              (None, 64, 64, 128)       1280      
_________________________________________________________________
max_pooling2d (MaxPooling2D) (None, 32, 32, 128)       0         
_________________________________________________________________
dropout (Dropout)            (None, 32, 32, 128)       0         
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 32, 32, 256)       295168    
_________________________________________________________________
max_pooling2d_1 (MaxPooling2 (None, 16, 16, 256)       0         
_________________________________________________________________
dropout_1 (Dropout)          (None, 16, 16, 256)       0     

In [9]:
speculo.train()

Train on 9992 samples, validate on 552 samples
Epoch 1/100
Epoch 00001: loss improved from inf to 0.03704, saving model to models/v8-adam-mse-bw.h5
Epoch 2/100
Epoch 00002: loss improved from 0.03704 to 0.03287, saving model to models/v8-adam-mse-bw.h5
Epoch 3/100
Epoch 00003: loss improved from 0.03287 to 0.03257, saving model to models/v8-adam-mse-bw.h5
Epoch 4/100
Epoch 00004: loss improved from 0.03257 to 0.03210, saving model to models/v8-adam-mse-bw.h5
Epoch 5/100
Epoch 00005: loss improved from 0.03210 to 0.03153, saving model to models/v8-adam-mse-bw.h5
Epoch 6/100
Epoch 00006: loss improved from 0.03153 to 0.03103, saving model to models/v8-adam-mse-bw.h5
Epoch 7/100
Epoch 00007: loss did not improve from 0.03103
Epoch 8/100
Epoch 00008: loss improved from 0.03103 to 0.02984, saving model to models/v8-adam-mse-bw.h5
Epoch 9/100
Epoch 00009: loss improved from 0.02984 to 0.02974, saving model to models/v8-adam-mse-bw.h5
Epoch 10/100
Epoch 00010: loss improved from 0.02974 to 0.

In [0]:
%tensorboard --logdir logs