***Image pre-processing (True Color, False Color, NDVI)***

In [None]:
# Creation of NDVI Image
import numpy as np
from PIL import Image

# Import images as numpy arrays
b04_img = Image.open('images/b04.jpg')
b08_img = Image.open('images/b08.jpg')

b04_array = np.array(b04_img)
b08_array = np.array(b08_img)

# Normalize the arrays to float32
b04_array = b04_array.astype(np.float32) / 255.0
b08_array = b08_array.astype(np.float32) / 255.0

# Calculate NDVI
ndvi = (b08_array - b04_array) / (b08_array + b04_array)

# Map NDVI values to 0-255 for visualization
ndvi_mapped = (((ndvi + 1) / 2)* 255).astype(np.uint8)

# Create RGB array (ndvi, ndvi, ndvi)
ndvi_rgb = np.stack((ndvi_mapped, ndvi_mapped, ndvi_mapped), axis=-1)

# Convert to PIL Image
ndvi_image = Image.fromarray(ndvi_rgb)

# Convert into 512x512
ndvi_image = ndvi_image.resize((512, 512), Image.LANCZOS)

# Save the NDVI image
ndvi_image.save('images/ndvi.jpg')


***Model import***

In [2]:
from diffusers import ControlNetModel, StableDiffusionControlNetPipeline

controlnet = ControlNetModel.from_pretrained("mespinosami/sen12mscr-sd-1_5")
base_model_id = "CompVis/stable-diffusion-v1-4"
pipe = StableDiffusionControlNetPipeline.from_pretrained(
	base_model_id, controlnet=controlnet
)

print(controlnet)


Loading pipeline components...:   0%|          | 0/7 [00:00<?, ?it/s]

