In [1]:
DEVICE="cuda"

import numpy as np
import sklearn.metrics
from PIL import Image
import torch
from torch import nn
import torch.nn.functional as F
from transformers import AutoModel, AutoImageProcessor, PreTrainedModel, PretrainedConfig

In [2]:
class ImageMultiRegressionConfig(PretrainedConfig):
    def __init__(
        self,
        output_size=3,
        init_checkpoint=None,
        **kwargs,
    ):
        self.output_size=output_size
        self.init_checkpoint=init_checkpoint
        super().__init__(**kwargs)


class ImageMultiRegressionModel(PreTrainedModel):
    config_class=ImageMultiRegressionConfig
    def __init__(self, config, loss=nn.MSELoss()):
        super().__init__(config)
        
        self.inner_model = AutoModel.from_pretrained(config.init_checkpoint)
        self.classifier = nn.Linear(self.inner_model.config.hidden_size, config.output_size)
        self.loss=loss
    
    def forward(self, pixel_values, labels=None):
        outputs = self.inner_model(pixel_values=pixel_values)
        cls_output = outputs.last_hidden_state[:, 0, :]  # image embedding
        values = self.classifier(cls_output)
        loss = None
        if labels is not None:
            loss = self.loss(values.view(-1), labels.view(-1))
        return (loss, values) if loss is not None else values

In [3]:
from datasets import load_from_disk
dataset=load_from_disk("./data/dataset/")
dataset["train"].set_format("torch")
dataset["test"].set_format("torch")

from PIL import ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True
import torchvision.transforms.v2 as transforms
import torch

_train_transform = transforms.Compose([
    transforms.ToImage(),
    transforms.ToDtype(torch.float32, scale=True),  # Normalize expects float input
    transforms.RandomRotation(degrees=20),
    transforms.RandomHorizontalFlip(),
    transforms.RandomResizedCrop(size=(256,256), scale=(.6,1.0), antialias=True),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

def train_transform(ex):
    if "image" in ex:
        ex["pixel_values"]=[_train_transform(image) for image in ex["image"]]
    return ex
    
_test_transform = transforms.Compose([
    transforms.ToImage(),
    transforms.ToDtype(torch.float32, scale=True),  # Normalize expects float input
    transforms.Resize(size=(256,256)),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

def test_transform(ex):
    if "image" in ex:
        ex["pixel_values"]=[_test_transform(image) for image in ex["image"]]
    return ex

dataset["train"].set_transform(train_transform)
dataset["test"].set_transform(test_transform)

In [4]:
#gather training data statistics

In [5]:
t1=torch.tensor(dataset["train"]["light_level"])
t2=torch.tensor(dataset["train"]["fume_strength"])
t3=torch.tensor(dataset["train"]["explosion_strength"])
ty=torch.tensor(dataset["train"]["class"])
T=torch.stack([t1,t2,t3], dim=1)

In [6]:
#gather target statistics

In [7]:
y1=torch.tensor(dataset["test"]["light_level"])
y2=torch.tensor(dataset["test"]["fume_strength"])
y3=torch.tensor(dataset["test"]["explosion_strength"])
yy=torch.tensor(dataset["test"]["class"])
Y=torch.stack([y1,y2,y3], dim=1)

In [8]:
# out of sample R score
# https://stats.stackexchange.com/questions/228540/how-to-calculate-out-of-sample-r-squared/492581#492581
# https://arxiv.org/pdf/2302.05131
def oosR(MST, MSE): 
    return 1-MSE/MST

In [9]:
null_MSE=0

In [10]:
#calculate standard metrics for regression predictors
def compute_metrics_reg(x,y):
    MSE=sklearn.metrics.mean_squared_error(x, y)
    return {"MSE":MSE,
           "MAE":sklearn.metrics.mean_absolute_error(x,y),
           "R2_test":sklearn.metrics.r2_score(x,y),
           "oosR2":0 if null_MSE==0 else oosR(null_MSE,MSE),
           }

In [11]:
#calculate standard metrics for regression predictors
def compute_metrics_class(x,y):
    report=sklearn.metrics.classification_report(x,y, output_dict=True)
    matrix=sklearn.metrics.confusion_matrix(x,y)
    return report, matrix

In [12]:
# baseline: always predict the mean

In [13]:
T_mean=T.mean(dim=0)

In [14]:
T_mean_only = T_mean.repeat(Y.shape[0],1)

In [15]:
T_mean_only.shape, Y.shape

(torch.Size([1891, 3]), torch.Size([1891, 3]))

In [16]:
T_mean_only[1]

tensor([0.9057, 0.7516, 0.3702])

In [17]:
mean_metrics = compute_metrics_reg(T_mean_only, Y); mean_metrics

{'MSE': 0.14325829, 'MAE': 0.31039932, 'R2_test': 0.0, 'oosR2': 0}

In [18]:
null_MSE=mean_metrics["MSE"]

In [19]:
# baseline: always predict the median

In [20]:
T_median=T.median(dim=0).values

In [21]:
T_median_only = T_median.repeat(Y.shape[0],1)

In [22]:
T_median_only.shape, Y.shape

(torch.Size([1891, 3]), torch.Size([1891, 3]))

In [23]:
T_median_only[1]

tensor([1., 1., 0.])

In [24]:
median_metrics = compute_metrics_reg(T_median_only, Y); median_metrics

{'MSE': 0.21269357,
 'MAE': 0.23851593,
 'R2_test': 0.0,
 'oosR2': -0.484686017036438}

In [25]:
oosR(mean_metrics['MSE'], 0.052876099944114685) #swinv2-base

0.630903729921912

In [26]:
oosR(mean_metrics['MSE'], 0.07436350733041763) #dinov2

0.4809130549607282

In [27]:
oosR(mean_metrics['MSE'], 0.051811158657073975) #beit

0.6383374449144006

In [28]:
from fastprogress import progress_bar as pb

In [29]:
DEVICE="cpu"

In [30]:
def evaluate_reg(model):
    outputs=[]
    for batch in pb(dataset["test"]):
        output=model(batch["pixel_values"][None,:,:,:].to(DEVICE)).cpu()
        outputs.append(output)
    print(torch.stack(outputs))
    return torch.stack(outputs)

In [31]:
# results: regression models

reg_model_paths=["models/swinv2-base", ]
reg_models=[ImageMultiRegressionModel.from_pretrained(model).to(DEVICE) for model in reg_model_paths]
reg_results=[evaluate_reg(model) for model in reg_models]
reg_scores=[compute_metrics_reg() for results in reg_results]


KeyboardInterrupt



In [None]:
# results: regression models translated to classification

In [None]:
# results: (older) classification models