In [None]:
import os
import pandas as pd
import numpy as np
import multiprocessing as mp
import cv2

# Sklearn
from sklearn.pipeline import make_pipeline
from sklearn import preprocessing
from sklearn.decomposition import PCA
from sklearn.cluster import KMeans
from sklearn.model_selection import train_test_split

# Keras
from keras import Model
from keras import regularizers, callbacks
from keras.layers import Conv2D, MaxPooling2D, Input, UpSampling2D, Dense
from keras.applications.vgg16 import preprocess_input
from keras.preprocessing.image import load_img
from keras.preprocessing.image import img_to_array
from keras.callbacks import TensorBoard

# Plot
import matplotlib.pyplot as plt

# Test dataset
from keras.datasets import mnist


path = os.getcwd()
raw_data_directory = os.path.join(path, 'Raw_data')
list_filename = os.listdir(raw_data_directory)
list_filename_full = [os.path.join(path, 'Raw_data', f) for f in list_filename]

def extract_hist(f, scale_percent = 50):
    
    color = ('b','g','r')
    im = cv2.imread(f)
    width = int( np.shape(im)[1] * scale_percent / 100 )
    height = int( np.shape(im)[0] * scale_percent / 100 )
    normalization = float(width * height)
    im = cv2.resize(im, (width, height), interpolation=cv2.INTER_NEAREST)
    hist_channel = [cv2.calcHist([im],[i],None,[hist_size],[0,hist_size]) for i, col in enumerate(color)]
    hist = np.concatenate(hist_channel) / (np.max(hist_channel) + 0.001)
    #hist = np.concatenate(hist_channel) / normalization
    return hist


In [None]:
# Training data

# options
dataset = "mnist" # Chose either "mnist" or "personal_data" stored in Raw_data folder
hist_data = 'compute' # Chose either "compute" to (re)compute histograms or "load" to load last personal_data
hist_size = 256
N_images = np.size(list_filename) # All images

if dataset == "mnist":
    (x_train, _), (x_test, _) = mnist.load_data()
    
    # Extract histogram for mnist
    x_train = [cv2.calcHist([im], [0], None, [hist_size], [0,256]) / np.prod(im.shape) for im in x_train]
    x_train = np.squeeze(np.array(x_train))
    x_test = [cv2.calcHist([im], [0], None, [hist_size], [0,256]) / np.prod(im.shape) for im in x_test]
    x_test = np.squeeze(np.array(x_test))
    
elif dataset == "personal_data":
    if hist_data == 'compute':
        # Compute histograms in parallel
        pool = mp.Pool(mp.cpu_count())
        hist = pool.starmap(extract_hist, [(f, 100) for f in list_filename_full[0:N_images]])
        hist = np.squeeze(np.array(hist))
        pool.close()

        # Sauver ces histograms
        np.save('histogram.npy', hist)

    elif hist_data == 'load':
        # Charger les histograms sauvegardés
        hist = np.load('histogram.npy')
        
    x_train, x_test = train_test_split(hist, test_size=0.2, random_state=123)


In [None]:
ndim_red = 32
dim_red_method = "autoencoder"

if dim_red_method == "pca":
    M = preprocessing.scale(hist)
    Mp = pd.DataFrame(M, index=list_filename)

    pca = PCA()
    Y = pca.fit_transform(Mp)

    print(pca.explained_variance_ratio_[0:20])

elif dim_red_method == "autoencoder":

    # Early stopping
    early_stopping = callbacks.EarlyStopping(monitor='loss', patience=3)

    input_vec = Input(shape= (x_train.shape[1],) )
    
    encoded = Dense(4*ndim_red, activation='relu')(input_vec)
    encoded = Dense(2*ndim_red, activation='relu')(encoded)
    
    encoded = Dense(ndim_red, activation='relu')(encoded)
    
    decoded = Dense(2*ndim_red, activation='relu')(encoded)
    decoded = Dense(4*ndim_red, activation='relu')(decoded)
    decoded = Dense(x_train.shape[1], activation='sigmoid', activity_regularizer=regularizers.l1(10e-5))(encoded)
    
    autoencoder = Model(input_vec, decoded)
    autoencoder.compile(optimizer='adam', loss='binary_crossentropy')

    # Encoder is needed to get only the encoded data
    encoder = Model(input_vec, encoded)

    # Decoder
    input_decoder = Input(shape=(ndim_red), )
    decoder_layer = Dense(x_train.shape[1], activation='sigmoid')(input_decoder)
    decoder = Model(input_decoder, decoder_layer)

    # Training on data
    autoencoder.fit(x_train, x_train,
                    epochs=20,
                    batch_size=32,
                    shuffle=True,
                    validation_data=(x_test, x_test),
                    callbacks=[early_stopping, TensorBoard(log_dir='./hist_based/')])


In [None]:
# Comparing original histogram with decoded version.
# Complete failure. Why histogram auto-encoding is so difficult ?

sample = 3
plt.plot(x_train[sample], color='r')

enc_M0 = encoder.predict([x_train[sample:sample+1]])
dec_M0 = decoder.predict(enc_M0)
dec_M0 = dec_M0.reshape(x_train[sample].shape)
dec_M0.shape
plt.plot(dec_M0, color='b')