In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

# import os
# for dirname, _, filenames in os.walk('/kaggle/input'):
#     for filename in filenames:
#         print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
import os
import cv2
import numpy as np
from shutil import copy2

# Convert a BGR image to LAB and normalize each channel
def convert_to_lab_channels(color_img_bgr):
    img_lab = cv2.cvtColor(color_img_bgr, cv2.COLOR_BGR2LAB).astype(np.float32)
    L = img_lab[:,:,0] / 255.0
    a = (img_lab[:,:,1] - 128.0) / 127.0
    b = (img_lab[:,:,2] - 128.0) / 127.0
    return L.astype(np.float32), a.astype(np.float32), b.astype(np.float32)

# Dataset paths
dataset1_color = '/kaggle/input/landscape-image-colorization/landscape Images/color'
dataset1_gray  = '/kaggle/input/landscape-image-colorization/landscape Images/gray'

dataset2_base = '/kaggle/input/image-colorization-dataset/data'
dataset2_train_color = os.path.join(dataset2_base, "train_color")
dataset2_train_gray  = os.path.join(dataset2_base, "train_black")
dataset2_test_color  = os.path.join(dataset2_base, "test_color")
dataset2_test_gray   = os.path.join(dataset2_base, "test_black")

output_base = '/kaggle/working/landscape_data'

# Create output directory structure
splits = ['train', 'valid', 'test']
for split in splits:
    for subdir in ['color', 'grayscale', 'lab_L', 'lab_ab']:
        os.makedirs(os.path.join(output_base, split, subdir), exist_ok=True)

# Process one image pair: copy images and save L/ab channels as .npy
def process_pair(color_path, gray_path, filename, split):
    copy2(color_path, os.path.join(output_base, split, 'color', filename))
    copy2(gray_path, os.path.join(output_base, split, 'grayscale', filename))

    img_color_bgr = cv2.imread(color_path)
    L, a, b = convert_to_lab_channels(img_color_bgr)

    np.save(os.path.join(output_base, split, 'lab_L', filename.replace('.jpg', '.npy')), L)
    ab = np.stack([a, b], axis=0)
    np.save(os.path.join(output_base, split, 'lab_ab', filename.replace('.jpg', '.npy')), ab)

# Split dataset 1 (landscape) into train/valid/test
all_files_1 = sorted(os.listdir(dataset1_color))
np.random.seed(42)
np.random.shuffle(all_files_1)

n1 = len(all_files_1)
n1_train, n1_valid = int(0.8 * n1), int(0.1 * n1)

splits1 = {
    'train': all_files_1[:n1_train],
    'valid': all_files_1[n1_train:n1_train + n1_valid],
    'test':  all_files_1[n1_train + n1_valid:]
}

for split, files in splits1.items():
    for fname in files:
        process_pair(
            os.path.join(dataset1_color, fname),
            os.path.join(dataset1_gray, fname),
            fname, split
        )

# Split dataset 2 (custom dataset) into train/valid
all_files_2_train = sorted(os.listdir(dataset2_train_color))
np.random.seed(42)
np.random.shuffle(all_files_2_train)

n2 = len(all_files_2_train)
n2_train = int(0.8 * n2)

files2_train = all_files_2_train[:n2_train]
files2_valid = all_files_2_train[n2_train:]

# Process train set from dataset 2
for fname in files2_train:
    process_pair(
        os.path.join(dataset2_train_color, fname),
        os.path.join(dataset2_train_gray, fname),
        fname, 'train'
    )

# Process valid set from dataset 2
for fname in files2_valid:
    process_pair(
        os.path.join(dataset2_train_color, fname),
        os.path.join(dataset2_train_gray, fname),
        fname, 'valid'
    )

# Process test set from dataset 2
files2_test = sorted(os.listdir(dataset2_test_color))
for fname in files2_test:
    process_pair(
        os.path.join(dataset2_test_color, fname),
        os.path.join(dataset2_test_gray, fname),
        fname, 'test'
    )


In [None]:
import os
from collections import defaultdict
import pandas as pd
# For obeserving the structure of file

base_path = '/kaggle/working/landscape_data'


splits = ['train', 'valid', 'test']
subfolders = ['color', 'grayscale', 'lab_L', 'lab_ab']


file_counts = defaultdict(dict)


