In [1]:
import numpy as np
import pandas as pd
import random
import math
import os
from tqdm import tqdm
from PIL import Image
import albumentations as A
from albumentations.pytorch import ToTensorV2

import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
import timm
os.environ["CUDA_VISIBLE_DEVICES"] = "1"

  from .autonotebook import tqdm as notebook_tqdm


In [None]:
class CONFIG:
    is_debug = False
    seed = 308
    n_folds = 5

    test_csv = "CSIRO/CSIRO_my_5fold_train_csv.csv"
    test_img_path = "CSIRO/train" # (1000, 2000)
    n_workers = os.cpu_count() // 2

    test_batch_size = 16

    model_path = "CSIRO/output/2025-12-27_00:55:16_vit_base_patch16_dinov3.lvd1689m_output"
    model_name = "vit_base_patch16_dinov3.lvd1689m"
    if "dinov2" in model_name:
        img_size = [518, 1036]
    elif "eva02" in model_name:
        img_size = [448, 896]
    else:
        img_size = [512, 1024]
    """
    tf_efficientnet_b0.ns_jft_in1k
    edgenext_base.in21k_ft_in1k
    convnextv2_tiny.fcmae_ft_in22k_in1k
    vit_base_patch14_dinov2.lvd142m
    vit_base_patch16_dinov3.lvd1689m
    """

    head_out = 5
    DataParallel = False
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")


In [3]:
def set_seed(seed=308):
    random.seed(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    
set_seed(CONFIG.seed)

In [14]:
test_all = pd.read_csv(CONFIG.test_csv)
id_and_fold = {}
for i in range(len(test_all)):
    row = test_all.iloc[i, :]
    _id = row.sample_id.split("_")[0]
    _fold = row.fold.item()
    if _id not in id_and_fold.keys():
        id_and_fold[_id] = _fold

test = pd.DataFrame(list(id_and_fold.items()), columns=['sample_id', 'fold'])
test

Unnamed: 0,sample_id,fold
0,ID1011485656,0
1,ID1012260530,1
2,ID1025234388,2
3,ID1028611175,3
4,ID1035947949,4
...,...,...
352,ID975115267,3
353,ID978026131,0
354,ID980538882,4
355,ID980878870,2


In [5]:
def Calculate_Weighted_R2(y_true, y_pred):
    """
    计算 Kaggle CSIRO Image2Biomass 比赛的加权 R2 分数。
    
    参数:
    y_true: 真实值，形状为 [n_samples, 5]
    y_pred: 预测值，形状为 [n_samples, 5]
    
    列顺序假设:
    0: Dry_Clover_g (w=0.1)
    1: Dry_Dead_g   (w=0.1)
    2: Dry_Green_g  (w=0.1)
    3: Dry_Total_g  (w=0.5)
    4: GDM_g        (w=0.2)
    """
    
    # 1. 定义权重向量
    weights = np.array([0.1, 0.1, 0.1, 0.5, 0.2])
    
    # 2. 确保输入是 numpy 数组
    y_true = np.array(y_true)
    y_pred = np.array(y_pred)
    
    # 3. 计算全局加权均值 (Global Weighted Mean)
    # 这里的 sum(weights) = 1.0，所以分母实际上就是 样本数 * 1.0
    # 我们利用广播机制将权重应用到每一行
    weighted_sum = np.sum(y_true * weights) 
    total_weight = np.sum(weights) * y_true.shape[0] # weights总和 * 样本数
    y_bar_w = weighted_sum / total_weight
    
    # 4. 计算残差平方和 (SS_res)
    # 公式: sum( w_j * (y_j - y_hat_j)^2 )
    ss_res = np.sum(weights * (y_true - y_pred)**2)
    
    # 5. 计算总离差平方和 (SS_tot)
    # 公式: sum( w_j * (y_j - y_bar_w)^2 )
    # 注意这里减去的是全局加权均值 y_bar_w
    ss_tot = np.sum(weights * (y_true - y_bar_w)**2)
    
    # 6. 计算 R2
    # 避免分母为0的极个别情况
    if ss_tot == 0:
        return 0.0
        
    r2 = 1 - (ss_res / ss_tot)
    
    return r2

# --- 测试用例 ---
# 模拟数据
dummy_true = np.array([
    [5, 16, 36, 54, 42],
    [3, 8, 10, 18, 13]
])
# 假设预测非常接近
dummy_pred = dummy_true + 0.5 

score = Calculate_Weighted_R2(dummy_true, dummy_pred)
print(f"验证集得分: {score:.5f}")

# 使用示例
# 模拟验证集数据
n_valid_samples = 16
# 随机生成验证集真实值和预测值
y_valid_true = np.random.rand(n_valid_samples, 5)
y_valid_pred = np.random.rand(n_valid_samples, 5)
# 计算分数
score = Calculate_Weighted_R2(y_valid_true, y_valid_pred).item()
print(f"score: {score:.5f}")

验证集得分: 0.99926
score: -0.57594


In [6]:
# 1. 获取配置对象
cfg = timm.get_pretrained_cfg(CONFIG.model_name)

# 2. 【核心修复】先转成字典 (.to_dict()) 再传入
# 这样 resolve_data_config 就能正常使用 .get() 方法了
cfg_dict = cfg.to_dict()
data_config = timm.data.resolve_data_config(pretrained_cfg=cfg_dict)

# 3. 提取结果
_mean = data_config['mean']
_std = data_config['std']

print(f"模型: {CONFIG.model_name}")
print(f"自动获取 Mean: {_mean}")
print(f"自动获取 Std:  {_std}")

模型: vit_base_patch16_dinov3.lvd1689m
自动获取 Mean: (0.485, 0.456, 0.406)
自动获取 Std:  (0.229, 0.224, 0.225)


In [7]:
def transform(img):
    composition = A.Compose([
        A.Resize(CONFIG.img_size[0], CONFIG.img_size[1]),
        A.Normalize(
            mean=_mean,
            std=_std
        ),
        ToTensorV2(),
    ])
    return composition(image=img)["image"]

class CSIRODataset(Dataset):
    def __init__(self, df, original_train=test_all, transform=transform):
        super().__init__()
        self.df = df
        self.original_train = original_train
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx, :]
        img_id = row.sample_id

        img_path = os.path.join(CONFIG.test_img_path, img_id + ".jpg")

        img = Image.open(img_path)
        img = np.array(img)
        if self.transform != None:
            img = self.transform(img)

        target_id = ["__Dry_Clover_g", "__Dry_Dead_g", "__Dry_Green_g", "__Dry_Total_g", "__GDM_g"]
        label = []
        for _id in target_id:
            tmp_row = self.original_train[self.original_train["sample_id"] == f"{img_id}{_id}"]["target"].item()
            label.append(tmp_row)
        label = torch.tensor(label, dtype=torch.float32)

        return img, label
    
