In [None]:
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow import keras
from tensorflow.keras import regularizers
from tensorflow.keras.utils import Progbar
from tensorflow.keras.initializers import RandomNormal

import numpy as np
import matplotlib.pyplot as plt
import os
import datetime
import time
import random
import os, os.path
#import tensorflow_addons as tfa

import math
import json

import sklearn.metrics
from statistics import mean

In [None]:
print(tf.__version__)

In [None]:
print("Num GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU')))

In [None]:
AUTOTUNE = tf.data.experimental.AUTOTUNE

IMG_WIDTH = 224
IMG_HEIGHT = 224

IMG_CHANNELS = 1

PATCH_SIZE = 64
BATCH_SIZE = 64

NOISE_DIM = 200

In [None]:
PATH_TRAINING = '../../study/biometric/seminar/data/LivDet-Fingerprint/livdet21/Dermalog/'

In [None]:
#create shuffeled TF-Datasets containing the paths to the files
def shuffle_paths(ds_paths, shuffle_loops):
    #see https://stackoverflow.com/questions/46444018/meaning-of-buffer-size-in-dataset-map-dataset-prefetch-and-dataset-shuffle/48096625#48096625
    list_paths = list(ds_paths.as_numpy_iterator())
    for i in range(shuffle_loops):
        random.seed(4)
        random.shuffle(list_paths)
    return tf.data.Dataset.from_tensor_slices(list_paths)

#train paths only contain live fingerprints
ds_train_paths = tf.data.Dataset.list_files(str(PATH_TRAINING + '*/Live/*.png'), seed=4)
ds_train_paths = shuffle_paths(ds_train_paths,10)

# Data Input & Preprocessing

In [None]:
def get_label(file_path):
    # convert the path to a list of path components
    parts = tf.strings.split(file_path, os.path.sep)
    # The second to last is the class-directory
    if parts[-2] == 'Live':
        return 1
    return 0

def decode_bmp(file_path):
    file = tf.io.read_file(file_path)
    img = tf.image.decode_bmp(file, channels=1)
    return img    

def decode_png(file_path):
    file = tf.io.read_file(file_path)
    img = tf.image.decode_png(file, channels=1)
    return img

def equalize_hist(img):
    img_zero_map = (img != 255);
    hist, bins = np.histogram(img[img_zero_map], 256,[0,255])
    cdf = hist.cumsum()
    cdf = (cdf - cdf.min())*255/(cdf.max()-cdf.min())
    cdf = np.ma.filled(cdf, 255).astype('uint8')
    return cdf[img]

def exctract_roi(img):
    # Extract region of intrest by simply removing empty surrounding pixels.
    img_height = img.shape[0]
    img_width = img.shape[1]

    y_start = 0
    y_stop = img_height
    x_start = 0
    x_stop = img_width
    
    for i in range(img_height):
        if tf.math.reduce_sum(img[i], axis=None, keepdims=False, name=None) != img_width:
            y_start = i
            break

    for i in range(img_height-1, 0, -1):
        if tf.math.reduce_sum(img[i], axis=None, keepdims=False, name=None) != img_width:
            y_stop = i
            break

    for i in range(img_width):
        if tf.math.reduce_sum(img[:,i], axis=None, keepdims=False, name=None) != img_height:
            x_start = i
            break

    for i in range(img_width-1, 0, -1):
        if tf.math.reduce_sum(img[:,i], axis=None, keepdims=False, name=None) != img_height:
            x_stop = i
            break

    img = img[y_start:y_stop,x_start:x_stop]
    img = (img-1)*-1
        
    return img

def get_random_patch(image):
    # Cropping random patch from input image
    non_zero_count = 0
    loop_count = 0
    #making sure the crop does not contain mainly void.
    while non_zero_count < 1900:
        loop_count += 1
        cropped = tf.image.random_crop(image, [PATCH_SIZE, PATCH_SIZE,IMG_CHANNELS], seed=None, name=None)
        #cropped = tf.image.resize(cropped, [IMG_HEIGHT, IMG_WIDTH])
        non_zero_count = tf.math.count_nonzero(cropped)
        if loop_count > 10:
            return cropped
    return cropped

def process_path(file_path):
    # Wire preprocessing together
    label = get_label(file_path)
    img = decode_png(file_path)
    
    #normalize img data to [0,1) scale
    img = tf.image.convert_image_dtype(img, tf.float32)
    
    img = exctract_roi(img)

    #for transfer learning which nets which are trained on RGB imges
    if IMG_CHANNELS == 3:
        img = tf.concat([img,img,img], 2)
        
    return img, label

def gan_rescale(image, label):
    return image*2-1, label

In [None]:
#define mappable functions to run custom python code in tf
def mappable_get_random_patch(image,label):
    random_patch = tf.py_function(func=get_random_patch,
                                inp=[image],
                                Tout=(tf.float32))
    result_tensor = random_patch, label
    result_tensor[0].set_shape((PATCH_SIZE, PATCH_SIZE, IMG_CHANNELS))
    result_tensor[1].set_shape(())
    return result_tensor


def mappable_fn_patch(x):
    result_tensor = tf.py_function(func=process_path,
                                inp=[x],
                                Tout=(tf.float32,tf.uint8))
    return result_tensor

In [None]:
def augment(image,label):
    image = tf.image.random_flip_left_right(image, seed=None)
    
    #degrees = tf.random.uniform([], minval=-20, maxval=20, dtype=tf.dtypes.float32, seed=None, name=None)
    #image = tfa.image.transform_ops.rotate(image, degrees * math.pi/180)
    
    brightness = tf.random.uniform([], minval=0.75, maxval=1.25, dtype=tf.dtypes.float32, seed=None, name=None)
    image = image*brightness
    image = tf.clip_by_value(image, 0, 1, name=None)
    
    return image, label

## Prepare Datasets for Patch Image Classification

In [None]:
ds_patch_train = (ds_train_paths
            .map(mappable_fn_patch, num_parallel_calls=AUTOTUNE)
            .cache()
            .shuffle(buffer_size=4000)
            .map(augment, num_parallel_calls=AUTOTUNE) # randomizes the image based on augmentation rules
            .map(mappable_get_random_patch)
            .map(gan_rescale)
            .batch(BATCH_SIZE, drop_remainder=True)
            .prefetch(buffer_size=AUTOTUNE)
           )

# DCGAN

In [None]:
init = RandomNormal(stddev=0.02)

def make_generator_model():
    model = tf.keras.Sequential()
    model.add(layers.Dense(4*4*1024, use_bias = False, input_shape = (200,), kernel_initializer=init))
    model.add(layers.BatchNormalization())
    model.add(layers.ReLU())
    model.add(layers.Reshape((4, 4, 1024)))
    
    model.add(layers.Conv2DTranspose(512, (5, 5), strides = (2,2), padding = "same", use_bias = False, kernel_initializer=init))
    assert model.output_shape == (None, 8, 8, 512)
    model.add(layers.BatchNormalization())
    model.add(layers.ReLU())
    
    model.add(layers.Conv2DTranspose(256, (5,5), strides = (2,2), padding = 'same', use_bias = False, kernel_initializer=init))
    assert model.output_shape == (None, 16, 16, 256)
    model.add(layers.BatchNormalization())
    model.add(layers.ReLU())
    
    model.add(layers.Conv2DTranspose(128, (5,5), strides = (2,2), padding = 'same', use_bias = False, kernel_initializer=init))
    assert model.output_shape == (None, 32, 32, 128)
    model.add(layers.BatchNormalization())
    model.add(layers.ReLU())
    
    model.add(layers.Conv2DTranspose(1, (5,5), strides = (2,2), padding = 'same', use_bias = False, activation = 'tanh', kernel_initializer=init))
    assert model.output_shape == (None, 64, 64, 1)
    
    return model


def make_discriminator_model():
    model = tf.keras.Sequential()
    model.add(layers.Conv2D(64, (5, 5), strides=(2, 2), padding='same', input_shape=[64, 64, 1], kernel_initializer=init))
    assert model.output_shape == (None, 32, 32, 64)
    model.add(layers.LeakyReLU(alpha=0.2))
    #model.add(layers.Dropout(0.3))

    model.add(layers.Conv2D(128, (5, 5), strides=(2, 2), padding='same', kernel_initializer=init))
    assert model.output_shape == (None, 16, 16, 128)
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU(alpha=0.2))
    #model.add(layers.Dropout(0.3))
    
    model.add(layers.Conv2D(256, (5, 5), strides=(2, 2), padding='same', kernel_initializer=init))
    assert model.output_shape == (None, 8, 8, 256)
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU(alpha=0.2))
    model.add(layers.Dropout(0.3))
    
    model.add(layers.Conv2D(512, (5, 5), strides=(2, 2), padding='same', kernel_initializer=init))
    assert model.output_shape == (None, 4, 4, 512)
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU(alpha=0.2))
    model.add(layers.Dropout(0.3))

    model.add(layers.Flatten())
    model.add(layers.Dense(1,activation='linear'))

    return model

