In [None]:
from google.colab import drive
# Mount Google Drive
drive.mount('/content/drive')

In [None]:
!git clone https://github.com/WongKinYiu/yolov7.git

In [None]:
!pip install -r yolov7/requirements.txt

In [None]:
!pip install torchvision transformers

In [None]:
import os
import scipy.io
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image

class KeypointVHS_Dataset(Dataset):
    def __init__(self, image_dir, label_dir, transform=None):
        self.image_dir = image_dir
        self.label_dir = label_dir
        self.transform = transform
        self.image_files = sorted(os.listdir(image_dir))

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        # Load image
        image_path = os.path.join(self.image_dir, self.image_files[idx])
        image = Image.open(image_path).convert("RGB")

        # Load label
        label_path = os.path.join(self.label_dir, os.path.splitext(self.image_files[idx])[0] + '.mat')
        mat = scipy.io.loadmat(label_path)
        keypoints = mat['six_points'].flatten().astype('float32')  # Flatten 6 (x, y) pairs into 12 values

        # Calculate VHS class based on the VHS value
        vhs_value = mat['VHS'][0][0]  # Access VHS value
        if vhs_value < 8.2:
            vhs_class = 0
        elif 8.2 <= vhs_value <= 10:
            vhs_class = 1
        else:
            vhs_class = 2

        # Apply transformations to the image, if specified
        if self.transform:
            image = self.transform(image)

        # Prepare label dictionary
        labels = {
            'keypoints_output': torch.tensor(keypoints, dtype=torch.float32),
            'vhs_class_output': torch.tensor(vhs_class, dtype=torch.long)
        }

        return image, labels

# Define transformations
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Define paths
train_image_dir = '/content/drive/MyDrive/Train/Images'
train_label_dir = '/content/drive/MyDrive/Train/Labels'
valid_image_dir = '/content/drive/MyDrive/Valid/Images'
valid_label_dir = '/content/drive/MyDrive/Valid/Labels'

# Create datasets
train_dataset = KeypointVHS_Dataset(train_image_dir, train_label_dir, transform=transform)
valid_dataset = KeypointVHS_Dataset(valid_image_dir, valid_label_dir, transform=transform)

# Create DataLoaders
batch_size = 3
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False)


using unet with attention gate

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

# U-Net Encoder with Attention Gates
class AttentionGate(nn.Module):
    def __init__(self, F_g, F_l, F_int):
        super(AttentionGate, self).__init__()
        self.W_g = nn.Sequential(
            nn.Conv2d(F_g, F_int, kernel_size=1, stride=1, padding=0, bias=True),
            nn.BatchNorm2d(F_int)
        )

        self.W_x = nn.Sequential(
            nn.Conv2d(F_l, F_int, kernel_size=1, stride=1, padding=0, bias=True),
            nn.BatchNorm2d(F_int)
        )

        self.psi = nn.Sequential(
            nn.Conv2d(F_int, 1, kernel_size=1, stride=1, padding=0, bias=True),
            nn.BatchNorm2d(1),
            nn.Sigmoid()
        )

        self.relu = nn.ReLU(inplace=True)

    def forward(self, g, x):
        # Resize g to match x's dimensions if they don't match
        if g.size() != x.size():
            g = F.interpolate(g, size=x.shape[2:], mode='bilinear', align_corners=True)

        g1 = self.W_g(g)
        x1 = self.W_x(x)
        psi = self.relu(g1 + x1)
        psi = self.psi(psi)
        return x * psi

