In [1]:
from PIL import Image
import os
from tqdm import tqdm
import numpy as np
import matplotlib.pyplot as plt
import keras
from tensorflow.keras.optimizers import Adam
from keras.models import Sequential, Model
from keras.layers import Dense, LeakyReLU, Reshape, Flatten, Input
from keras.layers import Add,Layer,UpSampling2D,Conv2D, MaxPooling2D, Activation, Dropout, Conv2DTranspose,BatchNormalization,SpectralNormalization,AveragePooling2D
from keras import layers
import tensorflow as tf
import time
from IPython import display
#import cv2
import shutil
import gc
from tensorflow.keras import backend as K
from concurrent.futures import ThreadPoolExecutor
from keras.applications.inception_v3 import InceptionV3
from skimage.transform import resize
from scipy.linalg import sqrtm

import math

# Hyperparameters

In [9]:
## hyper parmeters
dataset_path = "/kaggle/input/anime-faces/data/data"
save_train_loaction = "./train"
image_shape = (64,64,3)
BUFFER_SIZE = 23000

noise_dim = 256
num_examples_to_generate = 25
learning_rate = 0.0001
CheckpointInterval = 1 #In epoch terms
OutputInterval = 1  # In epoch how often to show output
ImageSaveInterval = 1 #Subset of above i.e. >= OutputInterval
#old_checkpoint_location = "/kaggle/input/progress-1/train"
old_checkpoint_location = None
steps_per_epoch = 20300// 32
no_blocks = int(math.log2(image_shape[1])) - 1
n_batch = [32, 32, 16, 8, 8, 8,8]
Epochs = [20,20,40,40,100,100]


GP_WEIGHT = 1.0


USE_PIPELINING = False

In [None]:
while len(n_batch) < no_blocks:
    n_batch.append(n_batch[-1])
while len(Epochs) < no_blocks:
    Epochs.append(Epochs[-1])

In [None]:
try:
    # Try to detect and initialize a TPU
    tpu = tf.distribute.cluster_resolver.TPUClusterResolver()  # Detect TPU
    tf.tpu.experimental.initialize_tpu_system(tpu)  # Initialize TPU
    tpu_strategy = tf.distribute.TPUStrategy(tpu)  # Use TPU strategy
    print("Running on TPU")
except ValueError:
    # If TPU is not available, fall back to GPU or CPU
    tpu_strategy = tf.distribute.MirroredStrategy()  # Use GPU or CPU strategy
    print("Running on GPU or CPU")


In [None]:
## since for input we have another directory therefore there is different location and this check is needed 
# after headless run
if old_checkpoint_location is not None and not os.path.isdir(save_train_loaction):
    print("Old CheckPoint Location - Copied to current working/output directory")
    shutil.copytree(old_checkpoint_location, save_train_loaction, dirs_exist_ok=True)
else:
    print("Initializing from scratch.")

In [None]:
os.makedirs(save_train_loaction, exist_ok=True)
os.makedirs(save_train_loaction+"/images", exist_ok=True)
os.makedirs(save_train_loaction+"/model", exist_ok=True)

In [None]:
## helper function
def has_file_with_prefix(directory, prefix):
    # Check if the directory exists
    if not os.path.isdir(directory):
        return False

    # Iterate over files in the directory
    for filename in os.listdir(directory):
        if filename.startswith(prefix):
            return True
    return False


In [None]:
def load_and_preprocess_image(image_path):
    image = tf.io.read_file(image_path)
    image = tf.image.decode_jpeg(image, channels=3)  # Decode JPEG image
    image = tf.image.resize(image, (image_shape[0],image_shape[1]))  # Resize to desired shape
    image = (image /127.5) - 1  # Normalize to [-1, 1] for DCGAN compatibility
    return image



def change_dataset(new_shape,batch_size,dataset):
    dataset = dataset.map(lambda x: tf.image.resize(x, new_shape), num_parallel_calls=tf.data.AUTOTUNE)
    dataset = dataset.unbatch()
    dataset = dataset.batch(batch_size, drop_remainder=True)
    dataset = dataset.prefetch(tf.data.AUTOTUNE)  # Prefetch for optimal loading
    return dataset.repeat() 
    
