In [None]:
import kagglehub
requiemonk_sentinel12_image_pairs_segregated_by_terrain_path = kagglehub.dataset_download('requiemonk/sentinel12-image-pairs-segregated-by-terrain')

print('Data source import complete.')

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
from torchvision.utils import save_image
from sklearn.model_selection import train_test_split

from PIL import Image
import os
import re
import numpy as np
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm

!pip install torchmetrics
from torchmetrics.image import PeakSignalNoiseRatio, StructuralSimilarityIndexMeasure
from torchmetrics.image.fid import FrechetInceptionDistance
from torchmetrics.image.lpip import LearnedPerceptualImagePatchSimilarity



device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
from torchvision.utils import save_image
from sklearn.model_selection import train_test_split

from PIL import Image
import os
import re
import numpy as np
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm


from torchmetrics.image import PeakSignalNoiseRatio, StructuralSimilarityIndexMeasure
from torchmetrics.image.fid import FrechetInceptionDistance
from torchmetrics.image.lpip import LearnedPerceptualImagePatchSimilarity



device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")

class SentinelDataset(Dataset):
    def __init__(self, root_dir, image_size=128, mode='train', split_ratio=0.8):
        self.root_dir = root_dir
        self.image_pairs = []

        self.transform = transforms.Compose([
            transforms.Resize((image_size, image_size)),
            transforms.ToTensor(),
            transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
        ])

        def get_base_name(fname):
            return re.sub(r'_s[12]_', '_', fname)
        terrains = ['agri']
        print(f"Loading data for {mode} set from folder: {terrains[0]}")

        for terrain in terrains:
            terrain_path = os.path.join(root_dir, terrain)
            if not os.path.isdir(terrain_path):
                print(f"Warning: Folder '{terrain}' not found. Skipping.")
                continue

            s1_path = os.path.join(terrain_path, "s1")
            s2_path = os.path.join(terrain_path, "s2")

            if os.path.isdir(s1_path) and os.path.isdir(s2_path):
                s1_files = {get_base_name(f): os.path.join(s1_path, f) for f in os.listdir(s1_path) if f.endswith(".png")}
                s2_files = {get_base_name(f): os.path.join(s2_path, f) for f in os.listdir(s2_path) if f.endswith(".png")}

                common_base_names = sorted(list(s1_files.keys() & s2_files.keys()))

                # Split the data into training and validation sets
                train_base_names, val_base_names = train_test_split(common_base_names, train_size=split_ratio, random_state=42)

                if mode == 'train':
                    selected_base_names = train_base_names
                else:
                    selected_base_names = val_base_names

                for base_name in selected_base_names:
                    self.image_pairs.append((s1_files[base_name], s2_files[base_name]))

    def __len__(self):
        return len(self.image_pairs)

    def __getitem__(self, idx):
        s1_path, s2_path = self.image_pairs[idx]
        condition_img = Image.open(s1_path).convert("RGB")
        target_img = Image.open(s2_path).convert("RGB")
        condition_img = self.transform(condition_img)
        target_img = self.transform(target_img)
        return condition_img, target_img

In [None]:
class Block(nn.Module):
    def __init__(self, in_channels, out_channels, down=True, act="relu", use_dropout=False):
        super(Block, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, 4, 2, 1, bias=False, padding_mode="reflect")
            if down
            else nn.ConvTranspose2d(in_channels, out_channels, 4, 2, 1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU() if act == "relu" else nn.LeakyReLU(0.2),
        )
        self.use_dropout = use_dropout
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        x = self.conv(x)
        return self.dropout(x) if self.use_dropout else x

