In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import random
import cv2
from tqdm import tqdm
import numpy as np
from tensorflow.keras.models import load_model
from sklearn.model_selection import train_test_split
from keras.models import Model,Sequential
from keras.callbacks import ReduceLROnPlateau, ModelCheckpoint,EarlyStopping
from tensorflow.keras import layers, models
from tensorflow.keras.preprocessing.image import img_to_array
from keras.layers import Conv2D, Flatten,Reshape,Dense,Input,Conv2DTranspose,LeakyReLU,Concatenate,Activation
import tensorflow as tf
from tensorflow.keras.optimizers import RMSprop,Adam
from IPython.display import clear_output
from tensorflow.keras.preprocessing import image_dataset_from_directory
import glob
import os
import warnings

import keras
warnings.filterwarnings('ignore')

# preprocessing Data

In [None]:
def normalize_img_mask(img, mask):
    img = tf.cast(img, dtype=tf.float32) / 127.5 - 1.0  # Normalize images to [-1,1]
    mask = tf.cast(mask, dtype=tf.float32)/ 127.5 - 1.0  # Keep masks unchanged
    return img, mask

def denormalize(img):
        return (img + 1) / 2

In [None]:

# Define paths
image_path = "/kaggle/input/unified/blackclover/colored"
mask_path = "/kaggle/input/unified/blackclover/grayscale"

# Set batch size and image size
BATCH_SIZE = 16
IMG_SIZE = (256, 256)

# Function to normalize images and masks

# Load images and masks (batch early for efficiency)
image_dataset = image_dataset_from_directory(
    image_path,
    label_mode=None,
    image_size=IMG_SIZE,
    batch_size=BATCH_SIZE,  # ✅ Batch early
    shuffle=False
)

mask_dataset = image_dataset_from_directory(
    mask_path,
    label_mode=None,
    image_size=IMG_SIZE,
    batch_size=BATCH_SIZE,
    shuffle=False# ✅ Batch early
)

dataset = tf.data.Dataset.zip((image_dataset, mask_dataset))
dataset = dataset.map(normalize_img_mask, num_parallel_calls=tf.data.AUTOTUNE)

train_size = int(0.9 *len(dataset))  # Assuming 500 as the total dataset size
train_dataset = dataset.take(train_size)  # ✅ Avoid cardinality computation
val_dataset = dataset.skip(train_size)  # ✅ Prefetch before caching for better pipeline efficiency
train_dataset = train_dataset.prefetch(tf.data.AUTOTUNE).cache()
val_dataset = val_dataset.prefetch(tf.data.AUTOTUNE).cache()


In [None]:
for img_batch, mask_batch in train_dataset.take(1):
    plt.figure(figsize=(10, 5))
    plt.subplot(1, 2, 1)
    plt.imshow((img_batch[0] + 1) * 0.5)  # Denormalize the image
    plt.title("Image")
    plt.axis('off')

    plt.subplot(1, 2, 2)
    plt.imshow((mask_batch[0] + 1) * 0.5)  # Denormalize the mask
    plt.title("Mask")
    plt.axis('off')

    plt.show()

# Architecture

In [None]:
from tensorflow.keras.layers import BatchNormalization

def encoder_layer(inputs, filters, kernel_size=16, strides=2, activation='leakyrelu', use_batch_norm=True):
    conv = Conv2D(filters=filters,
                  kernel_size=kernel_size,
                  strides=strides,
                  padding='same')
    x = conv(inputs)
    if use_batch_norm:
        # Replaced Instance Normalization with BatchNormalization
        x = BatchNormalization()(x)
    if activation == 'relu':
        x = Activation('relu')(x)
    elif activation == 'leakyrelu':
        x = LeakyReLU(alpha=0.2)(x)
    return x

def decoder_layer(inputs, paired_inputs, filters, kernel_size=16, strides=2, activation='leakyrelu', use_batch_norm=False):
    conv = Conv2DTranspose(filters=filters,
                           kernel_size=kernel_size,
                           strides=strides,
                           padding='same')
    x = conv(inputs)
    if use_batch_norm:
        # Replaced Instance Normalization with BatchNormalization
        x = BatchNormalization()(x)
    if activation == 'relu':
        x = Activation('relu')(x)
    elif activation == 'leakyrelu':
        x = LeakyReLU(alpha=0.2)(x)
    # Concatenate the output with the paired inputs
    x = Concatenate()([x, paired_inputs])
    return x


In [None]:
def build_generator(input_shape,output_shape=None,kernel_size=3,name=None):
     inputs = Input(shape=input_shape)
     channels = int(output_shape[-1])
     e1 = encoder_layer(inputs,32,kernel_size=kernel_size,activation='leakyrelu',strides=1)
     e2 = encoder_layer(e1,64,activation='leakyrelu',kernel_size=kernel_size)
     e3 = encoder_layer(e2,128,activation='leakyrelu',kernel_size=kernel_size)
     e4 = encoder_layer(e3,256,activation='leakyrelu',kernel_size=kernel_size)
     e5 = encoder_layer(e4,512,activation='leakyrelu',kernel_size=kernel_size)

     d1 = decoder_layer(e5,e4,256,kernel_size=kernel_size)
     d2 = decoder_layer(d1,e3,128,kernel_size=kernel_size)
     d3 = decoder_layer(d2,e2,64,kernel_size=kernel_size)
     d4 = decoder_layer(d3,e1,32,kernel_size=kernel_size)
     outputs = Conv2DTranspose(channels,kernel_size=kernel_size,strides=1,activation='tanh',
     padding='same')(d4)
     generator = Model(inputs, outputs, name=name)
     return generator

