In [1]:
import tensorflow as tf
from tensorflow.keras.layers import Input, Conv2D, Dense, MaxPooling2D, Flatten, BatchNormalization, Reshape, LeakyReLU, multiply, Embedding, UpSampling2D, Dropout
from tensorflow.keras.models import Model, load_model, Sequential
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.preprocessing.sequence import pad_sequences

import os
import numpy as np
import json
import cv2
import matplotlib.pyplot as plt
import random
from tqdm import tqdm

In [3]:
def req_model(no_of_labels):
    input_layer = Input(shape=image_inp_shape_conv)
    conv_1 = Conv2D(filters=64, kernel_size=(2,2), activation='relu', padding='same')(input_layer)
    flatten_1 = Flatten()(conv_1)
    out_layer = Dense(no_of_labels, activation="softmax")(flatten_1)
    
    model = Model(input_layer, out_layer)
    model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
#     model.summary()
    return model

In [4]:
def labels():
    if os.path.isfile(labels_file_path):
        labels_file = open(labels_file_path, "r")
        labels = json.load(labels_file)
    else:
        labels_dict = {0:"noise"}
        with open(labels_file_path, "w") as f:
            json.dump(labels_dict, f)
        labels = labels_dict
    return labels

In [5]:
def preprocess(image):
    image_arr = cv2.imdecode(np.frombuffer(image, np.uint8), -1)
    image_processed = cv2.resize(image_arr, image_inp_shape)
    image_processed = cv2.cvtColor(image_processed, cv2.COLOR_BGR2GRAY)
    image_processed = (image_processed) / 255.0
    return image_processed

def preprocess_gan(image):
    image_arr = cv2.imdecode(np.frombuffer(image, np.uint8), -1)
    image_processed = cv2.resize(image_arr, image_inp_shape)
    image_processed = cv2.cvtColor(image_processed, cv2.COLOR_BGR2GRAY).reshape((dense_output,))
    image_processed = (image_processed) / 255.0
    return image_processed

In [6]:
def predict(image):
    labels_read = labels()
    if os.path.isfile(model_file):
        model = load_model(model_file)
    else:
        model = req_model(no_of_labels=len(labels_read))
    test_image = np.expand_dims(image, axis=0)
    results = model.predict(test_image)
    predicted_label = labels_read.get(str(np.argmax(results[0])))
    return predicted_label

In [7]:
def update_model_arc(model_v1, model):
#     model_v1 = req_model(no_of_labels=2)
    for i in range(len(model_v1.layers) -1):
        model_v1.layers[i].set_weights(model.layers[i].get_weights())

    last_layer = model.layers[-1].get_weights()
    temp_final_weights = []
    labels_to_add = 1
    for i in range(len(last_layer)):
        if len(list(last_layer[i].shape)) > 1:
            temp_dense = pad_sequences(last_layer[i], maxlen=last_layer[i].shape[-1] + labels_to_add, padding='post')
        else:
            temp_dense = np.pad(last_layer[i], pad_width=labels_to_add, mode='constant', constant_values=0)[
                         labels_to_add:]
        temp_final_weights.append(temp_dense)
    model_v1.layers[-1].set_weights(temp_final_weights)
    return model_v1

def update_gan_arc(gen_v1, gen):
#     gen_v1 = generator(n_classes=4)
    for i in range(len(gen_v1.layers) -1):
        if gen_v1.layers[i].get_config().get("name") == "embedding":
            last_layer = gen.get_layer(name="embedding").get_weights()
            temp_final_weights = []
            labels_to_add = 1
            for i in range(len(last_layer)):
                temp_dense = needed_arr = np.random.rand(1,last_layer[i].shape[1])
                temp_weights = np.concatenate((last_layer[i], temp_dense), axis=0)
                temp_final_weights.append(temp_weights)
            gen_v1.get_layer(name="embedding").set_weights(temp_final_weights)
        else:
            gen_v1.layers[i].set_weights(gen.layers[i].get_weights())
    return gen_v1

In [8]:
def train(image, ground_truth, add_label:bool):
    labels_read = labels()
    if os.path.isfile(model_file):
        if add_label:
            model_old = load_model(model_file)
            model_new = req_model(no_of_labels=len(labels_read))
            model = update_model_arc(model_v1=model_new, model=model_old)
        else:
            model = load_model(model_file)
    else:
        model = req_model(no_of_labels=len(labels_read))
    test_image = np.expand_dims(image, axis=0)
    ground_truth_arr = np.asarray([float(ground_truth)])
    for i in tqdm(range(0,max_iter), desc="training req model..."):
        history = model.train_on_batch(test_image,ground_truth_arr)
    model.save("model.h5")

