In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import models

import torch
import torch.nn as nn
from torchvision import models
import torch.nn.functional as F

class densetFPN_121(nn.Module):
    """ DenseNet121-based Feature Pyramid Network (FPN) for feature extraction. 
        Total number of parameters:  8232320 (8.23 million) """ 
    def __init__(self, weights='DEFAULT', common_channel_size=256, output_channel_size=256):
        super(densetFPN_121, self).__init__()
        original_densenet = models.densenet121(weights=weights)
        
        # Initial layers: extract features without modification
        self.encoder = nn.ModuleList([
            nn.Sequential(*list(original_densenet.features.children())[:6], nn.Dropout(0.4)),   # 128x12x12
            nn.Sequential(*list(original_densenet.features.children())[6:8], nn.Dropout(0.4)),  # 256x6x6
            nn.Sequential(*list(original_densenet.features.children())[8:10], nn.Dropout(0.4)), # 896x3x3
            nn.Sequential(*list(original_densenet.features.children())[10:-1], nn.Dropout(0.4)) # 1920x3x3
        ])
        
        # Define convolutional layers for adapting channel sizes
        fpn_channels = [128, 256, 512, 1024]
        self.adaptation_layers = nn.ModuleDict({
            f'adapt{i+1}': nn.Conv2d(fpn_channels[i], common_channel_size, kernel_size=1)
            for i in range(4)
        })

        # Define FPN layers
        self.fpn = nn.ModuleDict({
            f'fpn{i+1}': nn.Conv2d(common_channel_size, common_channel_size, kernel_size=1)
            for i in range(3)
        })

        self.merge_layers = nn.Sequential(
            nn.Conv2d(common_channel_size, output_channel_size, kernel_size=3), # kernel size 1 or 3
            nn.BatchNorm2d(output_channel_size),
            nn.ReLU(),
            nn.Dropout(0.4) # 0.2
        )

    def forward(self, x):
        # Encoder
        features = []
        for encoder in self.encoder:
            x = encoder(x)
            features.append(x)
        
        # Merge channels using 1x1 convolutions
        adapted_features = [self.adaptation_layers[f'adapt{i+1}'](features[i]) for i in range(4)]
        
        # FPN integration using top-down pathway
        fpn_output = adapted_features.pop()  # Start with the deepest features
        for i in reversed(range(3)):
            upsampled = F.interpolate(fpn_output, size=adapted_features[i].shape[-2:], mode='nearest')
            fpn_output = self.fpn[f'fpn{i+1}'](upsampled + adapted_features[i])
        
        # Merge features
        merged_features = self.merge_layers(fpn_output)
        
        return merged_features


model = densetFPN_121(weights=None)

# Print total number of parameters
total_params = sum(p.numel() for p in model.parameters())
print("Total number of parameters: ", total_params)

# Create a dummy input tensor of size [50, 3, 100, 100]
dummy_input = torch.randn(50, 3, 100, 100)

# Forward pass through the model with dummy input
features = model(dummy_input)

# Print output shapes to verify
print("Features shape:", features.shape)

# print(features.conv_info())

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import models