class UNetEncoderWithAttention(nn.Module):
    def __init__(self, in_channels=3):
        super(UNetEncoderWithAttention, self).__init__()
        self.enc1 = nn.Sequential(
            nn.Conv2d(in_channels, 64, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 64, kernel_size=3, padding=1),
            nn.ReLU(inplace=True)
        )
        self.enc2 = nn.Sequential(
            nn.MaxPool2d(2),
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(128, 128, kernel_size=3, padding=1),
            nn.ReLU(inplace=True)
        )
        self.enc3 = nn.Sequential(
            nn.MaxPool2d(2),
            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True)
        )
        self.enc4 = nn.Sequential(
            nn.MaxPool2d(2),
            nn.Conv2d(256, 512, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.ReLU(inplace=True)
        )
        self.attention4 = AttentionGate(512, 256, 256)

    def forward(self, x):
        flow = self.enc1(x)
        #print("Shape after enc1 (flow):", flow.shape)
        x2 = self.enc2(flow)
        #print("Shape after enc2:", x2.shape)
        x3 = self.enc3(x2)
        #print("Shape after enc3:", x3.shape)
        fhigh = self.enc4(x3)
        #print("Shape after enc4 (before attention):", fhigh.shape)

        # Apply attention to high-level features
        fhigh = self.attention4(fhigh, x3)
        #print("Shape after attention (fhigh):", fhigh.shape)

        return flow, fhigh

# Feature Fusion Module
class FeatureFusionModule(nn.Module):
    def __init__(self):
        super(FeatureFusionModule, self).__init__()
        self.conv1 = nn.Conv2d(64, 16, kernel_size=1)
        self.conv2 = nn.Conv2d(256, 16, kernel_size=1)

    def forward(self, flow, fhigh):
        #print("Shape of flow before conv1:", flow.shape)
        flow_feat = self.conv1(flow)
        #print("Shape of flow_feat after conv1:", flow_feat.shape)
        #print("Shape of fhigh before conv2:", fhigh.shape)
        fhigh_feat = self.conv2(fhigh)
        #print("Shape of fhigh_feat after conv2:", fhigh_feat.shape)
        # Resize flow_feat to match the spatial dimensions of fhigh_feat (56x56)
        flow_feat = F.interpolate(flow_feat, size=fhigh_feat.shape[2:], mode='bilinear', align_corners=True)
        fused_features = torch.cat((flow_feat, fhigh_feat), dim=1)
        #print("Shape after concatenation (fused_features):", fused_features.shape)
        return fused_features

# Orthogonal Layer
class OrthogonalLayer(nn.Module):
    def forward(self, points):
        x1, y1, x2, y2, x3, y3, x4, y4 = points[:, 0], points[:, 1], points[:, 2], points[:, 3], points[:, 4], points[:, 5], points[:, 6], points[:, 7]
        s = -(x1 - x2) / (y1 - y2 + 1e-6)
        y4_pred = s * (x4 - x3) + y3
        y4_corrected = points.clone()
        y4_corrected[:, 7] = y4_pred
        return y4_corrected

# Complete Model
class RVTModelWithUNetAttention(nn.Module):
    def __init__(self):
        super(RVTModelWithUNetAttention, self).__init__()
        self.encoder = UNetEncoderWithAttention()
        self.ffm = FeatureFusionModule()
        self.pool = nn.AdaptiveAvgPool2d((8, 8))  # Output shape (32, 8, 8)
        self.orthogonal_layer = OrthogonalLayer()
        self.fc = nn.Linear(32 * 8 * 8, 12)  # Changed output to 12 to match classifier input
        self.classifier = nn.Linear(12, 3)

    def forward(self, x):
        flow, fhigh = self.encoder(x)
        fused_features = self.ffm(flow, fhigh)
        pooled_features = self.pool(fused_features)
        pooled_features = pooled_features.view(pooled_features.size(0), -1)
        keypoints = self.fc(pooled_features).view(-1, 12)  # Changed to match classifier
        keypoints_corrected = self.orthogonal_layer(keypoints)
        vhs_class = self.classifier(keypoints_corrected)
        return keypoints_corrected, vhs_class

# Custom Loss Function
class RVTLoss(nn.Module):
    def __init__(self, gamma=1.0):
        super(RVTLoss, self).__init__()
        self.ce_loss = nn.CrossEntropyLoss()
        self.mse_loss = nn.MSELoss()
        self.gamma = gamma

    def forward(self, predicted_keypoints, true_keypoints, predicted_class, true_class):
        l_ce = self.ce_loss(predicted_class, true_class)
        l_mse = self.mse_loss(predicted_keypoints, true_keypoints)
        loss = l_ce + self.gamma * l_mse
        return loss

# Instantiate model and loss function
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = RVTModelWithUNetAttention().to(device)
criterion = RVTLoss(gamma=1.0).to(device)
optimizer = optim.Adam(model.parameters(), lr=0.01)


mobilenet and efficientnet for feature extraction

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchvision.models import mobilenet_v2, efficientnet_b0

# Combined MobileNet + EfficientNet Encoder
class MobileNetEfficientNetEncoder(nn.Module):
    def __init__(self):
        super(MobileNetEfficientNetEncoder, self).__init__()

        # Load MobileNet for low-level features
        mobilenet = mobilenet_v2(pretrained=True)
        self.low_level_features = nn.Sequential(*list(mobilenet.features[:7]))  # Use early layers for low-level features

        # Add a 1x1 conv layer to adjust channels from 32 to 80
        self.channel_adjust = nn.Conv2d(in_channels=32, out_channels=80, kernel_size=1)

        # Load EfficientNet for high-level features
        efficientnet = efficientnet_b0(pretrained=True)
        self.high_level_features = nn.Sequential(*list(efficientnet.features[5:]))  # Use deeper layers for high-level features

        # Attention Gate
        self.attention = nn.Sequential(
            nn.Conv2d(1280, 320, kernel_size=1),
            nn.BatchNorm2d(320),
            nn.ReLU(inplace=True),
            nn.Conv2d(320, 1, kernel_size=1),
            nn.BatchNorm2d(1),
            nn.Sigmoid()
        )

    def forward(self, x):
        # Extract low-level features
        flow = self.low_level_features(x)
        #print("Shape of flow (low-level features):", flow.shape)

        # Adjust channels to 80 before passing to high-level features
        flow = self.channel_adjust(flow)

        # Extract high-level features
        fhigh = self.high_level_features(flow)
        #print("Shape of fhigh (high-level features before attention):", fhigh.shape)

        # Attention mechanism on high-level features
        attention_weights = self.attention(fhigh)
        fhigh = fhigh * attention_weights
        #print("Shape of fhigh (after attention):", fhigh.shape)

        return flow, fhigh

# Feature Fusion Module
class FeatureFusionModule(nn.Module):
    def __init__(self):
        super(FeatureFusionModule, self).__init__()
        self.conv1 = nn.Conv2d(80, 16, kernel_size=1)  # Adjust channels to 80
        self.conv2 = nn.Conv2d(1280, 16, kernel_size=1)  # Adjust channels based on EfficientNet output

    def forward(self, flow, fhigh):
        #print("Shape of flow before conv1:", flow.shape)
        flow_feat = self.conv1(flow)
        #print("Shape of flow_feat after conv1:", flow_feat.shape)

        #print("Shape of fhigh before conv2:", fhigh.shape)
        fhigh_feat = self.conv2(fhigh)
        #print("Shape of fhigh_feat after conv2:", fhigh_feat.shape)

        # Resize flow_feat to match the spatial dimensions of fhigh_feat
        flow_feat = F.interpolate(flow_feat, size=fhigh_feat.shape[2:], mode='bilinear', align_corners=True)
        fused_features = torch.cat((flow_feat, fhigh_feat), dim=1)
        #print("Shape after concatenation (fused_features):", fused_features.shape)
        return fused_features

# Orthogonal Layer
class OrthogonalLayer(nn.Module):
    def forward(self, points):
        x1, y1, x2, y2, x3, y3, x4, y4 = points[:, 0], points[:, 1], points[:, 2], points[:, 3], points[:, 4], points[:, 5], points[:, 6], points[:, 7]
        s = -(x1 - x2) / (y1 - y2 + 1e-6)
        y4_pred = s * (x4 - x3) + y3
        y4_corrected = points.clone()
        y4_corrected[:, 7] = y4_pred
        return y4_corrected

# Complete Model
class RVTModelWithMobileEfficientNet(nn.Module):
    def __init__(self):
        super(RVTModelWithMobileEfficientNet, self).__init__()
        self.encoder = MobileNetEfficientNetEncoder()
        self.ffm = FeatureFusionModule()
        self.pool = nn.AdaptiveAvgPool2d((8, 8))  # Output shape (32, 8, 8)
        self.orthogonal_layer = OrthogonalLayer()
        self.fc = nn.Linear(32 * 8 * 8, 12)  # Changed output to 12 to match classifier input
        self.classifier = nn.Linear(12, 3)

    def forward(self, x):
        flow, fhigh = self.encoder(x)
        fused_features = self.ffm(flow, fhigh)
        pooled_features = self.pool(fused_features)
        pooled_features = pooled_features.view(pooled_features.size(0), -1)
        keypoints = self.fc(pooled_features).view(-1, 12)  # Changed to match classifier
        keypoints_corrected = self.orthogonal_layer(keypoints)
        vhs_class = self.classifier(keypoints_corrected)
        return keypoints_corrected, vhs_class

# Custom Loss Function
class RVTLoss(nn.Module):
    def __init__(self, gamma=1.0):
        super(RVTLoss, self).__init__()
        self.ce_loss = nn.CrossEntropyLoss()
        self.mse_loss = nn.MSELoss()
        self.gamma = gamma

    def forward(self, predicted_keypoints, true_keypoints, predicted_class, true_class):
        l_ce = self.ce_loss(predicted_class, true_class)
        l_mse = self.mse_loss(predicted_keypoints, true_keypoints)
        loss = l_ce + self.gamma * l_mse
        return loss

# Instantiate model and loss function
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = RVTModelWithMobileEfficientNet().to(device)
criterion = RVTLoss(gamma=1.0).to(device)
optimizer = optim.Adam(model.parameters(), lr=0.01)


using efficientnet_b7

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchvision.models import efficientnet_b7  # Use EfficientNet-B3 for deeper feature extraction

# EfficientNet Encoder (using EfficientNet-B3 for added complexity)
class EfficientNetEncoder(nn.Module):
    def __init__(self):
        super(EfficientNetEncoder, self).__init__()

        # Load EfficientNet-B3 for feature extraction
        efficientnet = efficientnet_b7(pretrained=True)

        # Use early layers for low-level features
        self.low_level_features = nn.Sequential(*list(efficientnet.features[:8]))  # Adjust as necessary for low-level features

        # Use deeper layers for high-level features
        self.high_level_features = nn.Sequential(*list(efficientnet.features[:2]))  # Adjust as necessary for high-level features

    def forward(self, x):
        # Extract low-level features
        #fhigh = self.high_level_features(x)
        #flow = self.low_level_features(fhigh)
        flow = self.low_level_features(x)
        #print("Shape of flow (low-level features):", flow.shape)

        # Extract high-level features
        fhigh = self.high_level_features(x)
        #print("Shape of fhigh (high-level features):", fhigh.shape)

        return flow, fhigh

# Feature Fusion Module
class FeatureFusionModule(nn.Module):
    def __init__(self):
        super(FeatureFusionModule, self).__init__()
        self.conv1 = nn.Conv2d(640, 16, kernel_size=1)  # Adjust channels based on EfficientNet low-level output
        self.conv2 = nn.Conv2d(32, 16, kernel_size=1)  # Adjust channels based on EfficientNet high-level output

    def forward(self, flow, fhigh):
        #print("Shape of flow before conv1:", flow.shape)
        flow_feat = self.conv1(flow)
        #print("Shape of flow_feat after conv1:", flow_feat.shape)

        #print("Shape of fhigh before conv2:", fhigh.shape)
        fhigh_feat = self.conv2(fhigh)
        #print("Shape of fhigh_feat after conv2:", fhigh_feat.shape)

        # Resize flow_feat to match the spatial dimensions of fhigh_feat
        flow_feat = F.interpolate(flow_feat, size=fhigh_feat.shape[2:], mode='bilinear', align_corners=True)
        fused_features = torch.cat((flow_feat, fhigh_feat), dim=1)
        #print("Shape after concatenation (fused_features):", fused_features.shape)
        return fused_features

# Orthogonal Layer
class OrthogonalLayer(nn.Module):
    def forward(self, points):
        x1, y1, x2, y2, x3, y3, x4, y4 = points[:, 0], points[:, 1], points[:, 2], points[:, 3], points[:, 4], points[:, 5], points[:, 6], points[:, 7]
        s = -(x1 - x2) / (y1 - y2 + 1e-6)
        y4_pred = s * (x4 - x3) + y3
        y4_corrected = points.clone()
        y4_corrected[:, 7] = y4_pred
        return y4_corrected

# Complete Model
class RVTModelWithEfficientNet(nn.Module):
    def __init__(self):
        super(RVTModelWithEfficientNet, self).__init__()
        self.encoder = EfficientNetEncoder()
        self.ffm = FeatureFusionModule()
        self.pool = nn.AdaptiveAvgPool2d((8, 8))  # Output shape (32, 8, 8)
        self.orthogonal_layer = OrthogonalLayer()
        self.fc = nn.Linear(32 * 8 * 8, 12)  # Changed output to 12 to match classifier input
        self.classifier = nn.Linear(12, 3)

    def forward(self, x):
        flow, fhigh = self.encoder(x)
        fused_features = self.ffm(flow, fhigh)
        pooled_features = self.pool(fused_features)
        pooled_features = pooled_features.view(pooled_features.size(0), -1)
        keypoints = self.fc(pooled_features).view(-1, 12)  # Changed to match classifier
        keypoints_corrected = self.orthogonal_layer(keypoints)
        vhs_class = self.classifier(keypoints_corrected)
        return keypoints_corrected, vhs_class

# Custom Loss Function
class RVTLoss(nn.Module):
    def __init__(self, gamma=1.0):
        super(RVTLoss, self).__init__()
        self.ce_loss = nn.CrossEntropyLoss()
        self.mse_loss = nn.MSELoss()
        self.gamma = gamma

    def forward(self, predicted_keypoints, true_keypoints, predicted_class, true_class):
        l_ce = self.ce_loss(predicted_class, true_class)
        l_mse = self.mse_loss(predicted_keypoints, true_keypoints)
        loss = l_ce + self.gamma * l_mse
        return loss

# Instantiate model and loss function
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = RVTModelWithEfficientNet().to(device)
criterion = RVTLoss(gamma=1.0).to(device)
optimizer = optim.Adam(model.parameters(), lr=0.01)

# Training and Evaluation Functions
def train(model, train_loader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0

    for images, labels in train_loader:
        images = images.to(device)
        keypoints_target = labels['keypoints_output'].to(device)
        vhs_class_target = labels['vhs_class_output'].to(device)

        # Forward pass
        keypoints_pred, vhs_class_pred = model(images)

        # Calculate loss
        loss = criterion(keypoints_pred, keypoints_target, vhs_class_pred, vhs_class_target)

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * images.size(0)

    epoch_loss = running_loss / len(train_loader.dataset)
    return epoch_loss

def evaluate(model, val_loader, criterion, device):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():
        for images, labels in val_loader:
            images = images.to(device)
            keypoints_target = labels['keypoints_output'].to(device)
            vhs_class_target = labels['vhs_class_output'].to(device)

            # Forward pass
            keypoints_pred, vhs_class_pred = model(images)

            # Calculate loss
            loss = criterion(keypoints_pred, keypoints_target, vhs_class_pred, vhs_class_target)
            running_loss += loss.item() * images.size(0)

            # Calculate accuracy for VHS class
            _, predicted_classes = torch.max(vhs_class_pred, 1)
            correct += (predicted_classes == vhs_class_target).sum().item()
            total += vhs_class_target.size(0)

    epoch_loss = running_loss / len(val_loader.dataset)
    accuracy = correct / total
    return epoch_loss, accuracy

# Training Loop
num_epochs = 200
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

for epoch in range(num_epochs):
    train_loss = train(model, train_loader, criterion, optimizer, device)
    val_loss, val_accuracy = evaluate(model, val_loader, criterion, device)

    print(f"Epoch [{epoch+1}/{num_epochs}] - "
          f"Train Loss: {train_loss:.4f} - "
          f"Validation Loss: {val_loss:.4f} - "
          f"Validation Accuracy: {val_accuracy:.4f}")


In [None]:
# Training and Evaluation Functions
def train(model, train_loader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0

    for images, labels in train_loader:
        images = images.to(device)
        keypoints_target = labels['keypoints_output'].to(device)
        vhs_class_target = labels['vhs_class_output'].to(device)

        # Forward pass
        keypoints_pred, vhs_class_pred = model(images)

        # Calculate loss
        loss = criterion(keypoints_pred, keypoints_target, vhs_class_pred, vhs_class_target)

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * images.size(0)

    epoch_loss = running_loss / len(train_loader.dataset)
    return epoch_loss

def evaluate(model, val_loader, criterion, device):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():
        for images, labels in val_loader:
            images = images.to(device)
            keypoints_target = labels['keypoints_output'].to(device)
            vhs_class_target = labels['vhs_class_output'].to(device)

            # Forward pass
            keypoints_pred, vhs_class_pred = model(images)

            # Calculate loss
            loss = criterion(keypoints_pred, keypoints_target, vhs_class_pred, vhs_class_target)
            running_loss += loss.item() * images.size(0)

            # Calculate accuracy for VHS class
            _, predicted_classes = torch.max(vhs_class_pred, 1)
            correct += (predicted_classes == vhs_class_target).sum().item()
            total += vhs_class_target.size(0)

    epoch_loss = running_loss / len(val_loader.dataset)
    accuracy = correct / total
    return epoch_loss, accuracy

# Training Loop
num_epochs = 200
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

for epoch in range(num_epochs):
    train_loss = train(model, train_loader, criterion, optimizer, device)
    val_loss, val_accuracy = evaluate(model, val_loader, criterion, device)

    print(f"Epoch [{epoch+1}/{num_epochs}] - "
          f"Train Loss: {train_loss:.4f} - "
          f"Validation Loss: {val_loss:.4f} - "
          f"Validation Accuracy: {val_accuracy:.4f}")

using FPN

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchvision.models import efficientnet_b7  # Use EfficientNet-B3 for deeper feature extraction
from torchvision.ops import FeaturePyramidNetwork

class EfficientNetWithFPN(nn.Module):
    def __init__(self):
        super(EfficientNetWithFPN, self).__init__()

        # Load EfficientNet-B3 as the backbone
        efficientnet = efficientnet_b7(pretrained=True)

        # Define layers for different scales
        self.low_level_features = nn.Sequential(*list(efficientnet.features[:3]))  # Fine details
        self.mid_level_features = nn.Sequential(*list(efficientnet.features[3:6])) # Mid-level features
        self.high_level_features = nn.Sequential(*list(efficientnet.features[6:])) # Broad context

        # Create FPN on top of extracted features
        self.fpn = FeaturePyramidNetwork(in_channels_list=[40, 112, 1280], out_channels=256)

    def forward(self, x):
        # Extract features at multiple levels
        low = self.low_level_features(x)
        mid = self.mid_level_features(low)
        high = self.high_level_features(mid)

        # Pass features through the FPN
        fpn_out = self.fpn({"0": low, "1": mid, "2": high})

        # Extract low-level and high-level feature maps
        low_level_output = fpn_out["0"]  # Using the lowest level output from FPN
        high_level_output = fpn_out["2"] # Using the highest level output from FPN

        return low_level_output, high_level_output


# Feature Fusion Module
class FeatureFusionModule(nn.Module):
    def __init__(self):
        super(FeatureFusionModule, self).__init__()
        self.conv1 = nn.Conv2d(64, 16, kernel_size=1)  # Adjust channels based on EfficientNet low-level output
        self.conv2 = nn.Conv2d(64, 16, kernel_size=1)  # Adjust channels based on EfficientNet high-level output

    def forward(self, flow, fhigh):
        print("Shape of flow before conv1:", flow.shape)
        flow_feat = self.conv1(flow)
        print("Shape of flow_feat after conv1:", flow_feat.shape)

        print("Shape of fhigh before conv2:", fhigh.shape)
        fhigh_feat = self.conv2(fhigh)
        print("Shape of fhigh_feat after conv2:", fhigh_feat.shape)

        # Resize flow_feat to match the spatial dimensions of fhigh_feat
        flow_feat = F.interpolate(flow_feat, size=fhigh_feat.shape[2:], mode='bilinear', align_corners=True)
        fused_features = torch.cat((flow_feat, fhigh_feat), dim=1)
        print("Shape after concatenation (fused_features):", fused_features.shape)
        return fused_features

# Orthogonal Layer
class OrthogonalLayer(nn.Module):
    def forward(self, points):
        x1, y1, x2, y2, x3, y3, x4, y4 = points[:, 0], points[:, 1], points[:, 2], points[:, 3], points[:, 4], points[:, 5], points[:, 6], points[:, 7]
        s = -(x1 - x2) / (y1 - y2 + 1e-6)
        y4_pred = s * (x4 - x3) + y3
        y4_corrected = points.clone()
        y4_corrected[:, 7] = y4_pred
        return y4_corrected

# Complete Model
class RVTModelWithEfficientNet(nn.Module):
    def __init__(self):
        super(RVTModelWithEfficientNet, self).__init__()
        self.encoder = EfficientNetWithFPN()
        self.ffm = FeatureFusionModule()
        self.pool = nn.AdaptiveAvgPool2d((8, 8))  # Output shape (32, 8, 8)
        self.orthogonal_layer = OrthogonalLayer()
        self.fc = nn.Linear(256 * 8 * 8, 12)  # Changed output to 12 to match classifier input
        self.classifier = nn.Linear(12, 3)

    def forward(self, x):
        flow, fhigh = self.encoder(x)
        fused_features = self.ffm(flow, fhigh)
        pooled_features = self.pool(fused_features)
        pooled_features = pooled_features.view(pooled_features.size(0), -1)
        keypoints = self.fc(pooled_features).view(-1, 12)  # Changed to match classifier
        keypoints_corrected = self.orthogonal_layer(keypoints)
        vhs_class = self.classifier(keypoints_corrected)
        return keypoints_corrected, vhs_class

# Custom Loss Function
class RVTLoss(nn.Module):
    def __init__(self, gamma=1.0):
        super(RVTLoss, self).__init__()
        self.ce_loss = nn.CrossEntropyLoss()
        self.mse_loss = nn.MSELoss()
        self.gamma = gamma

    def forward(self, predicted_keypoints, true_keypoints, predicted_class, true_class):
        l_ce = self.ce_loss(predicted_class, true_class)
        l_mse = self.mse_loss(predicted_keypoints, true_keypoints)
        loss = l_ce + self.gamma * l_mse
        return loss

# Instantiate model and loss function
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = RVTModelWithEfficientNet().to(device)
criterion = RVTLoss(gamma=1.0).to(device)
optimizer = optim.Adam(model.parameters(), lr=0.01)


In [None]:
# Training and Evaluation Functions
def train(model, train_loader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0

    for images, labels in train_loader:
        images = images.to(device)
        keypoints_target = labels['keypoints_output'].to(device)
        vhs_class_target = labels['vhs_class_output'].to(device)

        # Forward pass
        keypoints_pred, vhs_class_pred = model(images)

        # Calculate loss
        loss = criterion(keypoints_pred, keypoints_target, vhs_class_pred, vhs_class_target)

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * images.size(0)

    epoch_loss = running_loss / len(train_loader.dataset)
    return epoch_loss

def evaluate(model, val_loader, criterion, device):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():
        for images, labels in val_loader:
            images = images.to(device)
            keypoints_target = labels['keypoints_output'].to(device)
            vhs_class_target = labels['vhs_class_output'].to(device)

            # Forward pass
            keypoints_pred, vhs_class_pred = model(images)

            # Calculate loss
            loss = criterion(keypoints_pred, keypoints_target, vhs_class_pred, vhs_class_target)
            running_loss += loss.item() * images.size(0)

            # Calculate accuracy for VHS class
            _, predicted_classes = torch.max(vhs_class_pred, 1)
            correct += (predicted_classes == vhs_class_target).sum().item()
            total += vhs_class_target.size(0)

    epoch_loss = running_loss / len(val_loader.dataset)
    accuracy = correct / total
    return epoch_loss, accuracy

# Training Loop
num_epochs = 200
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

for epoch in range(num_epochs):
    train_loss = train(model, train_loader, criterion, optimizer, device)
    val_loss, val_accuracy = evaluate(model, val_loader, criterion, device)

    print(f"Epoch [{epoch+1}/{num_epochs}] - "
          f"Train Loss: {train_loss:.4f} - "
          f"Validation Loss: {val_loss:.4f} - "
          f"Validation Accuracy: {val_accuracy:.4f}")

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchvision.models import efficientnet_b7  # Use EfficientNet-B3 for deeper feature extraction

# EfficientNet Encoder (using EfficientNet-B3 for added complexity)
class EfficientNetEncoder(nn.Module):
    def __init__(self):
        super(EfficientNetEncoder, self).__init__()

        # Load EfficientNet-B3 for feature extraction
        efficientnet = efficientnet_b7(pretrained=True)

        # Use early layers for low-level features
        self.low_level_features = nn.Sequential(*list(efficientnet.features[:2]))  # Adjust as necessary for low-level features

        # Use deeper layers for high-level features
        self.high_level_features = nn.Sequential(*list(efficientnet.features[2:4]))  # Adjust as necessary for high-level features

    def forward(self, x):
        # Extract low-level features
        flow = self.low_level_features(x)
        print("Shape of flow (low-level features):", flow.shape)

        # Extract high-level features
        fhigh = self.high_level_features(flow)
        print("Shape of fhigh (high-level features):", fhigh.shape)

        return flow, fhigh

# Feature Fusion Module
class FeatureFusionModule(nn.Module):
    def __init__(self):
        super(FeatureFusionModule, self).__init__()
        self.conv1 = nn.Conv2d(32, 16, kernel_size=1)  # Adjust channels based on EfficientNet low-level output
        self.conv2 = nn.Conv2d(80, 16, kernel_size=1)  # Adjust channels based on EfficientNet high-level output

    def forward(self, flow, fhigh):
        print("Shape of flow before conv1:", flow.shape)
        flow_feat = self.conv1(flow)
        print("Shape of flow_feat after conv1:", flow_feat.shape)

        print("Shape of fhigh before conv2:", fhigh.shape)
        fhigh_feat = self.conv2(fhigh)
        print("Shape of fhigh_feat after conv2:", fhigh_feat.shape)

        # Resize flow_feat to match the spatial dimensions of fhigh_feat
        flow_feat = F.interpolate(flow_feat, size=fhigh_feat.shape[2:], mode='bilinear', align_corners=True)
        fused_features = torch.cat((flow_feat, fhigh_feat), dim=1)
        print("Shape after concatenation (fused_features):", fused_features.shape)
        return fused_features

# Orthogonal Layer
class OrthogonalLayer(nn.Module):
    def forward(self, points):
        x1, y1, x2, y2, x3, y3, x4, y4 = points[:, 0], points[:, 1], points[:, 2], points[:, 3], points[:, 4], points[:, 5], points[:, 6], points[:, 7]
        s = -(x1 - x2) / (y1 - y2 + 1e-6)
        y4_pred = s * (x4 - x3) + y3
        y4_corrected = points.clone()
        y4_corrected[:, 7] = y4_pred
        return y4_corrected

# Complete Model
class RVTModelWithEfficientNet(nn.Module):
    def __init__(self):
        super(RVTModelWithEfficientNet, self).__init__()
        self.encoder = EfficientNetEncoder()
        self.ffm = FeatureFusionModule()
        self.pool = nn.AdaptiveAvgPool2d((8, 8))  # Output shape (32, 8, 8)
        self.orthogonal_layer = OrthogonalLayer()
        self.fc = nn.Linear(32 * 8 * 8, 12)  # Changed output to 12 to match classifier input
        self.classifier = nn.Linear(12, 3)

    def forward(self, x):
        flow, fhigh = self.encoder(x)
        fused_features = self.ffm(flow, fhigh)
        pooled_features = self.pool(fused_features)
        pooled_features = pooled_features.view(pooled_features.size(0), -1)
        keypoints = self.fc(pooled_features).view(-1, 12)  # Changed to match classifier
        keypoints_corrected = self.orthogonal_layer(keypoints)
        vhs_class = self.classifier(keypoints_corrected)
        return keypoints_corrected, vhs_class

# Custom Loss Function
class RVTLoss(nn.Module):
    def __init__(self, gamma=1.0):
        super(RVTLoss, self).__init__()
        self.ce_loss = nn.CrossEntropyLoss()
        self.mse_loss = nn.MSELoss()
        self.gamma = gamma

    def forward(self, predicted_keypoints, true_keypoints, predicted_class, true_class):
        l_ce = self.ce_loss(predicted_class, true_class)
        l_mse = self.mse_loss(predicted_keypoints, true_keypoints)
        loss = l_ce + self.gamma * l_mse
        return loss

# Instantiate model and loss function
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = RVTModelWithEfficientNet().to(device)
criterion = RVTLoss(gamma=1.0).to(device)
optimizer = optim.Adam(model.parameters(), lr=0.01)



# Training and Evaluation Functions
def train(model, train_loader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0

    for images, labels in train_loader:
        images = images.to(device)
        keypoints_target = labels['keypoints_output'].to(device)
        vhs_class_target = labels['vhs_class_output'].to(device)

        # Forward pass
        keypoints_pred, vhs_class_pred = model(images)

        # Calculate loss
        loss = criterion(keypoints_pred, keypoints_target, vhs_class_pred, vhs_class_target)

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * images.size(0)

    epoch_loss = running_loss / len(train_loader.dataset)
    return epoch_loss

def evaluate(model, val_loader, criterion, device):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():
        for images, labels in val_loader:
            images = images.to(device)
            keypoints_target = labels['keypoints_output'].to(device)
            vhs_class_target = labels['vhs_class_output'].to(device)

            # Forward pass
            keypoints_pred, vhs_class_pred = model(images)

            # Calculate loss
            loss = criterion(keypoints_pred, keypoints_target, vhs_class_pred, vhs_class_target)
            running_loss += loss.item() * images.size(0)

            # Calculate accuracy for VHS class
            _, predicted_classes = torch.max(vhs_class_pred, 1)
            correct += (predicted_classes == vhs_class_target).sum().item()
            total += vhs_class_target.size(0)

    epoch_loss = running_loss / len(val_loader.dataset)
    accuracy = correct / total
    return epoch_loss, accuracy

# Training Loop
num_epochs = 200
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

for epoch in range(num_epochs):
    train_loss = train(model, train_loader, criterion, optimizer, device)
    val_loss, val_accuracy = evaluate(model, val_loader, criterion, device)

    print(f"Epoch [{epoch+1}/{num_epochs}] - "
          f"Train Loss: {train_loss:.4f} - "
          f"Validation Loss: {val_loss:.4f} - "
          f"Validation Accuracy: {val_accuracy:.4f}")

combination of rvt and efficientnet

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchvision.models import efficientnet_b3  # Use EfficientNet-B3 for deeper feature extraction

# EfficientNet Encoder (using EfficientNet-B3 for added complexity)
class EfficientNetEncoder(nn.Module):
    def __init__(self):
        super(EfficientNetEncoder, self).__init__()
        # Load EfficientNet-B3 for feature extraction
        efficientnet = efficientnet_b3(pretrained=True)
        # Use early layers for low-level features
        self.low_level_features = nn.Sequential(*list(efficientnet.features[:1]))  # Adjust as necessary for low-level features

        # Use deeper layers for high-level features
        self.layer1 = nn.Conv2d(3, 64, kernel_size=3, stride=2, padding=1)
        self.layer2 = nn.Conv2d(64, 128, kernel_size=3, stride=2, padding=1)
        self.layer3 = nn.Conv2d(128, 256, kernel_size=3, stride=2, padding=1)
        self.layer4 = nn.Conv2d(256, 512, kernel_size=3, stride=2, padding=1)

    def forward(self, x):
        # Extract low-level features
        flow = self.low_level_features(x)
        print("Shape of flow (low-level features):", flow.shape)

        x = self.layer1(x)
        fhigh = self.layer2(x)

        return flow, fhigh

# Feature Fusion Module
class FeatureFusionModule(nn.Module):
    def __init__(self):
        super(FeatureFusionModule, self).__init__()
        self.conv1 = nn.Conv2d(32, 16, kernel_size=1)  # Adjust channels based on EfficientNet low-level output
        self.conv2 = nn.Conv2d(24, 16, kernel_size=1)  # Adjust channels based on EfficientNet high-level output

    def forward(self, flow, fhigh):
        print("Shape of flow before conv1:", flow.shape)
        flow_feat = self.conv1(flow)
        print("Shape of flow_feat after conv1:", flow_feat.shape)

        print("Shape of fhigh before conv2:", fhigh.shape)
        fhigh_feat = self.conv2(fhigh)
        print("Shape of fhigh_feat after conv2:", fhigh_feat.shape)

        # Resize flow_feat to match the spatial dimensions of fhigh_feat
        flow_feat = F.interpolate(flow_feat, size=fhigh_feat.shape[2:], mode='bilinear', align_corners=True)
        fused_features = torch.cat((flow_feat, fhigh_feat), dim=1)
        print("Shape after concatenation (fused_features):", fused_features.shape)
        return fused_features

# Orthogonal Layer
class OrthogonalLayer(nn.Module):
    def forward(self, points):
        x1, y1, x2, y2, x3, y3, x4, y4 = points[:, 0], points[:, 1], points[:, 2], points[:, 3], points[:, 4], points[:, 5], points[:, 6], points[:, 7]
        s = -(x1 - x2) / (y1 - y2 + 1e-6)
        y4_pred = s * (x4 - x3) + y3
        y4_corrected = points.clone()
        y4_corrected[:, 7] = y4_pred
        return y4_corrected

# Complete Model
class RVTModelWithEfficientNet(nn.Module):
    def __init__(self):
        super(RVTModelWithEfficientNet, self).__init__()
        self.encoder = EfficientNetEncoder()
        self.ffm = FeatureFusionModule()
        self.pool = nn.AdaptiveAvgPool2d((8, 8))  # Output shape (32, 8, 8)
        self.orthogonal_layer = OrthogonalLayer()
        self.fc = nn.Linear(32 * 8 * 8, 12)  # Changed output to 12 to match classifier input
        self.classifier = nn.Linear(12, 3)

    def forward(self, x):
        flow, fhigh = self.encoder(x)
        fused_features = self.ffm(flow, fhigh)
        pooled_features = self.pool(fused_features)
        pooled_features = pooled_features.view(pooled_features.size(0), -1)
        keypoints = self.fc(pooled_features).view(-1, 12)  # Changed to match classifier
        keypoints_corrected = self.orthogonal_layer(keypoints)
        vhs_class = self.classifier(keypoints_corrected)
        return keypoints_corrected, vhs_class

# Custom Loss Function
class RVTLoss(nn.Module):
    def __init__(self, gamma=1.0):
        super(RVTLoss, self).__init__()
        self.ce_loss = nn.CrossEntropyLoss()
        self.mse_loss = nn.MSELoss()
        self.gamma = gamma

    def forward(self, predicted_keypoints, true_keypoints, predicted_class, true_class):
        l_ce = self.ce_loss(predicted_class, true_class)
        l_mse = self.mse_loss(predicted_keypoints, true_keypoints)
        loss = l_ce + self.gamma * l_mse
        return loss

# Instantiate model and loss function
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = RVTModelWithEfficientNet().to(device)
criterion = RVTLoss(gamma=1.0).to(device)
optimizer = optim.Adam(model.parameters(), lr=0.01)


In [None]:
import matplotlib.pyplot as plt
from PIL import Image
from torchvision.models import efficientnet_b7

class EfficientNetEncoder(nn.Module):
    def __init__(self):
        super(EfficientNetEncoder, self).__init__()

        # Load EfficientNet-B3 for feature extraction
        efficientnet = efficientnet_b7(pretrained=True)

        # Use early layers for low-level features
        self.low_level_features = nn.Sequential(*list(efficientnet.features[:1]))  # Adjust as needed

        # Use deeper layers for high-level features
        self.high_level_features = nn.Sequential(*list(efficientnet.features[2:4]))  # Adjust as needed

    def forward(self, x):
        # Extract low-level features
        low_level_features = self.low_level_features(x)

        # Extract high-level features
        high_level_features = self.high_level_features(low_level_features)

        return low_level_features, high_level_features

# Function to visualize feature maps
def visualize_feature_maps(features, title="Feature Maps"):
    num_features = min(8, features.shape[1])  # Choose up to 8 feature maps for display
    fig, axs = plt.subplots(1, num_features, figsize=(15, 15))
    for i in range(num_features):
        axs[i].imshow(features[0, i].cpu().detach().numpy(), cmap='viridis')
        axs[i].axis('off')
    plt.suptitle(title)
    plt.show()

# Load and preprocess a sample MRI image
def load_image(img_path):
    image = Image.open(img_path).convert('RGB')
    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])
    image = transform(image).unsqueeze(0)  # Add batch dimension
    return image

# Load sample image
image_path = '/content/drive/MyDrive/Train/Images/159.png'  # Update this path
image = load_image(image_path)

# Instantiate model and move to device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = EfficientNetEncoder().to(device)

# Pass the sample image through the model
image = image.to(device)
with torch.no_grad():
    low_level_features, high_level_features = model(image)

# Visualize low-level features
#visualize_feature_maps(low_level_features, title="Low-Level Feature Maps")

# Visualize high-level features
visualize_feature_maps(high_level_features, title="High-Level Feature Maps")

visualizing low level and high level

In [None]:
import matplotlib.pyplot as plt
from PIL import Image

class PVTEncoder(nn.Module):
    def __init__(self, in_channels=3):
        super(PVTEncoder, self).__init__()
        self.layer1 = nn.Conv2d(in_channels, 64, kernel_size=3, stride=2, padding=1)
        self.layer2 = nn.Conv2d(64, 128, kernel_size=3, stride=2, padding=1)
        self.layer3 = nn.Conv2d(128, 256, kernel_size=3, stride=2, padding=1)
        self.layer4 = nn.Conv2d(256, 512, kernel_size=3, stride=2, padding=1)

    def forward(self, x):
        flow = self.layer1(x)  # Low-level feature
        fhigh = self.layer2(flow)
        fhigh = self.layer3(x)
        #fhigh = self.layer4(x)  # High-level feature
        return flow, fhigh

# Function to visualize feature maps
def visualize_feature_maps(features, title="Feature Maps"):
    num_features = min(8, features.shape[1])  # Choose up to 8 feature maps for display
    fig, axs = plt.subplots(1, num_features, figsize=(15, 15))
    for i in range(num_features):
        axs[i].imshow(features[0, i].cpu().detach().numpy(), cmap='viridis')
        axs[i].axis('off')
    plt.suptitle(title)
    plt.show()

# Load and preprocess a sample MRI image
def load_image(img_path):
    image = Image.open(img_path).convert('RGB')
    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])
    image = transform(image).unsqueeze(0)  # Add batch dimension
    return image

# Load sample image
image_path = '/content/drive/MyDrive/Train/Images/159.png'  # Update this path
image = load_image(image_path)

# Instantiate model and move to device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = PVTEncoder().to(device)

# Pass the sample image through the model
image = image.to(device)
with torch.no_grad():
    low_level_features, high_level_features = model(image)

# Visualize low-level features
#visualize_feature_maps(low_level_features, title="Low-Level Feature Maps")

# Visualize high-level features
visualize_feature_maps(high_level_features, title="High-Level Feature Maps")

combination of rvt and efficientnet_b7

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchvision.models import efficientnet_b3  # Use EfficientNet-B3 for deeper feature extraction

# EfficientNet Encoder (using EfficientNet-B3 for added complexity)
class EfficientNetEncoder(nn.Module):
    def __init__(self):
        super(EfficientNetEncoder, self).__init__()

        efficientnet = efficientnet_b7(pretrained=True)
        # Use early layers for low-level features
        self.low_level_features = nn.Sequential(*list(efficientnet.features[:1]))  # Adjust as necessary for low-level features

        # Use deeper layers for high-level features
        self.layer1 = nn.Conv2d(3, 64, kernel_size=3, stride=2, padding=1)
        self.layer2 = nn.Conv2d(64, 128, kernel_size=3, stride=2, padding=1)
        self.layer3 = nn.Conv2d(128, 256, kernel_size=3, stride=2, padding=1)
        self.layer4 = nn.Conv2d(256, 512, kernel_size=3, stride=2, padding=1)

    def forward(self, x):
        # Extract low-level features
        flow = self.low_level_features(x)
        #print("Shape of flow (low-level features):", flow.shape)

        x = self.layer1(x)
        fhigh = self.layer2(x)

        return flow, fhigh

# Feature Fusion Module
class FeatureFusionModule(nn.Module):
    def __init__(self):
        super(FeatureFusionModule, self).__init__()
        self.conv1 = nn.Conv2d(64, 16, kernel_size=1)  # Adjust channels based on EfficientNet low-level output
        self.conv2 = nn.Conv2d(128, 16, kernel_size=1)  # Adjust channels based on EfficientNet high-level output

    def forward(self, flow, fhigh):
        #print("Shape of flow before conv1:", flow.shape)
        flow_feat = self.conv1(flow)
        #print("Shape of flow_feat after conv1:", flow_feat.shape)

        #print("Shape of fhigh before conv2:", fhigh.shape)
        fhigh_feat = self.conv2(fhigh)
        #print("Shape of fhigh_feat after conv2:", fhigh_feat.shape)

        # Resize flow_feat to match the spatial dimensions of fhigh_feat
        flow_feat = F.interpolate(flow_feat, size=fhigh_feat.shape[2:], mode='bilinear', align_corners=True)
        fused_features = torch.cat((flow_feat, fhigh_feat), dim=1)
        #print("Shape after concatenation (fused_features):", fused_features.shape)
        return fused_features

# Orthogonal Layer
class OrthogonalLayer(nn.Module):
    def forward(self, points):
        x1, y1, x2, y2, x3, y3, x4, y4 = points[:, 0], points[:, 1], points[:, 2], points[:, 3], points[:, 4], points[:, 5], points[:, 6], points[:, 7]
        s = -(x1 - x2) / (y1 - y2 + 1e-6)
        y4_pred = s * (x4 - x3) + y3
        y4_corrected = points.clone()
        y4_corrected[:, 7] = y4_pred
        return y4_corrected

# Complete Model
class RVTModelWithEfficientNet(nn.Module):
    def __init__(self):
        super(RVTModelWithEfficientNet, self).__init__()
        self.encoder = EfficientNetEncoder()
        self.ffm = FeatureFusionModule()
        self.pool = nn.AdaptiveAvgPool2d((8, 8))  # Output shape (32, 8, 8)
        self.orthogonal_layer = OrthogonalLayer()
        self.fc = nn.Linear(32 * 8 * 8, 12)  # Changed output to 12 to match classifier input
        self.classifier = nn.Linear(12, 3)

    def forward(self, x):
        flow, fhigh = self.encoder(x)
        fused_features = self.ffm(flow, fhigh)
        pooled_features = self.pool(fused_features)
        pooled_features = pooled_features.view(pooled_features.size(0), -1)
        keypoints = self.fc(pooled_features).view(-1, 12)  # Changed to match classifier
        keypoints_corrected = self.orthogonal_layer(keypoints)
        vhs_class = self.classifier(keypoints_corrected)
        return keypoints_corrected, vhs_class

# Custom Loss Function
class RVTLoss(nn.Module):
    def __init__(self, gamma=1.0):
        super(RVTLoss, self).__init__()
        self.ce_loss = nn.CrossEntropyLoss()
        self.mse_loss = nn.MSELoss()
        self.gamma = gamma

    def forward(self, predicted_keypoints, true_keypoints, predicted_class, true_class):
        l_ce = self.ce_loss(predicted_class, true_class)
        l_mse = self.mse_loss(predicted_keypoints, true_keypoints)
        loss = l_ce + self.gamma * l_mse
        return loss

# Instantiate model and loss function
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = RVTModelWithEfficientNet().to(device)
criterion = RVTLoss(gamma=1.0).to(device)
optimizer = optim.Adam(model.parameters(), lr=0.01)




# Training and Evaluation Functions
def train(model, train_loader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0

    for images, labels in train_loader:
        images = images.to(device)
        keypoints_target = labels['keypoints_output'].to(device)
        vhs_class_target = labels['vhs_class_output'].to(device)

        # Forward pass
        keypoints_pred, vhs_class_pred = model(images)

        # Calculate loss
        loss = criterion(keypoints_pred, keypoints_target, vhs_class_pred, vhs_class_target)

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * images.size(0)

    epoch_loss = running_loss / len(train_loader.dataset)
    return epoch_loss

def evaluate(model, val_loader, criterion, device):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():
        for images, labels in val_loader:
            images = images.to(device)
            keypoints_target = labels['keypoints_output'].to(device)
            vhs_class_target = labels['vhs_class_output'].to(device)

            # Forward pass
            keypoints_pred, vhs_class_pred = model(images)

            # Calculate loss
            loss = criterion(keypoints_pred, keypoints_target, vhs_class_pred, vhs_class_target)
            running_loss += loss.item() * images.size(0)

            # Calculate accuracy for VHS class
            _, predicted_classes = torch.max(vhs_class_pred, 1)
            correct += (predicted_classes == vhs_class_target).sum().item()
            total += vhs_class_target.size(0)

    epoch_loss = running_loss / len(val_loader.dataset)
    accuracy = correct / total
    return epoch_loss, accuracy

# Training Loop
num_epochs = 200
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

for epoch in range(num_epochs):
    train_loss = train(model, train_loader, criterion, optimizer, device)
    val_loss, val_accuracy = evaluate(model, val_loader, criterion, device)

    print(f"Epoch [{epoch+1}/{num_epochs}] - "
          f"Train Loss: {train_loss:.4f} - "
          f"Validation Loss: {val_loss:.4f} - "
          f"Validation Accuracy: {val_accuracy:.4f}")


using PVT encoder

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

# Custom Model Components
class PVTEncoder(nn.Module):
    def __init__(self, in_channels=3):
        super(PVTEncoder, self).__init__()
        self.layer1 = nn.Conv2d(in_channels, 64, kernel_size=3, stride=2, padding=1)
        self.layer2 = nn.Conv2d(64, 128, kernel_size=3, stride=2, padding=1)
        self.layer3 = nn.Conv2d(128, 256, kernel_size=3, stride=2, padding=1)
        self.layer4 = nn.Conv2d(256, 512, kernel_size=3, stride=2, padding=1)

    def forward(self, x):
        flow = self.layer1(x)  # Low-level feature
        x = self.layer2(flow)
        x = self.layer3(x)
        fhigh = self.layer4(x)  # High-level feature
        return flow, fhigh

class FeatureFusionModule(nn.Module):
    def __init__(self):
        super(FeatureFusionModule, self).__init__()
        self.conv1 = nn.Conv2d(64, 32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(512, 32, kernel_size=3, padding=1)
        self.adaptive_pool = nn.AdaptiveAvgPool2d((16, 16))

    def forward(self, flow, fhigh):
        print("Shape of flow before conv1:", flow.shape)
        flow_feat = self.conv1(flow)
        print("Shape of flow_feat after conv1:", flow_feat.shape)

        print("Shape of fhigh before conv2:", fhigh.shape)
        fhigh_feat = self.conv2(fhigh)
        print("Shape of fhigh_feat after conv2:", fhigh_feat.shape)

        flow_feat = self.adaptive_pool(flow_feat)
        fhigh_feat = self.adaptive_pool(fhigh_feat)

        if flow_feat.size() != fhigh_feat.size():
            fhigh_feat = F.interpolate(fhigh_feat, size=flow_feat.shape[2:], mode='bilinear', align_corners=False)

        fused_features = torch.cat((flow_feat, fhigh_feat), dim=1)
        return fused_features

class OrthogonalLayer(nn.Module):
    def forward(self, points):
        x1, y1, x2, y2, x3, y3, x4, y4 = points[:, 0], points[:, 1], points[:, 2], points[:, 3], points[:, 4], points[:, 5], points[:, 6], points[:, 7]
        s = -(x1 - x2) / (y1 - y2 + 1e-6)
        y4_pred = s * (x4 - x3) + y3
        y4_corrected = points.clone()
        y4_corrected[:, 7] = y4_pred
        return y4_corrected

class RVTModel(nn.Module):
    def __init__(self):
        super(RVTModel, self).__init__()
        self.encoder = PVTEncoder()
        self.ffm = FeatureFusionModule()
        self.orthogonal_layer = OrthogonalLayer()
        self.fc = nn.Linear(64 * 16 * 16, 12)
        self.classifier = nn.Linear(8, 3)

    def forward(self, x):
        flow, fhigh = self.encoder(x)
        fused_features = self.ffm(flow, fhigh)
        print("Shape of fusedfeatures1:", fused_features.shape)
        fused_features = fused_features.view(fused_features.size(0), -1)
        print("Shape of fusedfeatures2:", fused_features.shape)
        keypoints = self.fc(fused_features).view(-1, 8)
        keypoints_corrected = self.orthogonal_layer(keypoints)
        vhs_class = self.classifier(keypoints_corrected)
        return keypoints_corrected, vhs_class

class RVTLoss(nn.Module):
    def __init__(self, gamma=1.0):
        super(RVTLoss, self).__init__()
        self.ce_loss = nn.CrossEntropyLoss()
        self.mse_loss = nn.MSELoss()
        self.gamma = gamma

    def forward(self, predicted_keypoints, true_keypoints, predicted_class, true_class):
        print(f"Shape of predicted_class: {predicted_class.shape}, Shape of true_class: {true_class.shape}")
        print(f"Shape of predicted_keypoints: {predicted_keypoints.shape}, Shape of true_keypoints: {true_keypoints.shape}")

        l_ce = self.ce_loss(predicted_class, true_class)
        l_mse = self.mse_loss(predicted_keypoints, true_keypoints)
        loss = l_ce + self.gamma * l_mse
        return loss

# Instantiate model and loss
model = RVTModel()
criterion = RVTLoss(gamma=1.0)
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training and Evaluation Functions
def train(model, train_loader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0

    for images, labels in train_loader:
        images = images.to(device)
        keypoints_target = labels['keypoints_output'].to(device)
        vhs_class_target = labels['vhs_class_output'].to(device)

        # Forward pass
        keypoints_pred, vhs_class_pred = model(images)

        # Calculate loss
        loss = criterion(keypoints_pred, keypoints_target, vhs_class_pred, vhs_class_target)

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * images.size(0)

    epoch_loss = running_loss / len(train_loader.dataset)
    return epoch_loss

def evaluate(model, val_loader, criterion, device):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():
        for images, labels in val_loader:
            images = images.to(device)
            keypoints_target = labels['keypoints_output'].to(device)
            vhs_class_target = labels['vhs_class_output'].to(device)

            # Forward pass
            keypoints_pred, vhs_class_pred = model(images)

            # Calculate loss
            loss = criterion(keypoints_pred, keypoints_target, vhs_class_pred, vhs_class_target)
            running_loss += loss.item() * images.size(0)

            # Calculate accuracy for VHS class
            _, predicted_classes = torch.max(vhs_class_pred, 1)
            correct += (predicted_classes == vhs_class_target).sum().item()
            total += vhs_class_target.size(0)

    epoch_loss = running_loss / len(val_loader.dataset)
    accuracy = correct / total
    return epoch_loss, accuracy

# Training Loop
num_epochs = 200
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

for epoch in range(num_epochs):
    train_loss = train(model, train_loader, criterion, optimizer, device)
    val_loss, val_accuracy = evaluate(model, val_loader, criterion, device)

    print(f"Epoch [{epoch+1}/{num_epochs}] - "
          f"Train Loss: {train_loss:.4f} - "
          f"Validation Loss: {val_loss:.4f} - "
          f"Validation Accuracy: {val_accuracy:.4f}")


In [None]:
# Check one batch of data in the train_loader
for i, (images, labels) in enumerate(train_loader):
    if i == 0:  # Only look at the first batch
        print("Batch #1:")

        # Print image shape (should be [batch_size, channels, height, width])
        print(f"Images shape: {images.shape}")

        # Print the labels (keypoints and class)
        print(f"Keypoints shape: {labels['keypoints_output'].shape}")
        print(f"VHS class shape: {labels['vhs_class_output'].shape}")

        # Show keypoints and corresponding VHS class for the first image in the batch
        print("First Image Keypoints:", labels['keypoints_output'][0].numpy())
        print("First Image VHS Class:", labels['vhs_class_output'][0].item())
        break  # Exit the loop after inspecting the first batch


In [None]:
import torch
import torch.nn as nn
import torchvision.models as models

# Step 1: Load a pre-trained ResNet model (ResNet18 here for simplicity)
resnet = models.resnet18(pretrained=True)

# Modify the last fully connected layer to fit the desired output (feature maps)
# ResNet outputs a 512x7x7 feature map, which we will use
resnet.fc = nn.Identity()  # We don't need the final classification layer, just the feature maps

# Example of how the CNN backbone processes an image
class CNNBackbone(nn.Module):
    def __init__(self, pretrained_model=resnet):
        super(CNNBackbone, self).__init__()
        self.cnn = pretrained_model

    def forward(self, x):
        # Forward pass through the CNN (ResNet in our case)
        return self.cnn(x)

# Initialize the CNN model
cnn_backbone = CNNBackbone()

In [None]:
from torch.nn import TransformerEncoder, TransformerEncoderLayer

class FeatureTransformer(nn.Module):
    def __init__(self, feature_size=512, num_heads=8, num_layers=6):
        super(FeatureTransformer, self).__init__()

        self.flatten = nn.Flatten(start_dim=1)
        self.transformer_layer = TransformerEncoderLayer(d_model=feature_size, nhead=num_heads)
        self.transformer = TransformerEncoder(self.transformer_layer, num_layers=num_layers)

    def forward(self, x):
        # Flatten the input feature maps to sequence
        x = self.flatten(x)

        # Pass through transformer
        x = self.transformer(x)
        return x

# Example: Initialize the transformer block
transformer_block = FeatureTransformer()


In [None]:
class CNNTransformerModel(nn.Module):
    def __init__(self, cnn_backbone, transformer_block, num_keypoints=6):
        super(CNNTransformerModel, self).__init__()
        self.cnn_backbone = cnn_backbone
        self.transformer_block = transformer_block

        # Fully connected layers for keypoint prediction (12 coordinates)
        self.keypoint_fc = nn.Linear(512, num_keypoints * 2)  # 6 keypoints, 2 coords each

        # Fully connected layer for classification (VHS class)
        self.classification_fc = nn.Linear(512, 3)  # 3 classes for VHS

    def forward(self, x):
        # Step 1: Get features from the CNN backbone (ResNet)
        cnn_features = self.cnn_backbone(x)  # (batch_size, 512, 7, 7)

        # Step 2: Pass through the transformer for global context
        transformer_output = self.transformer_block(cnn_features)  # (batch_size, seq_len, feature_size)

        # Step 3: Predict the keypoints (12 coordinates)
        keypoints = self.keypoint_fc(transformer_output)  # (batch_size, 12)

        # Step 4: Predict the VHS class
        class_prediction = self.classification_fc(transformer_output.mean(dim=1))  # (batch_size, 3)

        return keypoints, class_prediction


In [None]:
import torch.optim as optim
import torch.nn.functional as F

def vhs_loss(keypoints_pred, keypoints_true, class_pred, class_true):
    # Keypoint loss (MSE)
    keypoint_loss = F.mse_loss(keypoints_pred, keypoints_true)

    # Classification loss (Cross-Entropy)
    classification_loss = F.cross_entropy(class_pred, class_true)

    # Total loss
    total_loss = keypoint_loss + classification_loss
    return total_loss

# Example: Optimizer
model = CNNTransformerModel(cnn_backbone, transformer_block)
optimizer = optim.Adam(model.parameters(), lr=0.001)


In [None]:
import torch

def compute_vhs(keypoints):
    # Compute distances between keypoints
    AB = torch.dist(keypoints[:, 0], keypoints[:, 1])  # AB distance
    CD = torch.dist(keypoints[:, 2], keypoints[:, 3])  # CD distance
    EF = torch.dist(keypoints[:, 4], keypoints[:, 5])  # EF distance

    # Compute VHS
    vhs = 6 * (AB + CD) / EF
    return vhs

def compute_class(vhs_values):
    # Compute class based on VHS value
    vhs_class = torch.where(vhs_values < 8.2, torch.tensor(0),
                            torch.where(vhs_values <= 10, torch.tensor(1), torch.tensor(2)))
    return vhs_class


In [None]:
def train(model, train_loader, optimizer):
    model.train()
    running_loss = 0.0
    for data in train_loader:
        images, labels = data  # Images and labels dictionary

        # Access keypoints and class from the labels dictionary
        keypoints_true = labels['keypoints_output']
        class_true = labels['vhs_class_output']

        optimizer.zero_grad()

        # Forward pass
        keypoints_pred, class_pred = model(images)

        # Compute loss
        loss = vhs_loss(keypoints_pred, keypoints_true, class_pred, class_true)

        # Backward pass
        loss.backward()

        # Update weights
        optimizer.step()

        running_loss += loss.item()

    return running_loss / len(train_loader)


In [None]:
def evaluate(model, val_loader):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for data in val_loader:
            images, keypoints_true, class_true = data

            keypoints_pred, class_pred = model(images)

            # Compute VHS and class prediction
            vhs_values = compute_vhs(keypoints_pred)
            predicted_class = compute_class(vhs_values)

            # Compute accuracy
            correct += (predicted_class == class_true).sum().item()
            total += class_true.size(0)

    accuracy = 100 * correct / total
    return accuracy


In [None]:
# Train and evaluate for a number of epochs
num_epochs = 10
for epoch in range(num_epochs):
    print(f"Epoch {epoch + 1}/{num_epochs}")

    # Train the model
    train_loss = train(model, train_loader, optimizer)
    print(f"Training Loss: {train_loss:.4f}")

    # Evaluate the model
    val_loss, val_accuracy = evaluate(model, val_loader)
    print(f"Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.2f}%")


In [None]:
import torch
import torch.nn as nn
import torchvision.models as models

# Step 1: Load a pre-trained ResNet model (ResNet18 here for simplicity)
resnet = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)  # Updated for the latest API

# Modify the last fully connected layer to fit the desired output (feature maps)
resnet.fc = nn.Identity()  # We don't need the final classification layer, just the feature maps

# Example of how the CNN backbone processes an image
class CNNBackbone(nn.Module):
    def __init__(self, pretrained_model=resnet):
        super(CNNBackbone, self).__init__()
        self.cnn = pretrained_model

    def forward(self, x):
        # Forward pass through the CNN (ResNet in our case)
        return self.cnn(x)

# Initialize the CNN model
cnn_backbone = CNNBackbone()

from torch.nn import TransformerEncoder, TransformerEncoderLayer

class FeatureTransformer(nn.Module):
    def __init__(self, feature_size=512, num_heads=8, num_layers=6):
        super(FeatureTransformer, self).__init__()

        self.flatten = nn.Flatten(start_dim=2)  # This will work if tensor has shape (batch_size, 512, 7, 7)
        self.transformer_layer = TransformerEncoderLayer(d_model=feature_size, nhead=num_heads)
        self.transformer = TransformerEncoder(self.transformer_layer, num_layers=num_layers)

    def forward(self, x):
        # Flatten the input feature maps (batch_size, 512, 7, 7) -> (batch_size, 49, 512)
        x = x.view(x.size(0), 512, -1).transpose(1, 2)  # Reshape to (batch_size, 49, 512)

        # Pass through transformer
        x = self.transformer(x)
        return x

# Example: Initialize the transformer block
transformer_block = FeatureTransformer()

class CNNTransformerModel(nn.Module):
    def __init__(self, cnn_backbone, transformer_block, num_keypoints=6):
        super(CNNTransformerModel, self).__init__()
        self.cnn_backbone = cnn_backbone
        self.transformer_block = transformer_block

        # Fully connected layers for keypoint prediction (12 coordinates)
        self.keypoint_fc = nn.Linear(512, num_keypoints * 2)  # 6 keypoints, 2 coords each

        # Fully connected layer for classification (VHS class)
        self.classification_fc = nn.Linear(512, 3)  # 3 classes for VHS

    def forward(self, x):
        # Step 1: Get features from the CNN backbone (ResNet)
        cnn_features = self.cnn_backbone(x)  # (batch_size, 512, 7, 7)

        # Step 2: Pass through the transformer for global context
        transformer_output = self.transformer_block(cnn_features)  # (batch_size, seq_len, feature_size)

        # Step 3: Predict the keypoints (12 coordinates)
        # Use the mean of the transformer output for keypoint prediction
        keypoints = self.keypoint_fc(transformer_output.mean(dim=1))  # (batch_size, 12)

        # Step 4: Predict the VHS class
        class_prediction = self.classification_fc(transformer_output.mean(dim=1))  # (batch_size, 3)

        return keypoints, class_prediction

import torch.optim as optim
import torch.nn.functional as F

def vhs_loss(keypoints_pred, keypoints_true, class_pred, class_true):
    # Keypoint loss (MSE)
    keypoint_loss = F.mse_loss(keypoints_pred, keypoints_true)

    # Classification loss (Cross-Entropy)
    classification_loss = F.cross_entropy(class_pred, class_true)

    # Total loss
    total_loss = keypoint_loss + classification_loss
    return total_loss

# Example: Optimizer
model = CNNTransformerModel(cnn_backbone, transformer_block)
optimizer = optim.Adam(model.parameters(), lr=0.001)

import torch

def compute_vhs(keypoints):
    # Compute distances between keypoints
    AB = torch.dist(keypoints[:, 0], keypoints[:, 1])  # AB distance
    CD = torch.dist(keypoints[:, 2], keypoints[:, 3])  # CD distance
    EF = torch.dist(keypoints[:, 4], keypoints[:, 5])  # EF distance

    # Compute VHS
    vhs = 6 * (AB + CD) / EF
    return vhs

def compute_class(vhs_values):
    # Compute class based on VHS value
    vhs_class = torch.where(vhs_values < 8.2, torch.tensor(0),
                            torch.where(vhs_values <= 10, torch.tensor(1), torch.tensor(2)))
    return vhs_class

def train(model, train_loader, optimizer):
    model.train()
    running_loss = 0.0
    for data in train_loader:
        images, labels = data  # Images and labels dictionary

        # Access keypoints and class from the labels dictionary
        keypoints_true = labels['keypoints_output']
        class_true = labels['vhs_class_output']

        optimizer.zero_grad()

        # Forward pass
        keypoints_pred, class_pred = model(images)

        # Compute loss
        loss = vhs_loss(keypoints_pred, keypoints_true, class_pred, class_true)

        # Backward pass
        loss.backward()

        # Update weights
        optimizer.step()

        running_loss += loss.item()

    return running_loss / len(train_loader)

def evaluate(model, val_loader):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for data in val_loader:
            images, labels = data  # Images and labels dictionary

            # Access keypoints and class from the labels dictionary
            keypoints_true = labels['keypoints_output']
            class_true = labels['vhs_class_output']

            keypoints_pred, class_pred = model(images)

            # Compute VHS and class prediction
            vhs_values = compute_vhs(keypoints_pred)
            predicted_class = compute_class(vhs_values)

            # Compute accuracy
            correct += (predicted_class == class_true).sum().item()
            total += class_true.size(0)

    accuracy = 100 * correct / total
    return accuracy

# Train and evaluate for a number of epochs
num_epochs = 10
for epoch in range(num_epochs):
    print(f"Epoch {epoch + 1}/{num_epochs}")

    # Train the model
    train_loss = train(model, train_loader, optimizer)
    print(f"Training Loss: {train_loss:.4f}")

    # Evaluate the model
    val_accuracy = evaluate(model, val_loader)
    print(f"Validation Accuracy: {val_accuracy:.2f}%")


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class SRAAttention(nn.Module):
    def __init__(self, Ci, Ri, dhead):
        super(SRAAttention, self).__init__()
        self.Ri = Ri
        self.WQ = nn.Linear(10752, 256)
        self.WK = nn.Linear(10752, 256)
        self.WV = nn.Linear(10752, 256)
        self.WO = nn.Linear(dhead, Ci)

    def forward(self, x):
        print("Shape before WQ:", x.shape)
        Q = self.WQ(x)

        K = self.WK(x)
        V = self.WV(x)

        # Apply Spatial Reduction
        K = self.spatial_reduction(K)
        V = self.spatial_reduction(V)

        # Attention calculation
        attention = torch.matmul(Q, K.transpose(-2, -1)) / (Q.size(-1) ** 0.5)
        attention = F.softmax(attention, dim=-1)

        head = torch.matmul(attention, V)
        output = self.WO(head)
        return output

    def spatial_reduction(self, x):
        # Apply spatial reduction based on Ri
        return x.view(x.size(0), self.Ri, -1).mean(dim=1)


In [None]:
class FeatureFusionModule(nn.Module):
    def __init__(self):
        super(FeatureFusionModule, self).__init__()
        self.Conv1 = nn.Conv2d(in_channels=256, out_channels=16, kernel_size=(1, 16), stride=96, padding=1)
        self.Conv2 = nn.Conv2d(in_channels=256, out_channels=16, kernel_size=(1, 16), stride=16, dilation=9, padding=1)

    def forward(self, low_level_features, high_level_features):
        f1 = self.Conv1(low_level_features)
        f2 = self.Conv2(high_level_features)
        fused_features = f1 * f2  # Element-wise multiplication for feature fusion
        return fused_features


In [None]:
class OrthogonalLayer(nn.Module):
    def __init__(self):
        super(OrthogonalLayer, self).__init__()

    def forward(self, points):
        # Given points (A, B, C, D), ensure perpendicularity of AB and CD.
        x1, y1, x2, y2, x3, y3, x4, y4 = points
        # Calculate slope of line CD
        s = (y4 - y3) / (x4 - x3)
        # Replace y4 based on the slope
        y4_new = y1 + s * (x4 - x1)
        return torch.cat([points[:7], y4_new.unsqueeze(1)], dim=-1)  # Update y4 to ensure perpendicularity


In [None]:
class RVTModel(nn.Module):
    def __init__(self):
        super(RVTModel, self).__init__()
        # Define layers (e.g., PVT transformer, FFM, Orthogonal layer)
        self.pvt_transformer = SRAAttention(Ci=256, Ri=8, dhead=64)  # Example configuration
        self.ffm = FeatureFusionModule()
        self.orthogonal_layer = OrthogonalLayer()
        self.fc = nn.Linear(256, 12)  # Final fully connected layer for 6 keypoints

    def forward(self, x):
        # Extract low-level and high-level features using PVT
        low_level_features, high_level_features = self.pvt_transformer(x)

        # Fuse features using FFM
        fused_features = self.ffm(low_level_features, high_level_features)

        # Predict keypoints
        keypoints = self.fc(fused_features)

        # Apply orthogonal layer
        keypoints = self.orthogonal_layer(keypoints)
        return keypoints


In [None]:
def loss_function(pred_keypoints, true_keypoints, vhs_class_pred, vhs_class_true, gamma=1.0):
    # MSE loss for keypoints
    mse_loss = F.mse_loss(pred_keypoints, true_keypoints)

    # Cross-entropy loss for VHS classification
    ce_loss = F.cross_entropy(vhs_class_pred, vhs_class_true)

    # Total loss
    total_loss = ce_loss + gamma * mse_loss
    return total_loss


In [None]:
def train(model, train_loader, optimizer, gamma=1.0):
    model.train()
    total_loss = 0
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    for images, labels in train_loader:
        images = images.to(device)
        keypoints_true = labels['keypoints_output'].to(device)
        vhs_class_true = labels['vhs_class_output'].to(device)

        optimizer.zero_grad()
        keypoints_pred = model(images)

        loss = loss_function(keypoints_pred, keypoints_true, vhs_class_pred, vhs_class_true, gamma)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    return total_loss / len(train_loader)

def evaluate(model, val_loader):
    model.eval()  # Set the model to evaluation mode
    total_loss = 0
    total_correct = 0
    total_samples = 0
    with torch.no_grad():  # Disable gradient computation
        for images, labels in val_loader:
            images = images.to(device)
            keypoints_true = labels['keypoints_output'].to(device)
            vhs_class_true = labels['vhs_class_output'].to(device)

            # Make predictions
            keypoints_pred = model(images)

            # Calculate the loss
            loss = loss_function(keypoints_pred, keypoints_true, vhs_class_pred, vhs_class_true)
            total_loss += loss.item()

            # Calculate classification accuracy (for VHS classification)
            _, predicted_class = vhs_class_pred.max(1)  # Get the class with the highest probability
            total_correct += (predicted_class == vhs_class_true).sum().item()
            total_samples += vhs_class_true.size(0)

    # Calculate average loss
    avg_loss = total_loss / len(val_loader)
    # Calculate accuracy for VHS classification
    accuracy = 100 * total_correct / total_samples

    return avg_loss, accuracy


In [None]:
# Check if CUDA (GPU) is available, otherwise use CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Instantiate the model and optimizer
model = RVTModel().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

num_epochs = 10
for epoch in range(num_epochs):
    model.train()  # Set the model to training mode
    train_loss = train(model, train_loader, optimizer)
    val_loss, val_accuracy = evaluate(model, val_loader)  # Evaluate on validation set

    print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.2f}%")

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class SRAAttention(nn.Module):
    def __init__(self, flattened_size, dhead):
        super(SRAAttention, self).__init__()
        self.WQ = nn.Linear(flattened_size, dhead)
        self.WK = nn.Linear(flattened_size, dhead)
        self.WV = nn.Linear(flattened_size, dhead)
        self.WO = nn.Linear(dhead, flattened_size)

    def forward(self, x):
        B, C, H, W = x.size()
        x = x.view(B, C, H * W)  # Flatten spatial dimensions (height, width)
        Q = self.WQ(x)
        K = self.WK(x)
        V = self.WV(x)

        attention = torch.matmul(Q, K.transpose(-2, -1)) / (Q.size(-1) ** 0.5)
        attention = F.softmax(attention, dim=-1)
        head = torch.matmul(attention, V)

        output = self.WO(head)
        return output.view(B, C, H, W)  # Reshape back to original spatial dimensions

class FeatureFusionModule(nn.Module):
    def __init__(self):
        super(FeatureFusionModule, self).__init__()
        self.Conv1 = nn.Conv2d(in_channels=256, out_channels=16, kernel_size=(1, 16), stride=1, padding=1)
        self.Conv2 = nn.Conv2d(in_channels=256, out_channels=16, kernel_size=(1, 16), stride=1, padding=1)

    def forward(self, low_level_features, high_level_features):
        f1 = self.Conv1(low_level_features)
        f2 = self.Conv2(high_level_features)
        fused_features = f1 * f2  # Element-wise multiplication for feature fusion
        return fused_features

class OrthogonalLayer(nn.Module):
    def __init__(self):
        super(OrthogonalLayer, self).__init__()

    def forward(self, points):
        x1, y1, x2, y2, x3, y3, x4, y4 = points
        s = (y4 - y3) / (x4 - x3)
        y4_new = y1 + s * (x4 - x1)
        return torch.cat([points[:7], y4_new.unsqueeze(1)], dim=-1)

class RVTModel(nn.Module):
    def __init__(self):
        super(RVTModel, self).__init__()
        self.input_layer = nn.Conv2d(in_channels=3, out_channels=256, kernel_size=3, stride=1, padding=1)
        self.pool = nn.AvgPool2d(kernel_size=8, stride=8)  # Reduce dimensions before attention
        self.flattened_size = 256 * 28 * 28  # Calculate flattened size after pooling
        self.pvt_transformer = SRAAttention(flattened_size=self.flattened_size, dhead=256)
        self.ffm = FeatureFusionModule()
        self.orthogonal_layer = OrthogonalLayer()

        # Update fully connected layer to match the flattened size after pooling
        self.fc = nn.Linear(self.flattened_size, 12)  # Output layer size

    def forward(self, x):
        x = self.input_layer(x)
        print("Shape after input_layer:", x.shape)  # Debug print
        x = self.pool(x)
        print("Shape after pooling:", x.shape)  # Debug print

        high_level_features = self.pvt_transformer(x)
        print("Shape after attention:", high_level_features.shape)  # Debug print
        low_level_features = F.interpolate(x, scale_factor=0.5)
        print("Shape of low_level_features:", low_level_features.shape)  # Debug print

        fused_features = self.ffm(low_level_features, high_level_features)
        print("Shape after feature fusion:", fused_features.shape)  # Debug print
        fused_features = fused_features.view(fused_features.size(0), -1)  # Flatten for fc layer
        print("Shape after flattening for fc:", fused_features.shape)  # Debug print
        keypoints = self.fc(fused_features)

        keypoints = self.orthogonal_layer(keypoints)
        return keypoints

def loss_function(pred_keypoints, true_keypoints, vhs_class_pred, vhs_class_true, gamma=1.0):
    mse_loss = F.mse_loss(pred_keypoints, true_keypoints)
    ce_loss = F.cross_entropy(vhs_class_pred, vhs_class_true)
    total_loss = ce_loss + gamma * mse_loss
    return total_loss

def train(model, train_loader, optimizer, gamma=1.0):
    model.train()
    total_loss = 0
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    for images, labels in train_loader:
        images = images.to(device)
        keypoints_true = labels['keypoints_output'].to(device)
        vhs_class_true = labels['vhs_class_output'].to(device)

        optimizer.zero_grad()
        keypoints_pred = model(images)

        loss = loss_function(keypoints_pred, keypoints_true, vhs_class_pred, vhs_class_true, gamma)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    return total_loss / len(train_loader)

def evaluate(model, val_loader):
    model.eval()
    total_loss = 0
    total_correct = 0
    total_samples = 0
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    with torch.no_grad():
        for images, labels in val_loader:
            images = images.to(device)
            keypoints_true = labels['keypoints_output'].to(device)
            vhs_class_true = labels['vhs_class_output'].to(device)

            keypoints_pred = model(images)
            loss = loss_function(keypoints_pred, keypoints_true, vhs_class_pred, vhs_class_true)
            total_loss += loss.item()

            _, predicted_class = vhs_class_pred.max(1)
            total_correct += (predicted_class == vhs_class_true).sum().item()
            total_samples += vhs_class_true.size(0)

    avg_loss = total_loss / len(val_loader)
    accuracy = 100 * total_correct / total_samples
    return avg_loss, accuracy

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = RVTModel().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

num_epochs = 10
for epoch in range(num_epochs):
    model.train()
    train_loss = train(model, train_loader, optimizer)
    val_loss, val_accuracy = evaluate(model, val_loader)
    print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.2f}%")


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import torch.cuda.amp as amp  # For mixed precision
from pathlib import Path
import sys
sys.path.append('./yolov7')  # Add yolov7 directory to the path

# Import YOLOv7 model components
from models.experimental import attempt_load  # YOLOv7 model loader

# Define the model class
class KeypointVHS_Model(nn.Module):
    def __init__(self, num_keypoints=6, num_classes=3, device='cuda'):
        super(KeypointVHS_Model, self).__init__()

        # Load YOLOv7 as the first stage (feature extractor)
        yolov7_weights_path = '/content/drive/MyDrive/YOLOv7/yolov7.pt'

        if Path(yolov7_weights_path).exists():
            yolov7_model = torch.load(yolov7_weights_path, map_location=device)  # Load the model weights directly
            self.yolov7 = yolov7_model['model'] if 'model' in yolov7_model else yolov7_model  # Handle model structure
        else:
            raise FileNotFoundError(f"YOLOv7 weights file not found at {yolov7_weights_path}")

        self.yolov7.eval()  # Set YOLOv7 to eval mode for feature extraction only

        # Freeze YOLOv7 weights initially
        for param in self.yolov7.parameters():
            param.requires_grad = False

        # Define output heads for keypoints and VHS classification
        self.keypoint_head = nn.Linear(1024, num_keypoints * 2).to(device)  # 12 values for 6 points
        self.class_head = nn.Linear(1024, num_classes).to(device)

    def forward(self, x):
        x = x.to(torch.float32)  # Convert input to float32

        with torch.no_grad():  # Forward pass through YOLOv7 (first stage)
            yolov7_features = self.yolov7(x)[0]  # Get features from YOLOv7

        yolov7_features = [yolov7_features]  # Convert to list
        pooled_features = yolov7_features[0].mean(dim=1)  # Global average pooling

        keypoints = self.keypoint_head(pooled_features)
        vhs_class = self.class_head(pooled_features)

        return keypoints, vhs_class


# Loss functions
criterion_keypoints = nn.MSELoss()  # For keypoint regression
criterion_class = nn.CrossEntropyLoss()  # For VHS class classification

# Train the model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = KeypointVHS_Model(device=device)  # Ensure to initialize with device
model.to(device)

# Optimizer - Move this after model initialization
optimizer = optim.Adam(model.parameters(), lr=0.0001)

scaler = torch.cuda.amp.GradScaler()  # Gradient scaler for mixed precision

def train_model(model, train_loader, val_loader, epochs=150, unfreeze_epoch=100):
    model.train()

    for epoch in range(epochs):
        running_loss = 0.0
        correct_class = 0
        total_samples = 0

        for images, labels in train_loader:
            images = images.to(device)
            keypoints = labels['keypoints_output'].to(device)
            vhs_classes = labels['vhs_class_output'].to(device)

            optimizer.zero_grad()

            # Forward pass with mixed precision
            with amp.autocast('cuda'):  # Mixed precision context (with device specified)
                pred_keypoints, pred_class = model(images)  # Use model instance here, not class

                # Calculate losses
                loss_keypoints = criterion_keypoints(pred_keypoints, keypoints)
                loss_class = criterion_class(pred_class, vhs_classes)
                loss = loss_keypoints + loss_class  # Total loss

            # Backward pass and optimization with gradient scaling
            scaler.scale(loss).backward()  # Backprop with scaled gradients
            scaler.step(optimizer)  # Optimizer step
            scaler.update()  # Update the scaler

            running_loss += loss.item() * images.size(0)
            _, predicted_classes = torch.max(pred_class, 1)
            correct_class += (predicted_classes == vhs_classes).sum().item()
            total_samples += vhs_classes.size(0)

        # Calculate epoch loss and accuracy
        epoch_loss = running_loss / len(train_loader.dataset)
        epoch_accuracy = correct_class / total_samples
        print(f"Epoch [{epoch + 1}/{epochs}], Loss: {epoch_loss:.4f}, Accuracy: {epoch_accuracy:.4f}")

        if epoch == unfreeze_epoch - 1:
            for param in model.yolov7.parameters():
                param.requires_grad = True  # Unfreeze YOLOv7 layers
            print("Unfreezing YOLOv7 backbone layers.")

    print("Training Complete")

train_model(model, train_loader, val_loader)


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import torch.cuda.amp as amp  # For mixed precision
from pathlib import Path
import sys
sys.path.append('./yolov7')  # Add yolov7 directory to the path

# Import YOLOv7 model components
from models.experimental import attempt_load  # YOLOv7 model loader

# Define the model class
class KeypointVHS_Model(nn.Module):
    def __init__(self, num_keypoints=6, num_classes=3, device='cuda'):
        super(KeypointVHS_Model, self).__init__()

        # Load YOLOv7 as the first stage (feature extractor)
        yolov7_weights_path = '/content/drive/MyDrive/YOLOv7/yolov7.pt'

        if Path(yolov7_weights_path).exists():
            yolov7_model = torch.load(yolov7_weights_path, map_location=device)  # Load the model weights directly
            self.yolov7 = yolov7_model['model'] if 'model' in yolov7_model else yolov7_model  # Handle model structure
        else:
            raise FileNotFoundError(f"YOLOv7 weights file not found at {yolov7_weights_path}")

        self.yolov7.eval()  # Set YOLOv7 to eval mode for feature extraction only

        # Freeze YOLOv7 weights initially
        for param in self.yolov7.parameters():
            param.requires_grad = False

        # Define output heads for keypoints and VHS classification
        self.keypoint_head = nn.Linear(1024, num_keypoints * 2).to(device)  # 12 values for 6 points
        self.class_head = nn.Linear(1024, num_classes).to(device)

    def forward(self, x):
        x = x.to(torch.float32)  # Convert input to float32

        print(x.shape)

        with torch.no_grad():  # Forward pass through YOLOv7 (first stage)
            yolov7_features = self.yolov7(x)[0]  # Get features from YOLOv7

        yolov7_features = [yolov7_features]  # Convert to list
        pooled_features = yolov7_features[0].mean(dim=1)  # Global average pooling

        keypoints = self.keypoint_head(pooled_features)
        vhs_class = self.class_head(pooled_features)

        return keypoints, vhs_class


# Loss functions
criterion_keypoints = nn.MSELoss()  # For keypoint regression
criterion_class = nn.CrossEntropyLoss()  # For VHS class classification

# Optimizer
optimizer = optim.Adam(model.parameters(), lr=0.0001)

# Training loop
def train_model(model, train_loader, val_loader, epochs=150, unfreeze_epoch=100):
    model.train()
    scaler = amp.GradScaler()  # Gradient scaler for mixed precision

    for epoch in range(epochs):
        running_loss = 0.0
        correct_class = 0
        total_samples = 0

        for images, labels in train_loader:
            images = images.to(device)
            keypoints = labels['keypoints_output'].to(device)
            vhs_classes = labels['vhs_class_output'].to(device)
            print(keypoints)
            print(vhs_classes)
            optimizer.zero_grad()

            # Forward pass with mixed precision
            with amp.autocast(device_type='cuda'):  # Mixed precision context (with device specified)
                print('sara is here')
                pred_keypoints, pred_class = model(images)  # Use model instance here, not class
                print('sara is:', pred_keypoints)
                print(pred_class)
                # Calculate losses
                loss_keypoints = criterion_keypoints(pred_keypoints, keypoints)
                loss_class = criterion_class(pred_class, vhs_classes)
                loss = loss_keypoints + loss_class  # Total loss

            # Backward pass and optimization with gradient scaling
            scaler.scale(loss).backward()  # Backprop with scaled gradients
            scaler.step(optimizer)  # Optimizer step
            scaler.update()  # Update the scaler

            running_loss += loss.item() * images.size(0)
            _, predicted_classes = torch.max(pred_class, 1)
            correct_class += (predicted_classes == vhs_classes).sum().item()
            total_samples += vhs_classes.size(0)

        # Calculate epoch loss and accuracy
        epoch_loss = running_loss / len(train_loader.dataset)
        epoch_accuracy = correct_class / total_samples
        print(f"Epoch [{epoch + 1}/{epochs}], Loss: {epoch_loss:.4f}, Accuracy: {epoch_accuracy:.4f}")

        if epoch == unfreeze_epoch - 1:
            for param in model.yolov7.parameters():
                param.requires_grad = True  # Unfreeze YOLOv7 layers
            print("Unfreezing YOLOv7 backbone layers.")

    print("Training Complete")


# Train the model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = KeypointVHS_Model(device=device)  # Ensure to initialize with device
model.to(device)
scaler = torch.cuda.amp.GradScaler()  # Gradient scaler for mixed precision

train_model(model, train_loader, val_loader)


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms
from torchvision.models.detection import fasterrcnn_resnet50_fpn
from pathlib import Path
import sys
sys.path.append('./yolov7')  # Add yolov7 directory to the path

# Import YOLOv7 model components
from models.experimental import attempt_load  # YOLOv7 model loader

class MultiStageTransformerModel(nn.Module):
    def __init__(self, num_keypoints=6, num_classes=3):
        super(MultiStageTransformerModel, self).__init__()

        # Load YOLOv7 as the first transformer stage (feature extractor)
        yolov7_weights_path = '/content/drive/MyDrive/YOLOv7/yolov7.pt'

        # Directly load the YOLOv7 model using torch.load to avoid downloading
        if Path(yolov7_weights_path).exists():
            yolov7_model = torch.load(yolov7_weights_path, map_location='cpu')  # Load the model weights directly
            self.yolov7 = yolov7_model['model'] if 'model' in yolov7_model else yolov7_model  # Handle model structure
        else:
            raise FileNotFoundError(f"YOLOv7 weights file not found at {yolov7_weights_path}")

        self.yolov7.eval()  # Set YOLOv7 to eval mode for feature extraction only

        # Freeze YOLOv7 weights initially
        for param in self.yolov7.parameters():
            param.requires_grad = False

        # Load Faster R-CNN as the second transformer stage
        self.faster_rcnn = fasterrcnn_resnet50_fpn(pretrained=True)

        # Set hidden size based on the Faster R-CNN output size
        self.hidden_size = 1024  # Adjust based on Faster R-CNN’s output

        # Define output heads for keypoints and VHS classification
        self.keypoint_head = nn.Linear(self.hidden_size, num_keypoints * 2)  # 12 values for 6 points
        self.class_head = nn.Linear(self.hidden_size, num_classes)

    def forward(self, x):
        # Ensure input is in float32 for compatibility with mixed precision
        x = x.to(torch.float32)  # Convert input to float32

        with torch.no_grad():  # Forward pass through YOLOv7 (first stage)
            yolov7_features = self.yolov7(x)[0]  # Run YOLOv7 and get first output (for feature maps)

        # Forward pass through Faster R-CNN (second stage) using YOLOv7 features as input
        # Make sure the input is in the right format for Faster R-CNN (list of images)
        yolov7_features = [yolov7_features]  # Convert to list
        faster_rcnn_features = self.faster_rcnn(yolov7_features)  # Pass through Faster R-CNN

        # Use pooled features for final predictions
        pooled_features = faster_rcnn_features[0].mean(dim=1)  # Global average pooling (assuming feature map from Faster R-CNN)

        # Keypoint and class predictions
        keypoints = self.keypoint_head(pooled_features)
        vhs_class = self.class_head(pooled_features)

        return keypoints, vhs_class

# Instantiate the model
model = MultiStageTransformerModel(num_keypoints=6, num_classes=3)

# Define loss functions for both outputs
criterion_keypoints = nn.MSELoss()  # Mean Squared Error for keypoint regression
criterion_class = nn.CrossEntropyLoss()  # Cross-Entropy for classification

# Define optimizer
optimizer = optim.Adam(model.parameters(), lr=0.0001)

# Data augmentation transformation (for PIL Images or ndarrays)
data_augmentation = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(15),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
])