class Generator128(nn.Module):
    def __init__(self, in_channels=3, features=64):
        super().__init__()
        self.initial_down = nn.Sequential(
            nn.Conv2d(in_channels, features, 4, 2, 1, padding_mode="reflect"),
            nn.LeakyReLU(0.2),
        )
        self.down1 = Block(features, features * 2, down=True, act="leaky")
        self.down2 = Block(features * 2, features * 4, down=True, act="leaky")
        self.down3 = Block(features * 4, features * 8, down=True, act="leaky")
        self.down4 = Block(features * 8, features * 8, down=True, act="leaky")
        self.down5 = Block(features * 8, features * 8, down=True, act="leaky")
        self.bottleneck = nn.Sequential(
            nn.Conv2d(features * 8, features * 8, 4, 2, 1), nn.ReLU()
        )
        self.up1 = Block(features * 8, features * 8, down=False, act="relu", use_dropout=True)
        self.up3 = Block(features * 8 * 2, features * 8, down=False, act="relu", use_dropout=True)
        self.up4 = Block(features * 8 * 2, features * 4, down=False, act="relu")
        self.up5 = Block(features * 4 + features * 8, features * 2, down=False, act="relu")
        self.up6 = Block(features * 2 + features * 4, features, down=False, act="relu")
        self.up7 = Block(features + features * 2, features, down=False, act="relu")
        self.final_up = nn.Sequential(
            nn.ConvTranspose2d(features * 2, in_channels, kernel_size=4, stride=2, padding=1),
            nn.Tanh(),
        )

    def forward(self, x):
        d1 = self.initial_down(x)
        d2 = self.down1(d1)
        d3 = self.down2(d2)
        d4 = self.down3(d3)
        d5 = self.down4(d4)
        d6 = self.down5(d5)
        bottleneck = self.bottleneck(d6)
        up1 = self.up1(bottleneck)
        up3 = self.up3(torch.cat([up1, d6], 1))
        up4 = self.up4(torch.cat([up3, d5], 1))
        up5 = self.up5(torch.cat([up4, d4], 1))
        up6 = self.up6(torch.cat([up5, d3], 1))
        up7 = self.up7(torch.cat([up6, d2], 1))
        return self.final_up(torch.cat([up7, d1], 1))

In [None]:
class CNNBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=2):
        super().__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, 4, stride, bias=False, padding_mode="reflect"),
            nn.BatchNorm2d(out_channels),
            nn.LeakyReLU(0.2),
        )
    def forward(self, x):
        return self.conv(x)

class Discriminator(nn.Module):
    def __init__(self, in_channels=3, features=[64, 128, 256, 512]):
        super().__init__()
        self.initial = nn.Sequential(
            nn.Conv2d(in_channels * 2, features[0], kernel_size=4, stride=2, padding=1, padding_mode="reflect"),
            nn.LeakyReLU(0.2),
        )
        layers = []
        in_channels = features[0]
        for feature in features[1:]:
            layers.append(CNNBlock(in_channels, feature, stride=1 if feature == features[-1] else 2))
            in_channels = feature
        layers.append(nn.Conv2d(in_channels, 1, kernel_size=4, stride=1, padding=1, padding_mode="reflect"))
        self.model = nn.Sequential(*layers)

    def forward(self, x, y):
        x = torch.cat([x, y], dim=1)
        x = self.initial(x)
        return self.model(x)

In [None]:
LEARNING_RATE = 2e-4
BATCH_SIZE = 16
NUM_WORKERS = 4
IMAGE_SIZE = 128
CHANNELS_IMG = 3
L1_LAMBDA = 100
NUM_EPOCHS = 150

DATASET_PATH = "/kaggle/input/sentinel12-image-pairs-segregated-by-terrain/v_2/"
os.makedirs("saved_images", exist_ok=True)

disc = Discriminator(in_channels=CHANNELS_IMG).to(device)
gen = Generator128(in_channels=CHANNELS_IMG).to(device)
opt_disc = optim.Adam(disc.parameters(), lr=LEARNING_RATE, betas=(0.5, 0.999))
opt_gen = optim.Adam(gen.parameters(), lr=LEARNING_RATE, betas=(0.5, 0.999))

BCE = nn.BCEWithLogitsLoss()
L1_LOSS = nn.L1Loss()

# Create the training dataset and dataloader
train_dataset = SentinelDataset(
    root_dir=DATASET_PATH,
    image_size=IMAGE_SIZE,
    mode='train'
)
train_loader = DataLoader(
    train_dataset,
    batch_size=BATCH_SIZE,
    shuffle=True,
    num_workers=NUM_WORKERS,
    pin_memory=True
)

# Create the validation dataset and dataloader
val_dataset = SentinelDataset(
    root_dir=DATASET_PATH,
    image_size=IMAGE_SIZE,
    mode='val'
)
val_loader = DataLoader(
    val_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=NUM_WORKERS,
    pin_memory=True
)

