In [1]:
import json
import os
import numpy as np
import cv2
import matplotlib.pyplot as plt
from keras.models import Model
from keras.layers import Input, Conv2D, MaxPooling2D, UpSampling2D, Conv2DTranspose, Resizing
from keras.optimizers import Adam
import tensorflow as tf

In [2]:
def split_image(image, chunk_size):
    h, w = image.shape[:2]
    cw, ch = chunk_size
    if h % ch != 0 or w % cw != 0:
        raise ValueError(f"Image size {w}x{h} not divisible by chunk {cw}x{ch}")

    chunks = []
    for y in range(0, h, ch):
        for x in range(0, w, cw):
            chunk = image[y:y + ch, x:x + cw] / 255.0
            chunks.append(chunk)
    return chunks


def merge_chunks(chunks, image_size, chunk_size):
    h, w = image_size
    cw, ch = chunk_size
    full_image = np.zeros((h, w, 3))
    idx = 0

    for y in range(0, h, ch):
        for x in range(0, w, cw):
            full_image[y:y + ch, x:x + cw] = chunks[idx]
            idx += 1
    return full_image


In [3]:
def build_autoencoder(input_shape):
    input_img = Input(shape=input_shape)
    h, w = input_shape[:2]

    # Encoder
    x = Conv2D(64, (3, 3), activation=None, padding='same')(input_img)
    x = MaxPooling2D((2, 2), padding='same')(x)
    x = Conv2D(32, (3, 3), activation=None, padding='same')(x)
    encoded = MaxPooling2D((2, 2), padding='same')(x)

    # Decoder
    x = Conv2D(32, (3, 3), activation=None, padding='same')(encoded)
    x = UpSampling2D((2, 2))(x)
    x = Conv2D(64, (3, 3), activation=None, padding='same')(x)
    x = UpSampling2D((2, 2))(x)
    x = Conv2D(3, (3, 3), activation='sigmoid', padding='same')(x)

    x = Resizing(h, w)(x)

    autoencoder = Model(input_img, x)
    autoencoder.compile(optimizer=Adam(), loss="mse")
    return autoencoder

In [4]:
def data_generator(coco_data, clean_root_prefix, corruption_root,
                   corruption_type, severity_level,
                   chunk_size, full_size, batch_size=32):

    corrupted_batch = []
    clean_batch = []

    for img_info in coco_data["images"]:

        relative_path = img_info["file_name"]
        clean_path = os.path.join(clean_root_prefix, relative_path)

        base_name = os.path.splitext(os.path.basename(relative_path))[0]
        camera_dir = os.path.basename(os.path.dirname(relative_path))

        corrupted_name = f"{base_name}_{corruption_type}_{severity_level}.jpg"
        corrupted_path = os.path.join(corruption_root, camera_dir, corrupted_name)

        clean = cv2.imread(clean_path)
        corrupted = cv2.imread(corrupted_path)

        if clean is None or corrupted is None:
            continue

        clean = cv2.resize(clean, full_size)
        corrupted = cv2.resize(corrupted, full_size)

        clean_chunks = split_image(clean, chunk_size)
        corrupted_chunks = split_image(corrupted, chunk_size)

        # Multi-chunk batching
        for c, k in zip(corrupted_chunks, clean_chunks):
            corrupted_batch.append(c)
            clean_batch.append(k)

            if len(corrupted_batch) == batch_size:
                yield np.array(corrupted_batch), np.array(clean_batch)
                corrupted_batch = []
                clean_batch = []

In [5]:
def train_autoencoder(
        coco_json_path,
        clean_root_prefix,
        corruption_root,
        corruption_type="fog",
        severity_level=2,
        chunk_size=(200, 150),
        full_size=(1600, 900),
        epochs=10,
        batch_size=32,
        weights_dir="weights",
):

    os.makedirs(weights_dir, exist_ok=True)

    # Load coco json metadata only (small)
    with open(coco_json_path, "r") as f:
        coco_data = json.load(f)

    # Compute input shape for model
    sample_h = chunk_size[1]
    sample_w = chunk_size[0]
    input_shape = (sample_h, sample_w, 3)

    # Build model
    autoencoder = build_autoencoder(input_shape)

    # Steps per epoch (48 chunks per full image)
    chunks_per_image = (full_size[0] // chunk_size[0]) * (full_size[1] // chunk_size[1])
    steps = (len(coco_data["images"]) * chunks_per_image) // batch_size

    print(f"Total images: {len(coco_data['images'])}")
    print(f"Chunks per image: {chunks_per_image}")
    print(f"Batch size: {batch_size}")
    print(f"Steps per epoch: {steps}")

    # Data generator (memory efficient)
    train_gen = data_generator(
        coco_data=coco_data,
        clean_root_prefix=clean_root_prefix,
        corruption_root=corruption_root,
        corruption_type=corruption_type,
        severity_level=severity_level,
        chunk_size=chunk_size,
        full_size=full_size,
        batch_size=batch_size
    )

    # Train
    autoencoder.fit(
        train_gen,
        steps_per_epoch=steps,
        epochs=epochs
    )

    weight_path = os.path.join(weights_dir, f"autoencoder_{corruption_type}_{severity_level}.h5")
    autoencoder.save_weights(weight_path)
    print(f"Model weights saved to {weight_path}")

    return autoencoder

In [7]:

if __name__ == "__main__":
    coco_json_path = "../data/sets/nuimages/nuimages_1k.json"
    clean_root_prefix = "../data/sets/nuimages"
    corruption_type = "fog"
    corruption_root = f"../data/sets/generated/{corruption_type}"
    severity_level = 3

    model = train_autoencoder(
        coco_json_path=coco_json_path,
        clean_root_prefix=clean_root_prefix,
        corruption_root=corruption_root,
        corruption_type=corruption_type,
        severity_level=severity_level,
        full_size=(1600, 900),
        chunk_size=(200, 150),
        epochs=8,
        batch_size=32
    )


Total images: 1000
Chunks per image: 48
Batch size: 32
Steps per epoch: 1500


IndexError: list index out of range

## Configuration