# Defining WGAN

This code is copied from: https://keras.io/examples/generative/

In [None]:
class WGAN(keras.Model):
    def __init__(
        self,
        discriminator,
        generator,
        latent_dim,
        discriminator_extra_steps=3,
        gp_weight=10.0,
    ):
        super(WGAN, self).__init__()
        self.discriminator = discriminator
        self.generator = generator
        self.latent_dim = latent_dim
        self.d_steps = discriminator_extra_steps
        self.gp_weight = gp_weight

    def compile(self, d_optimizer, g_optimizer, d_loss_fn, g_loss_fn):
        super(WGAN, self).compile()
        self.d_optimizer = d_optimizer
        self.g_optimizer = g_optimizer
        self.d_loss_fn = d_loss_fn
        self.g_loss_fn = g_loss_fn

    def gradient_penalty(self, batch_size, real_images, fake_images):
        """Calculates the gradient penalty.
        This loss is calculated on an interpolated image
        and added to the discriminator loss.
        """
        # Get the interpolated image
        alpha = tf.random.normal([batch_size, 1, 1, 1], 0.0, 1.0)
        diff = fake_images - real_images
        interpolated = real_images + alpha * diff

        with tf.GradientTape() as gp_tape:
            gp_tape.watch(interpolated)
            # 1. Get the discriminator output for this interpolated image.
            pred = self.discriminator(interpolated, training=True)

        # 2. Calculate the gradients w.r.t to this interpolated image.
        grads = gp_tape.gradient(pred, [interpolated])[0]
        # 3. Calculate the norm of the gradients.
        norm = tf.sqrt(tf.reduce_sum(tf.square(grads), axis=[1, 2, 3]))
        gp = tf.reduce_mean((norm - 1.0) ** 2)
        return gp

    def train_step(self, real_images):
        if isinstance(real_images, tuple):
            real_images = real_images[0]

        # Get the batch size
        batch_size = tf.shape(real_images)[0]

        # For each batch, we are going to perform the
        # following steps as laid out in the original paper:
        # 1. Train the generator and get the generator loss
        # 2. Train the discriminator and get the discriminator loss
        # 3. Calculate the gradient penalty
        # 4. Multiply this gradient penalty with a constant weight factor
        # 5. Add the gradient penalty to the discriminator loss
        # 6. Return the generator and discriminator losses as a loss dictionary

        # Train the discriminator first. The original paper recommends training
        # the discriminator for `x` more steps (typically 5) as compared to
        # one step of the generator. Here we will train it for 3 extra steps
        # as compared to 5 to reduce the training time.
        for i in range(self.d_steps):
            # Get the latent vector
            random_latent_vectors = tf.random.uniform(
                shape=(batch_size, self.latent_dim), minval=-1,maxval=1
            )
            with tf.GradientTape() as tape:
                # Generate fake images from the latent vector
                fake_images = self.generator(random_latent_vectors, training=True)
                # Get the logits for the fake images
                fake_logits = self.discriminator(fake_images, training=True)
                # Get the logits for the real images
                real_logits = self.discriminator(real_images, training=True)

                # Calculate the discriminator loss using the fake and real image logits
                d_cost = self.d_loss_fn(real_img=real_logits, fake_img=fake_logits)
                # Calculate the gradient penalty
                gp = self.gradient_penalty(batch_size, real_images, fake_images)
                # Add the gradient penalty to the original discriminator loss
                d_loss = d_cost + gp * self.gp_weight

            # Get the gradients w.r.t the discriminator loss
            d_gradient = tape.gradient(d_loss, self.discriminator.trainable_variables)
            # Update the weights of the discriminator using the discriminator optimizer
            self.d_optimizer.apply_gradients(
                zip(d_gradient, self.discriminator.trainable_variables)
            )

        # Train the generator
        # Get the latent vector
        random_latent_vectors = tf.random.uniform(shape=(batch_size, self.latent_dim), minval=-1,maxval=1)
        with tf.GradientTape() as tape:
            # Generate fake images using the generator
            generated_images = self.generator(random_latent_vectors, training=True)
            # Get the discriminator logits for fake images
            gen_img_logits = self.discriminator(generated_images, training=True)
            # Calculate the generator loss
            g_loss = self.g_loss_fn(gen_img_logits)

        # Get the gradients w.r.t the generator loss
        gen_gradient = tape.gradient(g_loss, self.generator.trainable_variables)
        # Update the weights of the generator using the generator optimizer
        self.g_optimizer.apply_gradients(
            zip(gen_gradient, self.generator.trainable_variables)
        )
        return {"d_loss": d_loss, "g_loss": g_loss}
    