# Define training loop
def train_model(model, train_loader, val_loader, epochs=150, unfreeze_epoch=100):
    model.train()

    for epoch in range(epochs):
        running_loss = 0.0
        correct_class = 0
        total_samples = 0

        for images, labels in train_loader:
            # Apply data augmentation if images are PIL Images or tensors
            if isinstance(images, torch.Tensor):
                # Skip transformation if already a tensor
                images = data_augmentation(images)

            images = images.to(device)
            keypoints = labels['keypoints_output'].to(device)
            vhs_classes = labels['vhs_class_output'].to(device)

            optimizer.zero_grad()

            # Prepare targets for Faster R-CNN
            targets = []
            for i in range(len(images)):
                target = {}
                target['boxes'] = labels['keypoints_output'][i].to(device)  # bounding boxes
                target['labels'] = labels['vhs_class_output'][i].to(device)  # labels (class ids)
                targets.append(target)

            # Forward pass with autocast for mixed precision
            with torch.cuda.amp.autocast():  # Mixed precision context
                pred_keypoints, pred_class = model(images)

                # Calculate losses
                loss_keypoints = criterion_keypoints(pred_keypoints, keypoints)
                loss_class = criterion_class(pred_class, vhs_classes)
                loss = loss_keypoints + loss_class  # Combine losses

            # Backward pass and optimization with gradient scaling
            scaler.scale(loss).backward()  # Scale the loss before backward pass
            scaler.step(optimizer)  # Step the optimizer with scaled gradients
            scaler.update()  # Update the scaler after stepping the optimizer

            # Track metrics
            running_loss += loss.item() * images.size(0)
            _, predicted_classes = torch.max(pred_class, 1)
            correct_class += (predicted_classes == vhs_classes).sum().item()
            total_samples += vhs_classes.size(0)

        # Calculate epoch loss and accuracy
        epoch_loss = running_loss / len(train_loader.dataset)
        epoch_accuracy = correct_class / total_samples
        print(f"Epoch [{epoch + 1}/{epochs}], Loss: {epoch_loss:.4f}, Accuracy: {epoch_accuracy:.4f}")

        # Unfreeze YOLOv7 layers after the specified epoch
        if epoch == unfreeze_epoch - 1:  # After unfreeze_epoch epochs
            for param in model.yolov7.parameters():
                param.requires_grad = True  # Unfreeze all layers in YOLOv7
            print("Unfreezing YOLOv7 backbone layers.")

    print("Training Complete")