class efficientDecoder_v2_s(nn.Module):
    def __init__(self, num_channels=[24, 48, 64, 128, 160, 256], output_channel_size=256, output_feature_size=25):
        super(efficientDecoder_v2_s, self).__init__()
        # Load EfficientNet V2 Small features
        efficientnet_v2_s = models.efficientnet_v2_s(weights='DEFAULT').features[:-1]

        # Modularize encoders
        self.encoders = nn.ModuleList([
            nn.Sequential(*list(efficientnet_v2_s.children())[:2], nn.Dropout(0.1)),    # 24x50x50
            nn.Sequential(*list(efficientnet_v2_s.children())[2:3], nn.Dropout(0.1)),   # 48x25x25
            nn.Sequential(*list(efficientnet_v2_s.children())[3:4], nn.Dropout(0.2)),   # 64x13x13
            nn.Sequential(*list(efficientnet_v2_s.children())[4:5], nn.Dropout(0.2)),   # 128x7x7
            nn.Sequential(*list(efficientnet_v2_s.children())[5:6], nn.Dropout(0.3)),   # 160x7x7 # TODO: Check whether to skip 128x7x7
            nn.Sequential(*list(efficientnet_v2_s.children())[6:7], nn.Dropout(0.3))    # 256x4x4
        ])
        
        # Modularize upconvolutions
        self.upconvs = nn.ModuleList([
            nn.ConvTranspose2d(in_channels=256, out_channels=160, kernel_size=2, stride=2, padding=1, output_padding=1),
            nn.Conv2d(in_channels=160, out_channels=128, kernel_size=1, stride=1),
            nn.ConvTranspose2d(in_channels=128, out_channels=64, kernel_size=2, stride=2, padding=1, output_padding=1),
            nn.ConvTranspose2d(in_channels=64, out_channels=48, kernel_size=2, stride=2, padding=1, output_padding=1),
            nn.ConvTranspose2d(in_channels=48, out_channels=24, kernel_size=2, stride=2)
        ])
        
        # Modularize decoders
        self.decoders = nn.ModuleList([
            nn.Sequential(
                nn.Conv2d(num_channels[i] * 2, num_channels[i], kernel_size=3, padding=1),
                nn.BatchNorm2d(num_channels[i]),
                nn.SiLU(),
                nn.Dropout(0.1 + i * 0.05)
            ) for i in range(len(num_channels)-1)
        ])

        # Optional, merge layers to increase the number of channels
        self.merge_layers = nn.Sequential(
            nn.Conv2d(24, output_channel_size, kernel_size=1),
            nn.BatchNorm2d(output_channel_size),
            nn.SiLU(),
            nn.Dropout(0.3)
        )
        
        self.global_avg_pool = nn.AdaptiveAvgPool2d(output_feature_size) # to reduce noise and overfitting

    def forward(self, x):
        # Encoder
        features = []
        for encoder in self.encoders:
            x = encoder(x)
            features.append(x)
        
        # Decoder
        x = features.pop()
        for upconv, decoder, feature in zip(self.upconvs, reversed(self.decoders), reversed(features)):
            x = upconv(x)
            x = torch.cat((x, feature), dim=1)
            x = decoder(x)
        
        x = self.merge_layers(x) # Introduced to increase the number of channels
        pooled_features = self.global_avg_pool(x) # Introduced to reduce noise and overfitting
        
        return pooled_features

model = efficientDecoder_v2_s()

# Print total number of parameters
total_params = sum(p.numel() for p in model.parameters())
print("Total number of parameters: ", total_params)

# Create a dummy input tensor of size [50, 3, 100, 100]
dummy_input = torch.randn(50, 3, 100, 100)

# Forward pass through the model with dummy input
features = model(dummy_input)

# Print output shapes to verify
print("Features shape:", features.shape)

In [None]:
for i in reversed(range(1,6)):
    print(i)

In [None]:
import torch
import torch.nn as nn
from torchvision import models
import torch.nn.functional as F

