<a href="https://colab.research.google.com/github/jortegon/materialsGAN/blob/main/materialsGAN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!lscpu |grep 'Model name'

# Sección nueva

In [None]:
!nvidia-smi -L
!nvidia-smi

# Setup

## Imports

In [None]:
# Debug mode (on/off)
%pdb off

In [None]:
# Main imports
import os
import time
import numpy as np
import pandas as pd
from typing import List
from pandas import DataFrame
from matplotlib import pyplot as plt

import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras.preprocessing.image import ImageDataGenerator

from IPython import display

In [None]:
from google.colab import files


def download_file(path: str):
    """
    Download file from content/
    """
    files.download(path)

In [None]:
from shutil import make_archive, unpack_archive


def zip_files(files: List[str]):
    """
    zip and download files
    """
    for f in files:
        make_archive(
            f'/content/{f}',
            'zip',
            f'/content/',
            f'{f}',
        )
        download_file(f'{f}.zip')


def unzip_files(files: List[str]):
    for f in files:
        unpack_archive(
            f'/content/{f}.zip',
            '/content/',
            'zip',
        )

In [None]:
from tensorflow.keras.utils import plot_model


def my_plot_model(model):
    """
    Plot and save model design
    """
    name = f'{model.name}.png'
    plot_model(
        model,
        to_file=name,
        show_shapes=True,
        show_layer_names=True,
        expand_nested=True,
    )
    download_file(name)

In [None]:
from matplotlib.image import imsave


def save_img(source, name: str, dir: str):
    """
    Saves images from source in specific dir
    """
    img = source[0, :, :, 0]
    path = f'{dir}/img_{name}.png'
    imsave(
        path,
        img,
        # dpi=params['IMG_SIZE'],
        cmap='gray'
    )
    return path

In [None]:
def show_img(source):
    """
    Display image from source
    """
    display.clear_output(wait=True)
    img = source[0, :, :, 0]
    plt.imshow(img, cmap='gray')
    plt.axis('off')
    plt.show()
    time.sleep(0.1)

In [None]:
def timed(func):
    """
    Prints elapsed time for function
    """
    def wrapper(*args, **kwargs):
        before = time.time()
        result = func(*args, **kwargs)
        after = time.time()
        fname = func.__name__
        print(f'{fname}: {(after - before)} secs')
        return result
    return wrapper

In [None]:
@timed
def fit_model(model, kwargs):
    return model.fit(**kwargs)

## Tensorboard

### Setup

In [None]:
# Load the TensorBoard notebook extension
%load_ext tensorboard

In [None]:
from datetime import datetime
from tensorflow.keras.callbacks import TensorBoard


def set_tensorboard_callback():
    """
    TensorBoard Callback
    """
    log_dir = "logs/fit/" + datetime.now().strftime("%Y%m%d-%H%M%S")
    board = TensorBoard(log_dir=log_dir, histogram_freq=1)
    return board


tensorboard_callback = set_tensorboard_callback()

In [None]:
# Clear any logs from previous runs
os.system(f'rm -rf ./logs/')

### Dashboard

In [None]:
%tensorboard --logdir logs/fit

# Source DataFrame

## Data Sources

In [None]:
# Import data from Drive
from google.colab import drive


drive_dir = '/content/drive'
drive.mount(drive_dir)

In [None]:
# Load SEM images
def get_path(drive_dir: str):
    """
    Drive for original_data/
    11 classes
    700, 750, 800,
    N700, N750, N800,
    SKPH, SKPHD,
    SPY - Sargazo pirolizado
    K750, KSPY,
    """
    data_path = drive_dir + '/My Drive/New_Tesis/data/original_data/'
    class_path = '700/'

    # Dataset
    DATASET_PATH = data_path + class_path

    # Classes = ['M0', 'M1', 'M2', 'M3']
    CLASSES = [
        d for d in os.listdir(DATASET_PATH) if os.path.isdir(
            os.path.join(DATASET_PATH, d)
        )
    ]
    return DATASET_PATH, CLASSES


DATASET_PATH, CLASSES = get_path(drive_dir)

## Init PARAMS

In [None]:
from tensorflow.keras import backend as K