for split in splits:
    for sub in subfolders:
        folder_path = os.path.join(base_path, split, sub)
        if os.path.exists(folder_path):
            num_files = len([f for f in os.listdir(folder_path) if os.path.isfile(os.path.join(folder_path, f))])
        else:
            num_files = 0
        file_counts[split][sub] = num_files


df = pd.DataFrame(file_counts).T
print(df)


In [1]:
# needed libraries
import os
import cv2
import numpy as np
import torch
import torchvision
from torchvision import transforms
from torchvision.models.segmentation import deeplabv3_resnet50
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt


In [None]:
!pip install transformers timm

In [None]:
# If you have a CUDA capable GPU, use it, otherwise switch to CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
# Import the DeepLabV3 model with a ResNet-101 backbone from torchvision
from torchvision.models.segmentation import deeplabv3_resnet101

# Load the pretrained model (trained on ImageNet + COCO) and move it to the appropriate device (GPU if available)
deeplab = deeplabv3_resnet101(pretrained=True).to(device)

# Set the model to evaluation mode — disables layers like dropout and batchnorm updates
deeplab.eval()




In [None]:
# Function to generate a segmentation mask using the DeepLab model
def get_segmentation_mask(image):
    # Define preprocessing transformations: convert to PIL, resize, normalize
    transform = transforms.Compose([
        transforms.ToPILImage(),               # Convert NumPy array to PIL image
        transforms.Resize((256, 256)),         # Resize to 256x256 (model input size)
        transforms.ToTensor(),                 # Convert to tensor and scale to [0,1]
        transforms.Normalize(                  # Normalize using ImageNet mean and std
            mean=[0.485, 0.456, 0.406], 
            std=[0.229, 0.224, 0.225]
        )
    ])
    
    # Apply transformations and add batch dimension
    input_tensor = transform(image).unsqueeze(0).to(device)

    # Run inference with the segmentation model (no gradient calculation)
    with torch.no_grad():
        output = deeplab(input_tensor)['out'][0]  # Get raw segmentation logits

    # Convert model output to segmentation mask (take argmax over class dimension)
    seg_mask = output.argmax(0).cpu().numpy()

    return seg_mask  # Return as NumPy array (H, W) with class indices



In [None]:
import albumentations as A
from albumentations.pytorch import ToTensorV2

# Custom dataset class for fast loading of colorization data with segmentation support
class ColorizationDatasetFast(Dataset):
    def __init__(self, root_dir, split='train', target_size=(256, 256)):
        # Paths for L channel, ab channels, and grayscale images
        self.l_path = os.path.join(root_dir, split, 'lab_L')
        self.ab_path = os.path.join(root_dir, split, 'lab_ab')
        self.gray_path = os.path.join(root_dir, split, 'grayscale')  # only used for filenames
        self.size = target_size
        self.files = sorted(os.listdir(self.gray_path))  # list of image filenames

    def __len__(self):
        # Return total number of samples
        return len(self.files)

    def __getitem__(self, idx):
        # Replace file extension to match .npy format
        filename = self.files[idx].replace('.jpg', '.npy')

        # Load L and ab channels from .npy files
        l = np.load(os.path.join(self.l_path, filename))  # shape: (H, W)
        ab = np.load(os.path.join(self.ab_path, filename))  # shape: (2, H, W)

        # Resize L and ab channels to target size
        l = cv2.resize(l, self.size, interpolation=cv2.INTER_LINEAR)  # shape: (H, W)
        ab_resized = np.stack([
            cv2.resize(ab[0], self.size, interpolation=cv2.INTER_LINEAR),
            cv2.resize(ab[1], self.size, interpolation=cv2.INTER_LINEAR)
        ], axis=0)  # shape: (2, H, W)

        # Convert to torch tensors
        l = torch.from_numpy(l).unsqueeze(0).float()  # shape: (1, H, W)
        ab = torch.from_numpy(ab_resized).float()     # shape: (2, H, W)

        # Load corresponding grayscale image for segmentation (convert to RGB)
        rgb_image = cv2.imread(os.path.join(self.gray_path, self.files[idx]))  # BGR image
        rgb_image = cv2.cvtColor(rgb_image, cv2.COLOR_BGR2RGB)  # convert to RGB
        rgb_image = cv2.resize(rgb_image, self.size)  # resize to target size

        # Generate segmentation mask from RGB image
        seg_mask = get_segmentation_mask_from_np(rgb_image)  # shape: (H, W)

        # Convert segmentation mask to tensor
        seg_mask_tensor = torch.from_numpy(seg_mask).long()  # shape: (H, W)

        # Return L channel, ab channels, and segmentation mask
        return l, ab, seg_mask_tensor