def build_discriminator(input_shape,kernel_size=3,name=None):
    inputs = Input(shape=input_shape)
    x = encoder_layer(inputs,32,kernel_size=kernel_size,activation='leakyrelu',instance_norm=False)
    x = encoder_layer(x,64,kernel_size=kernel_size,activation='leakyrelu',instance_norm=False)
    x = encoder_layer(x,128,kernel_size=kernel_size,activation='leakyrelu',instance_norm=False)
    x = encoder_layer(x,256,kernel_size=kernel_size,strides=1,activation='leakyrelu',instance_norm=False)
    x = Conv2D(3, kernel_size=kernel_size, strides=1, padding='same')(x)
    discriminator = Model(inputs, x, name=name)
    return discriminator



# Main Class

In [None]:
class cycle_gan(tf.keras.Model):
  def __init__(self, generator_t, generator_s, discriminator_t, discriminator_s):
        super().__init__()
        self.generator_t = generator_t
        self.generator_s = generator_s
        self.discriminator_t = discriminator_t
        self.discriminator_s = discriminator_s
  def compile(self,g_t_optimizer,g_s_optimizer,d_t_optimizer,d_s_optimizer,loss_fn):
        super().compile()  # This works in Python 3+
        self.g_t_optimizer = g_t_optimizer
        self.g_s_optimizer = g_s_optimizer
        self.d_t_optimizer = d_t_optimizer
        self.d_s_optimizer = d_s_optimizer
        self.loss_fn = loss_fn


  @tf.function
  def train_step(self,batch_data):
      real_x, real_y = batch_data
      # ✅ One GradientTape for Discriminators
      with tf.GradientTape(persistent=True) as tape_d:
          fake_y = self.generator_t(real_x, training=True)
          fake_x = self.generator_s(real_y, training=True)

          disc_real_x = self.discriminator_s(real_x, training=True)
          disc_fake_x = self.discriminator_s(fake_x, training=True)
          disc_real_y = self.discriminator_t(real_y, training=True)
          disc_fake_y = self.discriminator_t(fake_y, training=True)

          # Compute discriminator losses
          disc_t_loss = loss_fn(tf.ones_like(disc_real_y), disc_real_y) + \
                        loss_fn(tf.zeros_like(disc_fake_y), disc_fake_y)
          disc_s_loss = loss_fn(tf.ones_like(disc_real_x), disc_real_x) + \
                        loss_fn(tf.zeros_like(disc_fake_x), disc_fake_x)

      # Apply gradients for both discriminators
      grads_dt = tape_d.gradient(disc_t_loss, self.discriminator_t.trainable_variables)
      grads_ds = tape_d.gradient(disc_s_loss, self.discriminator_s.trainable_variables)
      self.d_t_optimizer.apply_gradients(zip(grads_dt, self.discriminator_t.trainable_variables))
      self.d_s_optimizer.apply_gradients(zip(grads_ds,self.discriminator_s.trainable_variables))
      del tape_d  # ✅ Free memory

      # ✅ One GradientTape for Generators
      with tf.GradientTape(persistent=True) as tape_g:
          fake_y = self.generator_t(real_x, training=True)
          fake_x = self.generator_s(real_y, training=True)

          cycled_x = self.generator_s(fake_y, training=True)
          cycled_y = self.generator_t(fake_x, training=True)

          same_x = self.generator_s(real_x, training=True)
          same_y = self.generator_t(real_y, training=True)

          disc_fake_y = self.discriminator_t(fake_y, training=False)#previous True
          disc_fake_x = self.discriminator_s(fake_x, training=False)#previous True

          # Compute generator losses
          g_target_loss = loss_fn(tf.ones_like(disc_fake_y), disc_fake_y)
          g_source_loss = loss_fn(tf.ones_like(disc_fake_x), disc_fake_x)

          cycle_loss_t = tf.reduce_mean(tf.abs(real_y - cycled_y)) * 10
          cycle_loss_s = tf.reduce_mean(tf.abs(real_x - cycled_x)) * 10

          id_loss_t = tf.reduce_mean(tf.abs(real_y - same_y)) * 10 * 0.5
          id_loss_s = tf.reduce_mean(tf.abs(real_x - same_x)) * 10 * 0.5

          # Total loss
          total_loss_t = g_target_loss + cycle_loss_t + id_loss_t
          total_loss_s = g_source_loss + cycle_loss_s + id_loss_s

      # Apply gradients for both generators
      grads_gt = tape_g.gradient(total_loss_t, self.generator_t.trainable_variables)
      grads_gs = tape_g.gradient(total_loss_s, self.generator_s.trainable_variables)
      self.g_t_optimizer.apply_gradients(zip(grads_gt, self.generator_t.trainable_variables))
      self.g_s_optimizer.apply_gradients(zip(grads_gs, self.generator_s.trainable_variables))
      del tape_g  # ✅ Free memory

      return {
        "D_T_Loss": disc_t_loss,
        "D_S_Loss": disc_s_loss,
        "G_T_Loss": total_loss_t,
        "G_S_Loss": total_loss_s,
      }



