In [14]:
# from google.colab import drive
# drive.mount('/content/drive')

# import gdown
# gdown.download_folder("https://drive.google.com/drive/folders/1vtLNqk0N2GriYKcv6FUwiPQHSCBN69E-", quiet=False, use_cookies=False)


# import zipfile
# with zipfile.ZipFile("/content/Phase_2_data/images_val.zip", 'r') as zip_ref:
#     zip_ref.extractall("dataset")
# with zipfile.ZipFile("/content/Phase_2_data/images_train.zip", 'r') as zip_ref:
#     zip_ref.extractall("dataset")



In [15]:
import os
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torch.optim as optim
from torch.optim.lr_scheduler import CosineAnnealingWarmRestarts
import cv2
import albumentations as A
from albumentations.pytorch import ToTensorV2
import timm
from tqdm import tqdm
import torch.cuda.amp as amp

# Check for GPU and enable mixed precision
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
torch.backends.cudnn.benchmark = True
scaler = amp.GradScaler()

## Enhanced Data Augmentation
def get_train_transform():
    return A.Compose([
        A.Resize(256, 256),
        A.HorizontalFlip(p=0.5),
        A.VerticalFlip(p=0.1),
        A.Rotate(limit=30, p=0.7),
        A.RandomBrightnessContrast(brightness_limit=0.2, contrast_limit=0.2, p=0.5),
        A.HueSaturationValue(hue_shift_limit=20, sat_shift_limit=20, val_shift_limit=20, p=0.5),
        A.CoarseDropout(max_holes=8, max_height=32, max_width=32, fill_value=0, p=0.3),
        A.GaussNoise(var_limit=(10.0, 50.0)),
        A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ToTensorV2()
    ])

def get_val_transform():
    return A.Compose([
        A.Resize(256, 256),
        A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ToTensorV2()
    ])

# Optimized Dataset Class
class AngleDataset(Dataset):
    def __init__(self, image_dir, labels_csv, transform=None):
        self.image_dir = image_dir
        self.labels_df = pd.read_csv(labels_csv)
        self.labels_df = self.labels_df[self.labels_df['angle'] <= 360]
        self.transform = transform
        self.samples = [(row['filename'], float(row['angle']))
                       for _, row in self.labels_df.iterrows()]

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        filename, angle = self.samples[idx]
        img_path = os.path.join(self.image_dir, filename)

        image = cv2.imread(img_path, cv2.IMREAD_COLOR)
        if image is None:
            raise FileNotFoundError(f"Image not found at {img_path}")
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        # Convert angle to sin and cos
        sin = np.sin(np.radians(angle))
        cos = np.cos(np.radians(angle))

        if self.transform:
            image = self.transform(image=image)["image"]

        return image, torch.tensor([sin, cos], dtype=torch.float32)

# Advanced Model Architecture with Multi-Task Learning
class AdvancedAngleRegressor(nn.Module):
    def __init__(self, backbone='convnext_large'):
        super(AdvancedAngleRegressor, self).__init__()

        # Load pretrained backbone from timm
        self.backbone = timm.create_model(
            backbone,
            pretrained=True,
            num_classes=0,
            features_only=False
        )
        in_features = self.backbone.num_features

        # Freeze first 60% of layers
        total_layers = len(list(self.backbone.parameters()))
        for i, param in enumerate(self.backbone.parameters()):
            if i < total_layers * 0.6:
                param.requires_grad = False

        # Multi-task learning heads
        self.angle_head = nn.Sequential(
            nn.Linear(in_features, 2048),
            nn.BatchNorm1d(2048),
            nn.GELU(),
            nn.Dropout(0.5),
            nn.Linear(2048, 1024),
            nn.BatchNorm1d(1024),
            nn.GELU(),
            nn.Dropout(0.3),
            nn.Linear(1024, 2)  # sin and cos
        )

        # Auxiliary classification head (cardinal directions)
        self.aux_head = nn.Sequential(
            nn.Linear(in_features, 1024),
            nn.BatchNorm1d(1024),
            nn.GELU(),
            nn.Linear(1024, 8)  # 8 cardinal directions
        )

    def forward(self, x):
        features = self.backbone(x)
        angle_output = self.angle_head(features)
        aux_output = self.aux_head(features)
        return angle_output, aux_output

