In [60]:
import tensorflow as tf
from tensorflow.keras import layers
import numpy as np
import matplotlib.pyplot as plt
import cv2
from tensorflow.keras.optimizers import Adam
!pip install mlflow dagshub
import mlflow
import mlflow.tensorflow as mltf




In [83]:
from huggingface_hub import login
access_token_read = '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'
login(token = access_token_read)

The token has not been saved to the git credentials helper. Pass `add_to_git_credential=True` in this function directly or `--add-to-git-credential` if using via `huggingface-cli` if you want to set the git credential as well.
Token is valid (permission: read).
Your token has been saved to /root/.cache/huggingface/token
Login successful


In [84]:
import dagshub

dagshub.init("pycolorizer", "realrohilbansal", mlflow=True)

In [85]:
experiment_name = "pycolorizer" 
artifact_location = "pycolorizer_artifacts"
experiment = mlflow.get_experiment_by_name(experiment_name)
if experiment is None:
    experiment_id = mlflow.create_experiment(experiment_name, artifact_location)
else:
    experiment_id = experiment.experiment_id

In [86]:
mlflow.set_tracking_uri('https://dagshub.com/realrohilbansal/pycolorizer.mlflow')

mlflow.set_experiment(experiment_name)

<Experiment: artifact_location='/app/pycolorizer_artifacts', creation_time=1719298699628, experiment_id='0', last_update_time=1719298699628, lifecycle_stage='active', name='pycolorizer', tags={}>

In [87]:
def getexperiment(experiment_id: str=None, experiment_name:str=None):
    if experiment_id is not None:
        experiment = mlflow.get_experiment(experiment_id)
    elif experiment_name is not None:
        experiment = mlflow.get_experiment_by_name(experiment_name)
    else:
        raise ValueError("Either Experiment ID or Experiment Name must be provided.")
    
    print("Name: ", experiment.name)
    print("ID: ", experiment.experiment_id)
    print("Artifact location: ", experiment.artifact_location)
    # print("Tags: ", experiment.tags)
    print("Creation timestamp: ", experiment.creation_time)
    print("Lifecycle stage: ", experiment.lifecycle_stage)
    
    return experiment

In [88]:
def downsample(filters, size, apply_batchnorm=True):
    initializer = tf.random_normal_initializer(0., 0.02)
    result = tf.keras.Sequential()
    result.add(layers.Conv2D(filters, size, strides=2, padding='same', kernel_initializer=initializer, use_bias=False))
    if apply_batchnorm:
        result.add(layers.BatchNormalization())
    result.add(layers.LeakyReLU())
    return result

def upsample(filters, size, apply_dropout=False):
    initializer = tf.random_normal_initializer(0., 0.02)
    result = tf.keras.Sequential()
    result.add(layers.Conv2DTranspose(filters, size, strides=2, padding='same', kernel_initializer=initializer, use_bias=False))
    result.add(layers.BatchNormalization())
    if apply_dropout:
        result.add(layers.Dropout(0.5))
    result.add(layers.ReLU())
    return result

def Generator():
    inputs = tf.keras.layers.Input(shape=[256, 256, 1])
    down_stack = [
        downsample(64, 4, apply_batchnorm=False),
        downsample(128, 4),
        downsample(256, 4),
        downsample(512, 4),
        downsample(512, 4),
        downsample(512, 4),
        downsample(512, 4),
        downsample(512, 4),
    ]
    up_stack = [
        upsample(512, 4, apply_dropout=True),
        upsample(512, 4, apply_dropout=True),
        upsample(512, 4, apply_dropout=True),
        upsample(512, 4),
        upsample(256, 4),
        upsample(128, 4),
        upsample(64, 4),
    ]
    initializer = tf.random_normal_initializer(0., 0.02)
    last = layers.Conv2DTranspose(3, 4, strides=2, padding='same', kernel_initializer=initializer, activation='tanh')
    x = inputs
    skips = []
    for down in down_stack:
        x = down(x)
        skips.append(x)
    skips = reversed(skips[:-1])
    for up, skip in zip(up_stack, skips):
        x = up(x)
        x = layers.Concatenate()([x, skip])
    x = last(x)
    return tf.keras.Model(inputs=inputs, outputs=x)

def Discriminator():
    initializer = tf.random_normal_initializer(0., 0.02)
    inp = layers.Input(shape=[256, 256, 1], name='input_image')
    tar = layers.Input(shape=[256, 256, 3], name='target_image')
    x = layers.concatenate([inp, tar])
    down1 = downsample(64, 4, False)(x)
    down2 = downsample(128, 4)(down1)
    down3 = downsample(256, 4)(down2)
    down4 = downsample(512, 4)(down3)
    zero_pad1 = layers.ZeroPadding2D()(down4)
    conv = layers.Conv2D(512, 4, strides=1, kernel_initializer=initializer, use_bias=False)(zero_pad1)
    batchnorm1 = layers.BatchNormalization()(conv)
    leaky_relu = layers.LeakyReLU()(batchnorm1)
    zero_pad2 = layers.ZeroPadding2D()(leaky_relu)
    last = layers.Conv2D(1, 4, strides=1, kernel_initializer=initializer)(zero_pad2)
    return tf.keras.Model(inputs=[inp, tar], outputs=last)


