In [1]:
import numpy as np 
import pandas as pd 
import albumentations as A
from albumentations.pytorch import ToTensorV2
import cv2 
import timm
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

  data = fetch_version_info()


In [2]:
test_set_path = "/kaggle/input/csiro-biomass/test.csv"

class Config:
    def __init__(self):
        
        self.num_header = 3
        self.model_name = "vit_base_patch14_dinov2"
        self.img_size = 518

        self.lr = 1e-4
        self.loss_weights = {'total_loss' : 0.5, 'gdm_loss': 0.2, 'green_loss':0.1}
        self.scoring_weights = [0.5, 0.2, 0.1, 0.1, 0.1]

        self.train_path = "/kaggle/input/csiro-biomass/train.csv"
        self.parent_image_path = "/kaggle/input/csiro-biomass/"

        self.n_folds = 5
        self.random_state = 42

        self.batch_size = 4
        self.num_workers = 2
        self.n_epochs = 25


class VisionDataTransformer:
    def __init__(self):
        self.img_size = Config().img_size
        return None

    def get_left_right_input(self,img):

        if img is None:
            raise ValueError("img error")
        
        h, w = img.shape[:2]
        
        mid = w // 2
        
        img_left = img[:, :mid]      
        img_right = img[:, mid:]

        return img_left, img_right
    
    def data_reshape_only_pipeline(self):
        
        transform = A.Compose([A.Resize(self.img_size, self.img_size),A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), ToTensorV2()])
        
        return transform
        
    def reshape(self, img, part):
        assert part == "right" or part == "left"
        
        pipeline = self.data_reshape_only_pipeline()
        
        if part == "right":
            img_cut = self.get_left_right_input(img)[1]
        elif part == 'left':
            img_cut = self.get_left_right_input(img)[0]
        
        transformed = pipeline(image = img_cut)
        transformed_image = transformed["image"]

        return transformed_image

class BiomassDataset:
    
    def __init__(self, labels):

        self.labels = labels
        self.vision_transformer = VisionDataTransformer()

    def __len__(self):
        return len(self.labels)

    def __getitem__(self,idx):
        
        img_path = self.labels['image_path'].iloc[idx]
        img = cv2.imread(img_path)

        image_right = self.vision_transformer.reshape(img, 'right')
        image_left = self.vision_transformer.reshape(img, 'left')
        
        return image_right, image_left

# class CsiroModel(nn.Module):

#     def __init__(self, config):

#         super().__init__()
#         self.config = config
#         self.backbone_model = timm.create_model(config.model_name, pretrained=False, num_classes=0)

#         self.n_features = self.backbone_model.num_features
#         self.n_combined = self.n_features * 2 

#         self.head_total = self.create_head()
#         self.head_gdm = self.create_head()
#         self.head_green = self.create_head()
        
#     def create_head(self):
        
#         head = nn.Sequential(nn.Linear(self.n_combined, self.n_combined // 2),
#                             nn.ReLU(),
#                             nn.Dropout(0.3),
#                             nn.Linear(self.n_combined // 2 , 1)
#                 )
#         return head

#     def forward(self, img_right, img_left):

#         right_embedd = self.backbone_model(img_right)
#         left_embedd = self.backbone_model(img_left)

#         combined_embedd = torch.concat([right_embedd, left_embedd], dim = 1)

#         out_total = self.head_total(combined_embedd)
#         out_gdm = self.head_gdm(combined_embedd)
#         out_green = self.head_green(combined_embedd)
        
#         return out_total, out_gdm, out_green