def set_params(path: str) -> dict:
    """
    Set global params as dict
    """
    params = {}
    params['DATASET_PATH'] = path

    img_size = 320          # @param {type:"integer"}
    params['IMG_SIZE'] = 32 * (img_size // 32)

    batch_size = 32         # @param {type:"integer"}
    params['BATCH_SIZE'] = batch_size

    # max colab 64
    latent_dim = 64         # @param {type:"integer"}
    params['LATENT_DIM'] = latent_dim

    channels = 1
    img_size = params.get('IMG_SIZE')
    if K.image_data_format() == 'channels_first':
        params['INPUT_SHAPE'] = (
            channels, img_size, img_size
        )
    else:
        params['INPUT_SHAPE'] = (
            img_size, img_size, channels
        )
    return params


PARAMS = set_params(DATASET_PATH)

In [None]:
@timed
def init_dataframe(path: str) -> DataFrame:
    """
    Initialize dataframe from source images path
    """
    data = {
        'id': [],
        'path': [],
        'class': [],
        'lbl': [],
    }
    for cls in CLASSES:
        cls_path = os.path.join(path, cls)
        for f in os.listdir(cls_path):
            if f.endswith('.png'):
                data['id'].append(f)
                data['path'].append(os.path.join(cls_path, f))
                data['class'].append(cls)
                data['lbl'].append(None)

    return DataFrame(data)

In [None]:
MAIN_DF = init_dataframe(DATASET_PATH)
MAIN_DF.info()
MAIN_DF.describe()
MAIN_DF.to_csv('main_df.csv', index=False)

# Manual Select

## Methods

In [None]:
# CSC Generator
DATAGEN = ImageDataGenerator(
    rescale=1./255,
    validation_split=0.25
  )

In [None]:
# Ask
def ask(img, name: str, i=0) -> str:
    """
    Shows image and returns user input
    """
    show_img(img)
    ans = str(input(f"{i}:{name} works (w) or nah?\n")).lower()
    return ans


# Ratio msgs
def manual_lim(lbl_counts: List[int], lim=0.95) -> bool:
    """
    Selection loop control
    """
    result = lbl_counts[1] / lbl_counts[0]
    msg = f'{lbl_counts[1]}/{lbl_counts[0]} {result:.2f}: '
    if result > lim:
        display.clear_output(wait=True)
        print(msg + 'finishing manual dataframe!')
        return True
    print(msg + 'not there yet!!')
    return False


# Manual selections
@timed
def manual_select(df: DataFrame) -> DataFrame:
    """
    Manual selection loop
    """
    manual_generator = DATAGEN.flow_from_dataframe(
        dataframe=df,
        directory=DATASET_PATH,
        x_col="path", y_col="id",
        batch_size=1,
        shuffle=True,
        class_mode='raw',
        target_size=(
            PARAMS.get('IMG_SIZE'), PARAMS.get('IMG_SIZE'),
        ),
    )
    # manual lims
    #lim = df.count()[0]
    lim = len(manual_generator)
    df_min = int(lim * 0.1)
    df_min = df_min if df_min > 100 else 100
    df_min = df_min if df_min < lim else lim

    df_max = int(lim * 0.2)
    df_max = df_max if df_max > 100 else 100
    df_max = df_max if df_max < lim else lim

    # manual loop
    pre_img = [pd.NA, pd.NA]
    for i in range(len(manual_generator)):
        # next
        img, id = next(manual_generator)
        id = id[0]

        # show and ask
        response = ask(img, id, i)
        if response == 'exit':
            break
        if i > 0 and response == 'back':
            response = ask(*pre_img, i - 1)

        img_n = df[df['id'] == id].index.item()
        df['lbl'][img_n] = 'YES' if response == 'w' else 'NO'

        # limit control
        if i < df_min:
            print(f'{i}/{df_min}: not enough data!!')
        elif i > df_max:
            print(f'{i}: finishing, too much data!')
            break
        # ratio
        elif manual_lim(df.value_counts(subset=['lbl'])):
            break

        # wait result
        time.sleep(0.2)
        pre_img = [img, id]

    # copy and clean df
    manual_df = df.copy()
    manual_df.dropna(inplace=True, subset=['lbl'])
    manual_df.info()

    return manual_df

## Select

In [None]:
manual_df = manual_select(MAIN_DF)

In [None]:
manual_df.to_csv('manual_df.csv', index=False)
download_file('manual_df.csv')

In [None]:
manual_df = pd.read_csv('manual_df.csv')
manual_df.describe()

# CSC

## Classifier

In [None]:
# Classifier (API mode)


@timed
def build_classifier(input_shape: List[int]):
    """
    Returns binary classifier model
    """
    # input
    inputs = tf.keras.Input(shape=input_shape)

    # conv1
    conv_1 = layers.Conv2D(8, 8, activation='relu')(inputs)
    mpool_1 = layers.MaxPool2D(2)(conv_1)

    # conv2
    conv_2 = layers.Conv2D(16, 8, activation='relu')(mpool_1)
    mpool_2 = layers.MaxPool2D(2)(conv_2)

    # dense
    flat = layers.Flatten()(mpool_2)
    densebig = layers.Dense(32, activation='relu')(flat)
    dropbig = layers.Dropout(0.2)(densebig)

    # output
    dense = layers.Dense(1, activation='sigmoid')(dropbig)

    # model
    model = tf.keras.Model(
        inputs=inputs,
        outputs=dense,
        name='classifier'
    )

    # compile
    model.compile(
        optimizer='adam',
        loss='binary_crossentropy',
        metrics=['acc']
    )

    return model

In [None]:
# Summary
csc = build_classifier(PARAMS.get('INPUT_SHAPE'))
my_plot_model(csc)
csc.summary()

## Train

In [None]:
# Open
class_df = pd.read_csv('manual_df.csv')

### Callbacks

In [None]:
class myEarlyCallback(tf.keras.callbacks.Callback):
    """
    Stops train loop when thresholds met
    """

    def __init__(self, patience: int):
        super().__init__()
        self.patience = patience
        self.best_weights = None

    def on_train_begin(self, logs=None):
        # The number of epoch it has waited when loss is no longer minimum.
        self.wait = 0
        # The epoch the training stops at.
        self.stopped_epoch = 0
        # Initialize best
        self.best_acc = 0
        self.best_val_acc = 0

    def on_epoch_end(self, epoch, logs={}):
        # Get logs info
        acc = logs.get('acc')
        val_acc = logs.get('val_acc')

        # Greater
        is_acc = np.greater(acc, self.best_acc)
        is_val = np.greater(val_acc, self.best_val_acc)
        if is_acc or is_val:
            if is_acc:
                self.best_acc = acc
            if is_val:
                self.best_val_acc = val_acc
                self.best_weights = self.model.get_weights()
            self.wait -= 1
        else:
            self.wait += 1
            # Patience stop
            if self.wait > self.patience:
                print(
                    f'\nPatience met - acc:{acc:.2f}, val_acc:{val_acc:.2f}!'
                )
                self.stopped_epoch = epoch
                self.model.stop_training = True
                self.model.set_weights(self.best_weights)

    def on_train_end(self, logs=None):
        if self.stopped_epoch > 0:
            print("Epoch %05d: early stopping" % (self.stopped_epoch + 1))

In [None]:
from tensorflow.keras.callbacks import ModelCheckpoint


def csc_callbacks() -> List:
    """
    Set classifier training callbacks
    """
    checkpoint = ModelCheckpoint(
        f'CSC.tf',
        monitor='acc',
        save_best_only=True,
        verbose=1
    )

    return [
        checkpoint,
        myEarlyCallback(patience=10),
        tensorboard_callback
    ]

In [None]:
def csc_kwargs(df: DataFrame, epochs: int):
    """
    Custom classifier kwargs generator
    """
    # Batch size
    manual_batch = df.count()[0] // 5

    # Train flow
    train_flow = DATAGEN.flow_from_dataframe(
        dataframe=df,
        directory=DATASET_PATH,
        x_col="path", y_col="lbl",
        subset="training",
        batch_size=manual_batch,
        seed=42,
        shuffle=True,
        class_mode="binary",
        color_mode='grayscale',
        target_size=(
            PARAMS['IMG_SIZE'], PARAMS['IMG_SIZE']
        ),
    )
    step_size_train = train_flow.n // train_flow.batch_size

    # Validation flow
    valid_flow = DATAGEN.flow_from_dataframe(
        dataframe=df,
        directory=DATASET_PATH,
        x_col="path", y_col="lbl",
        subset="validation",
        batch_size=manual_batch // 5,
        seed=42,
        shuffle=True,
        class_mode="binary",
        color_mode='grayscale',
        target_size=(
            PARAMS['IMG_SIZE'], PARAMS['IMG_SIZE']
        ),
    )
    step_size_valid = valid_flow.n // valid_flow.batch_size

    return {
        'x': train_flow,
        'steps_per_epoch': step_size_train,
        'validation_data': valid_flow,
        'validation_steps': step_size_valid,
        'epochs': epochs,
        'callbacks': csc_callbacks(),
        'verbose': 1,
    }

### history

In [None]:
eps = 100     # @param {type:"integer"}
fit_kwargs = csc_kwargs(class_df, epochs=eps)

In [None]:
history = fit_model(csc, fit_kwargs)

In [None]:
plt.plot(history.history['acc'])
plt.plot(history.history['val_acc'])
plt.title('model accuracy')
plt.ylabel('acc')
plt.xlabel('epoch')
plt.legend(['train', 'validation'], loc='upper left')
plt.show()

In [None]:
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'validation'], loc='upper left')
plt.show()