ControlNetModel(
  (conv_in): Conv2d(4, 320, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (time_proj): Timesteps()
  (time_embedding): TimestepEmbedding(
    (linear_1): Linear(in_features=320, out_features=1280, bias=True)
    (act): SiLU()
    (linear_2): Linear(in_features=1280, out_features=1280, bias=True)
  )
  (controlnet_cond_embedding): ControlNetConditioningEmbedding(
    (conv_in): Conv2d(3, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (blocks): ModuleList(
      (0): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (1): Conv2d(16, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
      (2): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (3): Conv2d(32, 96, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
      (4): Conv2d(96, 96, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (5): Conv2d(96, 256, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    )
    (conv_out): Conv2d(256, 320, ker

***Input images*** 
Pre processing of all the three different images and input in the model without touching the layers

In [None]:
true_color = Image.open('images/true_color.jpg')
false_color = Image.open('images/false_color.jpg')
ndvi_image = Image.open('images/ndvi.jpg')

# Convert into 512x512
true_color = true_color.resize((512, 512), Image.LANCZOS)
false_color = false_color.resize((512, 512), Image.LANCZOS)


# Print the sizes of the images
print(f"True Color Image Size: {true_color.size}")
print(f"False Color Image Size: {false_color.size}")
print(f"NDVI Image Size: {ndvi_image.size}")

# Print mode checks
print(f"True Color Image Mode: {true_color.mode}")
print(f"False Color Image Mode: {false_color.mode}")
print(f"NDVI Image Mode: {ndvi_image.mode}")

# Pipeline execution
prompt = 'a satellite view of some agricultural fields, in high-resolution, top-down view, near Rome'
controlnet_image = 'true_color'  # Options: 'true_color', 'false_color', 'ndvi'

output = pipe(
    prompt=prompt,
    image=ndvi_image,
    num_inference_steps=50
)

output_image = output.images[0]
# Save the output image
output_image.save(f'images/output_image' + '_ndvi_image' + '.jpg')


***Phase 2***
Fine-Tuning on the last 2 model layers
Image pre-processing

In [None]:
# Pre processing of the data segmenting each image and mask into patches of size 512x512
# and saving them into the respective folders for training and validation
import os
import numpy as np
import PIL.Image as Image
import rasterio as rio
import cv2

IMAGE_PATH =  'dataset/images/not_converted'
MASK_PATH = 'dataset/masks/not_converted'

# Read the image
def read_image(image_path):
    with rio.open(image_path) as src:
        image = src.read()
        image = image.astype(np.uint8)
    return image

# Read the mask
def read_mask(mask_path):
    with rio.open(mask_path) as src:
        mask = src.read()
        mask = np.squeeze(mask, axis=0)
        # Upscale from 20m to 10m with nearest neighbor interpolation
        mask = cv2.resize(mask, (0, 0), fx=2, fy=2, interpolation=cv2.INTER_NEAREST)
        mask = mask.astype(np.uint8)
    return mask

image_files = sorted(f for f in os.listdir(IMAGE_PATH) if f.endswith('.jp2'))
mask_files = sorted(f for f in os.listdir(MASK_PATH) if f.endswith('.jp2'))
patch_size = 512
stride = 512

for image_file, mask_file in zip(image_files, mask_files):
    image_path = os.path.join(IMAGE_PATH, image_file)
    mask_path = os.path.join(MASK_PATH, mask_file)

    # Read the image and mask
    image = read_image(image_path)
    mask = read_mask(mask_path)
    
    height, witdth = image.shape[1:]
    
    # Slide windoew over the image to get windows of size 512x512
    for y in range(0, height - patch_size + 1, stride):
        for x in range(0, witdth - patch_size + 1, stride):
            image_patch = image[:, y:y+512, x:x+512]
            mask_patch = mask[y:y+512, x:x+512]
            # Save image and mask patches
            image_patch_path = os.path.join('dataset/images/train', f'{image_file[:-4]}_{y}_{x}.png')
            mask_patch_path = os.path.join('dataset/masks/val', f'{mask_file[:-4]}_{y}_{x}.png')
            image_patch_image = Image.fromarray(image_patch.transpose(1, 2, 0)) # Transpose from (C, H, W) to (H, W, C)
            mask_patch_image = Image.fromarray(mask_patch.astype(np.uint8), mode='L')
            image_patch_image.save(image_patch_path)
            mask_patch_image.save(mask_patch_path)

***Fine Tuning***

In [None]:
import torch

# Explore the ControlNetModel to choose which blocks to train
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


# Dummy inputs
sample = torch.randn(1, 4, 64, 64).to(device)                    # latent
timestep = torch.tensor([50]).to(device)                            # e.g. 50
encoder_hidden_states = torch.randn(1, 77, 768).to(device)           # from tokenizer/CLIP
controlnet_cond = torch.randn(1, 3, 512, 512).to(device)    # e.g. RGB/NDVI

controlnet = controlnet.to(device)

# Forward pass like:
controlnet(sample, timestep, encoder_hidden_states, controlnet_cond)


In [None]:
from diffusers import StableDiffusionControlNetPipeline, DDPMScheduler, AutoencoderKL, UNet2DConditionModel, PNDMScheduler

# DataLoader for training
from tqdm import tqdm
import torch
import os 
import pandas as pd
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from torchvision.transforms import InterpolationMode as Im
from PIL import Image
import torch.nn.functional as F
from torch.optim import AdamW
from transformers import get_scheduler

# Warning handling
import warnings
warnings.filterwarnings("ignore")

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

for name, param in controlnet.named_parameters():
    if name.startswith("mid_block.") or name.startswith("down_blocks.3."):
        param.requires_grad = True
    else:
        param.requires_grad = False


class imageDataset(Dataset):
    def __init__(self, image_dir, mask_dir, transform=None):
        self.image_dir = image_dir
        self.mask_dir = mask_dir
        self.transform = transform
        self.image_files = sorted([f for f in os.listdir(image_dir) if f.endswith('.png')])
        self.mask_files = sorted([f for f in os.listdir(mask_dir) if f.endswith('.png')])
        
        # Transform images: resize to 512x512, normalize, and convert to tensor
        self.image_transform = transforms.Compose([
            transforms.Resize((256, 256), interpolation=Im.BILINEAR),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
        ])
        
        # Transform masks: resize to 512x512 and convert to tensor
        self.mask_transform = transforms.Compose([
            transforms.Resize((256, 256), interpolation=Im.NEAREST),
            transforms.ToTensor()
        ])
        
    def __len__(self):
        return len(self.image_files)
    
    def __getitem__(self, idx):
        image_path = os.path.join(self.image_dir, self.image_files[idx])
        mask_path = os.path.join(self.mask_dir, self.mask_files[idx])
        
        # Open images and masks
        image = Image.open(image_path).convert('RGB')  # Ensure image is in RGB format
        mask = Image.open(mask_path).convert('RGB')  # Ensure mask is in grayscale format
        
        image = self.image_transform(image)
        mask = self.mask_transform(mask)
        # mask = (mask * 255).long()  # Convert mask to long tensor for compatibility with loss functions (integer class labels)
        
        return image, mask

train_dataset = imageDataset(
    image_dir='dataset/train/images',
    mask_dir='dataset/train/masks'
)

val_dataset = imageDataset(
    image_dir='dataset/val/images',
    mask_dir='dataset/val/masks'
)

train_loader = DataLoader(train_dataset, batch_size=1, shuffle=True, num_workers=0)
val_loader = DataLoader(val_dataset, batch_size=1, shuffle=False)

for x, y in train_loader:
    print(x.shape, y.shape)
    break

# Initialize the Stable Diffusion pipeline with ControlNet
base_model_id = "CompVis/stable-diffusion-v1-4"

pipe = StableDiffusionControlNetPipeline.from_pretrained(
    base_model_id,
    controlnet=controlnet,
)

pipe.to(device)
# pipe.enable_model_cpu_offload()


# Optimizer needs to select only the trainabe parameters
parmas_to_train =[]
for name, param in controlnet.named_parameters():
    if param.requires_grad:
        parmas_to_train.append(param)

optimizer = AdamW(parmas_to_train, lr=1e-5)  # 1e-5, 

vae = pipe.vae                      # Code/Decode Images
unet = pipe.unet                    # To train the model  
text_encoder = pipe.text_encoder    # Encode text prompts
tokenizer = pipe.tokenizer          # Tokenizer for text prompts

for param in vae.parameters():
    param.requires_grad = False  # Freeze VAE parameters
for param in text_encoder.parameters():
    param.requires_grad = False  # Freeze UNet parameters

noise_scheduler = PNDMScheduler(
    beta_start=0.00085,
    beta_end=0.012,
    beta_schedule="scaled_linear",
    num_train_timesteps=1000,
    skip_prk_steps=True,
    steps_offset=1,
    set_alpha_to_one=False
) # Noise scheduler for training

# On GPU
# vae.to(device)
# unet.to(device)
# text_encoder.to(device)
unet.enable_gradient_checkpointing()

num_epochs = 5
total_steps = num_epochs * len(train_loader)

lr_scheduler = get_scheduler(
    "linear",
    optimizer=optimizer,
    num_warmup_steps=0,
    num_training_steps=total_steps
)

for epoch in range(num_epochs):
    total_loss = 0.0
    controlnet.train()
    unet.train()
    
    for step, batch in enumerate(tqdm(train_loader)):
        images, masks = batch
        images = images.to(device)
        masks = masks.to(device)
        
        latents = vae.encode(images).latent_dist.sample()
        latents = latents * vae.config.scaling_factor
        
        noise = torch.randn_like(latents)
        timesteps = torch.randint(0, noise_scheduler.num_train_timesteps, (latents.shape[0],), device=latents.device).long()
        noisy_latents = noise_scheduler.add_noise(latents, noise, timesteps)
        
        prompt = ["Satellite image"] * latents.shape[0] # * batch size
        input_ids = tokenizer(prompt, max_length=tokenizer.model_max_length, return_tensors="pt").input_ids.to(device)
        encoder_hidden_states = text_encoder(input_ids)[0]
        
        control_image = masks
        
        # Forward 
        down_block_res_samples, mid_block_res_sample = controlnet(
            noisy_latents, timesteps, encoder_hidden_states, controlnet_cond=control_image, return_dict=False
        )
        
        # Unet output with conditioning
        noise_pred = unet(
            sample=noisy_latents,
            timestep=timesteps,
            encoder_hidden_states=encoder_hidden_states,
            down_block_additional_residuals=down_block_res_samples,
            mid_block_additional_residual=mid_block_res_sample
        ).sample
        
        
        
        # Loss
        loss = F.mse_loss(noise_pred, noise)
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
        torch.cuda.empty_cache()
        
        running_loss += loss.item()
        if step % 10 == 0:
            print(f"Epoch [{epoch+1}/{num_epochs}], Step [{step+1}/{len(train_loader)}], Loss: {loss.item():.4f}")
            # Save weights every 10 epoch  
            torch.save(controlnet.state_dict(), f'controlnet_epoch{epoch+1}.pth')
        
pipe.eval()
val_loss = 0.0

with torch.no_grad():
    for step, batch in enumerate(tqdm(val_loader)):
        images, masks = batch
        images = images.to(device)
        masks = masks.to(device)

        # Genera latenti e noisy come nel training
        latents = vae.encode(images).latent_dist.sample() * vae.config.scaling_factor
        noise = torch.randn_like(latents)
        timesteps = torch.randint(0, noise_scheduler.num_train_timesteps, (latents.shape[0],), device=latents.device).long()
        noisy_latents = noise_scheduler.add_noise(latents, noise, timesteps)

        prompt = ["Satellite image"] * images.shape[0]
        input_ids = tokenizer(prompt, return_tensors="pt", padding=True, truncation=True).input_ids.to(device)
        encoder_hidden_states = text_encoder(input_ids)[0]

        # Forward
        down_block_res_samples, mid_block_res_sample = controlnet(
            noisy_latents, timesteps, encoder_hidden_states, controlnet_cond=masks, return_dict=False
        )
        noise_pred = unet(
            sample=noisy_latents,
            timestep=timesteps,
            encoder_hidden_states=encoder_hidden_states,
            down_block_additional_residuals=down_block_res_samples,
            mid_block_additional_residual=mid_block_res_sample
        ).sample

        val_loss += F.mse_loss(noise_pred, noise).item()

avg_val_loss = val_loss / len(val_loader)
print(f"Validation Loss: {avg_val_loss:.4f}")

torch.Size([1, 3, 256, 256]) torch.Size([1, 3, 256, 256])


Loading pipeline components...:   0%|          | 0/7 [00:00<?, ?it/s]

  0%|          | 0/5656 [00:00<?, ?it/s]Truncation was not explicitly activated but `max_length` is provided a specific value, please use `truncation=True` to explicitly truncate examples to max length. Defaulting to 'longest_first' truncation strategy. If you encode pairs of sequences (GLUE-style) with the tokenizer you can select this strategy more precisely by providing a specific strategy to `truncation`.
  0%|          | 0/5656 [00:20<?, ?it/s]


OutOfMemoryError: CUDA out of memory. Tried to allocate 20.00 MiB. GPU 0 has a total capacity of 4.00 GiB of which 0 bytes is free. Of the allocated memory 10.87 GiB is allocated by PyTorch, and 122.37 MiB is reserved by PyTorch but unallocated. If reserved but unallocated memory is large try setting PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True to avoid fragmentation.  See documentation for Memory Management  (https://pytorch.org/docs/stable/notes/cuda.html#environment-variables)

: 

In [None]:
# Load weights
controlnet.load_state_dict(torch.load('controlnet_epoch5.pth'))
pipe = StableDiffusionControlNetPipeline.from_pretrained(
    "stable-diffusion-v1-5/stable-diffusion-v1-5",
    controlnet=controlnet
).to(device)

image = pipe(prompt="Satellite image", controlnet_conditioning_image=mask_input_image).images[0]
image.save("output.png")