def create_image_dataset(data_dir, batch_size=32, shuffle_buffer_size=1000):
    # Load file paths for all images
    dataset = tf.data.Dataset.list_files(f"{data_dir}/*")

    # Shuffle file paths, map load_and_preprocess_image, batch and prefetch
    dataset = dataset.shuffle(shuffle_buffer_size)
    dataset = dataset.map(load_and_preprocess_image, num_parallel_calls=tf.data.AUTOTUNE)
    dataset = dataset.batch(batch_size, drop_remainder=True)
    dataset = dataset.prefetch(tf.data.AUTOTUNE)  # Prefetch for optimal loading

    return dataset.repeat() 

In [None]:
#load data
def load_image(file_path):
    return np.array(Image.open(file_path))

def load_data(dataset_path):
    files = [os.path.join(dataset_path, f) for f in os.listdir(dataset_path)]
    with ThreadPoolExecutor() as executor:
        data = list(tqdm(executor.map(load_image, files), total=len(files)))
    return data



In [None]:
if USE_PIPELINING :
    x_train = np.array(load_data(dataset_path))
    image_dataset = tf.data.Dataset.from_tensor_slices(train_images).shuffle(BUFFER_SIZE).batch(BATCH_SIZE, drop_remainder=True)
else:
    image_dataset = create_image_dataset(dataset_path, batch_size=BATCH_SIZE)

In [None]:
## display 25 photos in 5x5 format
def display_images(dataset, num_images=25):
    images = []
    for img_batch in dataset:
        images.extend(img_batch[:num_images - len(images)])
        if len(images) >= num_images:
            break
    
    # Display images in a 5x5 grid
    fig, ax = plt.subplots(5, 5, figsize=(10, 10))
    for i in range(5):
        for j in range(5):
            ax[i, j].imshow(((images[5 * i + j]+1)*127.5).numpy().astype(int)) 

            ax[i, j].axis('off')
    plt.tight_layout()
    plt.show()
display_images(image_dataset)          

In [None]:
# create distributed dataset
def make_distributed(dataset):
    dataset = tpu_strategy.experimental_distribute_dataset(dataset )
    return  iter(dataset)

# Model Definiation

## Helper Layers

In [2]:
# pixel-wise feature vector normalization layer
class PixelNormalization(Layer):
    # initialize the layer
    def __init__(self, **kwargs):
        super(PixelNormalization, self).__init__(**kwargs)
 
    # perform the operation
    def call(self, inputs):
        # computing pixel values

        #Perform l2 norm
        values = inputs**2.0
        mean_values = tf.reduce_mean(values, axis=-1, keepdims=True)
        mean_values += 1.0e-8
        l2 = tf.sqrt(mean_values)
        normalized = inputs / l2
        return normalized
 
    # define the output shape of the layer
    def compute_output_shape(self, input_shape):
        return input_shape

In [3]:
# mini-batch standard deviation layer
class MinibatchStdev(Layer):
    # initialize the layer
    def __init__(self, **kwargs):
        super(MinibatchStdev, self).__init__(**kwargs)
 
    # perform the operation
    def call(self, inputs):
        mean = tf.reduce_mean(inputs, axis=0, keepdims=True)
        squ_diffs = tf.square(inputs - mean)
        mean_sq_diff = tf.reduce_mean(squ_diffs, axis=0, keepdims=True)
        mean_sq_diff += 1e-8
        stdev = tf.sqrt(mean_sq_diff)
        
        mean_pix = tf.reduce_mean(stdev, keepdims=True)
        shape = tf.shape(inputs)
        output = tf.tile(mean_pix, (shape[0], shape[1], shape[2], 1))
        
        combined = tf.keras.backend.concatenate([inputs, output], axis=-1)
        return combined
 
    # define the output shape of the layer
    def compute_output_shape(self, input_shape):
        input_shape = list(input_shape)
        input_shape[-1] += 1
        return tuple(input_shape)

