##Model 1

edit the data directory in block 4

run code blocks in order to train model1

In [None]:
import timm
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torchvision
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import torchvision.transforms.v2 as transforms
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
from sklearn.metrics import r2_score

import matplotlib.pyplot as plt

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(device)

In [None]:
output_col = ['X4_mean', 'X11_mean', 'X18_mean', 'X26_mean', 'X50_mean', 'X3112_mean']
output_col_test = ['X4', 'X11', 'X18', 'X26', 'X50', 'X3112']

In [None]:
#load data
datapath = '/kaggle/input/cs-480-2024-spring/data/' # edit to directory with data
train = pd.read_csv(datapath + 'train.csv')
train['path'] = [datapath + 'train_images/'+str(int(e))+'.jpeg' for e in train['id'].values]

test = pd.read_csv(datapath + 'test.csv')
test['path'] = [datapath + 'test_images/'+str(int(e))+'.jpeg' for e in test['id'].values]

input_col = test.columns.values[1:-1]

train, val = train_test_split(train, test_size=4000, shuffle=True)

In [None]:
n_epochs = 6
n_batch_size = 24
n_display_step = 10
n_learning_rate = 0.0001

In [None]:
# Outlier handling

for column in output_col:
    lower_quantile = train[column].quantile(0.001)
    upper_quantile = train[column].quantile(0.999)
    train_f = train[(train[column] > lower_quantile) & (train[column] < upper_quantile)]
    val_f = val[(val[column] > lower_quantile) & (val[column] < upper_quantile)]


print(train_f.shape)
print(val_f.shape)

log_feat = ['X11_mean', 'X18_mean', 'X26_mean', 'X50_mean', 'X3112_mean']

def norm(y, df):
    for i, target in enumerate(output_col):
        col = df[target]
        if target in log_feat:
            col = np.log10(col)
        shift[i] = np.mean(col)
        col = col - shift[i]
        scale[i] = np.std(col)
        col = col / scale[i]
        y[:,i] = col
    return y

def denormalize(output, target):
    # Scale Back
    output = (output * scale) + shift
    # Log Scale
    output = torch.where(torch.tensor(np.isin(output_col, log_feat)).to(device), 10**output, output)
    target = (target * scale) + shift
    target = torch.where(torch.tensor(np.isin(output_col, log_feat)).to(device), 10**target, target)
    return output, target

shift = np.zeros(6)
scale = np.zeros(6)

# Normalize the target columns
y_train_norm = norm(np.zeros_like(train_f[output_col]), train_f)
y_val_norm = norm(np.zeros_like(val_f[output_col]), val_f)

scale = torch.from_numpy(scale).to(device)
shift = torch.from_numpy(shift).to(device)

In [None]:
# scale
scaler = StandardScaler()
x_train = scaler.fit_transform(train_f[input_col].values.astype(np.float32))
# Transform val/test features using scaler fitted on train data
x_val = scaler.transform(val_f[input_col].values.astype(np.float32))
x_test = scaler.transform(test[input_col].values.astype(np.float32))
# Convert Features to Torch Tensors
x_train = torch.tensor(x_train)
x_val = torch.tensor(x_val)
x_test = torch.tensor(x_test)