In [89]:
def generator_loss(disc_generated_output, gen_output, target):
    gan_loss = tf.keras.losses.BinaryCrossentropy(from_logits=True)(tf.ones_like(disc_generated_output), disc_generated_output)
    l1_loss = tf.reduce_mean(tf.abs(target - gen_output))
    total_gen_loss = gan_loss + (100 * l1_loss)
    return total_gen_loss

def discriminator_loss(disc_real_output, disc_generated_output):
    real_loss = tf.keras.losses.BinaryCrossentropy(from_logits=True)(tf.ones_like(disc_real_output), disc_real_output)
    generated_loss = tf.keras.losses.BinaryCrossentropy(from_logits=True)(tf.zeros_like(disc_generated_output), disc_generated_output)
    total_disc_loss = real_loss + generated_loss
    return total_disc_loss


In [90]:
@tf.function
def train_step(input_image, target, epoch):
    with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
        gen_output = generator(input_image, training=True)
        disc_real_output = discriminator([input_image, target], training=True)
        disc_generated_output = discriminator([input_image, gen_output], training=True)
        gen_loss = generator_loss(disc_generated_output, gen_output, target)
        disc_loss = discriminator_loss(disc_real_output, disc_generated_output)
    generator_gradients = gen_tape.gradient(gen_loss, generator.trainable_variables)
    discriminator_gradients = disc_tape.gradient(disc_loss, discriminator.trainable_variables)
    generator_optimizer.apply_gradients(zip(generator_gradients, generator.trainable_variables))
    discriminator_optimizer.apply_gradients(zip(discriminator_gradients, discriminator.trainable_variables))
    return gen_loss, disc_loss


In [91]:
from datasets import load_dataset

dataset = load_dataset('ILSVRC/imagenet-1k', split="test", streaming=True, trust_remote_code=True) # Using test dataset for training

In [111]:
# Preprocess image function
def preprocess_image(example):
    image = np.array(example['image'])
    if image.shape[-1] == 3:  # Check for RGB image
        image = cv2.resize(image, (256, 256))
        grayscale_image = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
        grayscale_image = grayscale_image[..., np.newaxis]  # Add channel dimension
        return grayscale_image, image
    return None, None

# Fit function with MLflow logging
def fit(dataset, epochs):
    batch_size = 32
    log_interval = 100
    save_interval = 500

    
    # Start an MLflow run
#     mlflow.start_run(experiment_id = experiment_id, run_name=f'Run{mlflow.search_runs(experiment_id).shape[0] + 1}')
    mlflow.log_params({
        "batch_size": batch_size,
        "epochs": epochs,
#         "generator_lr": g_lr,
#         "discriminator_lr": d_lr,
        "beta": beta,
        "log_interval": log_interval,
        "save_interval": save_interval
    })
    
    for epoch in range(epochs):
        batch_counter = 0
        batch_input = []
        batch_labels = []
        for i, example in enumerate(dataset):
            grayscale_image, image = preprocess_image(example)
            if grayscale_image is not None:
                batch_input.append(grayscale_image)
                batch_labels.append(image)
            if len(batch_input) == batch_size:
                batch_input = tf.convert_to_tensor(np.array(batch_input, dtype=np.float32) / 255.0)
                batch_labels = tf.convert_to_tensor(np.array(batch_labels, dtype=np.float32) / 127.5 - 1)
                gen_loss, disc_loss = train_step(batch_input, batch_labels, epoch)
                batch_input = []
                batch_labels = []
                batch_counter += 1

                if batch_counter % log_interval == 0:
                    # Log metrics to MLflow
                    mlflow.log_metric('gen_loss', gen_loss.numpy(), step=batch_counter)
                    mlflow.log_metric('disc_loss', disc_loss.numpy(), step=batch_counter)
                    print(f'Batch {batch_counter} done')
                    if batch_counter % save_interval == 0:
                        generator.save(f'generator_weights_batch_{batch_counter:04d}.h5')
                        discriminator.save(f'discriminator_weights_batch_{batch_counter:04d}.h5')
                        print("Saved")

        # Log epoch-end models and metrics
        mlflow.log_metric('gen_loss_epoch', gen_loss.numpy(), step=epoch)
        mlflow.log_metric('disc_loss_epoch', disc_loss.numpy(), step=epoch)
        generator.save(f'generator_weights_epoch_{epoch+1:04d}.h5')
        discriminator.save(f'discriminator_weights_epoch_{epoch+1:04d}.h5')
        print(f"Saved epoch {epoch+1}")

        # Log model artifacts