print(f"Found {len(train_dataset)} image pairs for training.")
print(f"Found {len(val_dataset)} image pairs for validation.")

In [None]:
def train_fn(disc, gen, loader, opt_disc, opt_gen, l1_loss, bce):
    loop = tqdm(loader, leave=True)

    avg_disc_loss = 0.0
    avg_gen_loss = 0.0

    for idx, (x, y) in enumerate(loop):
        x, y = x.to(device), y.to(device)

        # Train Discriminator
        with torch.amp.autocast(device_type=device, dtype=torch.float16):
            y_fake = gen(x)
            D_real = disc(x, y)
            D_real_loss = bce(D_real, torch.ones_like(D_real))
            D_fake = disc(x, y_fake.detach())
            D_fake_loss = bce(D_fake, torch.zeros_like(D_fake))
            D_loss = (D_real_loss + D_fake_loss) / 2

        disc.zero_grad()
        D_loss.backward()
        opt_disc.step()

        # Train Generator
        with torch.amp.autocast(device_type=device, dtype=torch.float16):
            D_fake = disc(x, y_fake)
            G_fake_loss = bce(D_fake, torch.ones_like(D_fake))
            L1 = l1_loss(y_fake, y) * L1_LAMBDA
            G_loss = G_fake_loss + L1

        opt_gen.zero_grad()
        G_loss.backward()
        opt_gen.step()

        avg_disc_loss += D_loss.item()
        avg_gen_loss += G_loss.item()

        if idx % 10 == 0:
            loop.set_postfix(
                D_real=torch.sigmoid(D_real).mean().item(),
                D_fake=torch.sigmoid(D_fake).mean().item(),
            )

    return avg_disc_loss / len(loader), avg_gen_loss / len(loader)


def validate_fn(disc, gen, loader, l1_loss, bce):
    loop = tqdm(loader, leave=True, desc="Validating")

    avg_disc_loss_val = 0.0
    avg_gen_loss_val = 0.0

    gen.eval()
    disc.eval()

    with torch.no_grad():
        for x, y in loop:
            x, y = x.to(device), y.to(device)

            with torch.amp.autocast(device_type=device, dtype=torch.float16):
                y_fake = gen(x)

                # Discriminator Loss
                D_real = disc(x, y)
                D_real_loss = bce(D_real, torch.ones_like(D_real))
                D_fake = disc(x, y_fake.detach())
                D_fake_loss = bce(D_fake, torch.zeros_like(D_fake))
                D_loss = (D_real_loss + D_fake_loss) / 2

                # Generator Loss
                D_fake_g = disc(x, y_fake)
                G_fake_loss = bce(D_fake_g, torch.ones_like(D_fake_g))
                L1 = l1_loss(y_fake, y) * L1_LAMBDA
                G_loss = G_fake_loss + L1

            avg_disc_loss_val += D_loss.item()
            avg_gen_loss_val += G_loss.item()

    gen.train()
    disc.train()

    return avg_disc_loss_val / len(loader), avg_gen_loss_val / len(loader)

def save_example_images(gen, val_loader, epoch, folder="saved_images"):
    x, y = next(iter(val_loader))
    x, y = x.to(device), y.to(device)
    gen.eval()
    with torch.no_grad():
        y_fake = gen(x)
        y_fake = y_fake * 0.5 + 0.5
        x = x * 0.5 + 0.5
        y = y * 0.5 + 0.5
        save_image(x, folder + f"/input_{epoch}.png")
        save_image(y, folder + f"/label_{epoch}.png")
        save_image(y_fake, folder + f"/fake_{epoch}.png")
    gen.train()

In [None]:
NUM_EPOCHS = 150

d_loss_history = []
g_loss_history = []
d_loss_val_history = []
g_loss_val_history = []

