In [12]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class VisualEncoder(nn.Module):
    def __init__(self, visual_dim, hidden_dim, num_layers):
        super(VisualEncoder, self).__init__()
        # Define the learnable visual token with the specified dimension
        self.visual_token = nn.Parameter(torch.randn(1, 1, visual_dim))
        
        # Transformer-based visual encoding blocks
        self.transformer_blocks = nn.ModuleList(
            [nn.TransformerEncoderLayer(d_model=visual_dim, nhead=8, dim_feedforward=hidden_dim)
             for _ in range(num_layers)]
        )

    def forward(self, V):
        # Batch size and visual feature dimensions
        batch_size = V.size(0)
        
        # Concatenate the learnable visual token with the visual feature sequence
        f_img_v = self.visual_token.expand(batch_size, -1, -1)
        V_concat = torch.cat((f_img_v, V), dim=1)
        
        # Pass through stacked transformer encoder blocks
        for layer in self.transformer_blocks:
            V_concat = layer(V_concat)
        
        # Extract the global visual feature
        F_v = V_concat[:, 0, :]  # Take the output corresponding to the visual token
        
        return F_v, V_concat  # Return both global and contextual features

# Example usage
visual_dim = 1024  # Dimensionality of visual features
hidden_dim = 2048  # Hidden dimension for FFN
num_layers = 5     # Number of stacked transformer blocks (NV)

# Initialize the encoder
visual_encoder = VisualEncoder(visual_dim=visual_dim, hidden_dim=hidden_dim, num_layers=num_layers)

# Dummy visual features with batch size of 4 and 10 objects per image (just for example)
V = torch.randn(4, 10, visual_dim)
F_v, V_concat = visual_encoder(V)

print("Global visual feature (F_v) shape:", F_v.shape)
print("Concatenated visual feature shape:", V_concat.shape)


Global visual feature (F_v) shape: torch.Size([4, 1024])
Concatenated visual feature shape: torch.Size([4, 11, 1024])


In [13]:
import torch
import torch.nn as nn
from torchvision import transforms
from torchvision.models.detection import fasterrcnn_resnet50_fpn
from torch.utils.data import DataLoader
from pycocotools.coco import COCO
from PIL import Image
import os

In [14]:
# Visual Encoder Definition
class VisualEncoder(nn.Module):
    def __init__(self, visual_dim, hidden_dim, num_layers):
        super(VisualEncoder, self).__init__()
        self.visual_token = nn.Parameter(torch.randn(1, 1, visual_dim))
        self.transformer_blocks = nn.ModuleList(
            [nn.TransformerEncoderLayer(d_model=visual_dim, nhead=8, dim_feedforward=hidden_dim)
             for _ in range(num_layers)]
        )

    def forward(self, V):
        batch_size = V.size(0)
        f_img_v = self.visual_token.expand(batch_size, -1, -1)
        V_concat = torch.cat((f_img_v, V), dim=1)
        for layer in self.transformer_blocks:
            V_concat = layer(V_concat)
        F_v = V_concat[:, 0, :]  # Global feature
        return F_v, V_concat

In [15]:
# Parameters
visual_dim = 1024
hidden_dim = 2048
num_layers = 5
batch_size = 4  
image_size = 224
K_v = 36  # Number of object features per image


In [16]:
# Initialize Encoder
visual_encoder = VisualEncoder(visual_dim=visual_dim, hidden_dim=hidden_dim, num_layers=num_layers)

In [17]:
# Faster R-CNN Model for Feature Extraction
faster_rcnn = fasterrcnn_resnet50_fpn(pretrained=True)
faster_rcnn.eval()