# Train the model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
scaler = torch.cuda.amp.GradScaler()  # Gradient scaler for mixed precision

train_model(model, train_loader, val_loader)


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms
from transformers import SwinForImageClassification

class MultiOutputModel(nn.Module):
    def __init__(self, num_keypoints=6, num_classes=3):
        super(MultiOutputModel, self).__init__()

        # Load Swin Transformer as backbone (without classification head)
        self.backbone = SwinForImageClassification.from_pretrained(
            "microsoft/swin-base-patch4-window7-224",
            num_labels=0  # Remove head
        )
        self.backbone.config.output_hidden_states = True

        # Freeze transformer weights initially
        for param in self.backbone.parameters():
            param.requires_grad = False

        # Set the output size from the backbone
        self.hidden_size = self.backbone.config.hidden_size
        self.keypoint_head = nn.Linear(self.hidden_size, num_keypoints * 2)  # 12 values for 6 points
        self.class_head = nn.Linear(self.hidden_size, num_classes)

    def forward(self, x):
        outputs = self.backbone(x)  # Forward pass through the backbone

        # Get hidden states
        hidden_states = outputs.hidden_states[-1]  # Get the last hidden state

        # Use global average pooling
        pooled_features = hidden_states.mean(dim=1)  # Average over the sequence length (49 in your case)

        # Predict keypoints (regression) and class (classification)
        keypoints = self.keypoint_head(pooled_features)
        vhs_class = self.class_head(pooled_features)  # Use pooled features for class prediction

        return keypoints, vhs_class