In [None]:
!pip install efficientnet_pytorch


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from efficientnet_pytorch import EfficientNet

# A U-Net style architecture with EfficientNet-B0 encoder and segmentation mask fusion
class EfficientUNetWithSeg(nn.Module):
    def __init__(self, n_classes=313):
        super().__init__()

        # Load pretrained EfficientNet-B0 as encoder
        self.encoder = EfficientNet.from_pretrained('efficientnet-b0')

        # Convert 2 input channels (L channel + segmentation embedding) into 3 for EfficientNet
        self.input_conv = nn.Conv2d(2, 3, kernel_size=1)

        # Extract intermediate encoder blocks (EfficientNet layers)
        self.enc1 = nn.Sequential(
            self.encoder._conv_stem,
            self.encoder._bn0,
            self.encoder._swish
        )  # Output: [B, 32, H/2, W/2]

        self.enc2 = nn.Sequential(*self.encoder._blocks[0:2])   # Output: [B, 24, H/4, W/4]
        self.enc3 = nn.Sequential(*self.encoder._blocks[2:4])   # Output: [B, 40, H/8, W/8]
        self.enc4 = nn.Sequential(*self.encoder._blocks[4:10])  # Output: [B, 80, H/16, W/16]
        self.enc5 = nn.Sequential(*self.encoder._blocks[10:])   # Output: [B, 112, H/32, W/32]

        # U-Net style decoder blocks with upsampling
        self.up4 = self._up_block(320, 112)  # Skip connection with enc4
        self.up3 = self._up_block(112, 40)   # Skip connection with enc3
        self.up2 = self._up_block(40, 24)    # Skip connection with enc2
        self.up1 = self._up_block(24, 32)    # Skip connection with enc1

        # Segmentation mask embedding: maps class index to 1D embedding per pixel
        self.seg_embed = nn.Embedding(21, 1)  # Output: [B, 1, H, W]

        # Final convolution: predict ab channels (2 outputs per pixel)
        self.final_conv = nn.Conv2d(32, 2, kernel_size=1)

        # Final upsampling to match input size
        self.upsample_final = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=False)

    def _up_block(self, in_ch, out_ch):
        # A basic upsampling block using transposed convolution followed by ReLU
        return nn.Sequential(
            nn.ConvTranspose2d(in_ch, out_ch, kernel_size=2, stride=2),
            nn.ReLU(inplace=True)
        )

    def forward(self, l, seg_mask):
        """
        Forward pass through the model.

        Args:
        l         -- Grayscale L channel input tensor, shape: [B, 1, H, W]
        seg_mask  -- Segmentation mask tensor with class IDs, shape: [B, H, W]

        Returns:
        ab_pred   -- Predicted ab channels, shape: [B, 2, H, W]
        """

        # Embed segmentation mask into continuous values
        seg_emb = self.seg_embed(seg_mask.long())      # [B, H, W, 1]
        seg_emb = seg_emb.permute(0, 3, 1, 2)          # [B, 1, H, W]

        # Concatenate L channel and segmentation embedding
        x = torch.cat([l, seg_emb], dim=1)             # [B, 2, H, W]
        x = self.input_conv(x)                         # [B, 3, H, W]

        # Encoder path
        x1 = self.enc1(x)  # Output after stem
        x2 = self.enc2(x1)
        x3 = self.enc3(x2)
        x4 = self.enc4(x3)
        x5 = self.enc5(x4)

        # Decoder path with skip connections
        u4 = self.up4(x5) + x4
        u3 = self.up3(u4) + x3
        u2 = self.up2(u3) + x2
        u1 = self.up1(u2) + x1

        # Final convolution to get 2-channel ab prediction
        out = self.final_conv(u1)
        out = self.upsample_final(out)  # Upsample to original resolution

        return out


In [None]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from tqdm import tqdm

# Initialize model and move to GPU/CPU
model = EfficientUNetWithSeg().to(device)

# Define optimizer and loss function
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.L1Loss()  # Mean Absolute Error (MAE) for regression

# Learning rate scheduler: reduces LR when validation loss plateaus
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, mode='min', factor=0.5, patience=3, verbose=True
)

# Early stopping setup
early_stop_patience = 5
best_val_loss = float('inf')
early_stop_counter = 0

# Load training and validation datasets
train_dataset = ColorizationDatasetFast('/kaggle/working/landscape_data', split='train')
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)