### Save

In [None]:
CSC_NAME = 'csc'    # @param {type: 'string'}
csc.save(CSC_NAME)

In [None]:
zip_files(['csc', 'CSC.tf'])

## Predictions

### Load

In [None]:
unzip_files(['csc', 'CSC.tf'])

In [None]:
class_df = pd.read_csv('manual_df.csv')

### Restore

In [None]:
unzip_name = 'csc'     # @param {type: 'string'}

saved_csc = tf.keras.models.load_model(
    f'/content/{unzip_name}'
)
saved_csc.summary()

In [None]:
eps = 20     # @param {type:"integer"}
fit_kwargs = csc_kwargs(class_df, epochs=eps)
history = fit_model(saved_csc, fit_kwargs)

### Predict

In [None]:
@timed
def predict(model):
    """
    Get classifier model predictions from DATASET_PATH/CLASSES
    """
    predict_flow = DATAGEN.flow_from_dataframe(
        dataframe=MAIN_DF,
        directory=DATASET_PATH,
        x_col='path', y_col='id',
        shuffle=False,
        class_mode=None,
        color_mode='grayscale',
        target_size=(
            PARAMS['IMG_SIZE'], PARAMS['IMG_SIZE']
        ),
    )
    return model.predict(predict_flow)

In [None]:
# First prediction [csc, saved_csc]
results = predict(saved_csc)

### Relearn

In [None]:
# Relearn DF
@timed
def relearn(df: DataFrame, results: np.ndarray) -> DataFrame:
    """
    Returns copy of maindf adding predictions
    """
    #learn_lim = results.mean()
    #learn_lim = results.max() - results.min() / 2
    learn_lim = results.mean() + results.std() ** 2

    df['val'] = [r[0] for r in results.tolist()]

    df.loc[(df['lbl'].isna()) & (df['val'] < learn_lim), 'lbl'] = 'NO'
    df.loc[(df['lbl'].isna()) & (df['val'] >= learn_lim), 'lbl'] = 'YES'

    # save and download
    df.to_csv('relearn_df.csv', index=False)
    download_file('relearn_df.csv')
    print(
        f'Saved relearn_df.csv (learn_lim :{learn_lim:.5f})'
    )
    df.info()

    return df