for epoch in range(NUM_EPOCHS):
    print(f"\n--- Epoch {epoch+1}/{NUM_EPOCHS} ---")

    avg_d_loss, avg_g_loss = train_fn(disc, gen, train_loader, opt_disc, opt_gen, L1_LOSS, BCE)
    d_loss_history.append(avg_d_loss)
    g_loss_history.append(avg_g_loss)

    avg_d_loss_val, avg_g_loss_val = validate_fn(disc, gen, val_loader, L1_LOSS, BCE)
    d_loss_val_history.append(avg_d_loss_val)
    g_loss_val_history.append(avg_g_loss_val)

    print(f"Train D Loss: {avg_d_loss:.4f}, Train G Loss: {avg_g_loss:.4f}")
    print(f"Val D Loss: {avg_d_loss_val:.4f}, Val G Loss: {avg_g_loss_val:.4f}")

    if (epoch + 1) % 5 == 0:
        print("📸 Saving example images...")
        try:
            save_example_images(gen, val_loader, epoch + 1, folder="saved_images")
        except StopIteration:
            print("Validation loader is empty. Cannot save examples.")


--- Epoch 1/150 ---


  0%|          | 0/200 [00:00<?, ?it/s]

Validating:   0%|          | 0/50 [00:00<?, ?it/s]

Train D Loss: 0.4007, Train G Loss: 35.4562
Val D Loss: 1.3556, Val G Loss: 31.8050

--- Epoch 2/150 ---


  0%|          | 0/200 [00:00<?, ?it/s]

Validating:   0%|          | 0/50 [00:00<?, ?it/s]

Train D Loss: 0.1723, Train G Loss: 34.1759
Val D Loss: 0.9109, Val G Loss: 31.5191

--- Epoch 3/150 ---


  0%|          | 0/200 [00:00<?, ?it/s]

Validating:   0%|          | 0/50 [00:00<?, ?it/s]

Train D Loss: 0.2373, Train G Loss: 33.7274
Val D Loss: 0.7800, Val G Loss: 32.2006

--- Epoch 4/150 ---


  0%|          | 0/200 [00:00<?, ?it/s]

Validating:   0%|          | 0/50 [00:00<?, ?it/s]

Train D Loss: 0.3761, Train G Loss: 33.1971
Val D Loss: 0.9662, Val G Loss: 34.5757

--- Epoch 5/150 ---


  0%|          | 0/200 [00:00<?, ?it/s]

Validating:   0%|          | 0/50 [00:00<?, ?it/s]

Train D Loss: 0.3046, Train G Loss: 33.4587
Val D Loss: 0.8548, Val G Loss: 30.1617
📸 Saving example images...

--- Epoch 6/150 ---


  0%|          | 0/200 [00:00<?, ?it/s]

Validating:   0%|          | 0/50 [00:00<?, ?it/s]

Train D Loss: 0.3490, Train G Loss: 33.1442
Val D Loss: 0.8815, Val G Loss: 30.5158

--- Epoch 7/150 ---


  0%|          | 0/200 [00:00<?, ?it/s]

Validating:   0%|          | 0/50 [00:00<?, ?it/s]

Train D Loss: 0.3253, Train G Loss: 32.8209
Val D Loss: 1.0587, Val G Loss: 29.7367

--- Epoch 8/150 ---


  0%|          | 0/200 [00:00<?, ?it/s]

Validating:   0%|          | 0/50 [00:00<?, ?it/s]

Train D Loss: 0.2868, Train G Loss: 32.1666
Val D Loss: 0.7320, Val G Loss: 28.2574

--- Epoch 9/150 ---


  0%|          | 0/200 [00:00<?, ?it/s]

Validating:   0%|          | 0/50 [00:00<?, ?it/s]

Train D Loss: 0.4405, Train G Loss: 31.8761
Val D Loss: 0.7266, Val G Loss: 30.0740

--- Epoch 10/150 ---


  0%|          | 0/200 [00:00<?, ?it/s]

Validating:   0%|          | 0/50 [00:00<?, ?it/s]

Train D Loss: 0.4189, Train G Loss: 30.8434
Val D Loss: 0.6830, Val G Loss: 30.8702
📸 Saving example images...

--- Epoch 11/150 ---


  0%|          | 0/200 [00:00<?, ?it/s]

Validating:   0%|          | 0/50 [00:00<?, ?it/s]

Train D Loss: 0.4519, Train G Loss: 29.9121
Val D Loss: 0.8033, Val G Loss: 34.1098