In [4]:
# weighted sum output
class WeightedSum(Add):
    # init with default value
    def __init__(self, alpha=0.0, **kwargs):
        super(WeightedSum, self).__init__(**kwargs)
        self.alpha = tf.Variable(alpha, name='ws_alpha',trainable=False)
 
    # output a weighted sum of inputs
    def _merge_function(self, inputs):
        # only supports a weighted sum of two inputs
        assert (len(inputs) == 2)
        # ((1-a) * input1) + (a * input2)
        output = ((1.0 - self.alpha) * inputs[0]) + (self.alpha * inputs[1])
        return output

# update the alpha value on each instance of WeightedSum
def update_fadein(models, step, n_steps):
    alpha = step / float(n_steps - 1)
    for model in models:
        for layer in model.layers:
            if isinstance(layer, WeightedSum):
                tf.keras.backend.set_value(layer.alpha, alpha)

## Generator

In [26]:
# adding a generator block
def add_generator_block(old_model,name):
    init = tf.keras.initializers.RandomNormal(stddev=0.02)
      #const = tf.keras.constraints.max_norm(1.0)
    const  = None
    block_end = old_model.layers[-3].output
    
    # upsample, and define new block
    upsampling = UpSampling2D()(block_end)
    g = Conv2D(128, (3,3), padding='same', kernel_initializer=init, kernel_constraint=const, name=f"{name}_Conv1")(upsampling)
    g = PixelNormalization(name=f"{name}_PixelNorm1")(g)
    g = LeakyReLU(negative_slope=0.2, name=f"{name}_LeakyReLU1")(g)
    g = Conv2D(128, (3,3), padding='same', kernel_initializer=init, kernel_constraint=const, name=f"{name}_Conv2")(g)
    g = PixelNormalization(name=f"{name}_PixelNorm2")(g)
    g = LeakyReLU(negative_slope=0.2, name=f"{name}_LeakyReLU2")(g)
    
    out_image = Conv2D(3, (1,1), padding='same', kernel_initializer=init, kernel_constraint=const, name=f"{name}_RGB1")(g)
    out_image = Activation('tanh',name=f"{name}_Activation")(out_image)
    model1 = Model(old_model.input, out_image,name = f"{name}_normal")
    out_old = old_model.layers[-2]
    out_image2 = out_old(upsampling)
    out_image2 =  Activation('tanh',name=f"{name}_Activation_Trin")(out_image2)
    
    merged = WeightedSum()([out_image2, out_image])
    model2 = Model(old_model.input, merged,name = f"{name}_fadeIn")
    return [model1, model2]

In [27]:
# define generator models
def define_generator( n_blocks, in_dim=4):
    init = tf.keras.initializers.RandomNormal(stddev=0.02)
     #const = tf.keras.constraints.max_norm(1.0)
    const  = None
    model_list = list()
    in_latent = Input(shape=(noise_dim,))
    name = 'Block_0'
    g  = Dense(128 * in_dim * in_dim, kernel_initializer=init, kernel_constraint=const,name = 'Dense1')(in_latent)
    g = Reshape((in_dim, in_dim, 128),name = 'Reshape1')(g)
    
    # conv 4x4, input block
    g = Conv2D(128, (3,3), padding='same', kernel_initializer=init, kernel_constraint=const, name=f"{name}_Conv1")(g)
    g = PixelNormalization(name=f"{name}_PixelNorm1")(g)
    g = LeakyReLU(negative_slope=0.2, name=f"{name}_LeakyReLU1")(g)
    
    # conv 3x3
    g = Conv2D(128, (3,3), padding='same', kernel_initializer=init, kernel_constraint=const, name=f"{name}_Conv2")(g)
    g = PixelNormalization(name=f"{name}_PixelNorm2")(g)
    g = LeakyReLU(negative_slope=0.2, name=f"{name}_LeakyReLU2")(g)
    
    # conv 1x1, output block
    out_image = Conv2D(3, (1,1), padding='same', kernel_initializer=init, kernel_constraint=const,name=f"{name}_RGB")(g)
    out_image = Activation('tanh',name=f"{name}_Activation")(out_image)
    model = Model(in_latent, out_image)
    model_list.append([model, model])
    
    for i in range(1, n_blocks):
        old_model = model_list[i - 1][0]
        models = add_generator_block(old_model,'Block_'+str(i))
        model_list.append(models)
        
    return model_list