In [None]:
relearn_df = relearn(MAIN_DF.copy(), results)

### Learnt

In [None]:
def csc_results(df: DataFrame, shows: int):
    """
    Show preclassifier results
    """
    learnt_generator = DATAGEN.flow_from_dataframe(
        dataframe=df,
        directory=DATASET_PATH,
        x_col='path', y_col='lbl',
        batch_size=1,
        shuffle=True,
        class_mode='raw',
        color_mode='grayscale',
        target_size=(
            PARAMS['IMG_SIZE'], PARAMS['IMG_SIZE']
        )
    )
    true_limit, false_limit = shows, shows
    for i in range(len(learnt_generator)):
        img, lbl = next(learnt_generator)
        if lbl == 'YES':
            true_limit -= 1
        else:
            false_limit -= 1
        show_img(img)
        answer = input(f'{i} - Label: {lbl}\nPress any key to continue\n')
        if answer == 'exit' or (true_limit < 0 and false_limit < 0):
            break

In [None]:
read_relearn_df = pd.read_csv('relearn_df.csv')
csc_results(read_relearn_df, shows=5)

# C shared lib

## C

In [None]:
%%file f2s.c
#include <stdlib.h>
#include <string.h>

    void *
    caracterizacion(char *pix, int width, int height)
{
    /*
    Parte para hacer el conteo de lineas
    hace tanto el conteo de inicio y fin
    como el conteo de toda la lÌnea
    */
    unsigned char *pixels = (unsigned char *)pix;
    int r, c, ls;
    int largo = width > height ? width : height;
    int largo2;
    int **cont;
    int first_pix, negado;
    cont = (int **)malloc(5 * sizeof(int *));
    /*
    Esto se hace para que las funciones de correlación
    solo se calcule hasta la mitad, de tal forma que se
    mantengan la dependencia lineal entre las funciones de cada fase.
    */
    largo = largo / 2;

    for (r = 0; r < 5; r++)
    {
        *(cont + r) = (int *)malloc(largo * sizeof(int));
        memset((void *)(*(cont + r)), 0, largo * sizeof(int));
    }
    for (r = 0; r < height; r++)
    {
        for (c = 0; c < width; c++)
        {
            first_pix = pixels[r * width + c];
            if (first_pix)
            {
                /* first_pix es 1 */
                negado = first_pix;
                largo2 = largo < width - c ? largo : width - c;
                for (ls = 0; ls < largo2; ls++)
                {
                    cont[4][ls]++;
                    /* cuenta los unos en los extremos */
                    cont[0][ls] += (first_pix & pixels[r * width + c + ls]);
                    /* Verifica si siguen siendo unos en la linea */
                    negado = negado & pixels[r * width + c + ls];
                    cont[1][ls] += negado;
                }
                negado = first_pix;
                largo2 = largo < height - r ? largo : height - r;
                for (ls = 0; ls < largo2; ls++)
                {
                    cont[4][ls]++;
                    /* cuenta los unos en los extremos */
                    cont[0][ls] += (first_pix & pixels[(r + ls) * width + c]);
                    /* Verifica si siguen siendo unos en la linea */
                    negado = negado & pixels[(r + ls) * width + c];
                    cont[1][ls] += negado;
                }
            }
            else
            {
                /* first_pix es cero */
                first_pix = 1;
                negado = 1;
                largo2 = largo < width - c ? largo : width - c;
                for (ls = 0; ls < largo2; ls++)
                {
                    cont[4][ls]++;
                    /* 
                    cuenta los ceros en los extremos, solo puede variar el 
		            ultimo bit, por lo que busco que sea diferente a 1
                    */
                    cont[2][ls] += (first_pix ^ pixels[r * width + c + ls]);
                    /* Verifica si siguen siendo ceros en la linea */
                    negado = negado & (first_pix ^ pixels[r * width + c + ls]);
                    cont[3][ls] += negado;
                }
                negado = 1;
                largo2 = largo < height - r ? largo : height - r;
                for (ls = 0; ls < largo2; ls++)
                {
                    cont[4][ls]++;
                    /*
                    cuenta los ceros en los extremos, solo puede variar el 
		            ultimo bit, por lo que busco que sea diferente a 1
                    */
                    cont[2][ls] += (first_pix ^ pixels[(r + ls) * width + c]);
                    /* Verifica si siguen siendo ceros en la linea */
                    negado = negado & (first_pix ^ pixels[(r + ls) * width + c]);
                    cont[3][ls] += negado;
                }
            }
        }
    }
    return cont;
}

In [None]:
os.system('gcc -c -Wall -Werror -fpic f2s.c')
os.system('gcc -shared -o f2s.so f2s.o')

## Python

In [None]:
import ctypes


def load_f2s():
    """
    Load the shared library into ctypes
    """
    libname = "./f2s.so"
    c_lib = ctypes.CDLL(libname)
    c_lib.caracterizacion.restype = ctypes.POINTER(
        ctypes.POINTER(ctypes.c_int)
    )
    return c_lib


C_LIB = load_f2s()

