# Necessary Installations

In [None]:
! pip install wandb
! pip install scikit-image

---

# Importing Required Libraries

In [None]:
import os
import torch
import wandb
from torch.utils.data import Dataset, DataLoader, Subset
from torchvision import transforms
from PIL import Image
import numpy as np
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
import pandas as pd
from kaggle_secrets import UserSecretsClient
from huggingface_hub import HfApi, HfFolder, Repository, create_repo, upload_file
from huggingface_hub import login
import torch.nn.functional as F
from timm.models.layers import trunc_normal_, DropPath
from timm.models.registry import register_model
from huggingface_hub import hf_hub_download
from skimage.metrics import structural_similarity as ssim

---

# Model Setup and Loading Checkpoints

In [None]:
import torch
import torch.nn as nn

class AOHead(nn.Module):
    def __init__(self, in_channels=768, out_channels=1):
        super(AOHead, self).__init__()
        
        # Convolutional layers for further refinement of features
        self.conv1 = nn.Conv2d(in_channels, 512, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(512, 256, kernel_size=3, stride=1, padding=1)
        self.conv3 = nn.Conv2d(256, 128, kernel_size=3, stride=1, padding=1)
        self.conv4 = nn.Conv2d(128, 64, kernel_size=3, stride=1, padding=1)
        
        # Output layer: Single channel for the AO map
        self.conv_out = nn.Conv2d(64, out_channels, kernel_size=3, stride=1, padding=1)

        # Activation functions
        self.relu = nn.ReLU()
        self.sigmoid = nn.Sigmoid()  # For output normalization to range [0, 1]
        
        # Upsample to 600x600
        self.upsample = nn.Upsample(size=(600, 600), mode='bilinear', align_corners=True)

    def forward(self, x):
        """
        Forward pass through the AO head.
        
        Parameters:
            x (torch.Tensor): The input feature map from the ConvNeXt backbone.
        
        Returns:
            torch.Tensor: The predicted Ambient Occlusion map.
        """
        # Pass through the convolutional layers with ReLU activations
        x = self.relu(self.conv1(x))
        x = self.relu(self.conv2(x))
        x = self.relu(self.conv3(x))
        x = self.relu(self.conv4(x))
        
        # Final output (predicting AO map)
        x = self.conv_out(x)
        
        # Apply sigmoid to get values in the range [0, 1] (for visualizing AO)
        x = self.sigmoid(x)
        
        # Upsample to (600, 600)
        x = self.upsample(x)
        
        return x

In [None]:
class Block(nn.Module):
    r""" ConvNeXt Block. There are two equivalent implementations:
    (1) DwConv -> LayerNorm (channels_first) -> 1x1 Conv -> GELU -> 1x1 Conv; all in (N, C, H, W)
    (2) DwConv -> Permute to (N, H, W, C); LayerNorm (channels_last) -> Linear -> GELU -> Linear; Permute back
    We use (2) as we find it slightly faster in PyTorch

    Args:
        dim (int): Number of input channels.
        drop_path (float): Stochastic depth rate. Default: 0.0
        layer_scale_init_value (float): Init value for Layer Scale. Default: 1e-6.
    """
    def __init__(self, dim, drop_path=0., layer_scale_init_value=1e-6):
        super().__init__()
        self.dwconv = nn.Conv2d(dim, dim, kernel_size=7, padding=3, groups=dim) # depthwise conv
        self.norm = LayerNorm(dim, eps=1e-6)
        self.pwconv1 = nn.Linear(dim, 4 * dim) # pointwise/1x1 convs, implemented with linear layers
        self.act = nn.GELU()
        self.pwconv2 = nn.Linear(4 * dim, dim)
        self.gamma = nn.Parameter(layer_scale_init_value * torch.ones((dim)),
                                    requires_grad=True) if layer_scale_init_value > 0 else None
        self.drop_path = DropPath(drop_path) if drop_path > 0. else nn.Identity()

    def forward(self, x):
        input = x
        x = self.dwconv(x)
        x = x.permute(0, 2, 3, 1) # (N, C, H, W) -> (N, H, W, C)
        x = self.norm(x)
        x = self.pwconv1(x)
        x = self.act(x)
        x = self.pwconv2(x)
        if self.gamma is not None:
            x = self.gamma * x
        x = x.permute(0, 3, 1, 2) # (N, H, W, C) -> (N, C, H, W)

        x = input + self.drop_path(x)
        return x

In [None]:
class ConvNeXt(nn.Module):
    r""" ConvNeXt
        A PyTorch impl of : `A ConvNet for the 2020s`  -
          https://arxiv.org/pdf/2201.03545.pdf
    Args:
        in_chans (int): Number of input image channels. Default: 3
        num_classes (int): Number of classes for classification head. Default: 1000
        depths (tuple(int)): Number of blocks at each stage. Default: [3, 3, 9, 3]
        dims (int): Feature dimension at each stage. Default: [96, 192, 384, 768]
        drop_path_rate (float): Stochastic depth rate. Default: 0.
        layer_scale_init_value (float): Init value for Layer Scale. Default: 1e-6.
        head_init_scale (float): Init scaling value for classifier weights and biases. Default: 1.
    """
    def __init__(self, in_chans=3, out_chans=1,
                 depths=[3, 3, 9, 3], dims=[96, 192, 384, 768], drop_path_rate=0.,
                 layer_scale_init_value=1e-6, head_init_scale=1.,
                 **kwargs,):

        super().__init__()



        self.downsample_layers = nn.ModuleList() # stem and 3 intermediate downsampling conv layers
        stem = nn.Sequential(
            nn.Conv2d(in_chans, dims[0], kernel_size=4, stride=4),
            LayerNorm(dims[0], eps=1e-6, data_format="channels_first")
        )
        self.downsample_layers.append(stem)
        for i in range(3):
            downsample_layer = nn.Sequential(
                    LayerNorm(dims[i], eps=1e-6, data_format="channels_first"),
                    nn.Conv2d(dims[i], dims[i+1], kernel_size=2, stride=2),
            )
            self.downsample_layers.append(downsample_layer)



        self.stages = nn.ModuleList() # 4 feature resolution stages, each consisting of multiple residual blocks
        dp_rates=[x.item() for x in torch.linspace(0, drop_path_rate, sum(depths))]
        cur = 0
        for i in range(4):
            stage = nn.Sequential(
                *[Block(dim=dims[i], drop_path=dp_rates[cur + j],
                layer_scale_init_value=layer_scale_init_value) for j in range(depths[i])]
            )
            self.stages.append(stage)
            cur += depths[i]




        # Output head for ambient map prediction
        self.ao_head = AOHead(in_channels= dims[-1], out_channels=1)


        self.apply(self._init_weights)


    def _init_weights(self, m):
        if isinstance(m, (nn.Conv2d, nn.Linear)):
            trunc_normal_(m.weight, std=.02)
            nn.init.constant_(m.bias, 0)

    def forward_features(self, x):
        features = []
        for i in range(4):
            x = self.downsample_layers[i](x)
            x = self.stages[i](x)
            features.append(x)

        x = self.ao_head(x)
        features.append(x)

        return features
        

    def forward(self, x):
        features = self.forward_features(x)

        return features

In [None]:
model_urls = {
    "convnext_tiny_1k": "https://dl.fbaipublicfiles.com/convnext/convnext_tiny_1k_224_ema.pth",
    "convnext_tiny_22k": "https://dl.fbaipublicfiles.com/convnext/convnext_tiny_22k_224.pth",
}

In [None]:
class LayerNorm(nn.Module):
    r""" LayerNorm that supports two data formats: channels_last (default) or channels_first.
    The ordering of the dimensions in the inputs. channels_last corresponds to inputs with
    shape (batch_size, height, width, channels) while channels_first corresponds to inputs
    with shape (batch_size, channels, height, width).
    """
    def __init__(self, normalized_shape, eps=1e-6, data_format="channels_last"):
        super().__init__()
        self.weight = nn.Parameter(torch.ones(normalized_shape))
        self.bias = nn.Parameter(torch.zeros(normalized_shape))
        self.eps = eps
        self.data_format = data_format
        if self.data_format not in ["channels_last", "channels_first"]:
            raise NotImplementedError
        self.normalized_shape = (normalized_shape, )

    def forward(self, x):
        if self.data_format == "channels_last":
            return F.layer_norm(x, self.normalized_shape, self.weight, self.bias, self.eps)
        elif self.data_format == "channels_first":
            u = x.mean(1, keepdim=True)
            s = (x - u).pow(2).mean(1, keepdim=True)
            x = (x - u) / torch.sqrt(s + self.eps)
            x = self.weight[:, None, None] * x + self.bias[:, None, None]

            return x

In [None]:
def convnext_tiny(pretrained=True,in_22k=False, **kwargs):
    model = ConvNeXt(depths=[3, 3, 9, 3], dims=[96, 192, 384, 768], **kwargs)
    if pretrained:
        #checkpoint = torch.load(kwargs['checkpoint'], map_location="cpu")
        url = model_urls['convnext_tiny_22k'] if in_22k else model_urls['convnext_tiny_1k']
        checkpoint = torch.hub.load_state_dict_from_url(url=url, map_location="cpu", check_hash=True)
        model_dict = model.state_dict()
        pretrained_dict = {}
        unmatched_pretrained_dict = {}
        for k, v in checkpoint['model'].items():
            if k in model_dict:
                pretrained_dict[k] = v
            else:
                unmatched_pretrained_dict[k] = v
        model_dict.update(pretrained_dict)
        model.load_state_dict(model_dict)
        

        print(f'The type of checkpoint is {type(checkpoint)}.')
        print(f'The type of model.state_dict is {type(model.state_dict)}.')
        print(f'The type of model.state_dict() is {type(model.state_dict())}.')


        
        for name,param in model.named_parameters():
          if name in pretrained_dict.keys():
              param.requires_grad = True
          else :
              param.requires_grad = True


        print('\n')

        print(f'The keys in pretrained_dict are : \n {pretrained_dict.keys()}')

        print('\n')

        print(f'The keys in unmatched_pretrained_dict are : \n {unmatched_pretrained_dict.keys()}')

        print('\n')

        print(
            'Successfully loaded pretrained %d paras, and %d paras are unmatched.'
            %(len(pretrained_dict.keys()), len(unmatched_pretrained_dict.keys())))

        print('\n')

        print('Unmatched pretrained paras are:', unmatched_pretrained_dict.keys())
        
    return model

---

# Dataset Definition

In [None]:
# Define a custom dataset class for loading texture and ambient data
class TextureDataset(Dataset):
    def __init__(self, texture_paths, ambient_paths, transform=None):
        """
        Args:
            texture_paths (list): List of paths to texture images.
            ambient_paths (list): List of paths to ambient images.
            transform (callable, optional): Optional transform to be applied
                on both input and target images.
        """
        self.texture_paths = texture_paths
        self.ambient_paths = ambient_paths
        self.transform = transform

    def __len__(self):
        return len(self.texture_paths)

    def __getitem__(self, idx):
        # Load images
        texture = Image.open(self.texture_paths[idx]).convert("RGB")
        ambient = Image.open(self.ambient_paths[idx]).convert("L")

        # Apply transforms if defined
        if self.transform:
            texture = self.transform(texture)
            ambient = self.transform(ambient)


        return {"input": texture, "target": ambient}

---

# Extract Texture and Ambient Paths from CSV Files

In [None]:
import os
import pandas as pd
# Specify the directory containing your CSV files
csv_directory = "/kaggle/input/texture-allpathds"  # Replace with your directory path

# Initialize lists to store paths
texture_paths = []
ambient_paths = []

# Iterate over all CSV files in the directory
for file_name in os.listdir(csv_directory):
    if file_name.endswith(".csv"):  # Check if the file is a CSV
        file_path = os.path.join(csv_directory, file_name)
        
        # Read the CSV file
        df = pd.read_csv(file_path)
        
        df["Ambient"] = df["Depth"].apply(lambda x: x.replace("/sa_", "/AOsa_"))

        
        # Function to replace the path
        def replace_path(path):
            # Extract the file name
            filename = os.path.basename(path)
            
            # Construct the new path
            new_path = f"/kaggle/input/ambinet-occlusion/{filename}"
            return new_path
        
        # Apply the function to the column
        df["Ambient"] = df["Ambient"].apply(replace_path)

        # Check if the paths exist
        df = df[df["Ambient"].apply(os.path.exists)]


        # Extract columns and append to the lists
        if "Texture" in df.columns and "Ambient" in df.columns:
            texture_paths.extend(df["Texture"].dropna().tolist())

            
            ambient_paths.extend(df["Ambient"].dropna().tolist())
        else:
            print(f"Warning: File {file_name} does not contain 'texture' column.")

---

# Dataset Preparation

In [None]:
# Split data into training and testing sets
train_textures, test_textures, train_ambient, test_ambient = train_test_split(
    texture_paths, ambient_paths, test_size=0.01, random_state=2
)

# Define transformations for the dataset
transform = transforms.Compose([
    transforms.Resize((600, 600)),  # Resize images
    transforms.ToTensor(),          # Convert images to tensors
])

# Create training and testing datasets
train_dataset = TextureDataset(train_textures, train_ambient, transform=transform)
test_dataset = TextureDataset(test_textures, test_ambient, transform=transform)

# Create data loaders
train_dataloader = DataLoader(train_dataset, batch_size=2, shuffle=True, num_workers=4, pin_memory=True)
test_dataloader = DataLoader(test_dataset, batch_size=2, shuffle=False)

---

# Model Instantiation

In [None]:
model = convnext_tiny(True, in_22k=False).cuda()

---

# Training Configuration

In [None]:
# Set optimizer
optimizer = optim.AdamW(filter(lambda p: p.requires_grad, model.parameters()), lr=1e-4)

---

# Combined Loss Function

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from skimage.metrics import structural_similarity as ssim

class CombinedLoss(nn.Module):
    def __init__(self, ssim_weight=0.5, l1_weight=0.25, grad_weight=0.25):
        super(CombinedLoss, self).__init__()
        self.ssim_weight = ssim_weight
        self.l1_weight = l1_weight
        self.grad_weight = grad_weight

    def forward(self, predicted, target):
        # Ensure shapes match
        if predicted.shape != target.shape:
            raise ValueError(f"Shape mismatch: predicted {predicted.shape}, target {target.shape}")
        
        # Calculate individual losses
        ssim_loss = self.ssim(predicted, target)
        l1_loss = self.l1(predicted, target)
        grad_loss = self.gradient_loss(predicted, target)
        
        # Combine the losses with the specified weights
        total_loss = self.ssim_weight * ssim_loss + self.l1_weight * l1_loss + self.grad_weight * grad_loss
        return total_loss

    def ssim(self, predicted, target):
        """
        Calculate the Structural Similarity Index (SSIM) between predicted and target for the whole batch.
        """
        predicted_np = predicted.cpu().detach().numpy()
        target_np = target.cpu().detach().numpy()

        batch_size = predicted.shape[0]
        ssim_loss = 0
        
        for i in range(batch_size):
            predicted_image = np.clip(predicted_np[i, 0, :, :], 0, 1)
            target_image = np.clip(target_np[i, 0, :, :], 0, 1)
            try:
                ssim_value = ssim(predicted_image, target_image, data_range=1.0, win_size=7)
                ssim_loss += (1 - ssim_value)
            except ValueError:
                ssim_loss += 1
        
        ssim_loss /= batch_size
        return ssim_loss

    def l1(self, predicted, target):
        """Calculate the L1 Loss between predicted and target."""
        return F.l1_loss(predicted, target)
        
    def gradient_loss(self, predicted, target):
        """Calculate the gradient loss."""
        grad_pred_x = predicted[:, :, 1:, :] - predicted[:, :, :-1, :]
        grad_pred_y = predicted[:, :, :, 1:] - predicted[:, :, :, :-1]

        grad_target_x = target[:, :, 1:, :] - target[:, :, :-1, :]
        grad_target_y = target[:, :, :, 1:] - target[:, :, :, :-1]

        grad_loss_x = F.l1_loss(grad_pred_x, grad_target_x)
        grad_loss_y = F.l1_loss(grad_pred_y, grad_target_y)

        return grad_loss_x + grad_loss_y


# Initialize Model with Proper Weight Initialization
def initialize_weights(model):
    """
    Applies Xavier initialization for weights and zeroes for biases to ensure 
    proper gradient flow in the network.
    """
    for module in model.modules():
        if isinstance(module, nn.Conv2d) or isinstance(module, nn.Linear):
            nn.init.xavier_uniform_(module.weight)
            if module.bias is not None:
                nn.init.zeros_(module.bias)

In [None]:
# Instantiate the CombinedLoss
criterion = CombinedLoss()

---

# Logging into Weights & Biases (W&B) Using a User Secret API Key

In [None]:
# Define a generic user secret

user_secrets = UserSecretsClient()
secret_value_0 = user_secrets.get_secret("WANDB_API_KEY")
 
#Login to W&B using the retrieved API key
wandb.login(key=secret_value_0)

---

# Saving and Uploading a PyTorch Model to the Hugging Face Hub

In [None]:
# Set the Hugging Face token from environment variables (ensure it's set in your Kaggle environment)
hf_token = user_secrets.get_secret("HF_TOKEN")


# Log in using the token
login(token=hf_token)

repo_name = "ConvNeXt_ambient_occlusion_model_1"
create_repo(repo_name, exist_ok=True)

def save_to_huggingface(model):
    # Save the model to a .pth file
    save_path = "ConvNeXt_ambient_occlusion_model_1.pth"
    torch.save(model.state_dict(), save_path)
    print(f"Model saved locally to {save_path}")

    upload_file(
        path_or_fileobj=save_path,
        path_in_repo=save_path,
        repo_id=f"prakanda/{repo_name}",  # Replace with your Hugging Face username
        token=hf_token  # Using the token from environment variable
    )
    print(f"Model uploaded to Hugging Face Hub: https://huggingface.co/prakanda/{repo_name}")

---

# Training Loop

In [None]:
device0="cuda:0"
num_epochs = 10
wandb.init(project="ambient-occlusion-ConvNeXt", config={"epochs": 10, "batch_size": 2, "learning_rate": 1e-4})


# Move the model to the GPU before the training loop
model.to(device0) 

max_grad_norm = 1.0  # Set the gradient clipping norm


for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0

    for batch_idx, batch in enumerate(train_dataloader):
        # Move inputs, targets, and camera_intrinsic to the device
        inputs = batch["input"].to(device0, non_blocking=True)
        targets = batch["target"].to(device0, non_blocking=True)

        # Zero gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = model(inputs)

        predicted_ambient = outputs[-1]

        # Use the criterion for ambient loss calculation
        loss = criterion(predicted_ambient, targets)

        # Backward pass and optimization
        loss.backward()

        # Gradient clipping
        nn.utils.clip_grad_norm_(model.parameters(), max_grad_norm)
        
        optimizer.step()

        running_loss += loss.item()

        # Log progress every 10 batches
        if batch_idx % 10 == 0:
            print(f"Epoch {epoch + 1}, Batch {batch_idx}, Loss: {loss.item():.4f}")

        wandb.log({"epoch": epoch + 1, "batch_loss": loss.item()})

        # Save the model every 1000 batches
        if batch_idx != 0 and batch_idx % 1000 == 0:
            print("Saving model")
            save_to_huggingface(model)

    # Log metrics to W&B
    average_loss_per_epoch = running_loss / len(train_dataloader)
    wandb.log({"epoch": epoch + 1, "average_loss_per_epoch": average_loss_per_epoch})

    print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {average_loss_per_epoch:.4f}")

print("Training completed!")

---

# Loading Model from Hugging Face

In [None]:
# Set the Hugging Face token from environment variables (ensure it's set in your Kaggle environment)
hf_token = user_secrets.get_secret("HF_TOKEN")


# Log in using the token
login(token=hf_token)

In [None]:
# Download the model file from Hugging Face Hub
repo_name = "ConvNeXt_ambient_occlusion_model_1"
downloaded_file = hf_hub_download(
    repo_id=f"prakanda/{repo_name}",  # Replace with your Hugging Face username
    filename="ConvNeXt_ambient_occlusion_model_1.pth"
)
print(f"Model downloaded from Hugging Face Hub: {downloaded_file}")



# Initialize the model and load the state_dict

model.load_state_dict(torch.load(downloaded_file),strict=False)

---

# Evaluation Loop

In [None]:
device0="cuda:0"
wandb.init(project="ambient-occlusion-ConvNeXt", config={"epochs": 1, "batch_size": 2, "learning_rate": 1e-4})

def evaluate(model, test_dataloader, criterion):
    model.eval()
    running_loss = 0.0  # Accumulate loss over all batches
    total_samples = 0   # Track the number of processed samples

    with torch.no_grad():
        for batch_idx, batch in enumerate(test_dataloader):
            # Move inputs and targets to the device
            inputs = batch["input"].to(device0, non_blocking=True)
            targets = batch["target"].to(device0, non_blocking=True)

            # Forward pass
            outputs = model(inputs)
            predicted_ambient= outputs[-1]

            # Calculate loss
            loss = criterion(predicted_ambient, targets)

            # Accumulate running loss and sample count
            running_loss += loss.item() * inputs.size(0)  # Weighted by batch size
            total_samples += inputs.size(0)

        # Calculate average loss over the dataset
        avg_loss = running_loss / total_samples
        return avg_loss

# Perform evaluation on the test set
test_loss = evaluate(model, test_dataloader, criterion)
print(f"Test Loss: {test_loss:.4f}")

# Log the average test loss to W&B
wandb.log({"average_test_loss": test_loss})

# Finish W&B run
wandb.finish()

print("Evaluation completed!")