class LocalMambaBlock(nn.Module):
    """
    Lightweight Mamba-style block (Gated CNN) from the reference notebook.
    Efficiently mixes tokens with linear complexity.
    """
    def __init__(self, dim, kernel_size=5, dropout=0.0):
        super().__init__()
        self.norm = nn.LayerNorm(dim)
        # Depthwise conv mixes spatial information locally
        self.dwconv = nn.Conv1d(dim, dim, kernel_size=kernel_size, padding=kernel_size // 2, groups=dim)
        self.gate = nn.Linear(dim, dim)
        self.proj = nn.Linear(dim, dim)
        self.drop = nn.Dropout(dropout)

    def forward(self, x):
        # x: (Batch, Tokens, Dim)
        shortcut = x
        x = self.norm(x)
        # Gating mechanism
        g = torch.sigmoid(self.gate(x))
        x = x * g
        # Spatial mixing via 1D Conv (requires transpose)
        x = x.transpose(1, 2)  # -> (B, D, N)
        x = self.dwconv(x)
        x = x.transpose(1, 2)  # -> (B, N, D)
        # Projection
        x = self.proj(x)
        x = self.drop(x)
        return shortcut + x



class CsiroModel(nn.Module):

    def __init__(self, config):

        super().__init__()
        self.config = config
        self.backbone_model = timm.create_model(config.model_name, pretrained=False, num_classes=0, global_pool='')

        self.n_features = self.backbone_model.num_features
        self.n_combined = self.n_features * 2 

    

        self.fusion = nn.Sequential(
            LocalMambaBlock(self.n_features, kernel_size=5, dropout=0.1),
            LocalMambaBlock(self.n_features, kernel_size=5, dropout=0.1)
        )

        self.pool = nn.AdaptiveAvgPool1d(1)

        self.head_total = self.create_head()
        self.head_gdm = self.create_head()
        self.head_green = self.create_head()
        
    def create_head(self):

        head = nn.Sequential(nn.Linear(self.n_features, self.n_features//2),
                            #nn.LayerNorm(self.n_combined//2),
                            nn.GELU(),
                            nn.Dropout(0.2),
                            nn.Linear(self.n_features//2 , 1),
                            nn.Softplus()
                )
        return head

    def forward(self, img_right, img_left):

        right_embedd = self.backbone_model(img_right)
        left_embedd = self.backbone_model(img_left)

        combined_embedd = torch.concat([right_embedd, left_embedd], dim = 1)

        x_fused = self.fusion(combined_embedd)
        x_pool = self.pool(x_fused.transpose(1, 2)).flatten(1)

        out_total = self.head_total(x_pool)
        out_gdm = self.head_gdm(x_pool)
        out_green = self.head_green(x_pool)
        
        return out_total, out_gdm, out_green



def data_preparator(path):
    
        parent_path = "/kaggle/input/csiro-biomass/"
        df = pd.read_csv(path)

        df_pivoted = df.pivot(index=["image_path"], columns="target_name", values="target_name").reset_index()
        df_pivoted['image_path'] = parent_path + df_pivoted['image_path']

        test_dataset = BiomassDataset(df_pivoted)
        test_loader = DataLoader(
            test_dataset,
            batch_size= 4,
            shuffle=False,
            num_workers=1,
            pin_memory=True
        )

        return test_loader

def load_models():
    config = Config()
    models = []
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    for i in range(4):
        model = CsiroModel(config)
        state = torch.load(f"/kaggle/input/csiro-10/pytorch/default/1/best_model_fold_{i}.pth", weights_only=True)
        model.load_state_dict(state)
        model.eval()
        model.to(device)
        models.append(model)
        
    return models

def predict_with_tta(model, img_right, img_left):
        # images shape: (B, C, H, W)
        preds_total = []
        preds_gdm = []
        preds_green = []
        
        # 1. Original
        out_total, out_gdm, out_green = model(img_right, img_left)
        preds_total.append(out_total)
        preds_gdm.append(out_gdm)
        preds_green.append(out_green)
        
        
        # 2. Horizontal Flip
        out_h_total, out_h_gdm, out_h_green = model(torch.flip(img_right, [3]), torch.flip(img_left, [3]))
        preds_total.append(out_h_total)
        preds_gdm.append(out_h_gdm)
        preds_green.append(out_h_green)
        
        # 3. Vertical Flip
        out_v_total, out_v_gdm, out_v_green  = model(torch.flip(img_right, [2]), torch.flip(img_left, [2]))
        preds_total.append(out_v_total)
        preds_gdm.append(out_v_gdm)
        preds_green.append(out_v_green)
        
        
        final_pred_total = torch.stack(preds_total).mean(dim=0)
        final_pred_gdm = torch.stack(preds_gdm).mean(dim=0)
        final_pred_green = torch.stack(preds_green).mean(dim=0)
        
        return final_pred_total, final_pred_gdm, final_pred_green







def infer(tta = False):
    
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    models = load_models()
    test_loader = data_preparator(test_set_path)

    
    predictions = {'model_1_preds': [],'model_2_preds': [], 'model_3_preds': [], 'model_4_preds': []}
    final_predictions = []
    
    with torch.no_grad():
        for img_right, img_left in test_loader:
            img_left = img_left.to(device)
            img_right = img_right.to(device)

            model_number = 1
            
            for model in models:

                if tta:
                    pred_total, pred_gdm, pred_green = predict_with_tta(model, img_right, img_left)
                
                else:
                
                    pred_total, pred_gdm, pred_green = model(img_right, img_left)
                
                pred_total = np.maximum(0,pred_total.cpu().numpy())
                pred_gdm = np.maximum(0,pred_gdm.cpu().numpy())
                pred_green = np.maximum( 0, pred_green.cpu().numpy())
                
                pred_clover = np.maximum(0,pred_gdm - pred_green)
                pred_dead = np.maximum(0, pred_total - pred_gdm)

                predictions[f'model_{model_number}_preds'].append(np.array([pred_clover, pred_dead, pred_green, pred_total, pred_gdm]).T)

                model_number += 1


            model_number = 0

    
    predictions = {
            k: np.concatenate(v, axis = 1).flatten() 
            for k, v in predictions.items()
        }
    
    predictions = pd.DataFrame.from_dict(predictions)

    #predictions = predictions['model_1_preds']
    
    predictions = np.average(predictions, axis=1)

    submission = pd.read_csv(test_set_path)
    submission['target'] = predictions

    submission = submission[['sample_id','target']]

    submission.to_csv('submission.csv', index = False)
    
    return submission

                
            
        

    
        

        



In [3]:
predictions = infer(tta = True)

In [4]:
np.mean([0.6766994394646929, 0.8166043147154585, 0.7340455502715365, 0.6514164163460323])

0.7196914301994299