<a href="https://colab.research.google.com/github/omriak/omri_project_files/blob/main/main_trainingV0.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
try:
    from google.colab import drive
    drive.mount('/content/drive', force_remount=True)
    COLAB = True
    print("Note: using Google CoLab")
    %tensorflow_version 2.x
except:
    print("Note: not using Google CoLab")
    COLAB = False

Mounted at /content/drive
Note: using Google CoLab


Imports section & globals parameters

In [2]:
import tensorflow as tf
import tensorflow.keras as keras
from tensorflow.keras.layers import Input, Reshape, Dropout, Dense, Resizing
from tensorflow.keras.layers import Flatten, BatchNormalization
from tensorflow.keras.layers import Activation, ZeroPadding2D
from tensorflow.keras.layers import LeakyReLU
from tensorflow.keras.layers import UpSampling2D, Conv2D
from tensorflow.keras.models import Sequential, Model, load_model
from tensorflow.keras.optimizers import Adam
import numpy as np
from PIL import Image
from tqdm import tqdm
import os 
import time
import cv2 as cv
import matplotlib.pyplot as plt
tf.config.run_functions_eagerly(True)

# main path:
path_folder = os.path.join('/content/drive/MyDrive/Project_OmriAkrish/project_code_files/training_data')



Function section

In [10]:
def upsample_100(data):
    current_N = np.shape(data)
    new_data = np.zeros(shape=(100,current_N[1],current_N[2]))
    for i in range(0,100):
        new_data[i,:,:] = data[int(np.floor((current_N[0]*i)/100)),:,:]
    return new_data

def DFS():
    DFS_indexlist = np.array([2, 21, 9, 10,  11,  12, 25,  12,  24,  12,  11, 10, 9,
                             21, 3, 4, 3, 21, 5, 6, 7, 8, 23, 8, 22, 8, 7, 6, 5, 21,
                             2, 1, 13, 14, 15, 16, 15, 14, 13, 1, 17, 18, 19, 20, 19,
                             18, 17, 1, 2])
    DFS_indexlist = DFS_indexlist-1
    return DFS_indexlist

# Nicely formatted time string

def hms_string(sec_elapsed):
    h = int(sec_elapsed / (60 * 60))
    m = int((sec_elapsed % (60 * 60)) / 60)
    s = sec_elapsed % 60
    return "{}:{:>02}:{:>05.2f}".format(h, m, s)

def build_image_resizer(Oheight,Owidth,Iheight,Iwidth,R):
    model = Sequential()
    model.add(Resizing(Oheight, Owidth, interpolation="bilinear" , input_shape=(Iheight,Iwidth,R)))
    return model

def build_generator(seed_size):
    # seed_size is z+c where c is the class sagment
    model = Sequential()
    
    #input layer
    model.add(Dense(4*4*512,activation="relu",input_dim=seed_size)) 
    model.add(Reshape((4,4,512)))

    # Mid CNN layer
    model.add(UpSampling2D(size=(2, 2),interpolation = 'bilinear'))
    model.add(Conv2D(256,kernel_size=(3,3),strides = (1, 1),padding="same",use_bias = False))
    model.add(BatchNormalization(epsilon=10**-5,momentum=0.1))
    model.add(Activation("relu"))
    
    model.add(UpSampling2D(size=(2, 2),interpolation = 'bilinear'))
    model.add(Conv2D(128,kernel_size=(3,3),strides = (1, 1),padding="same",use_bias = False))
    model.add(BatchNormalization(epsilon=10**-5,momentum=0.1))
    model.add(Activation("relu"))
    
    model.add(UpSampling2D(size=(2, 2),interpolation = 'bilinear'))
    model.add(Conv2D(64,kernel_size=(3,3),strides = (1, 1),padding="same",use_bias = False))
    model.add(BatchNormalization(epsilon=10**-5,momentum=0.1))
    model.add(Activation("relu"))
    
    # Final CNN layer
    model.add(UpSampling2D(size=(2, 2),interpolation = 'bilinear'))
    model.add(Conv2D(2,kernel_size=(3,3),strides = (1, 1),padding="same",use_bias = False))
    model.add(Activation("tanh"))
    return model


