In [3]:
pip install segmentation_models_pytorch-*.whl

In [None]:
import numpy as np
import pandas as pd 
import torch
import torch.nn as nn
import torch.optim as optim
import pickle
import os
import sys
sys.path.append('/bohr/train-4gug/v2')
from dataloader import load_data
import torch.nn.functional as F
from sklearn.model_selection import StratifiedShuffleSplit, train_test_split
from torch.utils.data import Dataset, DataLoader, Subset
import random
import torchvision.transforms.functional as TF


class MyModel(nn.Module):
    def __init__(self, in_channels=6, out_channels=2):
        super(MyModel, self).__init__()
        self.encoder1 = self.conv_block(in_channels, 32)
        self.encoder2 = self.conv_block(32, 64)
        self.encoder3 = self.conv_block(64, 128)
        self.encoder4 = self.conv_block(128, 256)
        self.pool = nn.MaxPool2d(2)

        # Bottleneck
        self.bottleneck = self.conv_block(256, 512)

        # Decoder
        self.up4 = nn.ConvTranspose2d(512, 256, kernel_size=2, stride=2)
        self.decoder4 = self.conv_block(512, 256)

        self.up3 = nn.ConvTranspose2d(256, 128, kernel_size=2, stride=2)
        self.decoder3 = self.conv_block(256, 128)

        self.up2 = nn.ConvTranspose2d(128, 64, kernel_size=2, stride=2)
        self.decoder2 = self.conv_block(128, 64)

        self.up1 = nn.ConvTranspose2d(64, 32, kernel_size=2, stride=2)
        self.decoder1 = self.conv_block(64, 32)

        self.final = nn.Conv2d(32, out_channels, kernel_size=1)

    def conv_block(self, in_ch, out_ch):
        return nn.Sequential(
            nn.Conv2d(in_ch, out_ch, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Dropout2d(p=0.2),
            nn.Conv2d(out_ch, out_ch, kernel_size=3, padding=1),
            nn.ReLU(inplace=True)
        )
    
    def forward(self, x):
        enc1 = self.encoder1(x)
        enc2 = self.encoder2(self.pool(enc1))
        enc3 = self.encoder3(self.pool(enc2))
        enc4 = self.encoder4(self.pool(enc3))

        # Bottleneck
        bottleneck = self.bottleneck(self.pool(enc4))

        # Decoder path
        dec4 = self.up4(bottleneck)
        enc4_cropped = enc4[:, :, :dec4.size(2), :dec4.size(3)]
        dec4 = self.decoder4(torch.cat([dec4, enc4_cropped], dim=1))

        dec3 = self.up3(dec4)
        enc3_cropped = enc3[:, :, :dec3.size(2), :dec3.size(3)]
        dec3 = self.decoder3(torch.cat([dec3, enc3_cropped], dim=1))

        dec2 = self.up2(dec3)
        enc2_cropped = enc2[:, :, :dec2.size(2), :dec2.size(3)]
        dec2 = self.decoder2(torch.cat([dec2, enc2_cropped], dim=1))

        dec1 = self.up1(dec2)
        enc1_cropped = enc1[:, :, :dec1.size(2), :dec1.size(3)]
        dec1 = self.decoder1(torch.cat([dec1, enc1_cropped], dim=1))

        out = self.final(dec1)
        out = F.pad(out, (0, 181 - out.shape[-1], 0, 50 - out.shape[-2]))
        return out


    
def train(model, train_loader, test_loader, optimizer, criterion, num_epochs=100):
    train_losses = []
    val_losses = []
    
    for epoch in range(num_epochs):
        model.train()
        epoch_loss = 0.0
        train_weighted_acc = 0.0
        batch_count = 0
        
        for images, labels in train_loader:
            images = images.float()
            labels = labels.float()
            images = images.cuda() if torch.cuda.is_available() else images
            labels = labels.cuda() if torch.cuda.is_available() else labels
            
            outputs = model(images)
            acc = compute_weighted_accuracy(outputs, labels, threshold=0.5)
            outputs = outputs.view(outputs.size(0), outputs.size(1), -1)  # [B, C, H*W]
            labels = labels.view(labels.size(0), -1)  # [B, H*W]
            labels = labels.long() 
            loss = criterion(outputs, labels)
            train_weighted_acc += acc
            
            
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            epoch_loss += loss.item()
            batch_count += 1
        
        avg_train_loss = epoch_loss / batch_count
        avg_train_acc = train_weighted_acc / batch_count
        train_losses.append(avg_train_loss)
        
        model.eval()
        val_loss = 0.0
        val_weighted_acc = 0.0
        val_batch_count = 0
        
        with torch.no_grad():
            for images, labels in test_loader:
                images = images.cuda() if torch.cuda.is_available() else images
                labels = labels.cuda() if torch.cuda.is_available() else labels
                
                outputs = model(images)
                acc = compute_weighted_accuracy(outputs, labels, threshold=0.5)
                outputs = outputs.view(outputs.size(0), outputs.size(1), -1)
                labels = labels.view(labels.size(0), -1)
                labels = labels.long() 
                loss = criterion(outputs, labels)
                val_weighted_acc += acc
                
                val_loss += loss.item()
                val_batch_count += 1
        
        avg_val_loss = val_loss / val_batch_count
        avg_val_acc = val_weighted_acc / val_batch_count
        val_losses.append(avg_val_loss)
        
        if (epoch+1) % 2 == 0:
            print(f'Epoch [{epoch+1}/{num_epochs}], '
                  f'Train Loss: {avg_train_loss:.4f}, '
                  f'Val Loss: {avg_val_loss:.4f}')
            print('weighted accuracy train:', avg_train_acc)
            print('weighted accuracy val:', avg_val_acc)
    
    return train_losses, val_losses
    



class RadarDataset(Dataset):
    def __init__(self, files, transform=None):
        self.files = files
        self.transform = transform
        

    def __getitem__(self, idx):
        tensor = torch.load(self.files[idx])  # [7, 50, 181]
        inputs = tensor[:6].float()  # [6, 50, 181]
        label = tensor[6].float()    # [50, 181]
        sample = {'image': inputs, 'mask': label}

        return sample['image'], sample['mask']

    def __len__(self):
        return len(self.files)
        


        

def load_data(base_path, batch_size=4, test_size=0.2, transform=None):
    from glob import glob
    all_files = glob(os.path.join(base_path, "*.pt"))
    train_files, test_files = train_test_split(all_files, test_size=test_size, random_state=42)

    train_dataset = RadarDataset(train_files, transform=transform)
    test_dataset = RadarDataset(test_files, transform=None)

    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    return train_loader, test_loader


def compute_weighted_accuracy(preds, targets, threshold=0.5, target_weight=1500):
    """
    preds: raw logits from the model (B, C, H, W)
    targets: ground truth labels (B, H, W)
    """
    with torch.no_grad():
        # Get predicted class (argmax for multi-class, softmax + threshold for binary)
        probs = torch.softmax(preds, dim=1)
        pred_class = (probs[:, 1, :, :] > threshold).long()
        
        # Ensure targets is binary
        true_class = (targets == 1).long()

        # Calculate correct predictions
        correct_target = ((pred_class == 1) & (true_class == 1)).sum()
        correct_background = ((pred_class == 0) & (true_class == 0)).sum()

        # Total number of target and background points
        total_target = (true_class == 1).sum()
        total_background = (true_class == 0).sum()

        # Weighted score
        correct_score = correct_background + target_weight * correct_target
        total_score = total_background + target_weight * total_target

        return (correct_score / total_score).item() if total_score > 0 else 0.0
    
data_path = '/bohr/train-4gug/v2/training_set'

train_loader, test_loader = load_data(
    base_path=data_path,
    batch_size=4,  
    test_size=0.2,
    transform=None
)

model = MyModel()



if torch.cuda.is_available():
    model = model.cuda()

class_weights = [1., 1500.]
class_weights_tensor = torch.tensor(class_weights, dtype=torch.float).cuda()
criterion = nn.CrossEntropyLoss(weight=class_weights_tensor)
pos_weight = torch.tensor([1500.0]).cuda()
optimizer = optim.Adam(model.parameters(), lr=0.0005) 

train_losses, val_losses = train(
    model=model,
    train_loader=train_loader,
    test_loader=test_loader,
    optimizer=optimizer,
    criterion=criterion,
    num_epochs=40
)

In [None]:
# Please write your model code (including the necessary imported modules, such as torch and torch.nn) below to generate a model structure file that can be easily loaded by the grading platform
model_code ='''
import torch
import torch.nn as nn
import torch.nn.functional as F

class MyModel(nn.Module):
    def __init__(self, in_channels=6, out_channels=2):
        super(MyModel, self).__init__()
        self.encoder1 = self.conv_block(in_channels, 32)
        self.encoder2 = self.conv_block(32, 64)
        self.encoder3 = self.conv_block(64, 128)
        self.encoder4 = self.conv_block(128, 256)
        self.pool = nn.MaxPool2d(2)

        # Bottleneck
        self.bottleneck = self.conv_block(256, 512)

        # Decoder
        self.up4 = nn.ConvTranspose2d(512, 256, kernel_size=2, stride=2)
        self.decoder4 = self.conv_block(512, 256)

        self.up3 = nn.ConvTranspose2d(256, 128, kernel_size=2, stride=2)
        self.decoder3 = self.conv_block(256, 128)

        self.up2 = nn.ConvTranspose2d(128, 64, kernel_size=2, stride=2)
        self.decoder2 = self.conv_block(128, 64)

        self.up1 = nn.ConvTranspose2d(64, 32, kernel_size=2, stride=2)
        self.decoder1 = self.conv_block(64, 32)

        self.final = nn.Conv2d(32, out_channels, kernel_size=1)

    def conv_block(self, in_ch, out_ch):
        return nn.Sequential(
            nn.Conv2d(in_ch, out_ch, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Dropout2d(p=0.2),
            nn.Conv2d(out_ch, out_ch, kernel_size=3, padding=1),
            nn.ReLU(inplace=True)
        )
    
    def forward(self, x):
        enc1 = self.encoder1(x)
        enc2 = self.encoder2(self.pool(enc1))
        enc3 = self.encoder3(self.pool(enc2))
        enc4 = self.encoder4(self.pool(enc3))

        # Bottleneck
        bottleneck = self.bottleneck(self.pool(enc4))

        # Decoder path
        dec4 = self.up4(bottleneck)
        enc4_cropped = enc4[:, :, :dec4.size(2), :dec4.size(3)]
        dec4 = self.decoder4(torch.cat([dec4, enc4_cropped], dim=1))

        dec3 = self.up3(dec4)
        enc3_cropped = enc3[:, :, :dec3.size(2), :dec3.size(3)]
        dec3 = self.decoder3(torch.cat([dec3, enc3_cropped], dim=1))

        dec2 = self.up2(dec3)
        enc2_cropped = enc2[:, :, :dec2.size(2), :dec2.size(3)]
        dec2 = self.decoder2(torch.cat([dec2, enc2_cropped], dim=1))

        dec1 = self.up1(dec2)
        enc1_cropped = enc1[:, :, :dec1.size(2), :dec1.size(3)]
        dec1 = self.decoder1(torch.cat([dec1, enc1_cropped], dim=1))

        out = self.final(dec1)
        out = F.pad(out, (0, 181 - out.shape[-1], 0, 50 - out.shape[-2]))
        return out
'''
# Write code to file
with open('submission_model.py', 'w',encoding="utf-8") as f:
    f.write(model_code)
print("submission_model.py file has been generated.")

In [None]:
# Save the parameters of the model
torch.save(model.state_dict(), 'submission_dic.pth')
print("submission_dic.pth file has been saved.")

In [None]:
# This block mainly specifies the submission format of this question.
import zipfile
import os

# Define the files to zip and the zip file name.
files_to_zip = ['submission_model.py', 'submission_dic.pth']
zip_filename = 'submission.zip'

# Create a zip file
with zipfile.ZipFile(zip_filename, 'w') as zipf:
    for file in files_to_zip:
        # Add the file to the zip fil
        zipf.write(file, os.path.basename(file))

print(f'{zip_filename} Created successfully!')