#         mltf.log_model(generator, f"generator_epoch_{epoch+1}.keras")
#         mltf.log_model(discriminator, f"discriminator_epoch_{epoch+1}.keras")
        

    mlflow.end_run()


In [113]:
# generator = Generator()
# discriminator = Discriminator()

generator = tf.keras.models.load_model("/kaggle/working/generator_weights_batch_3000.h5")
discriminator = tf.keras.models.load_model("/kaggle/working/discriminator_weights_batch_3000.h5")

g_lr = 2e-4
d_lr = 2e-3
beta = 0.5
epochs = 5

# Optimizers
generator_optimizer = tf.keras.optimizers.Adam(g_lr, beta)
discriminator_optimizer = tf.keras.optimizers.Adam(d_lr, beta)

# Start training
fit(dataset, epochs = epochs)

# Save final weights
generator.save('final_generator_weights.h5')
discriminator.save('final_discriminator_weights.h5')


Batch 100 done
Batch 200 done
Batch 300 done
Batch 400 done
Batch 500 done
Saved
Batch 600 done
Batch 700 done
Batch 800 done
Batch 900 done
Batch 1000 done
Saved
Batch 1100 done
Batch 1200 done
Batch 1300 done
Batch 1400 done
Batch 1500 done
Saved
Batch 1600 done
Batch 1700 done
Batch 1800 done
Batch 1900 done
Batch 2000 done
Saved
Batch 2100 done
Batch 2200 done
Batch 2300 done
Batch 2400 done
Batch 2500 done
Saved
Batch 2600 done
Batch 2700 done
Batch 2800 done
Batch 2900 done
Batch 3000 done
Saved
Saved epoch 1


ValueError: in user code:

    File "/tmp/ipykernel_34/3535119356.py", line 11, in train_step  *
        generator_optimizer.apply_gradients(zip(generator_gradients, generator.trainable_variables))
    File "/opt/conda/lib/python3.10/site-packages/keras/src/optimizers/base_optimizer.py", line 282, in apply_gradients  **
        self.apply(grads, trainable_variables)
    File "/opt/conda/lib/python3.10/site-packages/keras/src/optimizers/base_optimizer.py", line 321, in apply
        self.build(trainable_variables)
    File "/opt/conda/lib/python3.10/site-packages/keras/src/optimizers/adam.py", line 97, in build
        self.add_variable_from_reference(
    File "/opt/conda/lib/python3.10/site-packages/keras/src/backend/tensorflow/optimizer.py", line 36, in add_variable_from_reference
        return super().add_variable_from_reference(
    File "/opt/conda/lib/python3.10/site-packages/keras/src/optimizers/base_optimizer.py", line 218, in add_variable_from_reference
        return self.add_variable(
    File "/opt/conda/lib/python3.10/site-packages/keras/src/optimizers/base_optimizer.py", line 192, in add_variable
        variable = backend.Variable(
    File "/opt/conda/lib/python3.10/site-packages/keras/src/backend/common/variables.py", line 165, in __init__
        self._initialize(value)
    File "/opt/conda/lib/python3.10/site-packages/keras/src/backend/tensorflow/core.py", line 31, in _initialize
        self._value = tf.Variable(

    ValueError: tf.function only supports singleton tf.Variables created on the first call. Make sure the tf.Variable is only created once or created outside tf.function. See https://www.tensorflow.org/guide/function#creating_tfvariables for more information.


In [101]:
mlflow.end_run()

In [None]:
import numpy as np
import tensorflow as tf
import cv2

def preprocess_image_for_inference(image_path):
    image = cv2.imread(image_path)
    if image is None:
        raise ValueError("Image not found or unable to read")
    if image.shape[-1] == 3:  # Check for RGB image
        image = cv2.resize(image, (256, 256))
        grayscale_image = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
        grayscale_image = grayscale_image[..., np.newaxis]  # Add channel dimension
        grayscale_image = tf.convert_to_tensor(np.array(grayscale_image, dtype=np.float32) / 255.0)
        grayscale_image = tf.expand_dims(grayscale_image, axis=0)  # Add batch dimension
        return grayscale_image
    else:
        raise ValueError("Image is not in RGB format")

def postprocess_output_image(output_tensor):
    output_image = output_tensor[0]  # Remove batch dimension
    output_image = (output_image + 1) * 127.5  # Scale from [-1, 1] to [0, 255]
    output_image = np.array(output_image, dtype=np.uint8)
    return output_image

# Load the trained generator model
generator = tf.keras.models.load_model('models/generator_weights_batch_3000.h5')

# Path to the input image
input_image_path = 'image.jpeg'

# Preprocess the input image
preprocessed_image = preprocess_image_for_inference(input_image_path)

# Run the generator model to get the output
generated_output = generator(preprocessed_image, training=False)

# Post-process the output image
output_image = postprocess_output_image(generated_output)

# Save or display the output image
output_image_path = 'path_to_your_output_image.jpg'
cv2.imwrite(output_image_path, output_image)