def build_discriminator(image_shape):
    model = Sequential()
    
    #input layer first using resizer
    model.add(Conv2D(64, kernel_size=(4,4), strides=(2,2), input_shape=image_shape, padding="same",use_bias = False))
    model.add(LeakyReLU(alpha=0.2))
    
    model.add(Conv2D(128, kernel_size=(4,4), strides=(2,2),padding="same",use_bias = False))
    model.add(BatchNormalization(epsilon=10**-5,momentum=0.1))
    model.add(LeakyReLU(alpha=0.2))

    model.add(Conv2D(256, kernel_size=(4,4), strides=(2,2),padding="same",use_bias = False))
    model.add(BatchNormalization(epsilon=10**-5,momentum=0.1))
    model.add(LeakyReLU(alpha=0.2))
    
    model.add(Conv2D(512, kernel_size=(4,4), strides=(2,2),padding="same",use_bias = False))
    model.add(BatchNormalization(epsilon=10**-5,momentum=0.1))
    model.add(LeakyReLU(alpha=0.2))
    
    model.add(Flatten())
    model.add(Dense(1, activation='sigmoid'))
    return model

#traning module


def discriminator_loss(real_output, fake_output):
    real_loss = cross_entropy(tf.ones_like(real_output), real_output)
    fake_loss = cross_entropy(tf.zeros_like(fake_output), fake_output)
    total_loss = real_loss + fake_loss
    return total_loss

def generator_loss(fake_output):
    return cross_entropy(tf.ones_like(fake_output), fake_output)

def add_class(images):
    images = images.numpy()
    s = images.shape
    zero = np.zeros((s[0], s[1], s[2], 5))  
    one = np.ones((s[0], s[1], s[2], 1)) 
    class1 = np.append(zero, one, axis=3)
    return  np.append(images, class1, axis=3)
    



# Notice the use of `tf.function`
# This annotation causes the function to be "compiled".
@tf.function
def train_step(images,seed):
    generator_resizer = build_image_resizer(128,128)
    
    with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape: 
        generated_images = generator(seed, training=True)
        tf.concat([generated_images,tf.ones((generated_images.shape[0],64,64,1)),tf.zeros((generated_images.shape[0],64,64,5)), tf.ones((generated_images.shape[0],64,64,1))], 3)
        #generated_images = generator_resizer(generated_images, training=False)
        #proto_tensor = tf.make_tensor_proto(generated_images)
        #generated_images = tf.make_ndarray(proto_tensor)
        
        real_output = discriminator(images, training=True)
        fake_output = discriminator(generated_images, training=True)
    
        gen_loss = generator_loss(fake_output)
        disc_loss = discriminator_loss(real_output, fake_output)

        gradients_of_generator = gen_tape.gradient(gen_loss, generator.trainable_variables)
        gradients_of_discriminator = disc_tape.gradient(disc_loss, discriminator.trainable_variables)
        generator_optimizer.apply_gradients(zip(gradients_of_generator, generator.trainable_variables),
    experimental_aggregate_gradients=False)
        discriminator_optimizer.apply_gradients(zip(gradients_of_discriminator, discriminator.trainable_variables),
    experimental_aggregate_gradients=False)
    return gen_loss,disc_loss

def one_seed():
    seed = np.random.normal(0, 1,100)
    c = np.asarray((0,0,0,0,0,1))
    fixed_seed = np.append(seed,c)
    return fixed_seed

def batch_seed(BATCH_SIZE):
    seed = np.random.normal(0, 1,(BATCH_SIZE,100))
    s = seed.shape
    zero = np.zeros((s[0], 5))  
    one = np.ones((s[0], 1)) 
    class1 = np.append(zero, one, axis=1)
    return np.append(seed, class1, axis=1)