FasterRCNN(
  (transform): GeneralizedRCNNTransform(
      Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
      Resize(min_size=(800,), max_size=1333, mode='bilinear')
  )
  (backbone): BackboneWithFPN(
    (body): IntermediateLayerGetter(
      (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
      (bn1): FrozenBatchNorm2d(64, eps=0.0)
      (relu): ReLU(inplace=True)
      (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
      (layer1): Sequential(
        (0): Bottleneck(
          (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn1): FrozenBatchNorm2d(64, eps=0.0)
          (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
          (bn2): FrozenBatchNorm2d(64, eps=0.0)
          (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn3): FrozenBatchNorm2d(256, eps=0.0)
          (relu): ReLU(

In [18]:
# Image Transformations
transform = transforms.Compose([
    transforms.Resize((image_size, image_size)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

In [19]:
# Load COCO Dataset
data_dir = 'D:/Project_phase_1/new_ds/train2014/train2014'  
ann_file = 'D:/Project_phase_1/new_ds/annotations_trainval2014/annotations/instances_train2014.json'  
coco = COCO(ann_file)
img_ids = list(coco.imgs.keys())

loading annotations into memory...
Done (t=20.73s)
creating index...
index created!


In [20]:
class COCODataset(torch.utils.data.Dataset):
    def __init__(self, coco, img_dir, transform=None):
        self.coco = coco
        self.img_dir = img_dir
        self.transform = transform
        self.img_ids = list(coco.imgs.keys())

    def __len__(self):
        return len(self.img_ids)

    def __getitem__(self, idx):
        img_id = self.img_ids[idx]
        path = self.coco.loadImgs(img_id)[0]['file_name']
        image = Image.open(os.path.join(self.img_dir, path)).convert("RGB")
        if self.transform:
            image = self.transform(image)
        return img_id, image

In [21]:
# Dataloader
coco_dataset = COCODataset(coco, data_dir, transform=transform)
data_loader = DataLoader(coco_dataset, batch_size=batch_size, shuffle=True)

In [22]:
# Encoding and Saving Process
output_dir = 'D:/Project_phase_1/image modality/im_encod'  
os.makedirs(output_dir, exist_ok=True)

In [None]:
with torch.no_grad():
    for img_ids, batch in data_loader:
        # Extract predictions (proposals, etc.) with Faster R-CNN
        output = faster_rcnn(batch)
        
        for i, img_id in enumerate(img_ids):
            # Extract the features for each region proposal
            object_features = output[i]['boxes']  # Use box coordinates for ROI pooling if needed
            proposals = len(object_features)

            # Here we would typically run features through an ROI pooling layer to get 1024-dim vectors for each proposal
            # Assuming we have `roi_pooled_features` as 1024-dim feature vectors for object proposals
            
            # Check the number of detected objects and adjust
            if proposals >= K_v:
                V = roi_pooled_features[:K_v]
            else:
                padding = torch.zeros(K_v - proposals, visual_dim)
                V = torch.cat([roi_pooled_features, padding], dim=0)

            V = V.unsqueeze(0)  # Add batch dimension to match encoder input shape
            
            # Pass features through the visual encoder
            F_v, V_concat = visual_encoder(V)
            
            # Save encoded features in .pt format
            save_path = os.path.join(output_dir, f"{img_id}_features.pt")
            torch.save(F_v, save_path)
            print(f"Saved visual features for image {img_id} at {save_path}")

KeyError: 'features'

In [28]:
import torch
import torch.nn as nn
from torchvision import transforms
from torchvision.models.detection import fasterrcnn_resnet50_fpn
from torch.utils.data import DataLoader
from torchvision.ops import roi_align
from pycocotools.coco import COCO
from PIL import Image
import os

# Visual Encoder Definition
class VisualEncoder(nn.Module):
    def __init__(self, visual_dim, hidden_dim, num_layers):
        super(VisualEncoder, self).__init__()
        self.visual_token = nn.Parameter(torch.randn(1, 1, visual_dim))
        self.transformer_blocks = nn.ModuleList(
            [nn.TransformerEncoderLayer(d_model=visual_dim, nhead=8, dim_feedforward=hidden_dim)
             for _ in range(num_layers)]
        )

    def forward(self, V):
        batch_size = V.size(0)
        f_img_v = self.visual_token.expand(batch_size, -1, -1)
        V_concat = torch.cat((f_img_v, V), dim=1)
        for layer in self.transformer_blocks:
            V_concat = layer(V_concat)
        F_v = V_concat[:, 0, :]  # Global feature
        return F_v, V_concat

# Parameters
visual_dim = 1024
hidden_dim = 2048
num_layers = 5
batch_size = 4  # Process images in batches of 4
image_size = 800  # Use 800 for resizing to match Faster R-CNN input size
K_v = 36  # Number of object features per image

# Initialize Encoder
visual_encoder = VisualEncoder(visual_dim=visual_dim, hidden_dim=hidden_dim, num_layers=num_layers)

# Faster R-CNN Model for Feature Extraction
faster_rcnn = fasterrcnn_resnet50_fpn(pretrained=True)
faster_rcnn.eval()

# Image Transformations
transform = transforms.Compose([
    transforms.Resize((image_size, image_size)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Load COCO Dataset
data_dir = 'D:/Project_phase_1/new_ds/train2014/train2014'  # COCO dataset path
ann_file = 'D:/Project_phase_1/new_ds/annotations_trainval2014/annotations/instances_train2014.json'  # Annotations path
coco = COCO(ann_file)
img_ids = list(coco.imgs.keys())

class COCODataset(torch.utils.data.Dataset):
    def __init__(self, coco, img_dir, transform=None):
        self.coco = coco
        self.img_dir = img_dir
        self.transform = transform
        self.img_ids = list(coco.imgs.keys())

    def __len__(self):
        return len(self.img_ids)

    def __getitem__(self, idx):
        img_id = self.img_ids[idx]
        path = self.coco.loadImgs(img_id)[0]['file_name']
        image = Image.open(os.path.join(self.img_dir, path)).convert("RGB")
        if self.transform:
            image = self.transform(image)
        return img_id, image

# Dataloader
coco_dataset = COCODataset(coco, data_dir, transform=transform)
data_loader = DataLoader(coco_dataset, batch_size=batch_size, shuffle=True)

# Directory to save encoded features
output_dir = 'D:/Project_phase_1/image modality/im_encod'  # Output path for saved features
os.makedirs(output_dir, exist_ok=True)

# Encoding and Saving Process
with torch.no_grad():
    for img_ids, batch in data_loader:
        # Extract predictions and feature maps with Faster R-CNN
        output = faster_rcnn(batch)
        
        for i, img_id in enumerate(img_ids):
            # Get bounding boxes and apply ROI Align to get object features
            boxes = output[i]['boxes']  # Detected bounding boxes
            feature_maps = output[i]['features']  # Get the feature map associated with the predictions
            
            if boxes.shape[0] > 0:
                # Convert boxes to numpy for roi_align
                boxes = boxes.cpu().detach().numpy()  # Convert to numpy for roi_align
                object_features = roi_align(feature_maps, boxes, output_size=(1, 1))
                object_features = object_features.view(object_features.size(0), -1)  # Flatten to (num_objects, 1024)
            else:
                object_features = torch.zeros(0, visual_dim)  # Handle images with no detections

            num_objects = object_features.size(0)

            # Select top K_v features (either pad or truncate to 36 features)
            if num_objects >= K_v:
                V = object_features[:K_v]
            else:
                padding = torch.zeros(K_v - num_objects, visual_dim)
                V = torch.cat([object_features, padding], dim=0)

            V = V.unsqueeze(0)  # Add batch dimension to match encoder input shape

            # Pass features through the visual encoder
            F_v, V_concat = visual_encoder(V)

            # Save encoded features in .pt format
            save_path = os.path.join(output_dir, f"{img_id}_features.pt")
            torch.save(F_v, save_path)
            print(f"Saved visual features for image {img_id} at {save_path}")



loading annotations into memory...
Done (t=17.11s)
creating index...
index created!


KeyError: 'features'

In [None]:
# %%
import torch
import torch.nn as nn
import torchvision.transforms as transforms
from torchvision.models.detection import fasterrcnn_resnet50_fpn
from torchvision.datasets import CocoDetection
from PIL import Image
import os

# %%
# Define preprocessing for the images
image_size = 800  # Resize to the specified size
preprocess = transforms.Compose([
    transforms.Resize((image_size, image_size)),
    transforms.ToTensor(),
])

# %%
# Load pretrained Faster R-CNN model
faster_rcnn = fasterrcnn_resnet50_fpn(pretrained=True)
faster_rcnn.eval()  # Set the model to evaluation mode

# %%
# Position-Aware Representation Module
class PositionAwareRepresentation(nn.Module):
    def __init__(self, feature_dim, bbox_dim, output_dim, hidden_dim=2048, num_layers=5):
        super(PositionAwareRepresentation, self).__init__()
        
        # Fully connected layers with the specified hidden dimensions and layers
        self.fc_f = nn.Linear(feature_dim, hidden_dim)
        self.fc_b = nn.Linear(bbox_dim, hidden_dim)
        
        self.hidden_layers = nn.Sequential(
            *[nn.Linear(hidden_dim, hidden_dim) for _ in range(num_layers)]
        )
        
        self.output_layer = nn.Linear(hidden_dim, output_dim)
        self.layer_norm_f = nn.LayerNorm(hidden_dim)
        self.layer_norm_b = nn.LayerNorm(hidden_dim)

    def forward(self, features, bboxes):
        # Normalize features and bounding boxes
        normalized_features = self.layer_norm_f(self.fc_f(features))
        normalized_bboxes = self.layer_norm_b(self.fc_b(bboxes))
        
        # Sum and average
        vj = (normalized_features + normalized_bboxes) / 2
        
        # Pass through hidden layers and final output layer
        V = self.output_layer(self.hidden_layers(vj).sum(dim=0))  # Sum over KV (36 objects)
        return V

# %%
# Function to extract features and bounding boxes from Faster R-CNN
def extract_faster_rcnn_features(image_tensor, kv=36):
    with torch.no_grad():
        predictions = faster_rcnn(image_tensor)

    objects = predictions[0]
    bboxes = objects['boxes']  # Shape: [N, 4]
    features = objects['scores']  # Shape: [N] confidence scores

    if len(features) > kv:
        features = features[:kv]
        bboxes = bboxes[:kv]
    elif len(features) < kv:
        padding_size = kv - len(features)
        features = torch.cat((features, torch.zeros(padding_size)))
        bboxes = torch.cat((bboxes, torch.zeros((padding_size, 4))))

    return features, bboxes

# %%
def process_dataset(coco_dataset, batch_size=4, kv=36, output_file='output_embeddings.pt'):
    results = []
    num_images = len(coco_dataset)

    # Set model dimensions
    dv = 1024  # visual_dim output dimension
    model = PositionAwareRepresentation(feature_dim=kv, bbox_dim=4, output_dim=dv)

    for i in range(len(coco_dataset.ids)):
        image, _ = coco_dataset[i]
        image_tensor = preprocess(image).unsqueeze(0)
        
        features, bboxes = extract_faster_rcnn_features(image_tensor, kv)

        V = model(features.unsqueeze(0), bboxes.unsqueeze(0))

        annotations = coco_dataset.coco.imgToAnns.get(coco_dataset.ids[i], [])
        if annotations:
            image_id = annotations[0]['image_id']
            results.append({'image_id': image_id, 'embedding': V})

        if (i + 1) % batch_size == 0 or (i + 1) == num_images:
            torch.save(results, output_file)
            results = []

        if (i + 1) % 1000 == 0:
            print(f'Processed {i + 1}/{num_images} images')

    print(f'Results saved to {output_file}')

# %%
# MS COCO dataset paths
coco_root = 'D:/Project_phase_1/new_ds/'
train_annotations = os.path.join(coco_root, 'annotations_trainval2014/annotations/instances_train2014.json')
train_images_dir = os.path.join(coco_root, 'train2014/train2014/')

# %%
# Load the MS COCO dataset for training
coco_dataset = CocoDetection(root=train_images_dir, annFile=train_annotations)

# %%
# Process the dataset and save results
process_dataset(coco_dataset, batch_size=4, kv=36, output_file='output_img.pt')




loading annotations into memory...
Done (t=17.03s)
creating index...
index created!