--- Epoch 12/150 ---


  0%|          | 0/200 [00:00<?, ?it/s]

Validating:   0%|          | 0/50 [00:00<?, ?it/s]

Train D Loss: 0.4589, Train G Loss: 28.9301
Val D Loss: 0.6943, Val G Loss: 27.6314

--- Epoch 13/150 ---


  0%|          | 0/200 [00:00<?, ?it/s]

Validating:   0%|          | 0/50 [00:00<?, ?it/s]

Train D Loss: 0.4885, Train G Loss: 27.9270
Val D Loss: 0.7979, Val G Loss: 28.7089

--- Epoch 14/150 ---


  0%|          | 0/200 [00:00<?, ?it/s]

Validating:   0%|          | 0/50 [00:00<?, ?it/s]

Train D Loss: 0.4890, Train G Loss: 27.2097
Val D Loss: 0.7780, Val G Loss: 27.6046

--- Epoch 15/150 ---


  0%|          | 0/200 [00:00<?, ?it/s]

Validating:   0%|          | 0/50 [00:00<?, ?it/s]

Train D Loss: 0.4997, Train G Loss: 26.3623
Val D Loss: 0.7011, Val G Loss: 26.9554
📸 Saving example images...

--- Epoch 16/150 ---


  0%|          | 0/200 [00:00<?, ?it/s]

In [None]:
gen.eval()
try:
    val_iterator = iter(val_loader)
    x, y = next(val_iterator)
    x, y = x.to(device), y.to(device)

    with torch.no_grad():
        y_fake = gen(x)
        x = x * 0.5 + 0.5
        y = y * 0.5 + 0.5
        y_fake = y_fake * 0.5 + 0.5

    n_examples = 3
    plt.figure(figsize=(15, 15))

    for i in range(n_examples):
        plt.subplot(n_examples, 3, i*3 + 1)
        plt.imshow(x[i].cpu().permute(1, 2, 0))
        plt.title("Input (S1 Image)")
        plt.axis("off")

        plt.subplot(n_examples, 3, i*3 + 2)
        plt.imshow(y_fake[i].cpu().permute(1, 2, 0))
        plt.title("Generated (Fake S2)")
        plt.axis("off")

        plt.subplot(n_examples, 3, i*3 + 3)
        plt.imshow(y[i].cpu().permute(1, 2, 0))
        plt.title("Ground Truth (Real S2)")
        plt.axis("off")

    plt.tight_layout()
    plt.show()

except StopIteration:
    print("DataLoader exhausted. Please re-run the training cell if you wish to see new results.")

In [None]:
from torchmetrics.image import PeakSignalNoiseRatio, StructuralSimilarityIndexMeasure

def evaluate_model(gen, loader, device):
    l1_loss_fn = nn.L1Loss()

    # Initialize metrics
    psnr = PeakSignalNoiseRatio(data_range=1.0).to(device)
    ssim = StructuralSimilarityIndexMeasure(data_range=1.0).to(device)

    total_l1_loss = 0.0

    gen.eval()  # Set the generator to evaluation mode
    loop = tqdm(loader, leave=True, desc="Evaluating")

    with torch.no_grad():
        for x, y in loop:
            x, y = x.to(device), y.to(device)
            y_fake = gen(x)

            # De-normalize images from [-1, 1] to [0, 1] as metrics expect this range
            y_de = (y * 0.5 + 0.5)
            y_fake_de = (y_fake * 0.5 + 0.5)

            # Update metrics
            total_l1_loss += l1_loss_fn(y_fake_de, y_de).item()
            psnr.update(y_fake_de, y_de)
            ssim.update(y_fake_de, y_de)

    # Compute final scores
    avg_l1 = total_l1_loss / len(loader)
    final_psnr = psnr.compute()
    final_ssim = ssim.compute()

    gen.train() # Set the generator back to training mode

    print("\n--- Evaluation Metrics ---")
    print(f"Average L1 Loss: {avg_l1:.4f}")
    print(f"Average PSNR: {final_psnr:.4f}")
    print(f"Average SSIM: {final_ssim:.4f}")
    print("--------------------------")

    return avg_l1, final_psnr.item(), final_ssim.item()