def prepare_loaders(df, fold=0):
    df_test = df[df["fold"] == fold]
    
    test_datasets = CSIRODataset(df=df_test, transform=transform)
    
    test_loader = DataLoader(test_datasets, batch_size=CONFIG.test_batch_size, num_workers=CONFIG.n_workers, shuffle=False, pin_memory=True)
    
    return test_loader

In [8]:
class CSIROModel(nn.Module):
    def __init__(self):
        super(CSIROModel, self).__init__()
        self.backbone = timm.create_model(model_name=CONFIG.model_name, 
                                          pretrained=False)

        if "efficientnet" in CONFIG.model_name:
            in_features = self.backbone.classifier.in_features
            self.backbone.classifier = nn.Identity()
        elif "edgenext" in CONFIG.model_name:
            in_features = self.backbone.head.fc.in_features
            self.backbone.head.fc = nn.Identity()
        elif "convnext" in CONFIG.model_name:
            if "dino" in CONFIG.model_name:
                if "tiny" in CONFIG.model_name:
                    in_features = 768
                elif "base" in CONFIG.model_name:
                    in_features = 1024
            else:
                in_features = self.backbone.head.fc.in_features
                self.backbone.head.fc = nn.Identity()
        elif "vit" in CONFIG.model_name:
            if "dino" in CONFIG.model_name:
                in_features = 1536
            else:
                in_features = self.backbone.head.fc.in_features * 2
            self.backbone.head.fc = nn.Identity()
        elif "eva02" in CONFIG.model_name:
            in_features = self.backbone.head.in_features * 2
            self.backbone.head = nn.Identity()
        else:
            raise("Error model!")
        
        self.head = nn.Sequential(
            nn.Linear(in_features, in_features // 2),
            nn.LeakyReLU(),
            nn.Linear(in_features // 2, CONFIG.head_out),
        )
        
        
    def forward(self, x):
        if "vit" in CONFIG.model_name or "eva02" in CONFIG.model_name:
            mid = CONFIG.img_size[0]
            x_left = x[:, :, :, :mid]
            x_right = x[:, :, :, mid:]
            _tmp1 = self.backbone(x_left)
            _tmp2 = self.backbone(x_right)
            _tmp = torch.cat([_tmp1, _tmp2], dim=1) # shape: [B, 1536]
        else:
            _tmp = self.backbone(x)
        output = self.head(_tmp)
        return output

In [9]:
all_paths = os.listdir(CONFIG.model_path)
all_paths
paths = []

for i in range(CONFIG.n_folds):
    _tmp_paths = []
    for _tmp_path in all_paths:
        if _tmp_path.split("_")[0] == str(i+1):
            _tmp_paths.append(_tmp_path)
    best_fold_path = sorted(_tmp_paths, key=lambda x:float(x.split("_")[2]))[-1]
    print(f"Fold {i+1} best path : {best_fold_path}")
    paths.append(best_fold_path)
paths

Fold 1 best path : 1_CV_0.8875_Loss70.0591_epoch20.pth
Fold 2 best path : 2_CV_0.8195_Loss159.2138_epoch12.pth
Fold 3 best path : 3_CV_0.7754_Loss175.1131_epoch27.pth
Fold 4 best path : 4_CV_0.8683_Loss121.2981_epoch22.pth
Fold 5 best path : 5_CV_0.8117_Loss149.2619_epoch22.pth


['1_CV_0.8875_Loss70.0591_epoch20.pth',
 '2_CV_0.8195_Loss159.2138_epoch12.pth',
 '3_CV_0.7754_Loss175.1131_epoch27.pth',
 '4_CV_0.8683_Loss121.2981_epoch22.pth',
 '5_CV_0.8117_Loss149.2619_epoch22.pth']

In [10]:
models = []

if CONFIG.DataParallel:
    device_ids = [0, 1]
    for i in range(CONFIG.n_folds):
        model = CSIROModel()
        model = torch.nn.DataParallel(model, device_ids=device_ids)
        model = model.cuda()
        model.load_state_dict(torch.load(os.path.join(CONFIG.model_path, paths[i])))
        model.eval()
        models.append(model)
        print(f"Fold_{paths[i]} Load Success!")
else:
    for i in range(CONFIG.n_folds):
        model = CSIROModel()
        model = model.to(CONFIG.device)
        model.load_state_dict(torch.load(os.path.join(CONFIG.model_path, paths[i])))
        model.eval()
        models.append(model)
        print(f"Fold_{paths[i]} Load Success!")

Fold_1_CV_0.8875_Loss70.0591_epoch20.pth Load Success!
Fold_2_CV_0.8195_Loss159.2138_epoch12.pth Load Success!
Fold_3_CV_0.7754_Loss175.1131_epoch27.pth Load Success!
Fold_4_CV_0.8683_Loss121.2981_epoch22.pth Load Success!
Fold_5_CV_0.8117_Loss149.2619_epoch22.pth Load Success!


In [11]:
def Infer(model, test_loader):
    y_preds = []
    y_trues = []
    bar = tqdm(enumerate(test_loader), total=len(test_loader))
    with torch.no_grad():
        for step, (images, labels) in bar:
            batch_size = images.size(0)
            if CONFIG.DataParallel:
                images = images.cuda().float()
                labels = labels.cuda().float()
            else:
                images = images.to(CONFIG.device, dtype=torch.float)
                labels = labels.to(CONFIG.device, dtype=torch.float)
                
            output = model(images)
            y_preds.append(output.detach().cpu().numpy())
            y_trues.append(labels.detach().cpu().numpy())
            
    y_preds = np.concatenate(y_preds)
    y_trues = np.concatenate(y_trues)
    return y_preds, y_trues

In [None]:
y_preds = []
y_trues = []
for fold in range(0, CONFIG.n_folds):
    print(f"==================== infer on Fold {fold+1} ====================")
    
    test_loader = prepare_loaders(test, fold)
    y_pred, y_true = Infer(models[fold], test_loader)
    y_preds.append(y_pred)
    y_trues.append(y_true)

oof = np.concatenate(y_preds)
true = np.concatenate(y_trues)



100%|██████████| 5/5 [00:08<00:00,  1.73s/it]




100%|██████████| 5/5 [00:02<00:00,  2.02it/s]




100%|██████████| 5/5 [00:02<00:00,  2.14it/s]




100%|██████████| 5/5 [00:02<00:00,  1.94it/s]




100%|██████████| 5/5 [00:02<00:00,  1.95it/s]


In [13]:
local_cv = Calculate_Weighted_R2(true, oof)
print("Local CV : ", local_cv)

Local CV :  0.9579038769353937