In [9]:
def generator(n_classes:int = None):
    gen = Sequential()
    gen.add(Dense(256,input_dim=latent_dim))
    gen.add(LeakyReLU(0.2))
    gen.add(BatchNormalization(momentum=0.8))
    gen.add(Dense(512))
    gen.add(LeakyReLU(0.2))
    gen.add(BatchNormalization(momentum=0.8))
    gen.add(Dense(1024))
    gen.add(LeakyReLU(0.2))
    gen.add(BatchNormalization(momentum=0.8))
    gen.add(Dense(dense_output,activation='tanh'))

    noise = Input(shape=(latent_dim,))
    label = Input(shape=(1,),dtype='int32')
    label_embedding = Flatten()(Embedding(n_classes,latent_dim, name="embedding")(label))
    model_input = multiply([noise,label_embedding])
    image = gen(model_input)

    gen = Model([noise,label],image)
    gen.compile(loss='binary_crossentropy',optimizer='adam',metrics=["accuracy"])
    return gen


def discriminator(n_classes:int = None):
    disc = Sequential()
    disc.add(Dense(512,input_dim=dense_output))
    disc.add(LeakyReLU(0.2))
    disc.add(Dropout(0.4))
    disc.add(Dense(512))
    disc.add(LeakyReLU(0.2))
    disc.add(Dropout(0.4))
    disc.add(Dense(512))
    disc.add(LeakyReLU(0.2))
    disc.add(Dropout(0.4))
    disc.add(Dense(1,activation='sigmoid'))

    image = Input(shape=(dense_output,))
    label = Input(shape=(1,),dtype='int32')
    label_embedding=Flatten()(Embedding(n_classes,dense_output)(label))
    model_input=multiply([image,label_embedding])
    prediction=disc(model_input)

    disc = Model([image,label],prediction)
    disc.compile(loss='binary_crossentropy',optimizer='adam',metrics=['accuracy'])
    return disc


def stacked_GAN(gen,disc):
    gan_input = Input(shape=(latent_dim,))
    label = Input(shape=(1,))
    x = gen([gan_input,label])
    disc.trainable=False
    gan_out = disc([x,label])
    gan_stack = Model([gan_input,label],gan_out)
    gan_stack.compile(loss='binary_crossentropy',optimizer=Adam(lr=0.0002, beta_1=0.5))
    return gan_stack

def test(gen, label_to_make,n_classes:int=None,i:int=None):
    noise = np.random.normal(0,1,(1,latent_dim))
    label = label_to_make
    gen_image = np.squeeze(gen.predict([noise,label]),axis=0)
#   plt.imsave('./epoch_%d_tag_%s'%(i,label[0]),image.reshape(image_inp_shape),format='jpg',cmap='gray')
    return gen_image

def train_cgan(max_iter,batch_size,gen,disc,gan_stack,X_train,y_train,n_classes:int=None,gen_model_path:str=None,disc_model_path:str=None):
    valid=np.ones((batch_size,1))
    fake=np.zeros((batch_size,1))
    for i in tqdm(range(max_iter), desc="training cgan model..."):
        noise=np.random.normal(0,1,(batch_size,latent_dim))
        index=np.random.randint(0, X_train.shape[0], size=batch_size)
        image_batch = X_train[index]
        label_batch = y_train[index]

        fake_images=gen.predict([noise,label_batch])

        disc.trainable=True
        disc_loss_real=disc.train_on_batch([image_batch,label_batch],valid)
        disc_loss_fake=disc.train_on_batch([fake_images,label_batch],fake)
        disc_loss_final=0.5*np.add(disc_loss_real,disc_loss_fake)

        fake_labels=np.random.randint(0,n_classes,batch_size).reshape(-1,1)
        disc.trainable=False
        gen_loss=gan_stack.train_on_batch([noise,fake_labels],valid)

#       print('epoch_%d---->gen_loss:[%f]---->disc_loss:[%f]---->acc:[%f]'%(i,gen_loss,disc_loss_final[0],disc_loss_final[1]*100))
        if i%100==0:
            gen.save(gen_model_path)
            disc.save(disc_model_path)
#           test(gen,i, n_classes=n_classes, label_to_make=np.expand_dims(y_train, axis=0))

