In [1]:
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


Preprocessing and Training for train files


In [2]:
import json

def merge_vqa_data(train_cvqa_path, train_json_path, questions_definitions_path, output_path):
    # Load train_cvqa.json (Question-Answer pairs)
    with open(train_cvqa_path, "r", encoding="utf-8") as f:
        train_cvqa = json.load(f)

    # Load train.json (Text + Images)
    with open(train_json_path, "r", encoding="utf-8") as f:
        train_data = json.load(f)

    # Load closedquestions_definitions_imageclef2025.json (Question Bank)
    with open(questions_definitions_path, "r", encoding="utf-8") as f:
        question_definitions = json.load(f)

    # Create a mapping of encounter_id to train.json data for quick lookup
    train_data_dict = {entry["encounter_id"]: entry for entry in train_data}

    # Create a mapping of CQIDxxx to question details
    question_dict = {
        question["qid"]: {
            "question_text": question["question_en"],
            "options": question["options_en"]
        }
        for question in question_definitions
    }

    # Merge train_cvqa with train.json based on encounter_id
    merged_data = []
    for qa_entry in train_cvqa:
        encounter_id = qa_entry.get("encounter_id")

        if encounter_id in train_data_dict:
            train_entry = train_data_dict[encounter_id]

            merged_entry = {
                "encounter_id": encounter_id,
                "image_ids": train_entry.get("image_ids", []),
                "query_title_en": train_entry.get("query_title_en", ""),
                "query_content_en": train_entry.get("query_content_en", ""),

                "qa_pairs": {key: value for key, value in qa_entry.items() if key.startswith("CQID")},
                "questions": {}
            }

            # Map question IDs to actual questions and answers
            for qid, answer_index in merged_entry["qa_pairs"].items():
                if qid in question_dict:
                    try:
                        # Ensure answer_index is an integer and within bounds
                        index = int(answer_index)
                        options = question_dict[qid]["options"]
                        correct_answer = options[index] if 0 <= index < len(options) else None

                        merged_entry["questions"][qid] = {
                            "qid": qid,
                            "question_text": question_dict[qid]["question_text"],
                            "options": options,
                            "correct_answer": correct_answer
                        }
                    except Exception as e:
                        print(f"[ERROR] qid: {qid}, encounter_id: {encounter_id}, error: {str(e)}")
                else:
                    print(f"[WARN] Question ID {qid} not found in question definitions.")

            merged_data.append(merged_entry)
        else:
            print(f"[WARN] Encounter ID {encounter_id} not found in train_data_dict.")

    # Save processed data to JSON
    with open(output_path, "w", encoding="utf-8") as f:
        json.dump(merged_data, f, indent=4)

    # Log sample for verification
    print(f"\n✅ Processed data saved to: {output_path}")
    print("🔍 Sample entry:")
    print(json.dumps(merged_data[:2], indent=4))  # Display first 2 entries


# ==== File paths ====
train_cvqa_path = "/content/drive/MyDrive/MAGIC_Dataset/train_cvqa.json"
train_json_path = "/content/drive/MyDrive/MAGIC_Dataset/train.json"
questions_definitions_path = "/content/drive/MyDrive/MAGIC_Dataset/closedquestions_definitions_imageclef2025.json"
output_path = "/content/drive/MyDrive/MAGIC_Dataset/processed_data.json"

# ==== Run the processor ====
merge_vqa_data(train_cvqa_path, train_json_path, questions_definitions_path, output_path)