In [None]:
def caracterization(img, size: int):
    """
    Returns implemented caracterization results from input img
    """
    # Numpy array of booleans
    b_img = img > img.mean()
    #b_img = img > (img.max() - img.mean() / 2)
    #b_img = img > (img.max() - img.min() / 2)

    # Define array of boolean chars
    char_array = np.ctypeslib.as_ctypes(b_img)

    # Process img
    f2s_c_data = C_LIB.caracterizacion(char_array, size, size)

    max_index = int(size / 2)
    f2s_py = np.zeros((5, max_index))
    for ftype in range(5):
        for index in range(max_index):
            f2s_py[ftype][index] = f2s_c_data[ftype][index]
    f2s_py = f2s_py / np.array(f2s_py[4, :])

    return f2s_py

In [None]:
@timed
def caract_df(csv_file: str, path: str) -> DataFrame:
    """
    Fill FF data for input df in path
    """
    df = pd.read_csv(csv_file)
    caract_gen = DATAGEN.flow_from_dataframe(
        df,
        directory=path,
        x_col="path", y_col="id",
        class_mode='raw',
        batch_size=1,
        shuffle=False,
        color_mode='grayscale',
        target_size=(
            PARAMS['IMG_SIZE'], PARAMS['IMG_SIZE'],
        ),
    )
    results = []
    for _ in range(len(caract_gen)):
        img, _ = next(caract_gen)
        f2s_py = caracterization(img, PARAMS['IMG_SIZE'])
        results.append(f2s_py)

    df['ff'] = results
    return df

## Correlations

In [None]:
def plot_avg(results: List):
    fig = plt.figure(constrained_layout=True)

    ax = fig.add_subplot()
    ax.plot(results[:4, :].T)
    ax.set_xlabel("Distance")
    ax.set_ylabel("Value")
    ax.set_title("Avg Correlations")
    
    plt.show()


def plot_separate(r: List):
    fig, (ax1, ax2) = plt.subplots(
        1, 2,
        sharex=True, sharey=True,
        constrained_layout=True
    )
    
    ax1.set_xlabel("Distance")
    ax1.set_ylabel("Value")
    ax1.set_title('1s')
    ax1.plot(r[0, :].T, 'tab:blue', label='F2P')
    ax1.plot(r[1, :].T, 'tab:orange', label='FLP')
    ax1.legend()

    ax2.set_xlabel("Distance")
    ax2.set_ylabel("Value")
    ax2.set_title('0s')
    ax2.plot(r[2, :].T, 'tab:green', label='F2P')
    ax2.plot(r[3, :].T, 'tab:red', label='FLP')
    ax2.legend()
    
    plt.show()


@timed
def plots(df: DataFrame, lbl: str):
    """
    Plots results
    """
    r = df['ff'][df['lbl'] == lbl].mean()
    
    plot_avg(r)
    plot_separate(r)
   
    #plt.savefig(f'{path}/plot')

In [None]:
df = pd.read_csv('results_df.csv')
df.head()

In [None]:
r = df['ff'][df['lbl'] == "YES"]

In [None]:
df['ff'][df['lbl'] == "YES"].iloc[0]

### FFS (Relearn)

In [None]:
results_df = caract_df(
    'relearn_df.csv',
    DATASET_PATH,
)
results_df.head()

In [None]:
plots(results_df)

In [None]:
plots(results_df, 'NO')

In [None]:
results_df.to_csv('results_df.csv', index=False)

### Test (Manual)

In [None]:
test_df = caract_df('manual_df.csv', DATASET_PATH)
test_df.head()

In [None]:
plots(test_df, 'YES')

In [None]:
plots(test_df, 'NO')

In [None]:
test_df.to_csv('test_df.csv', index=False)

# GAN


## Custom Class

In [None]:
def get_random_vectors(batch_size=1, latent_dim=64):
    """Noise generator"""
    return tf.random.normal(shape=(batch_size, latent_dim))


seed = get_random_vectors(
    PARAMS.get('BATCH_SIZE'),
    PARAMS.get('LATENT_DIM')
)

### Models