# Instantiate the model
model = MultiOutputModel(num_keypoints=6, num_classes=3)

# Define loss functions for both outputs
criterion_keypoints = nn.MSELoss()  # Mean Squared Error for keypoint regression
criterion_class = nn.CrossEntropyLoss()  # Cross-Entropy for classification

# Define optimizer
optimizer = optim.Adam(model.parameters(), lr=0.0001)

# Data augmentation transformation (for PIL Images or ndarrays)
data_augmentation = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(15),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
])

# Define training loop
def train_model(model, train_loader, val_loader, epochs=150, unfreeze_epoch=100):
    model.train()

    for epoch in range(epochs):
        running_loss = 0.0
        correct_class = 0
        total_samples = 0

        for images, labels in train_loader:
            # If images are PIL Images, apply data augmentation
            if isinstance(images, torch.Tensor):
                # Skip transformation if already a tensor
                images = data_augmentation(images)

            images = images.to(device)
            keypoints = labels['keypoints_output'].to(device)
            vhs_classes = labels['vhs_class_output'].to(device)

            optimizer.zero_grad()

            # Forward pass
            pred_keypoints, pred_class = model(images)

            # Calculate losses
            loss_keypoints = criterion_keypoints(pred_keypoints, keypoints)
            loss_class = criterion_class(pred_class, vhs_classes)
            loss = loss_keypoints + loss_class  # Combine losses

            # Backward pass and optimization
            loss.backward()
            optimizer.step()

            # Track metrics
            running_loss += loss.item() * images.size(0)
            _, predicted_classes = torch.max(pred_class, 1)
            correct_class += (predicted_classes == vhs_classes).sum().item()
            total_samples += vhs_classes.size(0)

        # Calculate epoch loss and accuracy
        epoch_loss = running_loss / len(train_loader.dataset)
        epoch_accuracy = correct_class / total_samples
        print(f"Epoch [{epoch + 1}/{epochs}], Loss: {epoch_loss:.4f}, Accuracy: {epoch_accuracy:.4f}")

        # Unfreeze some layers after the specified epoch
        if epoch == unfreeze_epoch - 1:  # After unfreeze_epoch epochs
            for param in model.backbone.parameters():
                param.requires_grad = True  # Unfreeze all layers
            print("Unfreezing Swin Transformer backbone layers.")

    print("Training Complete")