✅ Processed data saved to: /content/drive/MyDrive/MAGIC_Dataset/processed_data.json
🔍 Sample entry:
[
    {
        "encounter_id": "ENC00001",
        "image_ids": [
            "IMG_ENC00001_00001.jpg",
            "IMG_ENC00001_00002.jpg"
        ],
        "query_title_en": "Pleural effusion accompanied by rash",
        "query_content_en": "A patient with pleural effusion is accompanied by a systemic rash, as seen in the picture (currently only the back picture is available).",
        "qa_pairs": {
            "CQID010-001": 1,
            "CQID011-001": 5,
            "CQID011-002": 7,
            "CQID011-003": 7,
            "CQID011-004": 7,
            "CQID011-005": 7,
            "CQID011-006": 7,
            "CQID012-001": 1,
            "CQID012-002": 3,
            "CQID012-003": 3,
            "CQID012-004": 3,
            "CQID012-005": 3,
            "CQID012-006": 3,
            "CQID015-001": 6,
            "CQID020-001": 3,
            "CQID020-002": 9,
         

In [4]:
import torch
import os
import json
from PIL import Image
import numpy as np
from torchvision import models, transforms
from tqdm import tqdm
import glob
import cv2
from skimage.feature import local_binary_pattern
from skimage.transform import resize

# ✅ Setup VGG16
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
vgg16 = models.vgg16(pretrained=True).to(device)
vgg16.eval()

# ✅ Preprocessing for VGG16
preprocess = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# ✅ Image directory
image_dir = "/content/drive/MyDrive/MAGIC_Dataset/images_final/images_train"

# ✅ LBP Feature Extraction
def extract_lbp_features(image_path, P=8, R=1):
    # Read image and convert to grayscale
    image = cv2.imread(image_path)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)

    # Compute LBP
    lbp = local_binary_pattern(image, P, R, method='uniform')

    # Resize to match VGG shape
    lbp_resized = resize(lbp, (224, 224), mode='reflect', anti_aliasing=True)

    # Flatten and normalize
    lbp_flat = lbp_resized.flatten()
    lbp_norm = (lbp_flat - np.mean(lbp_flat)) / (np.std(lbp_flat) + 1e-6)

    return lbp_norm.astype(np.float32)

# ✅ VGG16 + LBP Feature Extraction
def extract_combined_features(image_path):
    # VGG16 features
    image = preprocess(Image.open(image_path).convert("RGB")).unsqueeze(0).to(device)
    with torch.no_grad():
        features = vgg16.features(image)
        features = features.view(features.size(0), -1)
        vgg_feat = features.squeeze(0).cpu().numpy()

    # LBP features
    lbp_feat = extract_lbp_features(image_path)

    # Combine VGG16 + LBP features
    combined_feat = np.concatenate((vgg_feat, lbp_feat))
    return combined_feat

# ✅ Process all training images
image_features = {}
image_paths = glob.glob(os.path.join(image_dir, "*.jpg"))  # Make sure your images are .jpg

for img_path in tqdm(image_paths):
    img_name = os.path.basename(img_path)
    try:
        image_features[img_name] = extract_combined_features(img_path)
    except Exception as e:
        print(f"Error processing {img_name}: {e}")

print(f"✅ Extracted combined features for {len(image_features)} images!")

# ✅ Save combined features to .npy file
output_path = "/content/drive/MyDrive/MAGIC_Dataset/vgg16_lbp_image_features_train.npy"
np.save(output_path, image_features)

print(f"✅ Combined features saved at: {output_path}")


100%|██████████| 2472/2472 [1:04:17<00:00,  1.56s/it]


✅ Extracted combined features for 2472 images!
✅ Combined features saved at: /content/drive/MyDrive/MAGIC_Dataset/vgg16_lbp_image_features_train.npy


In [7]:
import numpy as np
import json
from transformers import AutoTokenizer, AutoModel
import torch
from tqdm import tqdm

# ✅ Load preprocessed data (train only)
processed_data_path = "/content/drive/MyDrive/MAGIC_Dataset/processed_data.json"
with open(processed_data_path, "r", encoding="utf-8") as f:
    processed_data = json.load(f)

# ✅ Load XLM-R model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model_name = "xlm-roberta-base"
tokenizer = AutoTokenizer.from_pretrained(model_name)
xlm_r_model = AutoModel.from_pretrained(model_name).to(device)
xlm_r_model.eval()

# ✅ Load pre-extracted VGG16 image features
vgg16_features_path = "/content/drive/MyDrive/MAGIC_Dataset/vgg16_lbp_image_features_train.npy"
vgg16_image_features = np.load(vgg16_features_path, allow_pickle=True).item()  # dict[img_name] = np.array([...])

# ✅ Function to get XLM-R embedding for text
def get_xlm_r_embedding(text):
    """Returns average pooled embedding of input text using XLM-R"""
    inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=512)
    inputs = {key: val.to(device) for key, val in inputs.items()}
    with torch.no_grad():
        outputs = xlm_r_model(**inputs)
        embeddings = outputs.last_hidden_state  # shape: [1, seq_len, hidden_size]
        return embeddings.mean(dim=1).squeeze().cpu().numpy()  # shape: [768]

# ✅ Build fused data using XLM-R + VGG16 features
fused_data = []

for entry in tqdm(processed_data):
    encounter_id = entry["encounter_id"]

    # 📝 Combine title and content
    query_title = entry.get("query_title_en", "")
    query_content = entry.get("query_content_en", "")
    en_text = query_title + " " + query_content  # Combine title and content

    # 🔠 XLM-R embedding for the combined text
    text_vector = get_xlm_r_embedding(en_text)

    # 🖼️ VGG16 image embeddings (average if multiple images)
    image_vectors = []
    for img_id in entry["image_ids"]:
        img_file = img_id + ".jpg"
        if img_file in vgg16_image_features:
            image_vectors.append(vgg16_image_features[img_file])

    if image_vectors:
        image_vector = np.mean(image_vectors, axis=0)
    else:
        image_vector = np.zeros(4096)  # Zero vector if no image found

    # 🔗 Concatenate text and image vectors → final multimodal feature
    final_vector = np.concatenate((text_vector, image_vector))  # shape: (768 + 4096 = 4864)

    # ✅ Add to fused list
    fused_data.append({
        "encounter_id": encounter_id,
        "features": final_vector.tolist(),
        "qa_pairs": entry["qa_pairs"]
    })

# 💾 Save fused output
fused_out = "/content/drive/MyDrive/MAGIC_Dataset/fused_data_xlmr_vgg16.json"
with open(fused_out, "w", encoding="utf-8") as f:
    json.dump(fused_data, f, indent=4)

print(f"✅ Fused data saved at: {fused_out}")


100%|██████████| 300/300 [02:23<00:00,  2.09it/s]


✅ Fused data saved at: /content/drive/MyDrive/MAGIC_Dataset/fused_data_xlmr_vgg16.json


BERT+ViT

In [34]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import json
from sklearn.model_selection import train_test_split

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load fused data
fused_data_path = "/content/drive/MyDrive/MAGIC_Dataset/fused_data_xlmr_vgg16.json"
with open(fused_data_path, "r", encoding="utf-8") as f:
    fused_data = json.load(f)

# Prepare input features and labels
text_inputs, attention_masks, image_tensors, labels = [], [], [], []

for entry in fused_data:
    # Use existing preprocessed features instead of raw text
    features_vector = entry["features"]
    text_inputs.append(features_vector[:768])  # First 768 features for text
    attention_masks.append(np.ones(768))  # Fake attention mask (all ones)
    image_tensors.append(features_vector[768:])  # Remaining features for image
    # Collect labels
    labels.append(list(entry["qa_pairs"].values()))

# Convert to PyTorch tensors
text_inputs_tensor = torch.tensor(np.array(text_inputs), dtype=torch.float32)
attention_masks_tensor = torch.tensor(np.array(attention_masks), dtype=torch.float32)
image_tensors_tensor = torch.tensor(np.array(image_tensors), dtype=torch.float32)
y_tensor = torch.tensor(np.array(labels), dtype=torch.long)

# Train-test split (80% train, 20% test)
X_train_text, X_test_text, X_train_mask, X_test_mask, X_train_img, X_test_img, y_train, y_test = train_test_split(
    text_inputs_tensor, attention_masks_tensor, image_tensors_tensor, y_tensor, test_size=0.2, random_state=42
)

# Define Multimodal Model
class MultimodalModel(nn.Module):
    def __init__(self, num_classes=12):
        super(MultimodalModel, self).__init__()

        # Reduce Image Feature Dimensionality
        self.img_fc = nn.Linear(4096, 256)  # Updated: 4096 input features (VGG16)

        # Fusion Layer + Classification
        self.fc = nn.Sequential(
            nn.Linear(768 + 256, 256),  # 768 (text) + 256 (reduced image)
            nn.ReLU(),
            nn.Linear(256, num_classes * 27)  # 27 questions × 12 answer choices
        )

    def forward(self, text_features, attention_mask, image_tensor):
        # Reduce Image Feature Dimensionality
        image_features = self.img_fc(image_tensor)

        # Fuse Text + Image Features
        fused = torch.cat((text_features, image_features), dim=1)
        output = self.fc(fused)

        # Reshape output to match [batch, 27, num_classes]
        output = output.view(-1, 27, num_classes)
        return output

# Initialize model
model = MultimodalModel().to(device)

# Define Loss and Optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)

# Training Loop
epochs = 1000
batch_size = 17
num_classes = 12

for epoch in range(epochs):
    model.train()
    for i in range(0, len(X_train_text), batch_size):
        batch_text = X_train_text[i:i+batch_size].to(device)
        batch_mask = X_train_mask[i:i+batch_size].to(device)  # Not needed but kept for compatibility
        batch_img = X_train_img[i:i+batch_size].to(device)
        batch_y = y_train[i:i+batch_size].to(device)

        optimizer.zero_grad()
        outputs = model(batch_text, batch_mask, batch_img)  # Shape [batch, 27, num_classes]

        # Reshape outputs and labels correctly
        outputs = outputs.view(-1, num_classes)  # Reshape to [batch * 27, num_classes]
        batch_y = batch_y.view(-1).long()  # Reshape labels to [batch * 27]

        loss = criterion(outputs, batch_y)
        loss.backward()
        optimizer.step()

    print(f"Epoch {epoch+1}/{epochs}, Loss: {loss.item():.4f}")

# Save model
torch.save(model.state_dict(), "/content/drive/MyDrive/MAGIC_Dataset/multimodal_vit.pth")
print("✅ Model training complete! Saved model to /content/drive/MyDrive/MAGIC_Dataset/multimodal_vit.pth")


Epoch 1/1000, Loss: 2.3575
Epoch 2/1000, Loss: 2.2125
Epoch 3/1000, Loss: 2.0214
Epoch 4/1000, Loss: 1.7731
Epoch 5/1000, Loss: 1.4767
Epoch 6/1000, Loss: 1.1659
Epoch 7/1000, Loss: 0.8933
Epoch 8/1000, Loss: 0.6985
Epoch 9/1000, Loss: 0.5806
Epoch 10/1000, Loss: 0.5150
Epoch 11/1000, Loss: 0.4788
Epoch 12/1000, Loss: 0.4583
Epoch 13/1000, Loss: 0.4461
Epoch 14/1000, Loss: 0.4384
Epoch 15/1000, Loss: 0.4331
Epoch 16/1000, Loss: 0.4292
Epoch 17/1000, Loss: 0.4262
Epoch 18/1000, Loss: 0.4237
Epoch 19/1000, Loss: 0.4217
Epoch 20/1000, Loss: 0.4200
Epoch 21/1000, Loss: 0.4186
Epoch 22/1000, Loss: 0.4173
Epoch 23/1000, Loss: 0.4163
Epoch 24/1000, Loss: 0.4153
Epoch 25/1000, Loss: 0.4144
Epoch 26/1000, Loss: 0.4136
Epoch 27/1000, Loss: 0.4129
Epoch 28/1000, Loss: 0.4122
Epoch 29/1000, Loss: 0.4116
Epoch 30/1000, Loss: 0.4109
Epoch 31/1000, Loss: 0.4104
Epoch 32/1000, Loss: 0.4098
Epoch 33/1000, Loss: 0.4092
Epoch 34/1000, Loss: 0.4087
Epoch 35/1000, Loss: 0.4082
Epoch 36/1000, Loss: 0.4077
E

Preprocessing ,feature extraction for valid


In [10]:
import json

def merge_vqa_data(train_cvqa_path, train_json_path, questions_definitions_path, output_path):
    # Load train_cvqa.json (Question-Answer pairs)
    with open(train_cvqa_path, "r", encoding="utf-8") as f:
        train_cvqa = json.load(f)

    # Load train.json (Text + Images)
    with open(train_json_path, "r", encoding="utf-8") as f:
        train_data = json.load(f)

    # Load closedquestions_definitions_imageclef2025.json (Question Bank)
    with open(questions_definitions_path, "r", encoding="utf-8") as f:
        question_definitions = json.load(f)

    # Create a mapping of encounter_id to train.json data for quick lookup
    train_data_dict = {entry["encounter_id"]: entry for entry in train_data}

    # Create a mapping of CQIDxxx to question details
    question_dict = {
        question["qid"]: {
            "question_text": question["question_en"],
            "options": question["options_en"]
        }
        for question in question_definitions
    }

    # Merge train_cvqa with train.json based on encounter_id
    merged_data = []
    for qa_entry in train_cvqa:
        encounter_id = qa_entry.get("encounter_id")

        if encounter_id in train_data_dict:
            train_entry = train_data_dict[encounter_id]

            merged_entry = {
                "encounter_id": encounter_id,
                "image_ids": train_entry.get("image_ids", []),
                "query_title_en": train_entry.get("query_title_en", ""),
                "query_content_en": train_entry.get("query_content_en", ""),

                "qa_pairs": {key: value for key, value in qa_entry.items() if key.startswith("CQID")},
                "questions": {}
            }

            # Map question IDs to actual questions and answers
            for qid, answer_index in merged_entry["qa_pairs"].items():
                if qid in question_dict:
                    try:
                        # Ensure answer_index is an integer and within bounds
                        index = int(answer_index)
                        options = question_dict[qid]["options"]
                        correct_answer = options[index] if 0 <= index < len(options) else None

                        merged_entry["questions"][qid] = {
                            "qid": qid,
                            "question_text": question_dict[qid]["question_text"],
                            "options": options,
                            "correct_answer": correct_answer
                        }
                    except Exception as e:
                        print(f"[ERROR] qid: {qid}, encounter_id: {encounter_id}, error: {str(e)}")
                else:
                    print(f"[WARN] Question ID {qid} not found in question definitions.")

            merged_data.append(merged_entry)
        else:
            print(f"[WARN] Encounter ID {encounter_id} not found in train_data_dict.")

    # Save processed data to JSON
    with open(output_path, "w", encoding="utf-8") as f:
        json.dump(merged_data, f, indent=4)

    # Log sample for verification
    print(f"\n✅ Processed data saved to: {output_path}")
    print("🔍 Sample entry:")
    print(json.dumps(merged_data[:2], indent=4))  # Display first 2 entries


# ==== File paths ====
train_cvqa_path = "/content/drive/MyDrive/MAGIC_Dataset/valid_cvqa.json"
train_json_path = "/content/drive/MyDrive/MAGIC_Dataset/valid_ht_v2.json"
questions_definitions_path = "/content/drive/MyDrive/MAGIC_Dataset/closedquestions_definitions_imageclef2025.json"
output_path = "/content/drive/MyDrive/MAGIC_Dataset/processed_data_valid.json"

# ==== Run the processor ====
merge_vqa_data(train_cvqa_path, train_json_path, questions_definitions_path, output_path)



✅ Processed data saved to: /content/drive/MyDrive/MAGIC_Dataset/processed_data_valid.json
🔍 Sample entry:
[
    {
        "encounter_id": "ENC00852",
        "image_ids": [
            "IMG_ENC00852_00001.jpg",
            "IMG_ENC00852_00002.jpg"
        ],
        "query_title_en": "Is this Vitiligo?  Please see picture.",
        "query_content_en": "The patient is a middle age female, about 50 years old. There are dark red rashes on the back of the hand.  They gradually turn into Leukoplakia, but would become reddish on rubbing locally.  Patient feels no specific symptoms, but exhibits problem mentally.  Treatment based on diagnosis as eczema and  Vitiligo.  However, the symptom is getting worse: rashes are getting bigger and there come macula in light red on the face one month later, without skin scales.  There is no picture for the face.  Patient refuses to take a pathological examination.",
        "qa_pairs": {
            "CQID010-001": 1,
            "CQID011-001": 2,
      

In [11]:
import torch
import os
import json
from PIL import Image
import numpy as np
from torchvision import models, transforms
from tqdm import tqdm
import glob
import cv2
from skimage.feature import local_binary_pattern
from skimage.transform import resize

# ✅ Setup VGG16
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
vgg16 = models.vgg16(pretrained=True).to(device)
vgg16.eval()

# ✅ Preprocessing for VGG16
preprocess = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# ✅ Image directory
image_dir = "/content/drive/MyDrive/MAGIC_Dataset/images_final/images_valid"

# ✅ LBP Feature Extraction
def extract_lbp_features(image_path, P=8, R=1):
    # Read image and convert to grayscale
    image = cv2.imread(image_path)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)

    # Compute LBP
    lbp = local_binary_pattern(image, P, R, method='uniform')

    # Resize to match VGG shape
    lbp_resized = resize(lbp, (224, 224), mode='reflect', anti_aliasing=True)

    # Flatten and normalize
    lbp_flat = lbp_resized.flatten()
    lbp_norm = (lbp_flat - np.mean(lbp_flat)) / (np.std(lbp_flat) + 1e-6)

    return lbp_norm.astype(np.float32)

# ✅ VGG16 + LBP Feature Extraction
def extract_combined_features(image_path):
    # VGG16 features
    image = preprocess(Image.open(image_path).convert("RGB")).unsqueeze(0).to(device)
    with torch.no_grad():
        features = vgg16.features(image)
        features = features.view(features.size(0), -1)
        vgg_feat = features.squeeze(0).cpu().numpy()

    # LBP features
    lbp_feat = extract_lbp_features(image_path)

    # Combine VGG16 + LBP features
    combined_feat = np.concatenate((vgg_feat, lbp_feat))
    return combined_feat

# ✅ Process all training images
image_features = {}
image_paths = glob.glob(os.path.join(image_dir, "*.jpg"))  # Make sure your images are .jpg

for img_path in tqdm(image_paths):
    img_name = os.path.basename(img_path)
    try:
        image_features[img_name] = extract_combined_features(img_path)
    except Exception as e:
        print(f"Error processing {img_name}: {e}")

print(f"✅ Extracted combined features for {len(image_features)} images!")

# ✅ Save combined features to .npy file
output_path = "/content/drive/MyDrive/MAGIC_Dataset/vgg16_lbp_image_features_valid.npy"
np.save(output_path, image_features)

print(f"✅ Combined features saved at: {output_path}")


100%|██████████| 155/155 [05:00<00:00,  1.94s/it]


✅ Extracted combined features for 155 images!
✅ Combined features saved at: /content/drive/MyDrive/MAGIC_Dataset/vgg16_lbp_image_features_valid.npy


In [12]:
import numpy as np
import json
from transformers import AutoTokenizer, AutoModel
import torch
from tqdm import tqdm

# ✅ Load preprocessed data (train only)
processed_data_path = "/content/drive/MyDrive/MAGIC_Dataset/processed_data_valid.json"
with open(processed_data_path, "r", encoding="utf-8") as f:
    processed_data = json.load(f)

# ✅ Load XLM-R model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model_name = "xlm-roberta-base"
tokenizer = AutoTokenizer.from_pretrained(model_name)
xlm_r_model = AutoModel.from_pretrained(model_name).to(device)
xlm_r_model.eval()

# ✅ Load pre-extracted VGG16 image features
vgg16_features_path = "/content/drive/MyDrive/MAGIC_Dataset/vgg16_lbp_image_features_valid.npy"
vgg16_image_features = np.load(vgg16_features_path, allow_pickle=True).item()  # dict[img_name] = np.array([...])

# ✅ Function to get XLM-R embedding for text
def get_xlm_r_embedding(text):
    """Returns average pooled embedding of input text using XLM-R"""
    inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=512)
    inputs = {key: val.to(device) for key, val in inputs.items()}
    with torch.no_grad():
        outputs = xlm_r_model(**inputs)
        embeddings = outputs.last_hidden_state  # shape: [1, seq_len, hidden_size]
        return embeddings.mean(dim=1).squeeze().cpu().numpy()  # shape: [768]

# ✅ Build fused data using XLM-R + VGG16 features
fused_data = []

for entry in tqdm(processed_data):
    encounter_id = entry["encounter_id"]

    # 📝 Combine title and content
    query_title = entry.get("query_title_en", "")
    query_content = entry.get("query_content_en", "")
    en_text = query_title + " " + query_content  # Combine title and content

    # 🔠 XLM-R embedding for the combined text
    text_vector = get_xlm_r_embedding(en_text)

    # 🖼️ VGG16 image embeddings (average if multiple images)
    image_vectors = []
    for img_id in entry["image_ids"]:
        img_file = img_id + ".jpg"
        if img_file in vgg16_image_features:
            image_vectors.append(vgg16_image_features[img_file])

    if image_vectors:
        image_vector = np.mean(image_vectors, axis=0)
    else:
        image_vector = np.zeros(4096)  # Zero vector if no image found

    # 🔗 Concatenate text and image vectors → final multimodal feature
    final_vector = np.concatenate((text_vector, image_vector))  # shape: (768 + 4096 = 4864)

    # ✅ Add to fused list
    fused_data.append({
        "encounter_id": encounter_id,
        "features": final_vector.tolist(),
        "qa_pairs": entry["qa_pairs"]
    })

# 💾 Save fused output
fused_out = "/content/drive/MyDrive/MAGIC_Dataset/fused_data_xlmr_vgg16_valid.json"
with open(fused_out, "w", encoding="utf-8") as f:
    json.dump(fused_data, f, indent=4)

print(f"✅ Fused data saved at: {fused_out}")


100%|██████████| 56/56 [00:22<00:00,  2.49it/s]


✅ Fused data saved at: /content/drive/MyDrive/MAGIC_Dataset/fused_data_xlmr_vgg16_valid.json


Preprocessing ,feature extraction for test






In [13]:
import json

def preprocess_test_data(test_json_path, questions_definitions_path, output_path):
    # Load test.json (text + images)
    with open(test_json_path, "r", encoding="utf-8") as f:
        test_data = json.load(f)

    # Load question definitions
    with open(questions_definitions_path, "r", encoding="utf-8") as f:
        question_definitions = json.load(f)

    # Create a dictionary of questions
    question_dict = {
        question["qid"]: {
            "qid": question["qid"],
            "question_text": question["question_en"],
            "options": question["options_en"]
        }
        for question in question_definitions
    }

    # Prepare test entries
    processed_data = []
    for entry in test_data:
        processed_entry = {
            "encounter_id": entry["encounter_id"],
            "image_ids": entry.get("image_ids", []),
            "query_title_en": entry.get("query_title_en", ""),
            "query_content_en": entry.get("query_content_en", ""),
            "questions": question_dict,  # Attach all questions
            "qa_pairs": {}  # Empty as no labels are present
        }
        processed_data.append(processed_entry)

    # Save
    with open(output_path, "w", encoding="utf-8") as f:
        json.dump(processed_data, f, indent=4)

    print(f"✅ Test data processed and saved to: {output_path}")
    print(json.dumps(processed_data[:2], indent=4))  # show sample


# === File paths ===
test_json_path = "/content/drive/MyDrive/MAGIC_Dataset/test_ht_v3_spanishtestsetcorrected.json"
questions_definitions_path = "/content/drive/MyDrive/MAGIC_Dataset/closedquestions_definitions_imageclef2025.json"
output_path = "/content/drive/MyDrive/MAGIC_Dataset/processed_data_test.json"

# === Run ===
preprocess_test_data(test_json_path, questions_definitions_path, output_path)


✅ Test data processed and saved to: /content/drive/MyDrive/MAGIC_Dataset/processed_data_test.json
[
    {
        "encounter_id": "ENC00908",
        "image_ids": [
            "IMG_ENC00908_00001.jpg",
            "IMG_ENC00908_00002.jpg"
        ],
        "query_title_en": "Take a look. Is this a skin disease?",
        "query_content_en": "Picture 1:  On the outside of the thigh, there is a small circle of lump.  Approximately 2 months.\nPicture 2:  Small red spots on the palm.  There is slight numbness in the palm.",
        "questions": {
            "CQID010-001": {
                "qid": "CQID010-001",
                "question_text": "How much of the body is affected?",
                "options": [
                    "single spot",
                    "limited area",
                    "widespread",
                    "Not mentioned"
                ]
            },
            "CQID011-001": {
                "qid": "CQID011-001",
                "question_text": "1 Where 

In [14]:
import torch
import os
import json
from PIL import Image
import numpy as np
from torchvision import models, transforms
from tqdm import tqdm
import glob
import cv2
from skimage.feature import local_binary_pattern
from skimage.transform import resize

# ✅ Setup VGG16
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
vgg16 = models.vgg16(pretrained=True).to(device)
vgg16.eval()

# ✅ Preprocessing for VGG16
preprocess = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# ✅ Image directory for test images (adjust path as necessary)
image_dir = "/content/drive/MyDrive/MAGIC_Dataset/images_final/images_test"

# ✅ LBP Feature Extraction function
def extract_lbp_features(image_path, P=8, R=1):
    # Read image and convert to grayscale
    image = cv2.imread(image_path)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)

    # Compute LBP
    lbp = local_binary_pattern(image, P, R, method='uniform')

    # Resize to match VGG shape
    lbp_resized = resize(lbp, (224, 224), mode='reflect', anti_aliasing=True)

    # Flatten and normalize
    lbp_flat = lbp_resized.flatten()
    lbp_norm = (lbp_flat - np.mean(lbp_flat)) / (np.std(lbp_flat) + 1e-6)

    return lbp_norm.astype(np.float32)

# ✅ VGG16 + LBP Feature Extraction function
def extract_combined_features(image_path):
    # VGG16 features
    image = preprocess(Image.open(image_path).convert("RGB")).unsqueeze(0).to(device)
    with torch.no_grad():
        features = vgg16.features(image)
        features = features.view(features.size(0), -1)
        vgg_feat = features.squeeze(0).cpu().numpy()

    # LBP features
    lbp_feat = extract_lbp_features(image_path)

    # Combine VGG16 + LBP features
    combined_feat = np.concatenate((vgg_feat, lbp_feat))
    return combined_feat

# ✅ Process all test images
test_image_features = {}
test_image_paths = glob.glob(os.path.join(image_dir, "*.jpg"))  # Ensure your images are .jpg

for img_path in tqdm(test_image_paths):
    img_name = os.path.basename(img_path)
    try:
        test_image_features[img_name] = extract_combined_features(img_path)
    except Exception as e:
        print(f"Error processing {img_name}: {e}")

print(f"✅ Extracted combined features for {len(test_image_features)} test images!")

# ✅ Save combined features to .npy file for test set
output_test_path = "/content/drive/MyDrive/MAGIC_Dataset/vgg16_lbp_image_features_test.npy"
np.save(output_test_path, test_image_features)

print(f"✅ Combined test features saved at: {output_test_path}")


100%|██████████| 312/312 [08:44<00:00,  1.68s/it]


✅ Extracted combined features for 312 test images!
✅ Combined test features saved at: /content/drive/MyDrive/MAGIC_Dataset/vgg16_lbp_image_features_test.npy


In [16]:
import numpy as np
import json
from transformers import AutoTokenizer, AutoModel
import torch
from tqdm import tqdm

# ✅ Load preprocessed test data
test_processed_data_path = "/content/drive/MyDrive/MAGIC_Dataset/processed_data_test.json"  # Adjust the path for your test set
with open(test_processed_data_path, "r", encoding="utf-8") as f:
    test_processed_data = json.load(f)

# ✅ Load XLM-R model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model_name = "xlm-roberta-base"
tokenizer = AutoTokenizer.from_pretrained(model_name)
xlm_r_model = AutoModel.from_pretrained(model_name).to(device)
xlm_r_model.eval()

# ✅ Load pre-extracted VGG16 image features for the test set
vgg16_features_test_path = "/content/drive/MyDrive/MAGIC_Dataset/vgg16_lbp_image_features_test.npy"
vgg16_image_features_test = np.load(vgg16_features_test_path, allow_pickle=True).item()  # dict[img_name] = np.array([...])

# ✅ Function to get XLM-R embedding for text
def get_xlm_r_embedding(text):
    """Returns average pooled embedding of input text using XLM-R"""
    inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=512)
    inputs = {key: val.to(device) for key, val in inputs.items()}
    with torch.no_grad():
        outputs = xlm_r_model(**inputs)
        embeddings = outputs.last_hidden_state  # shape: [1, seq_len, hidden_size]
        return embeddings.mean(dim=1).squeeze().cpu().numpy()  # shape: [768]

# ✅ Build fused data using XLM-R + VGG16 for the test set
fused_test_data = []

for entry in tqdm(test_processed_data):
    encounter_id = entry["encounter_id"]

    # 📝 Combine title and content
    query_title = entry.get("query_title_en", "")
    query_content = entry.get("query_content_en", "")
    en_text = query_title + " " + query_content  # Combine title and content

    # 🔠 XLM-R embedding for the combined text
    text_vector = get_xlm_r_embedding(en_text)

    # 🖼️ VGG16 image embeddings (average if multiple images)
    image_vectors = []
    for img_id in entry["image_ids"]:
        img_file = img_id + ".jpg"
        if img_file in vgg16_image_features_test:
            image_vectors.append(vgg16_image_features_test[img_file])

    if image_vectors:
        image_vector = np.mean(image_vectors, axis=0)
    else:
        image_vector = np.zeros(4096)  # Zero vector if no image found

    # 🔗 Concatenate text and image vectors → final multimodal feature
    final_vector = np.concatenate((text_vector, image_vector))  # shape: (768 + 4096 = 4864)

    # ✅ Add to fused list
    fused_test_data.append({
        "encounter_id": encounter_id,
        "features": final_vector.tolist(),
        "qa_pairs": entry["qa_pairs"]
    })

# 💾 Save fused test data
fused_test_out = "/content/drive/MyDrive/MAGIC_Dataset/fused_test_data_xlmr_vgg16.json"
with open(fused_test_out, "w", encoding="utf-8") as f:
    json.dump(fused_test_data, f, indent=4)

print(f"✅ Fused test data saved at: {fused_test_out}")


100%|██████████| 100/100 [00:50<00:00,  1.97it/s]


✅ Fused test data saved at: /content/drive/MyDrive/MAGIC_Dataset/fused_test_data_xlmr_vgg16.json


In [35]:
import torch
import torch.nn as nn
import json
import numpy as np

# Device setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Define Multimodal Model
class MultimodalModel(nn.Module):
    def __init__(self, num_classes=12):
        super(MultimodalModel, self).__init__()

        # Reduce Image Feature Dimensionality
        self.img_fc = nn.Linear(4096, 256)  # Updated: 4096 input features (VGG16)

        # Fusion Layer + Classification
        self.fc = nn.Sequential(
            nn.Linear(768 + 256, 256),  # 768 (text) + 256 (reduced image)
            nn.ReLU(),
            nn.Linear(256, num_classes * 27)  # 27 questions × 12 answer choices
        )

    def forward(self, text_features, attention_mask, image_tensor):
        # Reduce Image Feature Dimensionality
        image_features = self.img_fc(image_tensor)

        # Fuse Text + Image Features
        fused = torch.cat((text_features, image_features), dim=1)
        output = self.fc(fused)

        # Reshape output to match [batch, 27, num_classes]
        output = output.view(-1, 27, num_classes)
        return output

# Initialize model
model = MultimodalModel().to(device)




model.load_state_dict(torch.load("/content/drive/MyDrive/MAGIC_Dataset/multimodal_vit.pth"))
model.eval()

# Define function to process any dataset
def load_and_predict(json_path, save_path=None):
    # Load the dataset
    with open(json_path, "r", encoding="utf-8") as f:
        data = json.load(f)

    encounter_ids = []
    text_inputs, image_inputs, attention_masks = [], [], []

    # Prepare inputs for model
    for entry in data:
        features = entry["features"]
        text_inputs.append(features[:768])
        image_inputs.append(features[768:])
        attention_masks.append(np.ones(768))  # dummy
        encounter_ids.append(entry["encounter_id"])

    # Convert to tensors
    text_tensor = torch.tensor(text_inputs, dtype=torch.float32).to(device)
    image_tensor = torch.tensor(image_inputs, dtype=torch.float32).to(device)
    attention_tensor = torch.tensor(attention_masks, dtype=torch.float32).to(device)

    # Run inference in batches
    batch_size = 17
    predictions = []

    for i in range(0, len(text_tensor), batch_size):
        batch_text = text_tensor[i:i+batch_size]
        batch_image = image_tensor[i:i+batch_size]
        batch_mask = attention_tensor[i:i+batch_size]

        with torch.no_grad():
            outputs = model(batch_text, batch_mask, batch_image)
            preds = torch.argmax(outputs, dim=2)  # Shape: [batch, 27]
            predictions.extend(preds.cpu().numpy())

    # Prepare the final output format
    qid_list = [
        "CQID010-001", "CQID011-001", "CQID011-002", "CQID011-003", "CQID011-004", "CQID011-005", "CQID011-006",
        "CQID012-001", "CQID012-002", "CQID012-003", "CQID012-004", "CQID012-005", "CQID012-006",
        "CQID015-001", "CQID020-001", "CQID020-002", "CQID020-003", "CQID020-004", "CQID020-005",
        "CQID020-006", "CQID020-007", "CQID020-008", "CQID020-009",
        "CQID025-001", "CQID034-001", "CQID035-001", "CQID036-001"
    ]

    predictions_list = []

    # Structure output as per requirement
    for enc_id, pred_row in zip(encounter_ids, predictions):
        pred_entry = {"encounter_id": enc_id}
        for i, qid in enumerate(qid_list):
            pred_entry[qid] = int(pred_row[i])  # Ensure it's an int
        predictions_list.append(pred_entry)

    # Save the predictions to file if path is provided
    if save_path:
        with open(save_path, "w", encoding="utf-8") as f:
            json.dump(predictions_list, f, indent=4)
        print(f"✅ Saved predictions to {save_path}")

    return predictions_list


# --------- RUN ON VALIDATION SET ---------
val_json_path = "/content/drive/MyDrive/MAGIC_Dataset/fused_data_xlmr_vgg16_valid.json"  # Validation set JSON
val_output_path = "/content/drive/MyDrive/MAGIC_Dataset/val_predictions.json"
val_predictions = load_and_predict(val_json_path, val_output_path)

# --------- RUN ON TEST SET (for submission) ---------
test_json_path = "/content/drive/MyDrive/MAGIC_Dataset/fused_test_data_xlmr_vgg16.json"  # Test set JSON
test_output_path = "/content/drive/MyDrive/MAGIC_Dataset/data_cvqa.json"
test_predictions = load_and_predict(test_json_path, test_output_path)


✅ Saved predictions to /content/drive/MyDrive/MAGIC_Dataset/val_predictions.json
✅ Saved predictions to /content/drive/MyDrive/MAGIC_Dataset/data_cvqa.json


In [36]:
import sys
import json

# QID List and Parents (This can remain the same)
QIDS = [
    "CQID010-001", "CQID011-001", "CQID011-002", "CQID011-003", "CQID011-004", "CQID011-005", "CQID011-006",
    "CQID012-001", "CQID012-002", "CQID012-003", "CQID012-004", "CQID012-005", "CQID012-006", "CQID015-001",
    "CQID020-001", 'CQID020-002', 'CQID020-003', 'CQID020-004', 'CQID020-005', 'CQID020-006', 'CQID020-007',
    'CQID020-008', 'CQID020-009', "CQID025-001", "CQID034-001", "CQID035-001", "CQID036-001",
]

QIDS_PARENTS = sorted(list(set([x.split('-')[0] for x in QIDS])))

# Functions for calculating accuracy, organizing data, and main logic

def calculate_accuracy(qid2val_byencounterid_gold, qid2val_byencounterid_sys, qidparents=QIDS_PARENTS):
    results = {}
    x_all = []
    y_all = []
    encounter_ids = list(qid2val_byencounterid_gold.keys())

    for qid in qidparents:
        goldlist = [qid2val_byencounterid_gold[encounter_id][qid] for encounter_id in encounter_ids]
        syslist = [qid2val_byencounterid_sys[encounter_id][qid] for encounter_id in encounter_ids]
        x_all.extend(goldlist)
        y_all.extend(syslist)
        results['accuracy_{}'.format(qid)] = get_accuracy_score(goldlist, syslist)

    results['accuracy_{}'.format('all')] = get_accuracy_score(x_all, y_all)
    return results

def get_accuracy_score(gold_items, sys_items):
    total = 0
    weight_sum = 0
    for x, y in zip(gold_items, sys_items):
        weight = len(set(x).intersection(set(y)))
        weight_sum += weight / max(len(set(x)), len(set(y)))
        total += 1
    return weight_sum / total

def organize_values(data):
    qid2val_byencounterid = {}
    for item in data:
        encounter_id = item['encounter_id'].split('-')[0]
        qid2val_byencounterid[encounter_id] = qid2val_byencounterid.get(encounter_id, {})
        for key, val in item.items():
            if key == 'encounter_id' or len(key) != 11:
                continue
            qid, _ = key.split('-')
            qid2val_byencounterid[encounter_id][qid] = qid2val_byencounterid[encounter_id].get(qid, [])
            qid2val_byencounterid[encounter_id][qid].append(val)
    return qid2val_byencounterid

def main(reference_fn, prediction_fn):
    with open(reference_fn) as f:
        data_ref = json.load(f)
    with open(prediction_fn) as f:
        data_sys = json.load(f)

    print('Detected {} instances for reference.'.format(len(data_ref)), file=sys.stderr)
    print('Detected {} instances for predictions.'.format(len(data_sys)), file=sys.stderr)

    if len(data_sys) == 0:
        return {
            "accuracy_CQID010": 0.0,
            "accuracy_CQID011": 0.0,
            "accuracy_CQID012": 0.0,
            "accuracy_CQID015": 0.0,
            "accuracy_CQID020": 0.0,
            "accuracy_CQID025": 0.0,
            "accuracy_CQID034": 0.0,
            "accuracy_CQID035": 0.0,
            "accuracy_CQID036": 0.0,
            "accuracy_all": 0.0,
            "number_cvqa_instances": 0
        }

    encounterids_ref = set([x['encounter_id'] for x in data_ref])
    encounterids_sys = set([x['encounter_id'] for x in data_sys])
    print('ENCOUNTERID-MATCH: {}'.format(encounterids_ref == encounterids_sys), file=sys.stderr)

    print('Organizing Values by Questionids', file=sys.stderr)
    qid2val_byencounterid_gold = organize_values(data_ref)
    qid2val_byencounterid_sys = organize_values(data_sys)

    print('Calculating Accuracy', file=sys.stderr)
    results = calculate_accuracy(qid2val_byencounterid_gold, qid2val_byencounterid_sys)
    results['number_cvqa_instances'] = len(encounterids_ref)
    return results

if __name__ == "__main__":
    reference_fn = '/content/drive/MyDrive/MAGIC_Dataset/valid_cvqa.json'  # Your ground truth file
    prediction_fn = '/content/drive/MyDrive/MAGIC_Dataset/val_predictions.json'  # Your predicted output file
    score_dir = '/content/drive/MyDrive/MAGIC_Dataset/'

    results = main(reference_fn, prediction_fn)
    with open('{}/scores_cvqa.json'.format(score_dir), 'w') as f:
        json.dump(results, f, indent=4)

    print(f"✅ Results saved successfully to {score_dir}/scores_cvqa.json")


✅ Results saved successfully to /content/drive/MyDrive/MAGIC_Dataset//scores_cvqa.json


Detected 56 instances for reference.
Detected 56 instances for predictions.
ENCOUNTERID-MATCH: True
Organizing Values by Questionids
Calculating Accuracy