In [None]:
# Run evaluation on the validation set
evaluate_model(gen, val_loader, device)

In [None]:
import torch
import torch.nn as nn
from tqdm.notebook import tqdm
from torchmetrics.image import PeakSignalNoiseRatio, StructuralSimilarityIndexMeasure
from torchmetrics.image.lpip import LearnedPerceptualImagePatchSimilarity

def evaluate_model(gen, loader, device):
    l1_loss_fn = nn.L1Loss()

    # Initialize metrics (FID has been removed)
    psnr = PeakSignalNoiseRatio(data_range=1.0).to(device)
    ssim = StructuralSimilarityIndexMeasure(data_range=1.0).to(device)
    lpips = LearnedPerceptualImagePatchSimilarity(net_type='alex').to(device)

    total_l1_loss = 0.0

    gen.eval()
    loop = tqdm(loader, leave=True, desc="Evaluating")

    with torch.no_grad():
        for x, y in loop:
            x, y = x.to(device), y.to(device)
            y_fake = gen(x)

            # De-normalize images from [-1, 1] to [0, 1]
            y_de = (y * 0.5 + 0.5)
            y_fake_de = (y_fake * 0.5 + 0.5)

            # Update metrics
            total_l1_loss += l1_loss_fn(y_fake_de, y_de).item()
            psnr.update(y_fake_de, y_de)
            ssim.update(y_fake_de, y_de)
            lpips.update(y_fake_de, y_de)

    # Compute final scores
    avg_l1 = total_l1_loss / len(loader)
    final_psnr = psnr.compute()
    final_ssim = ssim.compute()
    final_lpips = lpips.compute()

    gen.train()

    print("\n--- Evaluation Metrics ---")
    print(f"Average L1 Loss: {avg_l1:.4f} (Lower is better)")
    print(f"Average PSNR:    {final_psnr:.4f} (Higher is better)")
    print(f"Average SSIM:    {final_ssim:.4f} (Higher is better)")
    print(f"LPIPS:           {final_lpips:.4f} (Lower is better)")
    print("--------------------------")

    return {
        "L1": avg_l1,
        "PSNR": final_psnr.item(),
        "SSIM": final_ssim.item(),
        "LPIPS": final_lpips.item()
    }

In [None]:
evaluate_model(gen, val_loader, device)

In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize=(14, 6))

# Plot Generator Loss
plt.subplot(1, 2, 1)
plt.plot(g_loss_history, label='Train Generator Loss')
plt.plot(g_loss_val_history, label='Validation Generator Loss')
plt.title("Generator Loss Over Epochs")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.legend()

# Plot Discriminator Loss
plt.subplot(1, 2, 2)
plt.plot(d_loss_history, label='Train Discriminator Loss')
plt.plot(d_loss_val_history, label='Validation Discriminator Loss')
plt.title("Discriminator Loss Over Epochs")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.legend()

plt.tight_layout()
plt.show()

In [None]:
# Deforestation Detection Example using CSV NDVI Data
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

# Load NDVI data from CSV
csv_path = "ndvi_dataset.csv"  # Change path if needed
df = pd.read_csv(csv_path)

# Reshape NDVI arrays back to 2D
image_size = int(np.sqrt(len(df)))  # For our data, it's 64
ndvi_past = df["ndvi_past"].values.reshape((image_size, image_size))
ndvi_recent = df["ndvi_recent"].values.reshape((image_size, image_size))

# Calculate NDVI difference
diff = ndvi_recent - ndvi_past

# Threshold for significant vegetation loss
deforestation_mask = diff < -0.2

# Plot results
plt.figure(figsize=(12, 4))

plt.subplot(1, 3, 1)
plt.title('NDVI Past')
plt.imshow(ndvi_past, cmap='Greens')
plt.colorbar()

plt.subplot(1, 3, 2)
plt.title('NDVI Recent')
plt.imshow(ndvi_recent, cmap='Greens')
plt.colorbar()

plt.subplot(1, 3, 3)
plt.title('Deforestation Risk')
plt.imshow(deforestation_mask, cmap='Reds')
plt.colorbar()

plt.tight_layout()
plt.show()

print("Deforestation detection performed using CSV NDVI data.")