class GANMonitor(keras.callbacks.Callback):
    def __init__(self, num_img=6, latent_dim=128):
        self.num_img = num_img
        self.latent_dim = latent_dim

    def on_epoch_end(self, epoch, logs=None):
        
        random_latent_vectors = tf.random.uniform(shape=(self.num_img, self.latent_dim), minval=-1,maxval=1)
        generated_images = self.model.generator(random_latent_vectors)
        generated_images = (generated_images * 127.5) + 127.5

        for i in range(self.num_img):
            img = generated_images[i].numpy()
            plt.imshow(img)
            plt.show()
            
cbk = GANMonitor(num_img=5, latent_dim=NOISE_DIM)

In [None]:
generator_optimizer = keras.optimizers.Adam(
        learning_rate=0.0002, beta_1=0.5, beta_2=0.9
)
discriminator_optimizer = keras.optimizers.Adam(
    learning_rate=0.0002, beta_1=0.5, beta_2=0.9
)

# Define the loss functions for the discriminator,
# which should be (fake_loss - real_loss).
# We will add the gradient penalty later to this loss function.
def discriminator_loss(real_img, fake_img):
    real_loss = tf.reduce_mean(real_img)
    fake_loss = tf.reduce_mean(fake_img)
    return fake_loss - real_loss


# Define the loss functions for the generator.
def generator_loss(fake_img):
    return -tf.reduce_mean(fake_img)

# Get the wgan model
wgan = WGAN(
    discriminator=make_discriminator_model(),
    generator=make_generator_model(),
    latent_dim=NOISE_DIM,
    discriminator_extra_steps=3,
)

In [None]:
wgan.compile(
    d_optimizer=discriminator_optimizer,
    g_optimizer=generator_optimizer,
    g_loss_fn=generator_loss,
    d_loss_fn=discriminator_loss,
)

# Start training
wgan.fit(ds_patch_train, batch_size=BATCH_SIZE, epochs=100, callbacks=[cbk])                                                  

# Validate the GAN

In [None]:
# Generate random latent space and create fingerprint patch using generator
latent_space = np.random.uniform(size=(1,200), low=-1, high=1)
img = wgan.generator(latent_space)
plt.imshow(img[0])
plt.show()

# Save the weights

In [17]:
wgan.generator.save_weights('./gen/')
wgan.discriminator.save_weights('./disc/')