In [None]:
g_t = build_generator((272,272,3),(272,272,3))
g_s = build_generator((272,272,3),(272,272,3))
d_t = build_discriminator((272,272,3),3)
d_s = build_discriminator((272,272,3),3)
cycle_gan = cycle_gan(g_t,g_s,d_t,d_s)


# Loading weights to continute training 

In [None]:
g_s = tf.keras.models.load_model("/kaggle/input/color-weights-v1/grey to color last3 (1).h5")
g_t = tf.keras.models.load_model("/kaggle/input/color-weights-v1/colored to grey last3.h5")
#d_t = tf.keras.models.load_model(")
#d_s = tf.keras.models.load_model("")
#cycle_gan = cycle_gan(g_t,g_s,d_t,d_s)

In [None]:
optimizer_dt = Adam(learning_rate=1e-4, beta_1=0.5, beta_2=0.999)  # Lower LR for D # 1e-4
optimizer_ds = Adam(learning_rate=1e-4, beta_1=0.5, beta_2=0.999) # 1e-4
optimizer_gt = Adam(learning_rate=2e-4, beta_1=0.5, beta_2=0.999)  # Keep G at 2e-4
optimizer_gs = Adam(learning_rate=2e-4, beta_1=0.5, beta_2=0.999)
loss_fn = tf.keras.losses.BinaryCrossentropy()
cycle_gan.compile(optimizer_gt,optimizer_gs,optimizer_dt,optimizer_ds,loss_fn)

# Train

In [None]:
cycle_gan.fit(train_dataset,epochs=50)

In [None]:
g_s.save("mask to img last4.h5")
g_t.save("img to  mask.h5")
d_t.save("disc_tlast4.h5")
d_s.save("disc_s last4.h5")

In [None]:

def visualize_random_samples(val_dataset,g_s=None,g_t=None,num_samples=8):
    # Loop over a small batch from the validation dataset
    for real_x, real_y in val_dataset.take(1):
        # Ensure we're only taking the first 'num_samples' from the batch
        test_img, test_masked = real_x[:num_samples], real_y[:num_samples]

        # Create a figure to plot the images
        plt.figure(figsize=(20, 5 * num_samples))  # Adjust figure size for multiple samples

        for i in range(num_samples):
            # Original image
            plt.subplot(num_samples, 4, i * 4 + 1)
            plt.title(f'Original Image {i+1}')
            plt.imshow(denormalize(test_img[i]))  # Denormalize if normalized to [-1, 1]
            plt.axis('off')

            # Masked image
            plt.subplot(num_samples, 4, i * 4 + 2)
            plt.title(f'Masked Image {i+1}')
            plt.imshow(denormalize(test_masked[i]),cmap='gray')  # Denormalize if normalized to [-1, 1]
            plt.axis('off')

            # Reconstructed from mask to image
            reconstructed_img = g_s.predict(test_masked[i:i+1])  # Batch input to the model
            plt.subplot(num_samples, 4, i * 4 + 3)
            plt.title(f'Mask → Image {i+1}')
            plt.imshow(denormalize(reconstructed_img[0]))  # Denormalize if normalized to [-1, 1]
            plt.axis('off')

            # Reconstructed from image to mask
            reconstructed_mask = g_t.predict(test_img[i:i+1])  # Batch input to the model
            plt.subplot(num_samples, 4, i * 4 + 4)
            plt.title(f'Image → Mask {i+1}')
            plt.imshow(denormalize(reconstructed_mask[0]),cmap='gray')  # Denormalize if normalized to [-1, 1]
            plt.axis('off')

        plt.tight_layout()
        plt.show()  # Show all plots at once

# Call the visualization function
visualize_random_samples(val_dataset,g_s,g_t,num_samples=15)  # Change num_samples as needed

In [None]:
image_rgb = cv2.cvtColor(cv2.imread("/kaggle/input/img-test/black clover test.png"), cv2.COLOR_BGR2RGB)
image_rgb = cv2.resize(image_rgb, (272, 272))  
image_rgb = image_rgb / 127.5 - 1.0  # Normalize to [-1, 1]

# Expand dimensions to match model input shape
image_rgb = np.expand_dims(image_rgb, axis=0)  

# Predict
output = g_t.predict(image_rgb)

# Denormalize and display
plt.subplot(1,2,1)
plt.imshow(denormalize(image_rgb.squeeze()))
plt.subplot(1,2,2)
plt.imshow(denormalize(output.squeeze()))
plt.title("Generated")
plt.show()