In [None]:
# Automated Deforestation Detection from Sentinel-2 Images
import os
from PIL import Image
import numpy as np
import matplotlib.pyplot as plt

def load_s2_image(path, image_size=128):
    img = Image.open(path).convert('RGB').resize((image_size, image_size))
    arr = np.asarray(img).astype(np.float32) / 255.0  # Normalize to [0,1]
    arr = arr.transpose(2, 0, 1)  # [C, H, W]
    return arr

def compute_ndvi(img, red_idx=0, nir_idx=2):
    """
    Compute NDVI from a 3-channel image.
    Adjust red_idx and nir_idx according to your channel order.
    """
    red = img[red_idx]
    nir = img[nir_idx]
    # Add a small epsilon to avoid division by zero and handle cases where nir + red is close to zero
    ndvi = (nir - red) / (nir + red + 1e-8)
    return ndvi

# Set your Sentinel-2 folder paths for two dates
# Assuming the dataset structure is path_to_dataset/v_2/terrain/s2
s2_dir_base = os.path.join(requiemonk_sentinel12_image_pairs_segregated_by_terrain_path, 'v_2', 'agri', 's2')

# In a real-world scenario, you would have S2 images from different dates.
# For this example, we'll use pairs from the same directory to demonstrate the NDVI calculation.
# Replace these with paths to S2 images from different dates if you have them.
# For demonstration, let's pick two arbitrary files from the s2 directory.
s2_files_in_dir = sorted([f for f in os.listdir(s2_dir_base) if f.endswith('.png')])

if len(s2_files_in_dir) < 2:
    print("Not enough Sentinel-2 images found in the directory to demonstrate deforestation detection.")
else:
    # Select two different files for demonstration (representing different dates)
    file1 = s2_files_in_dir[0]
    file2 = s2_files_in_dir[1]

    print(f"Using files: {file1} and {file2} to simulate different dates for demonstration.")

    s2_past_path = os.path.join(s2_dir_base, file1)
    s2_recent_path = os.path.join(s2_dir_base, file2)

    s2_past = load_s2_image(s2_past_path)
    s2_recent = load_s2_image(s2_recent_path)

    ndvi_past = compute_ndvi(s2_past)
    ndvi_recent = compute_ndvi(s2_recent)

    # Calculate NDVI difference
    diff = ndvi_recent - ndvi_past

    # Threshold for significant vegetation loss (you might need to adjust this)
    deforestation_mask = diff < -0.2

    plt.figure(figsize=(12, 4))
    plt.subplot(1, 3, 1)
    plt.title('NDVI Past')
    plt.imshow(ndvi_past, cmap='Greens')
    plt.colorbar()
    plt.subplot(1, 3, 2)
    plt.title('NDVI Recent')
    plt.imshow(ndvi_recent, cmap='Greens')
    plt.colorbar()
    plt.subplot(1, 3, 3)
    plt.title('Deforestation Risk')
    plt.imshow(deforestation_mask, cmap='Reds')
    plt.colorbar()
    plt.tight_layout()
    plt.show()

    print(f"Simulated Deforestation Detection using {file1} (Past) and {file2} (Recent)")
    print("Red channel: 0, NIR channel: 2 (Adjust if your S2 data has a different channel order)")

# Note: In a real application, ensure you have S2 images from two distinct dates covering the same area.

## Implement workflow 1 (s1 to s2 generation and comparison)



In [None]:
# Automated Deforestation Detection using Synthetic Sentinel-2 Images from Sentinel-1
import os
from PIL import Image
import numpy as np
import matplotlib.pyplot as plt
import torch

def load_s1_image(path, image_size=128):
    """
    Loads and preprocesses a Sentinel-1 image.
    Assuming S1 images are 3-channel RGB or can be adapted.
    Returns image in [C, H, W] format, normalized to [0, 1].
    """
    img = Image.open(path).convert('RGB').resize((image_size, image_size))
    arr = np.asarray(img).astype(np.float32) / 255.0  # Normalize to [0,1]
    arr = arr.transpose(2, 0, 1)  # [C, H, W]
    return arr