In [None]:
def build_discriminator(input_shape=(32, 32, 1), latent_dim=128):
    """
    Returns GAN discriminator model
    """
    model = tf.keras.models.Sequential(name='discriminator')

    # input layer
    model.add(layers.InputLayer(input_shape=input_shape))

    # conv layers
    conv_layers = [
        layers.Conv2D(1, 8, strides=1, padding="same"),
        layers.Conv2D(latent_dim//8, 8, strides=2, padding="same"),
        layers.Conv2D(latent_dim//4, 4, strides=2, padding="same"),
        layers.Conv2D(latent_dim//2, 2, strides=2, padding="same"),
        layers.Conv2D(latent_dim, 2, strides=2, padding="same"),
    ]
    for c in conv_layers:
        model.add(c)
        model.add(layers.BatchNormalization()),
        model.add(layers.LeakyReLU(alpha=0.2))

    # dense layers
    dense_lays = [
        layers.GlobalMaxPooling2D(),
        layers.Dense(latent_dim),
        layers.Dropout(0.2)
    ]
    for d in dense_lays:
        model.add(d)

    # output layer
    model.add(layers.Dense(1))
    return model

In [None]:
discriminator = build_discriminator(PARAMS.get('INPUT_SHAPE'))
my_plot_model(discriminator)
discriminator.summary()

In [None]:
def build_generator(img_size=32, latent_dim=128):
    """
    Returns GAN generator model
    """
    model = tf.keras.models.Sequential(name='generator')

    # input layer
    model.add(layers.InputLayer(input_shape=(latent_dim,)))

    # dense layer
    shaped_size = img_size // 32
    model.add(layers.Dense(shaped_size**2 * latent_dim))
    model.add(layers.Dropout(0.2))
    model.add(layers.Reshape((shaped_size, shaped_size, latent_dim)))

    # 5 convt layers
    for c in range(5):
        model.add(
            layers.Conv2DTranspose(
                latent_dim, 2,
                strides=2,
                padding="same"
            )
        )
        model.add(layers.BatchNormalization())
        model.add(layers.LeakyReLU(alpha=0.2)),

    # output layer
    model.add(
        layers.Conv2DTranspose(
            1, 2,
            strides=1,
            padding="same",
            activation='tanh'
        )
    )
    return model

In [None]:
generator = build_generator(
    PARAMS['INPUT_SHAPE'][1],
    PARAMS.get('LATENT_DIM')
)
my_plot_model(generator)
generator.summary()

### Class

In [None]:
class GAN(tf.keras.Model):
    """
    Custom GAN class model
    """

    def __init__(
        self,
        generator,
        discriminator,
        params: dict,
        name='GAN',
    ):
        super().__init__()
        self.latent_dim = params.get('LATENT_DIM')
        self.seed = seed
        self.discriminator = discriminator
        self.generator = generator

    def compile(self, d_optimizer, g_optimizer, loss_fn):
        self.loss_fn = loss_fn
        self.d_optimizer = d_optimizer
        self.g_optimizer = g_optimizer
        super().compile()

    def call(self, inputs) -> List[tf.Tensor]:
        fakes = self.generator(
            get_random_vectors(
                tf.shape(inputs)[0], self.latent_dim
            )
        )
        predictions = [
            self.discriminator(inputs),
            self.discriminator(fakes)
        ]
        return [fakes, predictions]

    def train_step(self, real_images):
        if isinstance(real_images, tuple):
            real_images = real_images[0]

        # Sample random points in the latent space
        batch_size = tf.shape(real_images)[0]
        random_latent_vectors = get_random_vectors(batch_size, self.latent_dim)

        # Decode them to fake images
        generated_images = self.generator(random_latent_vectors)

        # Combine them with real images
        combined_images = tf.concat(
            [generated_images, real_images],
            axis=0
        )

        # Assemble labels discriminating real from fake images
        labels = tf.concat(
            [tf.ones((batch_size, 1)), tf.zeros((batch_size, 1))],
            axis=0
        )

        # Add random noise to the labels - important trick!
        labels += 0.05 * tf.random.uniform(tf.shape(labels))

        # Train the discriminator
        with tf.GradientTape() as tape:
            predictions = self.discriminator(combined_images)
            d_loss = self.loss_fn(labels, predictions)
            grads = tape.gradient(
                d_loss,
                self.discriminator.trainable_variables
            )
            self.d_optimizer.apply_gradients(
                zip(grads, self.discriminator.trainable_variables)
            )

        # Sample random points in the latent space (again)
        random_latent_vectors = get_random_vectors(batch_size, self.latent_dim)

        # Assemble labels that say "all real images"
        misleading_labels = tf.zeros((batch_size, 1))

        # Train the generator
        # (note that we should *not* update the weights of the discriminator)
        with tf.GradientTape() as tape:
            generated_images = self.generator(random_latent_vectors)
            predictions = self.discriminator(generated_images)
            g_loss = self.loss_fn(misleading_labels, predictions)
            grads = tape.gradient(g_loss, self.generator.trainable_variables)
            self.g_optimizer.apply_gradients(
                zip(grads, self.generator.trainable_variables)
            )

        return {"d_loss": d_loss, "g_loss": g_loss}

In [None]:
# Best - 1e4, 3e4

DIS_LR = 0.0001   # @param {type:"number"}
d_optimizer = tf.keras.optimizers.Adam(learning_rate=DIS_LR)

GEN_LR = 0.0003   # @param {type:"number"}
g_optimizer = tf.keras.optimizers.Adam(learning_rate=GEN_LR)

In [None]:
# Build GAN
gan = GAN(
    generator,
    discriminator,
    PARAMS,
)
gan.compile(
    d_optimizer=d_optimizer,
    g_optimizer=g_optimizer,
    loss_fn=tf.keras.losses.BinaryCrossentropy(from_logits=True)
)

## Setup


### Callbacks

In [None]:
GAN_CKPT_DIR = './gan_ckpts'

In [None]:
class myGANCallback(tf.keras.callbacks.Callback):
    """
    Show and save imgs per epoch (checkpoint)
    """
    def __init__(self, dir: str, manager):
        self.dir = dir
        self.manager = manager
        self.loss_lim = 1.
        super().__init__()

    def on_epoch_end(self, epoch: int, logs={}):
        """
        chkpt and imgs save
        """
        result = self.model.generator(self.model.seed)
        show_img(result)
        # display.clear_output(wait=True)
        
        # FFS?
        g_loss = logs.get('g_loss')
        d_loss = logs.get('d_loss')
        
        #if g_loss < d_loss:
        if abs(g_loss - d_loss) < self.loss_lim:
            # ckpt
            self.manager.save()
            save_img(result, epoch, self.dir)
            # loss lim rate
            self.loss_lim *= 0.9
        else:
            if (epoch % 10) == 0:
                save_img(result, epoch, self.dir)

        # TODO: Dynamic Learn, FFS?
        if False:
            optz = [
                self.model.g_optimizer,
                self.model.d_optimizer,
            ]
            for o in optz:
                old_lr = o.lr.read_value()
                new_lr = old_lr * 0.99
                o.lr.assign(new_lr)

In [None]:
def gan_ckpt():
    """
    GAN Checkpoint
    """
    checkpoint = tf.train.Checkpoint(
        g_optimizer=g_optimizer,
        d_optimizer=d_optimizer,
        generator=generator,
        discriminator=discriminator,
    )
    manager = tf.train.CheckpointManager(
        checkpoint,
        directory=GAN_CKPT_DIR,
        max_to_keep=5
    )
    checkpoint.restore(manager.latest_checkpoint)
    return manager

In [None]:
def gan_callbacks(dir: str) -> list:
    """
    Set GAN training callbacks
    """
    return [
        myGANCallback(dir, gan_ckpt()),
        tensorboard_callback
    ]

### Fit

In [None]:
def prep_fn(img):
    """
    Normalize image preprocess function [-1, 1]
    """
    img = img.astype(np.float32) / 255.0
    img = (img - 0.5) * 2
    return img


def get_gan_gen(df: DataFrame):
    """
    Returns GANs input flow
    """
    datagen_gan = ImageDataGenerator(
        preprocessing_function=prep_fn,
        #rotation_range=90,
        #horizontal_flip=True,
        #vertical_flip=True,
    )
    return datagen_gan.flow_from_dataframe(
        dataframe=df,
        directory=DATASET_PATH,
        x_col='path', y_col='lbl',
        batch_size=PARAMS['BATCH_SIZE'],
        seed=42,
        shuffle=True,
        color_mode='grayscale',
        classes=['YES'],
        target_size=(
            PARAMS['IMG_SIZE'], PARAMS['IMG_SIZE']
        ),
    )

In [None]:
def gan_kwargs(name: str, eps: int, df_name='relearn_df.csv') -> dict:
    """
    Custom GAN fit
    """
    dir = f'GAN_train/{name}'
    try:
        os.mkdir(dir)
    except FileExistsError as e:
        exit(f'{e}: {dir} already exists')

    gan_df = pd.read_csv(df_name)
    return {
        'x': get_gan_gen(gan_df),
        'epochs': eps,
        'callbacks': gan_callbacks(dir),
        'verbose': 1,
    }

### Restore

In [None]:
os.mkdir('GAN_train')

In [None]:
unzip_files(['gan_ckpts'])

## Training

### Single

In [None]:
FIT_NAME = "test"  # @param {type:"string"}
gan_eps = 10        # @param {type:"integer"}

train_kwargs = gan_kwargs(FIT_NAME, gan_eps)

In [None]:
gan_history = fit_model(gan, train_kwargs)

### Loop

In [None]:
@timed
def train_loop(gan_model, version: int, loops:int=10) -> List:
    history = []
    for i in range(loops):
        FIT_NAME = f"train_{version}_{i}"
        # Setup train (100 epochs)
        gan_eps = 100
        train_kwargs = gan_kwargs(FIT_NAME, gan_eps)

        # Custom fit
        gan_history = fit_model(gan_model, train_kwargs)
        history.append(gan_history)

        # Save
        zip_files(['GAN_train', 'gan_ckpts'])
    return history

In [None]:
version =          1# @param {type:"integer"}
gan_history = train_loop(
    gan, version,
    loops=1         # @param {type:"integer"}
)

In [None]:
plt.plot(gan_history[0].history['g_loss'])
plt.plot(gan_history[0].history['d_loss'])
plt.title('GAN train loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(
    ['generator', 'discriminator'],
    loc='upper left'
)
plt.show()

## Save

In [None]:
GAN_NAME = 'gan'    # @param {type: 'string'}
gan.save(GAN_NAME)

In [None]:
zip_files([GAN_NAME])

# Results

## Samples

### Call

In [None]:
from tensorflow.python.ops.numpy_ops import np_config


np_config.enable_numpy_behavior()


@timed
def make_gans_batch(model, name: str, samples=10):
    """
    Build local batch from trained generator
    """
    p_name = f'GAN_samples/{name}'
    try:
        os.mkdir(p_name)
    except FileExistsError as e:
        print(e)

    data = {
        'id': [],
        'path': [],
    }
    for i in range(samples):
        result = model(
            get_random_vectors(
                PARAMS.get('BATCH_SIZE'),
                PARAMS.get('LATENT_DIM')
            )
        )
        # TODO: control FFS
        path = save_img(result, i, p_name)
        data['path'].append(f'/content/{path}')

        id = path.split('/')[-1]
        data['id'].append(id)
        show_img(result)

    samples_df = DataFrame(data)
    samples_df['lbl'] = ['YES' for i in range(samples)]

    f_name = f'samples_{name}_df.csv'
    samples_df.to_csv(f'{p_name}/{f_name}', index=False)
    return samples_df

In [None]:
os.mkdir('GAN_samples')

### Sample

In [None]:
BATCH_NAME = 'results'   # @param {type: 'string'}
SAMPLES_SIZE = 2000     # @param {type: 'integer'}

samples_df = make_gans_batch(
    gan.generator,
    BATCH_NAME,
    SAMPLES_SIZE
)

In [None]:
samples_df = caract_df(
    f'/content/GAN_samples/{BATCH_NAME}/samples_{BATCH_NAME}_df.csv',
    f'/content/GAN_samples/{BATCH_NAME}'
)

In [None]:
def plot_avg(results: List):
    fig = plt.figure(constrained_layout=True)

    ax = fig.add_subplot()
    ax.plot(results[:4, :].T)
    ax.set_xlabel("Distance")
    ax.set_ylabel("Value")
    ax.set_title("Avg Correlations")
    
    plt.show()


def plot_separate(r: List):
    fig, (ax1, ax2) = plt.subplots(
        1, 2,
        sharex=True, sharey=True,
        constrained_layout=True
    )
    
    ax1.set_xlabel("Distance")
    ax1.set_ylabel("Value")
    ax1.set_title('1s')
    ax1.plot(r[0, :].T, 'tab:blue', label='F2P')
    ax1.plot(r[1, :].T, 'tab:orange', label='FLP')
    ax1.legend()

    ax2.set_xlabel("Distance")
    ax2.set_ylabel("Value")
    ax2.set_title('0s')
    ax2.plot(r[2, :].T, 'tab:green', label='F2P')
    ax2.plot(r[3, :].T, 'tab:red', label='FLP')
    ax2.legend()
    
    plt.show()


@timed
def plots(df: DataFrame, lbl: str):
    """
    Plots results
    """
    r = df['ff'][df['lbl'] == lbl].mean()
    
    plot_avg(r)
    plot_separate(r)
   
    #plt.savefig(f'{path}/plot')

In [None]:
df = test_df
title = 'Test samples avg'

In [None]:
df = samples_df
title = 'GANs samples avg'

In [None]:
r = df['ff'][df['lbl'] == "YES"].mean()

fig, (ax1, ax2) = plt.subplots(
    2, 1,
    sharex=True, sharey=False,
    constrained_layout=True
)
ax1.set_xlabel("Distance")
ax1.set_ylabel("Value")
ax1.set_title('F2P')
ax1.plot(r[0, :].T, 'tab:blue', label='1s')
ax1.plot(r[2, :].T, 'tab:orange', label='0s')
ax1.legend()

ax2.set_xlabel("Distance")
ax2.set_ylabel("Value")
ax2.set_title('FLP')
ax2.plot(r[1, :].T, 'tab:green', label='1s')
ax2.plot(r[3, :].T, 'tab:red', label='0s')
ax2.legend()

fig.suptitle(title)
plt.show()

### Download

In [None]:
#zip_files(['GAN_samples'])
zip_files(['gan_ckpts'])

## Restore

### Unzip

In [None]:
unzip_files(['gan'])

In [None]:
# Load
unzip_name = 'gan'     # @param {type: 'string'}

#saved_gan = tf.saved_model.load(
saved_gan = tf.keras.models.load_model(
    f'/content/{unzip_name}',
    #custom_objects={"GAN": GAN}
)
saved_gan.summary()

### FFs

In [None]:
# SOURCE
sample_generator = DATAGEN.flow_from_dataframe(
    dataframe=test_df,
    directory=DATASET_PATH,
    x_col="path", y_col="lbl",
    batch_size=1,
    colormode='grayscale',
    shuffle=True,
    classes=['YES'],
    target_size=(
        PARAMS['IMG_SIZE'], PARAMS['IMG_SIZE'],
    ),
)

In [None]:
img, _ = next(sample_generator)
show_img(img)

In [None]:
# FFS
true_ffs = caracterization(img, PARAMS['IMG_SIZE'])

In [None]:
title = 'Test'
results = true_ffs

In [None]:
fig = plt.figure(constrained_layout=True)

ax = fig.add_subplot()
ax.plot(results[:4, :].T)
ax.set_xlabel("Distance")
ax.set_ylabel("Value")
ax.set_title(f"Avg Correlations {title} sample")

plt.show()

In [None]:
# SAMPLE
PARAMS['LATENT_DIM'] = 64
#img = saved_gan.generator(
img = gan.generator(
    get_random_vectors(
        PARAMS.get('BATCH_SIZE'),
        PARAMS.get('LATENT_DIM')
    )
)
show_img(img)

In [None]:
# FFS
false_ffs = caracterization(img.numpy(), PARAMS.get('IMG_SIZE'))

In [None]:
r = true_ffs

fig = plt.figure(constrained_layout=True)

ax = fig.add_subplot()
ax.plot(r[0, :].T, 'tab:blue', label='F2P 1s')
ax.plot(r[1, :].T, 'tab:green', label='F2P 1s')
ax.plot(r[2, :].T, 'tab:orange', label='FLP 0s')
ax.plot(r[3, :].T, 'tab:red', label='FLP 0s')

ax.set_xlabel("Distance")
ax.set_ylabel("Value")
ax.set_title("Single img correlations")
ax.legend()

plt.show()


### Samples

In [None]:
# Batch
name = 'saved'    # @param {type: 'string'}

saved_samples_df = make_gans_batch(
    #saved_gan.generator, name, 5)
    gan.generator,
    name, 100
)

In [None]:
# Caract DF
saved_samples_df = caract_df(
    f'samples_{name}_df.csv',
    f'/content/GAN_samples/{name}'
)

In [None]:
plots(saved_samples_df)