## Discriminator

In [7]:
# adding a discriminator block
def add_discriminator_block(old_model, n_input_layers=3):
    init = tf.keras.initializers.RandomNormal(stddev=0.02)
    #const = tf.keras.constraints.max_norm(1.0)
    const  = None
    in_shape = list(old_model.input.shape)
    
    # define new input shape as double the size
    input_shape = (in_shape[-2]*2, in_shape[-2]*2, in_shape[-1])
    in_image = Input(shape=input_shape)
    
    # define new input processing layer
    d = Conv2D(128, (1,1), padding='same', kernel_initializer=init, kernel_constraint=const)(in_image)
    d = LeakyReLU(alpha=0.2)(d)
    
    # define new block
    d = Conv2D(128, (3,3), padding='same', kernel_initializer=init, kernel_constraint=const)(d)
    d = LeakyReLU(negative_slope=0.2)(d)
    d = Conv2D(128, (3,3), padding='same', kernel_initializer=init, kernel_constraint=const)(d)
    d = LeakyReLU(negative_slope=0.2)(d)
    d = AveragePooling2D((2,2))(d)
    block_new = d
    
    # skip the input, 1x1 and activation for the old model
    for i in range(n_input_layers, len(old_model.layers)):
        d = old_model.layers[i](d)
    model1 = Model(in_image, d)
    
    
    downsample = AveragePooling2D((2,2))(in_image)
    
    block_old = old_model.layers[1](downsample)
    block_old = old_model.layers[2](block_old)
    d = WeightedSum()([block_old, block_new])
    
    for i in range(n_input_layers, len(old_model.layers)):
        d = old_model.layers[i](d)
        
    model2 = Model(in_image, d)
    return [model1, model2]

In [8]:
# define the discriminator models for each image resolution
def define_discriminator(n_blocks, input_shape=(4,4,3)):
    init = tf.keras.initializers.RandomNormal(stddev=0.02)
    #const = tf.keras.constraints.max_norm(1.0)
    const  = None
    model_list = list()
    in_image = Input(shape=input_shape)
    
    d = Conv2D(128, (1,1), padding='same', kernel_initializer=init, kernel_constraint=const)(in_image)
    d = LeakyReLU(negative_slope=0.2)(d)
    d = MinibatchStdev()(d)
    
    d = Conv2D(128, (3,3), padding='same', kernel_initializer=init, kernel_constraint=const)(d)
    d = LeakyReLU(alpha=0.2)(d)
    d = Conv2D(128, (4,4), padding='same', kernel_initializer=init, kernel_constraint=const)(d)
    d = LeakyReLU(negative_slope=0.2)(d)
    
    d = Flatten()(d)
    out_class = Dense(1)(d)
    
    model = Model(in_image, out_class)
   
    model_list.append([model, model])
    
    for i in range(1, n_blocks):
        old_model = model_list[i - 1][0]
        models = add_discriminator_block(old_model)
        model_list.append(models)
        
    return model_list

## Model data

In [None]:
with tpu_strategy.scope():
    g_models = define_generator(no_blocks)
    d_models = define_discriminator(no_blocks)


    gen_training_loss = tf.keras.metrics.Mean('generator_training_loss', dtype=tf.float32)
    disc_training_loss = tf.keras.metrics.Mean('critic_training_loss', dtype=tf.float32)
    seed = tf.random.normal([num_examples_to_generate, noise_dim])
    accuracy  = tf.keras.metrics.BinaryAccuracy()
    generator_optimizer = keras.optimizers.Adam(learning_rate=learning_rate, beta_1=0, beta_2=0.99, epsilon=10e-8)
    discriminator_optimizer = keras.optimizers.Adam(learning_rate=learning_rate, beta_1=0, beta_2=0.99, epsilon=10e-8)
 
 