valid_dataset = ColorizationDatasetFast('/kaggle/working/landscape_data', split='valid')
valid_loader = DataLoader(valid_dataset, batch_size=8, shuffle=False)

# Training loop
for epoch in range(50):  # Max 50 epochs, early stopping will halt earlier if needed
    model.train()
    total_train_loss = 0.0

    # Training step
    for l, ab, seg in tqdm(train_loader, desc=f"[Epoch {epoch+1:02d}] Training"):
        l, ab, seg = l.to(device), ab.to(device), seg.to(device)
        output_ab = model(l, seg)  # Forward pass

        loss = criterion(output_ab, ab)  # Compute L1 loss
        optimizer.zero_grad()
        loss.backward()                 # Backpropagation
        optimizer.step()               # Update weights

        total_train_loss += loss.item()

    avg_train_loss = total_train_loss / len(train_loader)

    # Validation step
    model.eval()
    total_val_loss = 0.0
    with torch.no_grad():
        for l_val, ab_val, seg_val in valid_loader:
            l_val, ab_val, seg_val = l_val.to(device), ab_val.to(device), seg_val.to(device)
            output_val = model(l_val, seg_val)  # Forward pass on validation set

            val_loss = criterion(output_val, ab_val)  # Compute validation loss
            total_val_loss += val_loss.item()

    avg_val_loss = total_val_loss / len(valid_loader)

    # Step the learning rate scheduler
    scheduler.step(avg_val_loss)

    # Print epoch summary
    print(f"Epoch {epoch+1:02d} | Train Loss: {avg_train_loss:.4f} | Val Loss: {avg_val_loss:.4f} | LR: {optimizer.param_groups[0]['lr']:.6f}")

    # Early stopping logic: save model if improved
    if avg_val_loss < best_val_loss - 1e-4:
        best_val_loss = avg_val_loss
        torch.save(model.state_dict(), '/kaggle/working/best_model_earlystop.pth')  # Save best model
        early_stop_counter = 0
    else:
        early_stop_counter += 1
        print(f"  🔸 Early Stop Counter: {early_stop_counter}/{early_stop_patience}")
        if early_stop_counter >= early_stop_patience:
            print("⛔️ Early stopping triggered.")
            break  # Stop training


In [None]:
# Load test dataset from the specified path
test_dataset = ColorizationDatasetFast('/kaggle/working/landscape_data', split='test')
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

# Function to convert L and ab channels back to RGB image
def lab_to_rgb(L, ab):
    # If L has shape (1, H, W), squeeze the batch dimension
    if len(L.shape) == 3:
        L = L[0]

    # Denormalize L channel from [0, 1] to [0, 255]
    L = (L * 255.0).astype(np.uint8)

    # Denormalize a and b channels from [-1, 1] to [0, 255]
    a = (ab[0] * 127.0 + 128).astype(np.uint8)
    b = (ab[1] * 127.0 + 128).astype(np.uint8)

    # Stack the three LAB channels together
    lab = np.stack([L, a, b], axis=2)  # Shape: (H, W, 3)

    # Convert LAB image to RGB using OpenCV
    rgb = cv2.cvtColor(lab, cv2.COLOR_LAB2RGB)
    return rgb


In [None]:
from skimage.metrics import structural_similarity as compare_ssim
from skimage.metrics import peak_signal_noise_ratio

# Initialize metric accumulators
total_ssim = 0.0
total_psnr = 0.0
count = 0

# Load the trained model and move it to the appropriate device
model = EfficientUNetWithSeg()
model.load_state_dict(torch.load('/kaggle/input/bestt/pytorch/default/1/best_model_earlystop_BESTMODEL.pth', map_location=device))
model.to(device)
model.eval()  # Set model to evaluation mode

# Disable gradient computation for evaluation
with torch.no_grad():
    for l, ab, seg in test_loader:
        # Move data to device (GPU or CPU)
        l = l.to(device)
        ab = ab.to(device)
        seg = seg.to(device)

        # Predict ab channels using the model
        output_ab = model(l, seg)

        # Convert tensors to numpy arrays for metric computation
        l_np = l[0].cpu().numpy()
        ab_true_np = ab[0].cpu().numpy()
        ab_pred_np = output_ab[0].cpu().numpy()

        # Convert both ground truth and predicted LAB to RGB
        rgb_true = lab_to_rgb(l_np, ab_true_np)
        rgb_pred = lab_to_rgb(l_np, ab_pred_np)

        # Compute SSIM and PSNR between predicted and true RGB images
        ssim_score = compare_ssim(rgb_true, rgb_pred, channel_axis=2, data_range=255)
        psnr_score = peak_signal_noise_ratio(rgb_true, rgb_pred, data_range=255)

        # Accumulate metrics
        total_ssim += ssim_score
        total_psnr += psnr_score
        count += 1