def train(dataset, epochs):
    seed = batch_seed(BATCH_SIZE)
    start = time.time()
    gen_loss_total_list = []
    disc_loss_total_list = []

    for epoch in range(epochs):
        epoch_start = time.time()
        gen_loss_list = []
        disc_loss_list = []

        for image_batch in dataset:
            t = train_step(image_batch,seed)
            gen_loss_list.append(t[0])
            disc_loss_list.append(t[1])

        g_loss = sum(gen_loss_list) / len(gen_loss_list)
        d_loss = sum(disc_loss_list) / len(disc_loss_list)

        epoch_elapsed = time.time()-epoch_start
        print (f'Epoch {epoch+1}, gen loss={g_loss},disc loss={d_loss},'f' {hms_string(epoch_elapsed)}')

        gen_loss_total_list.append(g_loss)
        disc_loss_total_list.append(d_loss)
        
    elapsed = time.time()-start
    print (f'Training time: {hms_string(elapsed)}')
    return gen_loss_total_list , disc_loss_total_list

def get_class_all(n,numofclass,betchsize):
  if n == numofclass:
    one = tf.zeros((betchsize,64,64,numofclass-1))
    two = tf.ones((betchsize,64,64,1))
    return tf.concat(one,two,3)
  elif n == 1:
    one = tf.ones((betchsize,64,64,1))
    two = tf.zeros((betchsize,64,64,numofclass-1))
    return tf.concat(one,two,3)
  else:
    one = tf.zeros((betchsize,64,64,n-1))
    two = tf.ones((betchsize,64,64,1))
    three = tf.zeros((betchsize,64,64,numofclass-n))
  return tf.concat(one,two,three,3)

In [13]:
#arrange data for traing
training_data_r = resizer(training_data_normalize, training=False)
s = training_data_r.shape
for training_data_by_class in list(training_data_r)
  class1 = get_class_all(n,i%6,training_data_by_class.shape[0])
  training_data_r1 = np.append(training_data_r, class1, axis=3)
  training_data_r1[:, :, :, 2] = np.ones_like(training_data_r1[:, :, :, 2])
print(training_data_r1.shape)

SyntaxError: ignored

In [15]:
training_data_r = resizer(np.squeeze(training_data_normalize), training=False)
print(training_data_r.shape)

ValueError: ignored

Main

In [None]:
#load data:
print("Load training image binary...")
training_data =[];
for action_file in tqdm(os.listdir(path_folder)):
    training_binary_path = os.path.join(path_folder,action_file)
    training_data.append(np.load(training_binary_path))

#cast to np array
training_data = np.asanyarray(training_data)

#normlize
print("Normlizing all....")
training_data_normalize =[];
for i in tqdm(range(len(training_data))):
  training_data[i][:] = tf.convert_to_tensor(training_data[i][:])
  training_data_normalize.append(tf.map_fn(lambda x: tf.image.per_image_standardization(x), training_data, parallel_iterations=training_data.shape[0], dtype=tf.float32))

#build resizer
print("building resizer (64,64,100,49,3) ....")
resizer = build_image_resizer(64,64,100,49,3)


#arrange data for traing
training_data_r = resizer(training_data_normalize, training=False)
s = training_data_r.shape
for training_data_by_class in training_data_r
  class1 = get_class_all(n,i%6,training_data_by_class.shape[0])
  training_data_r1 = np.append(training_data_r, class1, axis=3)
  training_data_r1[:, :, :, 2] = np.ones_like(training_data_r1[:, :, :, 2])
print(training_data_r1.shape)

cross_entropy = tf.keras.losses.BinaryCrossentropy()
generator_optimizer = tf.keras.optimizers.Adam(1.5e-4,0.5)
discriminator_optimizer = tf.keras.optimizers.Adam(1.5e-4,0.5)

Load training image binary...


100%|██████████| 120/120 [00:01<00:00, 63.53it/s] 


In [None]:
print(training_data[0][:].shape)

(939, 100, 49, 3)