def compute_ndvi(img, red_idx=0, nir_idx=2):
    """
    Compute NDVI from a 3-channel image ([C, H, W]).
    Adjust red_idx and nir_idx according to your channel order.
    """
    red = img[red_idx]
    nir = img[nir_idx]
    # Add a small epsilon to avoid division by zero and handle cases where nir + red is close to zero
    ndvi = (nir - red) / (nir + red + 1e-8)
    return ndvi

print("\n--- Deforestation Detection using Synthetic Sentinel-2 from Sentinel-1 ---")

past_s1_image_path = input("Please enter the path to your past Sentinel-1 image: ")
recent_s1_image_path = input("Please enter the path to your recent Sentinel-1 image: ")

synthetic_s2_past_denorm = None
synthetic_s2_recent_denorm = None

try:
    # Load past Sentinel-1 image
    past_s1_image = load_s1_image(past_s1_image_path)
    past_s1_image_tensor = torch.from_numpy((past_s1_image - 0.5) / 0.5).unsqueeze(0).to(device) # Normalize to [-1, 1]

    # Load recent Sentinel-1 image
    recent_s1_image = load_s1_image(recent_s1_image_path)
    recent_s1_image_tensor = torch.from_numpy((recent_s1_image - 0.5) / 0.5).unsqueeze(0).to(device) # Normalize to [-1, 1]

    # Generate synthetic Sentinel-2 images
    gen.eval() # Set the generator to evaluation mode
    with torch.no_grad():
        synthetic_s2_past = gen(past_s1_image_tensor)
        synthetic_s2_recent = gen(recent_s1_image_tensor)

    # De-normalize and convert generated images
    synthetic_s2_past_denorm = (synthetic_s2_past * 0.5 + 0.5).squeeze(0).cpu().numpy() # [C, H, W], [0, 1]
    synthetic_s2_recent_denorm = (synthetic_s2_recent * 0.5 + 0.5).squeeze(0).cpu().numpy() # [C, H, W], [0, 1]


    print("Successfully generated synthetic Sentinel-2 images from Sentinel-1 images.")

    # Perform deforestation detection using synthetic S2 images
    if synthetic_s2_past_denorm is not None and synthetic_s2_recent_denorm is not None:
        try:
            # Compute NDVI for both synthetic images
            ndvi_past_synthetic = compute_ndvi(synthetic_s2_past_denorm)
            ndvi_recent_synthetic = compute_ndvi(synthetic_s2_recent_denorm)

            # Calculate NDVI difference
            diff = ndvi_recent_synthetic - ndvi_past_synthetic

            # Threshold for significant vegetation loss (you might need to adjust this)
            deforestation_mask = diff < -0.2

            print("Successfully performed deforestation detection using synthetic S2 images.")

            # Visualize the results
            plt.figure(figsize=(18, 5))

            plt.subplot(1, 4, 1)
            plt.title('Synthetic Past S2 Image')
            plt.imshow(synthetic_s2_past_denorm.transpose(1, 2, 0)) # Transpose back to [H, W, C] for plotting
            plt.axis("off")

            plt.subplot(1, 4, 2)
            plt.title('Synthetic Recent S2 Image')
            plt.imshow(synthetic_s2_recent_denorm.transpose(1, 2, 0)) # Transpose back to [H, W, C] for plotting
            plt.axis("off")

            plt.subplot(1, 4, 3)
            plt.title('NDVI Difference (Synthetic Recent - Synthetic Past)')
            plt.imshow(diff, cmap='RdYlGn', vmin=-1, vmax=1)
            plt.colorbar()
            plt.axis("off")

            plt.subplot(1, 4, 4)
            plt.title('Deforestation Risk Mask')
            plt.imshow(deforestation_mask, cmap='Reds')
            plt.colorbar()
            plt.axis("off")

            plt.tight_layout()
            plt.show()

        except Exception as e:
            print(f"An error occurred during deforestation detection or visualization: {e}")


except FileNotFoundError as e:
    print(f"Error: File not found - {e}. Please check the path(s) and try again.")
    synthetic_s2_past_denorm = None
    synthetic_s2_recent_denorm = None
except Exception as e:
    print(f"An error occurred during image processing: {e}")
    synthetic_s2_past_denorm = None
    synthetic_s2_recent_denorm = None
finally:
    gen.train() # Set the generator back to training mode