In [None]:
def display_images_all(generator_list,disc_list,no_blocks,random = None):
        if random is None:
            random = tf.random.normal((1,noise_dim))

        
        
        fig, axs = plt.subplots(no_blocks//3+1,3, figsize = (6,6))
        
        for i in range(no_blocks//3+1):
            for j in range(3):
                axs[i,j].axis('off')
                if i*3+j >= no_blocks:
                    continue
                gen_imgs = generator_list[i*3+j][0](random,training =False)
                axs[i, j].set_title('Shape = '+str(list(gen_imgs.shape)[1:]), fontsize=8)
                axs[i, j].imshow((np.array(gen_imgs[0]) * 127.5 + 127.5).astype(int))
                disc = disc_list[i*3+j][0](gen_imgs)
                axs[i, j].text(0.5, -0.1, 'Discriminator rating = ' + str(int(disc)), fontsize=8, ha='center', transform=axs[i, j].transAxes)

        
       
        plt.show()
      
        fig.savefig(save_train_loaction+"/model/%models_output.png" ,dpi=300)
        plt.close("all")
display_images_all(g_models,d_models,5)

# Training functions

In [None]:
@tf.function
def gradient_penalty(  real_images, fake_images,discriminator):
        # Get the interpolated image
        alpha = tf.random.uniform([len(fake_images), 1, 1, 1], 0.0, 1.0)
       
        
        diff = fake_images - real_images
        interpolated = real_images + alpha * diff

        with tf.GradientTape() as gp_tape:
            gp_tape.watch(interpolated)
            pred = discriminator(interpolated, training=True)

        grads = gp_tape.gradient(pred, [interpolated])[0]
        norm = tf.sqrt(tf.reduce_sum(tf.square(grads), axis=[1, 2, 3]))
        gp = tf.nn.compute_average_loss((norm - 1.0) ** 2,global_batch_size = len(fake_images)*tpu_strategy.num_replicas_in_sync)
        return gp

In [None]:
# Generator loss for WGAN
@tf.function
def generator_loss(fake_output):
    return -tf.nn.compute_average_loss(fake_output,global_batch_size = len(fake_output)*tpu_strategy.num_replicas_in_sync)

# Critic (discriminator) loss for WGAN
@tf.function
def discriminator_loss(real_output, fake_output):
    return tf.nn.compute_average_loss(fake_output - real_output,global_batch_size = len(fake_output)*tpu_strategy.num_replicas_in_sync)


In [None]:

def train_step(Iter):
   
    def step_fn(images):
        noise = tf.random.normal([len(images), noise_dim])
        
        with tf.GradientTape() as disc_tape:
        
               
                generated_images = generator(noise, training=True)
                real_output = discriminator(images, training=True)
                   
                fake_output = discriminator(generated_images, training=True)
                disc_loss = discriminator_loss(real_output, fake_output)/2.0
                gp = gradient_penalty( images, generated_images,discriminator)             
                disc_loss = disc_loss + gp * GP_WEIGHT
                
        gradients_of_discriminator = disc_tape.gradient(disc_loss, discriminator.trainable_variables)
        discriminator_optimizer.apply_gradients(zip(gradients_of_discriminator, discriminator.trainable_variables))

        disc_training_loss.update_state(disc_loss * tpu_strategy.num_replicas_in_sync)
        
        accuracy.update_state(tf.zeros_like(fake_output), tf.where(fake_output > 0, 1, 0))
        accuracy.update_state(tf.ones_like(real_output), tf.where(real_output > 0, 1, 0))
        
        # Train the generator once per step
        with tf.GradientTape() as gen_tape:
                noise = tf.random.normal([ len(images), noise_dim])
                generated_images = generator(noise, training=True)
                real_output = discriminator(images, training=True)
               
                fake_output = discriminator(generated_images, training=True)
                disc_loss = discriminator_loss(real_output, fake_output)
                gen_loss = generator_loss(fake_output)
          
        gradients_of_generator = gen_tape.gradient(gen_loss, generator.trainable_variables)
        generator_optimizer.apply_gradients(zip(gradients_of_generator, generator.trainable_variables))
        
        
        gen_training_loss.update_state(gen_loss * tpu_strategy.num_replicas_in_sync)
        


    
    tpu_strategy.run(step_fn, args=(next(Iter),))       


In [None]:
##checkpointing

checkpoint_dir = save_train_loaction+'/model'
last_epoch = tf.Variable(0, name='last_epoch', trainable=False)

Current_Block_no = tf.Variable(0, name='Current_Block_no', trainable=False)

best_fid_score = tf.Variable(np.inf, name='best_fid_score', trainable=False)
checkpoint = tf.train.Checkpoint(
                                 generator=g_models,
                                 discriminator=d_models,
                                 last_epoch = last_epoch,
                                best_fid_score =best_fid_score,
                                 Current_Block_no =Current_Block_no
                                )


## initializes the output manager with output directory and checks if there is any more recent waits

manager = tf.train.CheckpointManager(checkpoint, checkpoint_dir, max_to_keep=3)
checkpoint.restore(manager.latest_checkpoint)
if manager.latest_checkpoint:
    print("Restored from {}".format(manager.latest_checkpoint))
    print("Starting from Block no:", int(Current_Block_no))
else:
    print("Current Checkpoint Location Empty as Well")
    print("Initializing from scratch.")



In [None]:
def save_images(generator,epoch,random = None,fadeIn = False):
        if random is None:
            random = tf.random.normal((9,noise_dim))

        gen_imgs = generator(random,training =False)
        
        
        fig, axs = plt.subplots(3,3, figsize = (6,6))
        
        for i in range(3):
            for j in range(3):
                axs[i, j].imshow((np.array(gen_imgs[3 * i + j]) * 127.5 + 127.5).astype(int))
                axs[i,j].axis('off')
        plt.suptitle('Shape = '+str(list(gen_imgs.shape)[1:]))
       
        plt.show()
        if epoch%ImageSaveInterval == 0 :
            if fadeIn:
                fig.savefig(save_train_loaction+f"/images/{epoch}-fadIn-{str(list(gen_imgs.shape)[1:])}.png"  ,dpi=300)
            else:
                fig.savefig(save_train_loaction+f"/images/{epoch}-{str(list(gen_imgs.shape)[1:])}.png"  ,dpi=300)
        plt.close("all")


In [None]:

def trainer(gen,disc,dataset, epochs,fadein = False):
                global generator,discriminator,generator_optimizer,discriminator_optimizer
                generator = gen
                discriminator = disc
                progress_bar = tqdm(range(0,epochs))
                with tpu_strategy.scope():
                    generator_optimizer = keras.optimizers.Adam(learning_rate=learning_rate, beta_1=0, beta_2=0.99, epsilon=10e-8)
                    discriminator_optimizer = keras.optimizers.Adam(learning_rate=learning_rate, beta_1=0, beta_2=0.99, epsilon=10e-8)
                  
                compiled_train_step = tf.function(train_step)
                for steps in progress_bar:
                        progress_bar.set_description(f"Epoch {steps}")
                        for j in range(steps_per_epoch):
                        # Run step function with a distribution strategy
                            if fadein:
                                update_fadein([generator, discriminator], float(steps*steps_per_epoch+j), epochs*steps_per_epoch)
                        
                            compiled_train_step(dataset)
                            if j%1000 == 0:
                                progress_bar.set_postfix({
                                    'Generator Loss': f"{gen_training_loss.result():.4f}",
                                    'Discriminator Loss': f"{disc_training_loss.result():.4f}",
                                     'Accuracy': f"{accuracy.result():.4f}"
                                    
                                })
                        last_epoch.assign(steps+1)
                            # Save the model every 15 epochs
                        if steps  % CheckpointInterval == 0:
                            print("Saving current progress")  
                            #save_best_fid_score()
                            manager.save()
                        if steps%OutputInterval == 0:
                            display.clear_output(wait=False)
                            save_images(generator, steps,seed,fadein)
                        
                            gen_training_loss.reset_state()
                            disc_training_loss.reset_state
                            accuracy.reset_state()
              
# train the generator and discriminator
def train(g_models, d_models, dataset, epochs,n_batch):
    if int(Current_Block_no) == 0:
        g_normal, d_normal = g_models[0][0], d_models[0][0]
        gen_shape = g_normal.output_shape
        scaled_data = change_dataset((gen_shape[1],gen_shape[1]),n_batch[0],dataset ).repeat()
        scaled_data = make_distributed(scaled_data)
        print('Scaled Data', gen_shape)
    
        
        # train normal or straight-through models
        trainer(g_normal, d_normal,  scaled_data,epochs[0])
        Current_Block_no.assign(Current_Block_no+1)
        
    # process each level of growth
    for i in range(int(Current_Block_no), len(g_models)):
        # retrieve models for this level of growth
        [g_normal, g_fadein] = g_models[i]
        [d_normal, d_fadein] = d_models[i]
        
        
        # scale dataset to appropriate size
        gen_shape = g_normal.output_shape
        scaled_data = change_dataset((gen_shape[1],gen_shape[1]),n_batch[0],dataset ).repeat()
        scaled_data = make_distributed(scaled_data)
        print('Scaled Data', gen_shape)
        
        # train fade-in models for next level of growth
        trainer(g_fadein, d_fadein, scaled_data,epochs[i],  True)
        
        # train normal or straight-through models
        trainer(g_normal, d_normal, scaled_data,epochs[i])
        Current_Block_no.assign(Current_Block_no+1)
      

In [None]:
#save_best_fid_score()
          
train(g_models,d_models,image_dataset, Epochs,n_batch)


In [None]:
display_images_all(g_models,d_models,no_blocks)

In [None]:
display_images_all(g_models,d_models,no_blocks)

In [None]:
save_images(generator,1,seed)
save_best_fid_score(2)

In [None]:
Generator = g_models[no_blocks-1][0]
Disciminator = d_models[no_blocks-1][0]
Generator.summary()

In [None]:
Disciminator = d_models[no_blocks-1][0]
Disciminator.summary()

In [None]:
with tpu_strategy.scope():
    Generator.save('Generator.keras')
    Disciminator.save('Disciminator.keras')

In [None]:
Generator.output.shape[1:]

In [None]:
def save_all_models(models,name):
    for i in range(len(models)):
        with tpu_strategy.scope():
            models[i][0].save(f"{save_train_loaction}/model/{name}-{models[i][0].output.shape[1:]}-Block_no{i}.keras")
            models[i][1].save(f"{save_train_loaction}/model/{name}-{models[i][1].output.shape[1:]}-Intermidate-Block_no{i}.keras")
save_all_models(g_models,'Generator')
save_all_models(d_models,'Discriminator')

In [None]:
from tensorflow.keras.utils import plot_model


In [38]:
def save_all_models_images(models,name):
    for i in range(len(models)):
        plot_model(
            models[i][0],                  # Your TensorFlow model
            to_file=f"{name}-{models[i][0].output.shape[1:]}-Block_no{i}.png",  # File to save the image
            show_shapes=True,       # Display tensor shapes
            show_layer_names=True,  # Display layer names
            expand_nested=False,     # Expand nested models (useful for complex architectures)
            rankdir="TB",  # Top-to-Bottom layout; use "LR" for Left-to-Right
            dpi=96                  # Adjust for higher resolution
        )
        plot_model(
            models[i][1],                  # Your TensorFlow model
            to_file=f"{name}-{models[i][1].output.shape[1:]}-Intermidate-Block_no{i}.png",  # File to save the image
            show_shapes=True,       # Display tensor shapes
            show_layer_names=True,  # Display layer names
            expand_nested=False,     # Expand nested models (useful for complex architectures)
            rankdir="TB",  # Top-to-Bottom layout; use "LR" for Left-to-Right
            dpi=96                  # Adjust for higher resolution
        )

save_all_models_images(g_models,'Generator')
save_all_models_images(d_models,'Discriminator')



In [None]:
!apt install zip
!zip -r data.zip ./