class efficientDecoder_v2_s(nn.Module):
    def __init__(self, output_channel_size=256, output_feature_size=25):
        super(efficientDecoder_v2_s, self).__init__()
        # Load EfficientNet V2 Small features
        efficientnet_v2_s = models.efficientnet_v2_s(weights='DEFAULT').features[:-1]

        # Modularize encoders
        self.encoders = nn.ModuleList([
            nn.Sequential(*list(efficientnet_v2_s.children())[:2], nn.Dropout(0.1)),    # 24x50x50
            nn.Sequential(*list(efficientnet_v2_s.children())[2:3], nn.Dropout(0.1)),   # 48x25x25
            nn.Sequential(*list(efficientnet_v2_s.children())[3:4], nn.Dropout(0.2)),   # 64x13x13
            nn.Sequential(*list(efficientnet_v2_s.children())[4:5], nn.Dropout(0.2)),   # 128x7x7
            nn.Sequential(*list(efficientnet_v2_s.children())[5:6], nn.Dropout(0.3)),   # 160x7x7
            nn.Sequential(*list(efficientnet_v2_s.children())[6:7], nn.Dropout(0.3))    # 256x4x4
        ])
        
        # Modularize upconvolutions
        self.upconvs = nn.ModuleList([
            nn.ConvTranspose2d(in_channels=256, out_channels=160, kernel_size=2, stride=2, padding=1, output_padding=1),
            nn.Conv2d(in_channels=160, out_channels=128, kernel_size=1, stride=1),
            nn.ConvTranspose2d(in_channels=128, out_channels=64, kernel_size=2, stride=2, padding=1, output_padding=1),
            nn.ConvTranspose2d(in_channels=64, out_channels=48, kernel_size=2, stride=2, padding=1, output_padding=1),
            nn.ConvTranspose2d(in_channels=48, out_channels=24, kernel_size=2, stride=2)
        ])
        
        # Modularize decoders
        self.decoders = nn.ModuleList([
            nn.Sequential(
                nn.Conv2d(160*2, 160, kernel_size=3, padding=1),
                nn.BatchNorm2d(160, eps=0.001, momentum=0.1, affine=True, track_running_stats=True),
                nn.SiLU(inplace=True),
                nn.Dropout(0.3, inplace=True)
            ),
            nn.Sequential(
                nn.Conv2d(128*2, 128, kernel_size=3, padding=1),
                nn.BatchNorm2d(128, eps=0.001, momentum=0.1, affine=True, track_running_stats=True),
                nn.SiLU(inplace=True),
                nn.Dropout(0.3, inplace=True)
            ),
            nn.Sequential(
                nn.Conv2d(64*2, 64, kernel_size=3, padding=1),
                nn.BatchNorm2d(64, eps=0.001, momentum=0.1, affine=True, track_running_stats=True),
                nn.SiLU(inplace=True),
                nn.Dropout(0.2, inplace=True)
            ),
            nn.Sequential(
                nn.Conv2d(48*2, 48, kernel_size=3, padding=1),
                nn.BatchNorm2d(48, eps=0.001, momentum=0.1, affine=True, track_running_stats=True),
                nn.SiLU(inplace=True),
                nn.Dropout(0.2, inplace=True)
            ),
            nn.Sequential(
                nn.Conv2d(24*2, 24, kernel_size=3, padding=1),
                nn.BatchNorm2d(24, eps=0.001, momentum=0.1, affine=True, track_running_stats=True),
                nn.SiLU(inplace=True),
                nn.Dropout(0.1, inplace=True)
            )
        ])

        # Optional, merge layers to increase the number of channels
        self.merge_layers = nn.Sequential(
            nn.Conv2d(24, output_channel_size, kernel_size=1),
            nn.BatchNorm2d(output_channel_size),
            nn.SiLU(),
            nn.Dropout(0.3)
        )
        
        self.global_avg_pool = nn.AdaptiveAvgPool2d(output_feature_size) # to reduce noise and overfitting

    def forward(self, x):
        # Encoder
        features = []
        for encoder in self.encoders:
            x = encoder(x)
            features.append(x)
        
        # Decoder
        x = features.pop()
        for upconv, decoder, feature in zip(self.upconvs, self.decoders, reversed(features)):
            x = upconv(x)
            x = torch.cat((x, feature), dim=1)
            x = decoder(x)
        
        x = self.merge_layers(x) # Introduced to increase the number of channels
        pooled_features = self.global_avg_pool(x) # Introduced to reduce noise and overfitting
        
        return pooled_features

model = efficientDecoder_v2_s()

# Print total number of parameters
total_params = sum(p.numel() for p in model.parameters())
print("Total number of parameters: ", total_params)

# Example initialization and forward pass
# model = EfficientDecoder_v2_s()
dummy_input = torch.randn(10, 3, 100, 100)  # Adjust size according to your actual input
output = model(dummy_input)
print(f"Output shape: {output.shape}")

In [1]:
input_dim=(128,12,12)

input_channels, input_height, input_width = input_dim

print(input_channels, input_height, input_width)

128 12 12


In [9]:
import torch
from src.models.baseline_models import construct_baselineModel, construct_baseModel, BaseModel
from src.models.backbone_models import densetFPN_121, densetFPN_201

# model = densetFPN_121()
# Create the model instance
# model = BaseModel(backbone=densetFPN_121, weights='DEFAULT', input_dim=(256,12,12))
model = construct_baseModel(backbone_name='densetFPN_121', weights='DEFAULT', input_dim=(256,12,12))

# Print total number of parameters
total_params = sum(p.numel() for p in model.parameters())
print("Total number of parameters: ", total_params)

# Create a dummy input tensor of size [50, 3, 100, 100]
dummy_input = torch.randn(50, 3, 100, 100)

# Forward pass through the model with dummy input
features = model(dummy_input)

# Print output shapes to verify
print("Features shape:", features.shape)

Total number of parameters:  45460865
torch.Size([50, 3, 100, 100])
torch.Size([50, 256, 12, 12])
Features shape: torch.Size([50, 1])


In [2]:
features