# Train the model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
train_model(model, train_loader, val_loader)


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms
from transformers import SwinForImageClassification

class MultiOutputModel(nn.Module):
    def __init__(self, num_keypoints=6, num_classes=3):
        super(MultiOutputModel, self).__init__()

        # Load Swin Transformer as backbone (without classification head)
        self.backbone = SwinForImageClassification.from_pretrained("microsoft/swin-base-patch4-window7-224",
                                                                   num_labels=0)  # Remove head
        self.backbone.config.output_hidden_states = True

        # Freeze transformer weights initially
        for param in self.backbone.parameters():
            param.requires_grad = False

        # Set the output size from the backbone
        self.hidden_size = self.backbone.config.hidden_size
        self.keypoint_head = nn.Linear(self.hidden_size, num_keypoints * 2)  # 12 values for 6 points
        self.class_head = nn.Linear(self.hidden_size, num_classes)

    def forward(self, x):
        outputs = self.backbone(x)  # Forward pass through the backbone

        # Get hidden states
        hidden_states = outputs.hidden_states[-1]  # Get the last hidden state

        # Debugging shapes
        #print(f"Hidden States Shape: {hidden_states.shape}")  # Shape: (batch_size, num_layers, height, width)

        # Use global average pooling
        pooled_features = hidden_states.mean(dim=1)  # Average over the sequence length (49 in your case)

        # Debugging pooled features shape
        #print(f"Pooled Features Shape: {pooled_features.shape}")  # Shape: (batch_size, hidden_size)

        # Predict keypoints (regression) and class (classification)
        keypoints = self.keypoint_head(pooled_features)
        vhs_class = self.class_head(pooled_features)  # Use pooled features for class prediction

        # Debugging output shapes
        #print(f"Keypoints Shape: {keypoints.shape}")  # Should be (batch_size, num_keypoints * 2)
        #print(f"Class Predictions Shape: {vhs_class.shape}")  # Should be (batch_size, num_classes)

        return keypoints, vhs_class