# Compute average metrics over the entire test set
avg_ssim = total_ssim / count
avg_psnr = total_psnr / count

# Print final evaluation results
print(f"✅ Final Evaluation on Test Set:")
print(f"→ Average SSIM : {avg_ssim:.4f}")
print(f"→ Average PSNR : {avg_psnr:.2f} dB")


In [None]:
from torchvision import transforms

def get_segmentation_mask_from_np(rgb_np):
    transform = transforms.Compose([
        transforms.ToPILImage(),
        transforms.Resize((256, 256)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406],
                             std=[0.229, 0.224, 0.225])
    ])
    input_tensor = transform(rgb_np).unsqueeze(0).to(device)
    with torch.no_grad():
        output = deeplab(input_tensor)['out'][0]
    seg_mask = output.argmax(0).cpu().numpy()
    return seg_mask

In [None]:
!pip install gradio

In [None]:
# 6. Load the pretrained model and set to evaluation mode
model = EfficientUNetWithSeg().to(device)
model.load_state_dict(torch.load("/kaggle/input/bestt/pytorch/default/1/best_model_earlystop_BESTMODEL.pth", map_location=device))
model.eval()

# 7. Main image colorization function
def colorize_image(gray_img_pil, mode, file_format):
    # Normalize file format names (e.g., jpg -> JPEG)
    if file_format.upper() == "JPG":
        file_format = "JPEG"
    elif file_format.upper() == "WEBP":
        file_format = "WEBP"
    elif file_format.upper() == "TIFF":
        file_format = "TIFF"

    # 1. Convert grayscale image (PIL) to NumPy array (shape: H x W)
    gray_np_original = np.array(gray_img_pil.convert("L"))  # Grayscale (H, W)
    orig_h, orig_w = gray_np_original.shape  # Store original resolution

    # 2. Resize to 256x256 and normalize for model input
    gray_resized = cv2.resize(gray_np_original, (256, 256)) / 255.0
    L_tensor = torch.tensor(gray_resized).unsqueeze(0).unsqueeze(0).float().to(device)  # (1, 1, 256, 256)

    # 3. Create fake RGB from grayscale for segmentation mask
    rgb_simulated = cv2.cvtColor(gray_np_original, cv2.COLOR_GRAY2RGB)
    rgb_resized = cv2.resize(rgb_simulated, (256, 256))
    seg_mask = get_segmentation_mask_from_np(rgb_resized)
    seg_tensor = torch.tensor(seg_mask).unsqueeze(0).to(device)  # (1, 256, 256)

    # 4. Predict ab color channels from model
    with torch.no_grad():
        ab_pred = model(L_tensor, seg_tensor)
    ab_pred_np = ab_pred[0].cpu().numpy()  # (2, 256, 256)

    # 5. Resize ab


In [None]:
with gr.Blocks(theme="soft") as demo:
    gr.Markdown("## 🎨 AI-Powered Image Colorization")
    gr.Markdown("Colorize black-and-white images using a segmentation-assisted EfficientUNet model.")

    # 1. Image upload
    input_image = gr.Image(label="🖼️ Upload Grayscale Image", type="pil")

    # 2. Settings (mode + format)
    with gr.Row():
        mode = gr.Radio(["Basic", "Advanced"], value="Basic", label="🧭 Mode")
        file_format = gr.Radio(["PNG", "JPG", "WEBP", "TIFF"], value="PNG", label="🗂️ Output Format")

    # 3. Button
    run_button = gr.Button("🚀 Colorize")

    # 4. Gallery
    output_gallery = gr.Gallery(label="🎬 Before and After", columns=2, height=300)

    # 5. Download
    download_button = gr.File(label="⬇ Download Colorized Image")

    # Function
    def process_wrapper(img, mode, fmt):
        gallery, path = colorize_image(img, mode, fmt)
        return gallery, path

    run_button.click(fn=process_wrapper,
                     inputs=[input_image, mode, file_format],
                     outputs=[output_gallery, download_button])

demo.launch(share=True)