In [None]:
test_transform = transforms.Compose(
    [
        transforms.Resize((288, 288)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
    ]
)
# For training, we add some augmentation. Networks are too powerful and would overfit.
train_transform = transforms.Compose(
    [
        transforms.Resize((288, 288)),
        transforms.RandomHorizontalFlip(),
        transforms.ColorJitter((0.9,1.1), (0.9,1.1)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
    ]
)

In [None]:
class MyDataset(Dataset):
    def __init__(self, data_im, data_num, data_out, transform=None, mode='m1train'):
        self.data_im = data_im
        self.data_num = data_num
        self.data_out = data_out
        self.transform = transform
        self.mode = mode

    def __getitem__(self, index):
        if 'm2' in self.mode:
            x = self.transform(Image.open(self.data_im[index]))
        else:
            x = (self.transform(Image.open(self.data_im[index])).float(), self.data_num[index].float())
        if 'test' in self.mode:
            y = self.data_out[index]
        else:
            y = torch.from_numpy(self.data_out[index]).float()
        return x, y

    def __len__(self):
        return len(self.data_im)

In [None]:
train_dataset = MyDataset(train_f['path'].values, x_train, y_train_norm, train_transform)
val_dataset = MyDataset(val_f['path'].values, x_val, y_val_norm, test_transform)
test_dataset = MyDataset(test['path'].values, x_test, test['id'].values, test_transform, 'm1test')

train_dataloader = DataLoader(train_dataset, batch_size=n_batch_size, drop_last=True)

val_dataloader = DataLoader(val_dataset, batch_size=n_batch_size, drop_last=False)


In [None]:
class Model(nn.Module):
    def __init__(self):
        super().__init__()

        # Backbone
        self.effvit = timm.create_model(
                'efficientvit_b1.r288_in1k',
                pretrained=True,
                num_classes=0
            )
        self.effvit.head = self.effvit.head.global_pool

        # Features
        self.nn = nn.Sequential(
            nn.Linear(163,256),
            nn.GELU(),
            nn.Linear(256,256),
        )

        # Label
        self.combnn = nn.Sequential(
            nn.Linear(512, 256),
            nn.GELU(),
            nn.Linear(256,256),
            nn.GELU(),
            nn.Linear(256,6, bias=False)
        )

        # Initialize Weights
        self.initialize_weights()

    def initialize_weights(self):
        nn.init.kaiming_uniform_(self.nn[0].weight)
        nn.init.zeros_(self.combnn[0].weight)

    def forward(self, input):
        x1 = self.effvit(input[0])
        x2 = self.nn(input[1])
        x = torch.cat((x1, x2), dim=1)
        x = self.combnn(x)
        return x
model1 = Model()
model1.to(device)

In [None]:
def r2loss(output, target):
    target_mean = torch.mean(target, dim=0)
    ss_tot = (target - target_mean) ** 2
    ss_res = (target - output) ** 2
    loss = torch.sum(ss_res, dim=0) / torch.maximum(torch.sum(ss_tot, dim=0), torch.tensor([0.000001]).to(device))
    return torch.mean(loss)

In [None]:
# Get the learning rate scheduler
def lr_sc(optimizer):
    return torch.optim.lr_scheduler.OneCycleLR(
        optimizer=optimizer,
        max_lr=1e-4,
        total_steps=len(train_dataset)//n_batch_size * n_epochs + 1,
        pct_start=0.10,
        anneal_strategy='cos',
        div_factor=1e3,
        final_div_factor=1e4,
    )

In [None]:
optimizer = optim.AdamW(model1.parameters(), lr=n_learning_rate, weight_decay=0.01)
criterion = r2loss
lrsc = lr_sc(optimizer)

In [None]:
def get_r2(model, loader, mode):
    model.eval()
    n_samples = 0
    n_correct = 0
    res = []
    tar = []

    with torch.no_grad():
        test_loss = []
        for step, (x, target) in enumerate(loader):
            x = (x[0].to(device), x[1].to(device))
            target = target.to(device)
            output = model(x)
            output, target = denormalize(output, target)
            res.extend(output.detach().cpu())
            tar.extend(target.detach().cpu())
            loss = criterion(output, target)
            if step % n_display_step == 0:
                print('\rloss: ', loss.item(), end='')
    tar = np.array(tar)
    res = np.array(res)
    print([r2_score(tar[:,i],res[:,i]) for i in range(6)])
    return [r2_score(tar[:,i],res[:,i]) for i in range(6)]

# function to train the net
def train_model(model, criterion, optimizer, epochs, train_loader, val_loader):
    for epoch in range(epochs):
        model.train()
        for step, (x, target) in enumerate(train_loader):
            x = (x[0].to(device), x[1].to(device))
            target = target.to(device)
            output = model(x)
            output, target = denormalize(output, target)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()
            lrsc.step()

            if step % n_display_step == 0:
                print("\rEpoch {:2d} Step {:4d} Loss {:.4f}".format(epoch, step, loss.item()), f'{lrsc.get_last_lr()[0]:.2e}', end='')
        val_acc = get_r2(model, val_loader, 'test')
        print("Epoch {:2d} Loss {:.4f} Accuracy (Train | Test) {:.4f} {:.4f}".format(epoch, loss.item(), val_acc))

train_model(model1, criterion, optimizer, n_epochs, train_dataloader, val_dataloader)

In [None]:
# torch.save(model1.state_dict(), 'model1.pth')

In [None]:
model1.eval()
test_output=[]
with torch.no_grad():
    for step, (x, target_id) in enumerate(test_dataset):
        x = (x[0].unsqueeze(0).to(device), x[1].unsqueeze(0).to(device))
        output = model1(x).squeeze(0)
        output, _ = denormalize(output, output)
        test_output.append(np.append(np.array(target_id),output.cpu()))

df = pd.DataFrame(test_output, columns=['id']+output_col_test)
df['id'] = df['id'].astype(int)
df.to_csv('submissionm1.csv', index=False)