(tensor([[0.5106],
         [0.5035],
         [0.5488],
         [0.7421],
         [0.5720],
         [0.3836],
         [0.5911],
         [0.4598],
         [0.5526],
         [0.5835],
         [0.4567],
         [0.5489],
         [0.5642],
         [0.4666],
         [0.4395],
         [0.6266],
         [0.4210],
         [0.5086],
         [0.5422],
         [0.5515],
         [0.5906],
         [0.6073],
         [0.4439],
         [0.5127],
         [0.3540],
         [0.6443],
         [0.3925],
         [0.5958],
         [0.4544],
         [0.4678],
         [0.4648],
         [0.5237],
         [0.6828],
         [0.5871],
         [0.5303],
         [0.5280],
         [0.5333],
         [0.6036],
         [0.6479],
         [0.5992],
         [0.5446],
         [0.5203],
         [0.4284],
         [0.5404],
         [0.3937],
         [0.5419],
         [0.5040],
         [0.5163],
         [0.6542],
         [0.6807]], grad_fn=<SigmoidBackward0>),
 [tensor([[0.3055],


In [None]:
# Define the dictionary outside the class
MODEL_DICT = {
    'densetFPN_121': densetFPN_121,
    'densetFPN_201': densetFPN_201,
    'efficientFPN_v2_s': efficientFPN_v2_s,
    'efficientDecoder_v2_s': efficientDecoder_v2_s
}

class BaselineModel(nn.Module):
    def __init__(self, backbone, num_tasks=5, feature_dim=(256, 25, 25)):
        super(BaselineModel, self).__init__()
        self.backbone = backbone()  # Instantiate the backbone passed as a class
        
        feature_channels, feature_width, feature_height = feature_dim
        
        self.adaptive_pool = nn.AdaptiveAvgPool2d((feature_width, feature_height))
        
        self.task_specific_layers = nn.ModuleList([
            nn.Sequential(
                nn.Flatten(),
                nn.Linear(feature_channels * feature_width * feature_height, 1024),
                nn.BatchNorm1d(1024),
                nn.ReLU(),
                nn.Dropout(0.2)
            ) for _ in range(num_tasks)
        ])
        
        self.task_specific_classifier = nn.ModuleList([
            nn.Linear(1024, 1) for _ in range(num_tasks)
        ])
        
        self.final_classifier = nn.Sequential(
            nn.Linear(1024 * num_tasks, 1024),
            nn.BatchNorm1d(1024),
            nn.ReLU(),
            nn.Dropout(0.1),
            nn.Linear(1024, 1)
        )
        
    def forward(self, x):
        x = self.backbone(x)
        x = self.adaptive_pool(x)
        
        intermediate_outputs = [layer(x) for layer in self.task_specific_layers]
        concatenated_outputs = torch.cat(intermediate_outputs, dim=1)
        
        task_outputs = [torch.sigmoid(classifier(io)) for io, classifier in zip(intermediate_outputs, self.task_specific_classifier)]
        
        final_output = torch.sigmoid(self.final_classifier(concatenated_outputs))
        
        return final_output, task_outputs

def construct_baselineModel(model_name, num_tasks=5, feature_dim=(256, 25, 25)):
    if model_name not in MODEL_DICT:
        raise ValueError(f"Unsupported model name {model_name}")
    backbone = MODEL_DICT[model_name]
    return BaselineModel(backbone, num_tasks, feature_dim)

In [4]:
prototype_shape=(2000, 512, 1, 1)
for i in range(0, len(prototype_shape)):
    print(f"prototype_shape[{i}]: {prototype_shape[i]}")

prototype_shape[0]: 2000
prototype_shape[1]: 512
prototype_shape[2]: 1
prototype_shape[3]: 1


In [7]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from src.models.backbone_models import denseFPN_121, denseFPN_201, efficientFPN_v2_s, efficientDecoder_v2_s

# Dictionary of supported backbone models
BACKBONE_DICT = {
    'denseFPN_121': denseFPN_121,
    'denseFPN_201': denseFPN_201,
    'efficientFPN_v2_s': efficientFPN_v2_s,
    'efficientDecoder_v2_s': efficientDecoder_v2_s
}

class HybridProtoPNet(nn.Module):
    def __init__(self, backbone, hidden_layers, num_tasks, num_prototypes_per_task, prototype_shape):
        super(HybridProtoPNet, self).__init__()
        # Initialize the backbone with specified configurations
        self.backbone = backbone
        
        # Prototype layers for each task
        self.prototype_vectors = nn.ParameterList([
            nn.Parameter(torch.rand(prototype_shape), requires_grad=True)
            for _ in range(num_tasks * num_prototypes_per_task)
        ])
        
        # Task-specific layers to process prototype information
        self.task_specific_layers = nn.ModuleList([
            nn.Sequential(
                nn.Flatten(),
                nn.Linear(num_prototypes_per_task * prototype_shape[1], hidden_layers),
                nn.BatchNorm1d(hidden_layers),
                nn.ReLU(),
                nn.Dropout(0.2)
            ) for _ in range(num_tasks)
        ])
        
        # Classifiers for each task based on processed prototype information
        self.task_specific_classifier = nn.ModuleList([
            nn.Linear(hidden_layers, 1) for _ in range(num_tasks)
        ])
        
        # Final classifier that takes concatenated task outputs to make a final prediction
        self.final_classifier = nn.Sequential(
            nn.Linear(hidden_layers * num_tasks, hidden_layers),
            nn.BatchNorm1d(hidden_layers),
            nn.ReLU(),
            nn.Dropout(0.1),
            nn.Linear(hidden_layers, 1)
        )
        
    def forward(self, x):
        x = self.backbone(x)  # Feature extraction using the backbone
        print(f"Backbone output shape: {x.shape}")
        
        # Computing prototype distances and processing them per task
        intermediate_outputs = []
        for i, prototype in enumerate(self.prototype_vectors):
            print(f"Prototype {i} shape: {prototype.shape}")
            task_index = i // len(self.prototype_vectors) * len(self.task_specific_layers)
            # print(f"Task index: {task_index}")
            print(f"Unsqueeze prototype shape: {prototype.unsqueeze(0).shape}, x shape: {x.shape}")
            distances = torch.cdist(x, prototype.unsqueeze(0), p=2)
            print(f"Distances shape: {distances.shape}")
            processed = self.task_specific_layers[task_index](distances)
            intermediate_outputs.append(processed)
        
        concatenated_outputs = torch.cat(intermediate_outputs, dim=1)  # Concatenating outputs for final prediction
        
        # Predicting binary outputs for each task
        task_outputs = [torch.sigmoid(self.task_specific_classifier[i](intermediate_outputs[i])) for i in range(len(self.task_specific_layers))]
        
        # Final malignancy or comprehensive prediction
        final_output = torch.sigmoid(self.final_classifier(concatenated_outputs))
        
        return final_output, task_outputs
    
def construct_ppnetModel(backbone_name='denseNet121', 
                            weights='DEFAULT', 
                            common_channel_size=256, 
                            output_channel_size=256, 
                            output_feature_size=25, 
                            hidden_layers=256, 
                            num_tasks=5,
                            num_prototypes_per_task=10,
                            prototype_shape=(2000, 512, 1, 1)):
    
    if backbone_name not in BACKBONE_DICT:
        raise ValueError(f"Unsupported model name {backbone_name}")
    backbone = BACKBONE_DICT[backbone_name](weights=weights, common_channel_size=common_channel_size, output_channel_size=output_channel_size, output_feature_size=output_feature_size)
    
    return HybridProtoPNet(backbone=backbone, 
                           weights=weights, 
                           common_channel_size=common_channel_size, 
                           output_channel_size=output_channel_size, 
                           output_feature_size=output_feature_size,
                           hidden_layers=hidden_layers, 
                           num_tasks=num_tasks,
                           num_prototypes_per_task=num_prototypes_per_task,
                           prototype_shape=prototype_shape)

model = construct_ppnetModel(backbone_name='denseFPN_121', weights='DEFAULT', common_channel_size=256, output_channel_size=512, output_feature_size=1, hidden_layers=1024, num_tasks=5, num_prototypes_per_task=10, prototype_shape=(2000, 512, 1, 1))


# Print total number of parameters
total_params = sum(p.numel() for p in model.parameters())
print("Total number of parameters: ", total_params)

# Create a dummy input tensor of size [50, 3, 100, 100]
dummy_input = torch.randn(50, 3, 100, 100)

# Forward pass through the model with dummy input
final_logits, task_logits = model(dummy_input)

# Print output shapes to verify
print("Final logits shape:", final_logits.shape)


Total number of parameters:  90456198
Backbone output shape: torch.Size([50, 512, 1, 1])
Prototype 0 shape: torch.Size([2000, 512, 1, 1])
Unsqueeze prototype shape: torch.Size([1, 2000, 512, 1, 1]), x shape: torch.Size([50, 512, 1, 1])


RuntimeError: The size of tensor a (50) must match the size of tensor b (2000) at non-singleton dimension 1