# Enhanced Loss Function
class CombinedLoss(nn.Module):
    def __init__(self, alpha=0.2):
        super(CombinedLoss, self).__init__()
        self.alpha = alpha

    def forward(self, angle_output, aux_output, angle_target, aux_target):
        # Circular regression loss
        circ_loss = 1 - torch.mean(torch.sum(angle_output * angle_target, dim=1))

        # Auxiliary classification loss
        aux_loss = nn.CrossEntropyLoss()(aux_output, aux_target)

        return circ_loss + self.alpha * aux_loss

# Mean Absolute Angular Error
def mean_angular_error(preds, targets):
    pred_angles = torch.atan2(preds[:, 0], preds[:, 1]) * 180 / np.pi
    target_angles = torch.atan2(targets[:, 0], targets[:, 1]) * 180 / np.pi
    diff = torch.abs(pred_angles - target_angles)
    return torch.mean(torch.min(diff, 360 - diff))

# Training function with mixed precision
def train_model(model, train_loader, val_loader, epochs=30):
    optimizer = optim.AdamW(model.parameters(), lr=5e-5, weight_decay=1e-5)
    scheduler = CosineAnnealingWarmRestarts(optimizer, T_0=10, T_mult=1, eta_min=1e-6)
    criterion = CombinedLoss(alpha=0.2)
    best_maae = float('inf')

    for epoch in range(epochs):
        model.train()
        running_loss = 0.0

        pbar = tqdm(train_loader, desc=f'Epoch {epoch+1}/{epochs}')
        for images, angle_targets in pbar:
            images = images.to(device, non_blocking=True)
            angle_targets = angle_targets.to(device, non_blocking=True)

            # Create auxiliary targets (cardinal directions)
            angles = torch.atan2(angle_targets[:, 0], angle_targets[:, 1]) * 180 / np.pi
            aux_targets = ((angles + 360) % 360 / 45).long() % 8
            aux_targets = aux_targets.to(device)

            optimizer.zero_grad()

            # Mixed precision training
            with amp.autocast():
                angle_output, aux_output = model(images)
                loss = criterion(angle_output, aux_output, angle_targets, aux_targets)

            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()
            scheduler.step()

            running_loss += loss.item()
            pbar.set_postfix({'loss': f"{loss.item():.4f}"})

        # Validation with TTA
        model.eval()
        val_preds = []
        val_targets = []

        with torch.no_grad():
            for images, targets in val_loader:
                images = images.to(device, non_blocking=True)
                targets = targets.to(device, non_blocking=True)

                # Base prediction
                angle_output, _ = model(images)

                # TTA - horizontal flip
                flipped_images = torch.flip(images, dims=[3])
                flipped_output, _ = model(flipped_images)

                # Average predictions
                combined_output = (angle_output + flipped_output) / 2

                val_preds.append(combined_output.cpu())
                val_targets.append(targets.cpu())

        val_preds = torch.cat(val_preds)
        val_targets = torch.cat(val_targets)

        maae = mean_angular_error(val_preds, val_targets).item()
        print(f"Epoch {epoch+1}/{epochs} | Train Loss: {running_loss/len(train_loader):.4f} | Val MAAE: {maae:.2f}")

        # Save best model
        if maae < best_maae:
            best_maae = maae
            torch.save(model.state_dict(), 'best_model_Angle.pth')
            print(f"New best MAAE: {best_maae:.2f}")

# Load datasets
train_dataset = AngleDataset("/content/dataset/images_train", "/content/Phase_2_data/labels_train.csv", get_train_transform())
val_dataset = AngleDataset("/content/dataset/images_val", "/content/Phase_2_data/labels_val.csv", get_val_transform())