# Instantiate the model
model = MultiOutputModel(num_keypoints=6, num_classes=3)

# Define loss functions for both outputs
criterion_keypoints = nn.MSELoss()  # Mean Squared Error for keypoint regression
criterion_class = nn.CrossEntropyLoss()  # Cross-Entropy for classification

# Define optimizer
optimizer = optim.Adam(model.parameters(), lr=0.0001)

# Define training loop
def train_model(model, train_loader, val_loader, epochs=50):
    model.train()

    for epoch in range(epochs):
        running_loss = 0.0
        correct_class = 0
        total_samples = 0

        for images, labels in train_loader:
            images = images.to(device)
            keypoints = labels['keypoints_output'].to(device)
            vhs_classes = labels['vhs_class_output'].to(device)

            optimizer.zero_grad()

            # Forward pass
            pred_keypoints, pred_class = model(images)

            # Calculate losses
            loss_keypoints = criterion_keypoints(pred_keypoints, keypoints)
            loss_class = criterion_class(pred_class, vhs_classes)
            loss = loss_keypoints + loss_class  # Combine losses

            # Backward pass and optimization
            loss.backward()
            optimizer.step()

            # Track metrics
            running_loss += loss.item() * images.size(0)
            _, predicted_classes = torch.max(pred_class, 1)
            correct_class += (predicted_classes == vhs_classes).sum().item()
            total_samples += vhs_classes.size(0)

        # Calculate epoch loss and accuracy
        epoch_loss = running_loss / len(train_loader.dataset)
        epoch_accuracy = correct_class / total_samples
        print(f"Epoch [{epoch + 1}/{epochs}], Loss: {epoch_loss:.4f}, Accuracy: {epoch_accuracy:.4f}")

    print("Training Complete")