In [10]:
def wrapper_train(image, ground_truth, add_label:bool):
    latent_dim = 100
    batch_size = 16
    labels_read = labels()
    test_image = np.expand_dims(image, axis=0)
    ground_truth_arr = np.asarray([float(ground_truth)])
    X_train, y_train, num_classes = test_image, ground_truth_arr, len(labels_read)

    if os.path.isfile(gen_model_path):
        if add_label:
            gen_old = load_model(gen_model_path)
            gen_new = generator(n_classes=num_classes)
            gen = update_gan_arc(gen_v1=gen_new, gen=gen_old)
        else:
            gen = load_model(gen_model_path)
    else:
        gen = generator(n_classes=num_classes)

    if os.path.isfile(disc_model_path):
        if add_label:
            disc_old = load_model(disc_model_path)
            disc_new = discriminator(n_classes=num_classes)
            disc = update_gan_arc(gen_v1=disc_new, gen=disc_old)
        else:
            disc = load_model(disc_model_path)
    else:
        disc = discriminator(n_classes=num_classes)

    gan_stack = stacked_GAN(gen,disc)
    train_cgan(max_iter,batch_size,gen,disc,gan_stack, n_classes=num_classes,
               gen_model_path=gen_model_path, disc_model_path=disc_model_path,
               X_train = X_train, y_train = y_train)

In [11]:
def rev_labels(labels_dict):
    rev_labels_read = {}
    for key,val in labels_dict.items():
        rev_labels_read[val] = key
    return rev_labels_read

In [13]:
labels_file_path = "./labels.json"
model_file = "./model.h5"
gen_model_path = './generator.h5'
disc_model_path = './discriminator.h5'
image_inp_shape = (150,150)
image_inp_shape_conv = (150,150,1)
dense_output = int(image_inp_shape[0] * image_inp_shape[1])
latent_dim = 100
max_iter=200

for i in range(0,5):
    dataset_path = "."
    folder_name = input("feed the folder name: ")
    folder_path = os.path.join(dataset_path, folder_name)
    image_path = os.path.join(folder_path, random.choice(os.listdir(folder_path)))
    print(image_path)
    image = open(image_path, "rb").read()
    image_arr = preprocess(image=image)
    image_arr_gan = preprocess_gan(image=image)
    label = folder_name
    prediction = predict(image=image_arr)
    if prediction == label:
        print("Rewarded")
    else:
        if "noise" in list(labels().values()):
            print("Punishment to remove noise")
            labels_read = {"0":label}
            with open(labels_file_path, "w") as f:
                json.dump(labels_read, f)
            rev_labels_read=rev_labels(labels_dict=labels_read)
            # TRAIN MODEL
            train(image=image_arr, ground_truth=rev_labels_read.get(label),add_label=False)
        else:
            labels_read = labels()
            rev_labels_read=rev_labels(labels_dict=labels_read)
            if label in rev_labels_read:
                print("Punishment to just retrain")
                
                # TRAIN MODEL AND CGAN
                train(image=image_arr, 
                      ground_truth=rev_labels_read.get(label),
                      add_label=False)
                
                wrapper_train(image=image_arr_gan, 
                              ground_truth=rev_labels_read.get(label),
                              add_label=False)
                
                for key,val in rev_labels_read.items():
                    if key != label:
                        gen = load_model(gen_model_path)
                        label_to_send = np.expand_dims(np.asarray([float(rev_labels_read.get(key))]), axis=0)
                        re_train_image = test(gen=gen, label_to_make=label_to_send)
                        updated_image = re_train_image.reshape((image_inp_shape_conv))
                        train(image=updated_image, 
                              ground_truth=rev_labels_read.get(key),
                              add_label=False)
            else:
                print("Punishment to use cgan to remove bias")
                labels_read.update({len(labels_read):label})
                with open(labels_file_path, "w") as f:
                    json.dump(labels_read, f)
                rev_labels_read=rev_labels(labels_dict=labels_read)
                
                # TRAIN THE MODEL AND CGAN WITH UPDATED ARC AND USE CGAN FOR REMOVING BIAS
                train(image=image_arr, 
                      ground_truth=rev_labels_read.get(label), 
                      add_label=True)

                wrapper_train(image=image_arr_gan, 
                              ground_truth=rev_labels_read.get(label), 
                              add_label=True)

feed the folder name:  


./labels.json


error: OpenCV(4.4.0) /tmp/pip-req-build-vu_aq9yd/opencv/modules/imgproc/src/resize.cpp:3929: error: (-215:Assertion failed) !ssize.empty() in function 'resize'


In [None]:
image_path = "/forest/10736.jpg"
image = open(image_path, "rb")
image_arr = preprocess(image=image.read())
label = folder_name
prediction = predict(image=image_arr)
prediction