# Optimized DataLoader settings
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True,
                         num_workers=4, pin_memory=True, persistent_workers=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False,
                       num_workers=4, pin_memory=True, persistent_workers=True)

# Initialize and train model
model = AdvancedAngleRegressor(backbone='convnext_large').to(device)
train_model(model, train_loader, val_loader, epochs=23)

#### Works for only Val data the below for 17 epochs got best MAAE 36.69

# # Generate predictions with TTA
# def predict_with_tta(model, loader):
#     model.eval()
#     angles = []

#     with torch.no_grad():
#         for images, _ in loader:
#             images = images.to(device, non_blocking=True)

#             # Base prediction
#             angle_output, _ = model(images)

#             # TTA - horizontal flip
#             flipped_images = torch.flip(images, dims=[3])
#             flipped_output, _ = model(flipped_images)

#             # Average predictions
#             combined_output = (angle_output + flipped_output) / 2

#             # Convert to angles
#             batch_angles = torch.atan2(combined_output[:, 0], combined_output[:, 1]) * 180 / np.pi
#             batch_angles = (batch_angles + 360) % 360
#             angles.extend(batch_angles.cpu().numpy())
#             torch.cuda.empty_cache()


#     return angles

# val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
# predicted_angles = predict_with_tta(model, val_loader)

# # Create submission
# submission = pd.DataFrame({
#     'id': range(len(predicted_angles)),
#     'angle': predicted_angles
# })

# # Add dummy test predictions
# dummy_test = pd.DataFrame({
#     'id': range(len(predicted_angles), 738),
#     'angle': [0] * (738 - len(predicted_angles))
# })

# submission = pd.concat([submission, dummy_test], ignore_index=True)
# submission.to_csv("2022101002_best_angle.csv", index=False)
# print("Best submission file saved: 2022101002_best_angle.csv")

In [None]:

#### works fine for both Test + Val and correct Output Format


# Generate predictions with TTA
def predict_with_tta(model, loader):
    model.eval()
    angles = []

    with torch.no_grad():
        for images, _ in loader:
            images = images.to(device, non_blocking=True)

            # Base prediction
            angle_output, _ = model(images)

            # TTA - horizontal flip
            flipped_images = torch.flip(images, dims=[3])
            flipped_output, _ = model(flipped_images)

            # Average predictions
            combined_output = (angle_output + flipped_output) / 2

            # Convert to angles
            batch_angles = torch.atan2(combined_output[:, 0], combined_output[:, 1]) * 180 / np.pi
            batch_angles = (batch_angles + 360) % 360
            angles.extend(batch_angles.cpu().numpy())
            torch.cuda.empty_cache()

    return angles

# Create test dataset class (without labels)
class TestAngleDataset(Dataset):
    def __init__(self, image_dir, transform=None):
        self.image_dir = image_dir
        self.transform = transform
        self.image_files = sorted(os.listdir(image_dir))

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        filename = self.image_files[idx]
        img_path = os.path.join(self.image_dir, filename)

        image = cv2.imread(img_path, cv2.IMREAD_COLOR)
        if image is None:
            raise FileNotFoundError(f"Image not found at {img_path}")
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        if self.transform:
            image = self.transform(image=image)["image"]

        return image, filename  # Return filename for identification

# Load best model
model.load_state_dict(torch.load('/content/best_model_Angle.pth'))
model.to(device)

# Predict on validation set (369 images)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
val_predicted_angles = predict_with_tta(model, val_loader)

# Predict on test set (369 images)
test_dataset = TestAngleDataset("/content/drive/MyDrive/images_test", get_val_transform())
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)
test_predicted_angles = predict_with_tta(model, test_loader)

# Combine predictions (val first, then test)
all_predictions = val_predicted_angles + test_predicted_angles

# Create submission DataFrame with IDs 0-737
submission = pd.DataFrame({
    'id': range(738),  # 0 to 737
    'angle': all_predictions
})

# Save to CSV
submission.to_csv("2022101002_best_angle_Test_Finalll.csv", index=False)
print("Best submission file saved: 2022101002_best_angle.csv")