# Train the model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
train_model(model, train_loader, val_loader)


In [None]:
import os
import pandas as pd
import torch
from torchvision import transforms
from PIL import Image

# Define the path to your test images
test_images_dir = '/content/drive/MyDrive/Test_Images/Images'
output_csv_path = '/content/drive/MyDrive/vhs_results.csv'

# Define the same transformations used during training
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Adjust size based on your model
    transforms.ToTensor(),
])

# Function to calculate VHS from keypoint coordinates
def calculate_vhs(keypoints):
    A = keypoints[0]
    B = keypoints[1]
    C = keypoints[2]
    D = keypoints[3]
    E = keypoints[4]
    F = keypoints[5]

    # Calculate distances
    AB = torch.norm(A - B)  # Distance between points A and B
    CD = torch.norm(C - D)  # Distance between points C and D
    EF = torch.norm(E - F)  # Distance between points E and F

    # Calculate VHS
    if EF != 0:  # Prevent division by zero
        vhs = 6 * (AB + CD) / EF
    else:
        vhs = float('inf')  # or handle as you prefer

    return vhs.item()  # Return as Python float

# Prepare a list to collect results
results = []

# Switch model to evaluation mode
model.eval()
with torch.no_grad():
    for image_name in os.listdir(test_images_dir):
        image_path = os.path.join(test_images_dir, image_name)

        # Open and preprocess the image
        image = Image.open(image_path).convert('RGB')  # Ensure image is in RGB format
        image = transform(image).unsqueeze(0).to(device)  # Add batch dimension and send to device

        # Get keypoints predictions
        pred_keypoints, _ = model(image)  # Only need keypoints
        keypoints = pred_keypoints.squeeze(0)  # Remove batch dimension

        # Calculate VHS
        vhs_value = calculate_vhs(keypoints)

        # Append result
        results.append({'image_name': image_name, 'vhs': vhs_value})

# Convert results to a DataFrame
results_df = pd.DataFrame(results)

# Save to CSV
results_df.to_csv(output_csv_path, index=False)

print("VHS calculation complete. Results saved to:", output_csv_path)


In [None]:
import pandas as pd

# Load the CSV file
csv_file_path = '/content/drive/MyDrive/vhs_results.csv'  # Change this to your file path
results = pd.read_csv(csv_file_path)

# Display the contents of the DataFrame
print(results)



In [None]:
import pandas as pd

# Specify the path to your CSV file
csv_file_path = '/content/drive/MyDrive/vhs_results.csv'  # Replace with your actual file path
new_csv_file_path = '/content/drive/MyDrive/vhs_results.csv'  # Specify the path for the modified CSV file

# Load the CSV file
df = pd.read_csv(csv_file_path)

# Remove the first row (header)
df_without_header = df.iloc[1:]

# Save the modified DataFrame back to a new CSV file without the index
df_without_header.to_csv(new_csv_file_path, index=False)

print("The first row has been removed and the new CSV file is saved.")


In [None]:
import pandas as pd

# Specify the path to your CSV file
csv_file_path = '/path/to/your/file.csv'  # Replace with your actual file path

# Load the CSV file
df = pd.read_csv(csv_file_path)

# Get the number of rows
num_rows = df.shape[0]

# Print the number of rows
print(f"The number of rows in the CSV file is: {num_rows}")


In [None]:
import os
import numpy as np
import cv2
import scipy.io
import tensorflow as tf
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam

IMG_WIDTH, IMG_HEIGHT = 224, 224  # Resize dimensions

# Function to classify VHS value into classes
def classify_vhs(vhs_value):
    if vhs_value < 8.2:
        return 0
    elif 8.2 <= vhs_value <= 10:
        return 1
    else:
        return 2

# Function to load images and labels
def load_dataset(image_dir, label_dir):
    images = []
    keypoints = []
    vhs_classes = []

    for label_filename in os.listdir(label_dir):
        if label_filename.endswith('.mat'):
            label_path = os.path.join(label_dir, label_filename)
            label_data = scipy.io.loadmat(label_path)
            six_points = label_data['six_points']
            vhs_value = label_data['VHS'][0, 0]

            # Calculate VHS class
            vhs_class = classify_vhs(vhs_value)

            # Load corresponding image (check for .jpg or .png)
            base_filename = os.path.splitext(label_filename)[0]
            image_path_jpg = os.path.join(image_dir, base_filename + '.jpg')
            image_path_png = os.path.join(image_dir, base_filename + '.png')

            if os.path.exists(image_path_jpg):
                image = cv2.imread(image_path_jpg)
            elif os.path.exists(image_path_png):
                image = cv2.imread(image_path_png)
            else:
                print(f"Image {base_filename} not found in .jpg or .png format, skipping...")
                continue

            # Resize and normalize the image
            image = cv2.resize(image, (IMG_WIDTH, IMG_HEIGHT))
            image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) / 255.0

            # Scale keypoints to match resized image
            scale_x = IMG_WIDTH / image.shape[1]
            scale_y = IMG_HEIGHT / image.shape[0]
            scaled_kp = six_points * [scale_x, scale_y]

            # Append to lists
            images.append(image)
            keypoints.append(scaled_kp.flatten())
            vhs_classes.append(vhs_class)

    return np.array(images), np.array(keypoints), np.array(vhs_classes)

# Define paths for training and validation data
train_image_dir = '/content/drive/MyDrive/Train/Images'
train_label_dir = '/content/drive/MyDrive/Train/Labels'
valid_image_dir = '/content/drive/MyDrive/Valid/Images'
valid_label_dir = '/content/drive/MyDrive/Valid/Labels'

# Load the training and validation data
train_images, train_keypoints, train_vhs_classes = load_dataset(train_image_dir, train_label_dir)
valid_images, valid_keypoints, valid_vhs_classes = load_dataset(valid_image_dir, valid_label_dir)

# Check the data
print(f"Training images: {train_images.shape}, Keypoints: {train_keypoints.shape}, VHS classes: {train_vhs_classes.shape}")
print(f"Validation images: {valid_images.shape}, Keypoints: {valid_keypoints.shape}, VHS classes: {valid_vhs_classes.shape}")


In [None]:
import tensorflow as tf
from tensorflow.keras.layers import Dense, Dropout, Input, GlobalAveragePooling1D, Lambda, Layer
from tensorflow.keras.models import Model
from transformers import TFAutoModelForImageClassification, AutoConfig

# Define input shape
IMG_HEIGHT, IMG_WIDTH, IMG_CHANNELS = 224, 224, 3
NUM_CLASSES = 3  # VHS classification (0, 1, 2)
NUM_KEYPOINTS = 12  # Six points with x, y coordinates

# Load the Swin Transformer with an appropriate configuration, allowing mismatched sizes
swin_config = AutoConfig.from_pretrained("microsoft/swin-tiny-patch4-window7-224", num_labels=NUM_CLASSES)
swin_backbone = TFAutoModelForImageClassification.from_pretrained(
    "microsoft/swin-tiny-patch4-window7-224",
    config=swin_config,
    ignore_mismatched_sizes=True  # Ignore mismatched size warning
)

# Custom layer to handle KerasTensor compatibility
class SwinTransformerLayer(Layer):
    def __init__(self, swin_model):
        super(SwinTransformerLayer, self).__init__()
        self.swin_model = swin_model

    def call(self, inputs, training=False):
        # Convert input to tf.Tensor and pass through the Swin model
        inputs = tf.convert_to_tensor(inputs)
        return self.swin_model(inputs, training=training).logits

# Define inputs
inputs = Input(shape=(IMG_HEIGHT, IMG_WIDTH, IMG_CHANNELS))

# Wrap the normalization function in a Lambda layer
normalized_inputs = Lambda(lambda x: (tf.image.convert_image_dtype(x, tf.float32) - 0.5) / 0.5)(inputs)

# Pass normalized input through the custom SwinTransformerLayer
swin_output = SwinTransformerLayer(swin_backbone)(normalized_inputs)

# Global pooling layer for regression head
global_avg_pool = GlobalAveragePooling1D()(tf.expand_dims(swin_output, -1))

# Regression head for keypoint prediction
keypoint_output = Dense(128, activation="relu")(global_avg_pool)
keypoint_output = Dropout(0.3)(keypoint_output)
keypoint_output = Dense(NUM_KEYPOINTS, activation="linear", name="keypoints")(keypoint_output)

# Classification head for VHS class
classification_output = Dense(NUM_CLASSES, activation="softmax", name="vhs_class")(swin_output)

# Define model with dual outputs
model = Model(inputs=inputs, outputs=[keypoint_output, classification_output])

# Compile the model with weighted loss
losses = {
    "keypoints": "mean_squared_error",    # Regression loss
    "vhs_class": "sparse_categorical_crossentropy"  # Classification loss
}
loss_weights = {"keypoints": 0.5, "vhs_class": 0.5}  # Adjust weights to balance tasks

model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4), loss=losses, loss_weights=loss_weights, metrics={"vhs_class": "accuracy"})

# Model summary for verification
model.summary()


In [None]:
# Define the multi-output model function
def create_multi_output_model():
    # Load pre-trained MobileNetV2 as base model (without the top layers)
    base_model = MobileNetV2(input_shape=(224, 224, 3), include_top=False, weights='imagenet')
    base_model.trainable = False  # Freeze the base model layers to retain pretrained weights

    # Add global average pooling and a dense layer for feature extraction
    x = base_model.output
    x = GlobalAveragePooling2D()(x)
    x = Dense(512, activation='relu')(x)

    # Keypoint regression head for predicting coordinates (6 points = 12 values)
    kp_output = Dense(12, activation='linear', name='keypoints_output')(x)

    # Classification head for predicting the VHS class (3 classes)
    class_output = Dense(3, activation='softmax', name='vhs_class_output')(x)

    # Define the model with two outputs: keypoints and class prediction
    model = Model(inputs=base_model.input, outputs=[kp_output, class_output])

    return model

# Create the multi-output model
model = create_multi_output_model()

# Compile the model with appropriate loss functions and metrics for both outputs
model.compile(
    optimizer=Adam(learning_rate=0.0001),
    loss={'keypoints_output': 'mean_squared_error', 'vhs_class_output': 'sparse_categorical_crossentropy'},
    metrics={'keypoints_output': 'mae', 'vhs_class_output': 'accuracy'}
)

# Print model summary
model.summary()


In [None]:
# Prepare the labels for multi-output training
train_labels = {
    'keypoints_output': train_keypoints,      # For regression of keypoints
    'vhs_class_output': train_vhs_classes     # For classification of VHS class
}

valid_labels = {
    'keypoints_output': valid_keypoints,
    'vhs_class_output': valid_vhs_classes
}

# Train the model with validation data
epochs = 50
batch_size = 16

history = model.fit(
    train_images, train_labels,
    epochs=epochs,
    batch_size=batch_size,
    validation_data=(valid_images, valid_labels)
)


In [None]:
import numpy as np
import cv2
import os
import scipy.io

# Define the test image directory
test_image_dir = '/content/drive/MyDrive/Test_Images/Images'
IMG_HEIGHT, IMG_WIDTH, IMG_CHANNELS = 224, 224, 3
# Load the trained model
# If the model is saved, you can load it with:
# model = tf.keras.models.load_model('path_to_saved_model')

# Function to preprocess the test images
def preprocess_image(image_path):
    image = cv2.imread(image_path)
    image = cv2.resize(image, (IMG_WIDTH, IMG_HEIGHT))
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) / 255.0
    return image

# Function to calculate the VHS based on predicted key points
def calculate_vhs(points):
    # Points should be ordered as: A, B, C, D, E, F
    AB = np.linalg.norm(points[0] - points[1])
    CD = np.linalg.norm(points[2] - points[3])
    EF = np.linalg.norm(points[4] - points[5])
    vhs = 6 * (AB + CD) / EF
    return vhs

# Predict and calculate VHS for each image in the test set
test_predictions = []
for filename in os.listdir(test_image_dir):
    if filename.endswith('.jpg') or filename.endswith('.png'):
        # Load and preprocess the image
        image_path = os.path.join(test_image_dir, filename)
        image = preprocess_image(image_path)
        image = np.expand_dims(image, axis=0)  # Add batch dimension

        # Predict the key points and class
        keypoint_preds, class_preds = model.predict(image)

        # Reshape keypoint predictions into (6, 2) and rescale to original image size
        keypoints = keypoint_preds[0].reshape((6, 2))
        scale_x = image.shape[2] / IMG_WIDTH
        scale_y = image.shape[1] / IMG_HEIGHT
        scaled_keypoints = keypoints * [scale_x, scale_y]

        # Calculate VHS
        vhs_value = calculate_vhs(scaled_keypoints)

        # Save the predictions
        test_predictions.append({
            'filename': filename,
            'predicted_keypoints': scaled_keypoints,
            'predicted_vhs': vhs_value,
            'predicted_class': np.argmax(class_preds[0])  # The VHS class prediction
        })

# Display predictions
for prediction in test_predictions:
    print(f"Image: {prediction['filename']}")
    print(f"Predicted Keypoints:\n{prediction['predicted_keypoints']}")
    print(f"Predicted VHS: {prediction['predicted_vhs']:.2f}")
    print(f"Predicted VHS Class: {prediction['predicted_class']}")
    print("-" * 30)
