In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pip install torch torchvision torchaudio onnx2torch albumentations pandas scikit-learn opencv-python-headless matplotlib

#Importing all the libraries


In [None]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import pandas as pd
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
from torch.utils.data import Dataset, DataLoader
from onnx2torch import convert
import albumentations as A
from albumentations.pytorch import ToTensorV2
import cv2
from tqdm import tqdm
import matplotlib.pyplot as plt

#reading the dataset

In [None]:
import os
import pandas as pd

BASE_DIR = "/content/drive/MyDrive/GestaltMatcher/data/GestaltMatcherDB/v1.1.0"
IMG_DIR = os.path.join(BASE_DIR, "gmdb_align")
META_DIR = os.path.join(BASE_DIR, "gmdb_metadata")

image_metadata = pd.read_csv(os.path.join(META_DIR, "image_metadata_v1.1.0.tsv"), sep='\t')
train_df = pd.read_csv(os.path.join(META_DIR, "gmdb_train_images_v1.1.0.csv"))
val_df = pd.read_csv(os.path.join(META_DIR, "gmdb_val_images_v1.1.0.csv"))
frequent_gallery_df = pd.read_csv(os.path.join(META_DIR, "gmdb_frequent_gallery_images_v1.1.0.csv"))
frequent_test_df = pd.read_csv(os.path.join(META_DIR, "gmdb_frequent_test_images_v1.1.0.csv"))

id_to_syndrome = image_metadata.set_index('image_id')['internal_syndrome_name'].to_dict()

train_df['syndrome_name'] = train_df['image_id'].map(id_to_syndrome)
val_df['syndrome_name'] = val_df['image_id'].map(id_to_syndrome)
frequent_gallery_df['syndrome_name'] = frequent_gallery_df['image_id'].map(id_to_syndrome)
frequent_test_df['syndrome_name'] = frequent_test_df['image_id'].map(id_to_syndrome)

syndrome_counts = train_df['syndrome_name'].value_counts()
frequent_syndromes = syndrome_counts[syndrome_counts > 1].index

train_df = train_df[train_df['syndrome_name'].isin(frequent_syndromes)]
val_df = val_df[val_df['syndrome_name'].isin(frequent_syndromes)]
frequent_gallery_df = frequent_gallery_df[frequent_gallery_df['syndrome_name'].isin(frequent_syndromes)]
frequent_test_df = frequent_test_df[frequent_test_df['syndrome_name'].isin(frequent_syndromes)]

print("Training Data:", train_df.head(), sep='\n')
print("\nValidation Data:", val_df.head(), sep='\n')
print("\nFrequent Gallery Data:", frequent_gallery_df.head(), sep='\n')
print("\nFrequent Test Data:", frequent_test_df.head(), sep='\n')

#defining of loading arcface weights into pytorch model

In [None]:
import torch
import torch.nn as nn
from onnx2torch import convert

class ArcFaceClassifier(nn.Module):
    def __init__(self, backbone_path, num_classes, device='cuda'):
        super(ArcFaceClassifier, self).__init__()
        self.backbone = convert(backbone_path).to(device)
        self.classifier = nn.Linear(512, num_classes)

    def forward(self, x):
        with torch.no_grad():
            features = self.backbone(x)
        out = self.classifier(features)
        return out

#loading downloaded models in pytorch

In [None]:
num_classes = len(train_df['syndrome_name'].unique())
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

arcface_r50_path = "/content/drive/MyDrive/GestaltMatcher/saved_models/glint360k_r50.onnx"
arcface_r100_path = "/content/drive/MyDrive/GestaltMatcher/saved_models/glint360k_r100.onnx"

model_r50 = ArcFaceClassifier(arcface_r50_path, num_classes, device=device).to(device)
model_r100 = ArcFaceClassifier(arcface_r100_path, num_classes, device=device).to(device)

# print(model_r50)
# print(model_r100)

#augmenting images, (horizontal flip, color jitter,random scaling, resize and normalize

In [None]:
import albumentations as A
from albumentations.pytorch import ToTensorV2
import cv2
import numpy as np
import torch

train_transforms = A.Compose([
    A.HorizontalFlip(p=0.5),
    A.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1, p=0.5),
    A.RandomScale(scale_limit=0.2, p=0.3),
    A.Resize(112, 112),
    A.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5)),
    ToTensorV2()
])

val_transforms = A.Compose([
    A.Resize(112, 112),
    A.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5)),
    ToTensorV2()
])

#making dataset for gmdb

In [None]:
from torch.utils.data import Dataset
import os
import cv2

class GMDBDataset(Dataset):
    def __init__(self, dataframe, img_dir, transforms=None):
        self.dataframe = dataframe
        self.img_dir = img_dir
        self.transforms = transforms

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        img_id = self.dataframe.iloc[idx]['image_id']
        img_path = os.path.join(self.img_dir, f"{img_id}_aligned.jpg")
        image = cv2.imread(img_path)
        if image is None:
            print(f"Image not found {img_path}")
        else:
            image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
            label = self.dataframe.iloc[idx]['label']
            if self.transforms:
                augmented = self.transforms(image=image)
                image = augmented['image']
            return image, label

#partitioning dataset into training and validation

In [None]:
from torch.utils.data import DataLoader

img_dir = "/content/drive/MyDrive/GestaltMatcher/data/GestaltMatcherDB/v1.1.0/gmdb_align"

train_dataset = GMDBDataset(train_df, img_dir, transforms=train_transforms)
val_dataset = GMDBDataset(val_df, img_dir, transforms=val_transforms)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=2)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=2)

#training and saving the r50 model

In [None]:
import torch
import torch.nn.functional as F
from torch.utils.tensorboard import SummaryWriter
import datetime
import os

num_epochs = 30
batch_size = 32
learning_rate = 0.001
checkpoint_dir = "/content/drive/MyDrive/GestaltMatcher/checkpoints"

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model_r50.parameters(), lr=learning_rate)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=5, verbose=True)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=2)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=2)

writer = SummaryWriter(comment=f"model_r50_training_{datetime.datetime.now().strftime('%Y%m%d-%H%M%S')}")

def save_checkpoint(epoch, model, optimizer, scheduler, path):
    checkpoint = {
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'scheduler_state_dict': scheduler.state_dict()
    }
    torch.save(checkpoint, path)
    print(f"Checkpoint saved at epoch {epoch} to {path}")

def load_checkpoint(model, optimizer, scheduler, path):
    if os.path.exists(path):
        checkpoint = torch.load(path)
        model.load_state_dict(checkpoint['model_state_dict'])
        optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        scheduler.load_state_dict(checkpoint['scheduler_state_dict'])
        start_epoch = checkpoint['epoch'] + 1
        print(f"Resuming training from epoch {start_epoch}")
        return start_epoch
    else:
        print("No checkpoint found, starting from scratch")
        return 0

checkpoint_path = os.path.join(checkpoint_dir, "model_r50_checkpoint.pth")
start_epoch = load_checkpoint(model_r50, optimizer, scheduler, checkpoint_path)

def top_n_accuracy(output, target, n=5):
    _, pred = output.topk(n, 1, True, True)
    correct = pred.eq(target.view(-1, 1).expand_as(pred))
    return correct.float().sum().item() / target.size(0)

for epoch in range(start_epoch, num_epochs):
    model_r50.train()
    running_loss = 0.0
    correct_top1 = 0
    total = 0

    for images, labels in train_loader:
        images = images.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()
        outputs = model_r50(images)
        loss = criterion(outputs, labels)
        running_loss += loss.item()
        loss.backward()
        optimizer.step()

        _, predicted = outputs.max(1)
        correct_top1 += predicted.eq(labels).sum().item()
        total += labels.size(0)

    avg_loss = running_loss / len(train_loader)
    train_top1_acc = correct_top1 / total
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}, Top-1 Accuracy: {train_top1_acc:.4f}")

    writer.add_scalar('Training/Loss', avg_loss, epoch)
    writer.add_scalar('Training/Top-1 Accuracy', train_top1_acc, epoch)

    model_r50.eval()
    val_loss = 0.0
    val_top1_correct = 0
    val_total = 0
    val_top5_correct = 0

    with torch.no_grad():
        for images, labels in val_loader:
            images = images.to(device)
            labels = labels.to(device)
            outputs = model_r50(images)
            loss = criterion(outputs, labels)
            val_loss += loss.item()

            _, predicted = outputs.max(1)
            val_top1_correct += predicted.eq(labels).sum().item()
            val_total += labels.size(0)
            val_top5_correct += top_n_accuracy(outputs, labels, n=5) * labels.size(0)

    avg_val_loss = val_loss / len(val_loader)
    val_top1_acc = val_top1_correct / val_total
    val_top5_acc = val_top5_correct / val_total
    print(f"Validation Loss: {avg_val_loss:.4f}, Top-1 Accuracy: {val_top1_acc:.4f}, Top-5 Accuracy: {val_top5_acc:.4f}")

    writer.add_scalar('Validation/Loss', avg_val_loss, epoch)
    writer.add_scalar('Validation/Top-1 Accuracy', val_top1_acc, epoch)
    writer.add_scalar('Validation/Top-5 Accuracy', val_top5_acc, epoch)

    scheduler.step(val_top1_acc)
    save_checkpoint(epoch, model_r50, optimizer, scheduler, checkpoint_path)

final_model_path = "/content/drive/MyDrive/GestaltMatcher/saved_models/model_r50_trained_final.pth"
torch.save(model_r50.state_dict(), final_model_path)
print(f"Final model saved to {final_model_path}")

writer.close()

#training and saving the r100 model

In [None]:
def top_n_accuracy(output, target, n=5):
    with torch.no_grad():
        top_n_preds = output.topk(n, dim=1).indices
        correct = top_n_preds.eq(target.view(-1, 1).expand_as(top_n_preds))
        top_n_acc = correct.any(dim=1).float().mean().item()
    return top_n_acc

import torch
import datetime
import os

num_epochs_r100 = 30
batch_size_r100 = 32
learning_rate_r100 = 0.001
checkpoint_dir_r100 = "/content/drive/MyDrive/GestaltMatcher/checkpoints"

writer_r100 = SummaryWriter(log_dir="/content/drive/MyDrive/GestaltMatcher/tensorboard_logs/model_r100")

criterion_r100 = nn.CrossEntropyLoss()
optimizer_r100 = optim.Adam(model_r100.parameters(), lr=learning_rate_r100)
scheduler_r100 = optim.lr_scheduler.ReduceLROnPlateau(optimizer_r100, mode='max', factor=0.5, patience=5)

def save_checkpoint_r100(epoch, model, optimizer, scheduler, path):
    checkpoint = {
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'scheduler_state_dict': scheduler.state_dict(),
        'best_val_accuracy': best_val_accuracy
    }
    torch.save(checkpoint, path)

checkpoint_path_r100 = os.path.join(checkpoint_dir_r100, "model_r100_checkpoint.pth")
if os.path.exists(checkpoint_path_r100):
    checkpoint = torch.load(checkpoint_path_r100)
    model_r100.load_state_dict(checkpoint['model_state_dict'])
    optimizer_r100.load_state_dict(checkpoint['optimizer_state_dict'])
    scheduler_r100.load_state_dict(checkpoint['scheduler_state_dict'])
    start_epoch_r100 = checkpoint['epoch'] + 1
    best_val_accuracy = checkpoint.get('best_val_accuracy', 0)
else:
    start_epoch_r100 = 0
    best_val_accuracy = 0

for epoch in range(start_epoch_r100, num_epochs_r100):
    model_r100.train()
    running_loss = 0.0
    correct_top1 = 0
    total = 0

    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)
        optimizer_r100.zero_grad()
        outputs = model_r100(images)
        loss = criterion_r100(outputs, labels)
        running_loss += loss.item()
        loss.backward()
        optimizer_r100.step()
        _, predicted = outputs.max(1)
        correct_top1 += predicted.eq(labels).sum().item()
        total += labels.size(0)

    avg_loss = running_loss / len(train_loader)
    train_top1_acc = correct_top1 / total

    writer_r100.add_scalar('Training/Loss', avg_loss, epoch)
    writer_r100.add_scalar('Training/Top-1 Accuracy', train_top1_acc, epoch)

    model_r100.eval()
    val_loss = 0.0
    val_top1_correct = 0
    val_total = 0
    val_top5_correct = 0

    with torch.no_grad():
        for images, labels in val_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model_r100(images)
            loss = criterion_r100(outputs, labels)
            val_loss += loss.item()
            _, predicted = outputs.max(1)
            val_top1_correct += predicted.eq(labels).sum().item()
            val_total += labels.size(0)
            val_top5_correct += top_n_accuracy(outputs, labels, n=5) * labels.size(0)

    avg_val_loss = val_loss / len(val_loader)
    val_top1_acc = val_top1_correct / val_total
    val_top5_acc = val_top5_correct / val_total

    writer_r100.add_scalar('Validation/Loss', avg_val_loss, epoch)
    writer_r100.add_scalar('Validation/Top-1 Accuracy', val_top1_acc, epoch)
    writer_r100.add_scalar('Validation/Top-5 Accuracy', val_top5_acc, epoch)

    scheduler_r100.step(val_top1_acc)

    if val_top1_acc > best_val_accuracy:
        best_val_accuracy = val_top1_acc
        save_checkpoint_r100(epoch, model_r100, optimizer_r100, scheduler_r100, checkpoint_path_r100)

final_model_path_r100 = "/content/drive/MyDrive/GestaltMatcher/saved_models/model_r100_trained_final.pth"
torch.save(model_r100.state_dict(), final_model_path_r100)
writer_r100.close()

#loading saved r50 model

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
arcface_r50_path = "/content/drive/MyDrive/GestaltMatcher/saved_models/glint360k_r50.onnx"
num_classes = len(train_df['syndrome_name'].unique())

model_r50 = ArcFaceClassifier(arcface_r50_path, num_classes, device=device).to(device)

final_model_path_r50 = "/content/drive/MyDrive/GestaltMatcher/saved_models/model_r50_trained_final_new.pth"
model_r50.load_state_dict(torch.load(final_model_path_r50, map_location=device))

model_r50.eval()

#loading r100 model

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
arcface_r100_path = "/content/drive/MyDrive/GestaltMatcher/saved_models/glint360k_r100.onnx"
num_classes = len(train_df['syndrome_name'].unique())

model_r100 = ArcFaceClassifier(arcface_r100_path, num_classes, device=device).to(device)

final_model_path_r100 = "/content/drive/MyDrive/GestaltMatcher/saved_models/model_r100_trained_final_new.pth"
model_r100.load_state_dict(torch.load(final_model_path_r100, map_location=device))

model_r100.eval()

#making and saving encondings

In [None]:
import os
import torch
import pandas as pd
import numpy as np
from tqdm import tqdm
from albumentations import Compose, Normalize, HorizontalFlip, Resize
from albumentations.pytorch import ToTensorV2
import cv2

IMG_DIR = "/content/drive/MyDrive/GestaltMatcher/data/GestaltMatcherDB/v1.1.0/gmdb_align"
OUTPUT_CSV = "/content/drive/MyDrive/GestaltMatcher/data/GestaltMatcherDB/v1.1.0/gmdb_align/all_encodings_new.csv"
MODEL_R50_PATH = "/content/drive/MyDrive/GestaltMatcher/saved_models/model_r50_trained_final.pth"
MODEL_R100_PATH = "/content/drive/MyDrive/GestaltMatcher/saved_models/model_r100_trained_final.pth"

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

base_transforms = Compose([
    Resize(112, 112),
    Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5)),
    ToTensorV2()
])

flip_transforms = Compose([
    HorizontalFlip(p=1.0),
    Resize(112, 112),
    Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5)),
    ToTensorV2()
])

def preprocess_image(image_path, transforms):
    image = cv2.imread(image_path)
    if image is None:
        raise FileNotFoundError(f"Image not found: {image_path}")
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    augmented = transforms(image=image)
    return augmented['image'].unsqueeze(0)

def generate_encodings(models, image_tensor):
    encodings = []
    for model in models:
        with torch.no_grad():
            encoding = model(image_tensor.to(device)).cpu().numpy()
            encodings.append(encoding.squeeze())
    return encodings

output_data = []
image_files = sorted([f for f in os.listdir(IMG_DIR) if f.endswith('_aligned.jpg')])

for img_file in tqdm(image_files, desc="Generating Encodings"):
    img_path = os.path.join(IMG_DIR, img_file)
    img_name = img_file.split('_')[0]

    img_tensor = preprocess_image(img_path, base_transforms)
    original_encodings = generate_encodings([model_r50, model_r100], img_tensor)

    flipped_tensor = preprocess_image(img_path, flip_transforms)
    flipped_encodings = generate_encodings([model_r50, model_r100], flipped_tensor)

    avg_encodings = [
        np.mean([orig, flip], axis=0)
        for orig, flip in zip(original_encodings, flipped_encodings)
    ]

    for idx, (orig, flip, avg) in enumerate(zip(original_encodings, flipped_encodings, avg_encodings)):
        output_data.append({
            "img_name": img_name,
            "model": f"model_r{50 if idx == 0 else 100}",
            "flip": int(idx == 1),
            "class_conf": np.max(avg),
            "representations": avg.tolist()
        })

output_df = pd.DataFrame(output_data)
output_df.to_csv(OUTPUT_CSV, index=False)

print(f"Encodings saved to {OUTPUT_CSV}")

In [None]:
import json
import pandas as pd
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
from tqdm import tqdm
from ast import literal_eval

BASE_DIR = "/content/drive/MyDrive/GestaltMatcher/data/GestaltMatcherDB/v1.1.0"
ENCODINGS_PATH = os.path.join(BASE_DIR, "gmdb_align/all_encodings.csv")
META_DIR = os.path.join(BASE_DIR, "gmdb_metadata")

encodings_df = pd.read_csv(ENCODINGS_PATH)
frequent_gallery_df = pd.read_csv(os.path.join(META_DIR, "gmdb_frequent_gallery_images_v1.1.0.csv"))
frequent_test_df = pd.read_csv(os.path.join(META_DIR, "gmdb_frequent_test_images_v1.1.0.csv"))

encodings_df['representations'] = encodings_df['representations'].apply(literal_eval)

def prepare_encodings(df, encodings_df):
    print("frequent_gallery_df columns:", df.columns)
    print("encodings_df columns:", encodings_df.columns)

    merged_df = df.merge(encodings_df, left_on="image_id", right_on="img_name", how="inner")

    print("Merged DataFrame columns:", merged_df.columns)

    if 'representations_y' in merged_df.columns:
        merged_df["representations"] = merged_df["representations_y"].apply(lambda x: np.array(x) if isinstance(x, list) else np.array(eval(x)))
    elif 'representations' in merged_df.columns:
        merged_df["representations"] = merged_df["representations"].apply(lambda x: np.array(x) if isinstance(x, list) else np.array(eval(x)))
    else:
        raise KeyError("'representations' column is missing in the merged DataFrame.")

    return merged_df

gallery = prepare_encodings(frequent_gallery_df, encodings_df)
test_set = prepare_encodings(frequent_test_df, encodings_df)

gallery["syndrome_name"] = gallery["label"]
test_set["syndrome_name"] = test_set["label"]

def compute_top_n_accuracy(similarities, gallery_labels, test_labels, n=1):
    correct = 0
    for i, test_label in enumerate(test_labels):
        top_n_indices = np.argsort(similarities[i])[::-1][:n]
        top_n_labels = gallery_labels[top_n_indices]
        if test_label in top_n_labels:
            correct += 1
    return correct / len(test_labels)

def evaluate_model(gallery, test_set, top_ns=[1, 5, 10, 30]):
    gallery_encodings = np.vstack(gallery["representations"].values)
    gallery_labels = gallery["syndrome_name"].values
    test_encodings = np.vstack(test_set["representations"].values)
    test_labels = test_set["syndrome_name"].values

    similarities = cosine_similarity(test_encodings, gallery_encodings)

    top_n_results = {}
    for n in top_ns:
        top_n_results[f"Top-{n}"] = compute_top_n_accuracy(similarities, gallery_labels, test_labels, n=n)
    return top_n_results

print("Evaluating model ensemble...")
results_r50 = evaluate_model(gallery, test_set)
results_r100 = evaluate_model(gallery, test_set)

def evaluate_ensemble(gallery, test_set, encodings_df, top_ns=[1, 5, 10, 30]):
    print("start r_50 encodings_df..")
    r50_encodings = encodings_df[encodings_df["model"] == "model_r50"]
    print("start r_100 encodings_df..")
    r100_encodings = encodings_df[encodings_df["model"] == "model_r100"]

    print("start r_50 prepare encodings..")
    r50_test = prepare_encodings(test_set, r50_encodings)
    print("start r_100 prepare encodings..")
    r100_test = prepare_encodings(test_set, r100_encodings)
    combined_test_encodings = (np.vstack(r50_test["representations"].values) +
                               np.vstack(r100_test["representations"].values)) / 2

    gallery_encodings = np.vstack(gallery["representations"].values)
    gallery_labels = gallery["syndrome_name"].values
    similarities = cosine_similarity(combined_test_encodings, gallery_encodings)

    top_n_results = {}
    for n in top_ns:
        top_n_results[f"Top-{n}"] = compute_top_n_accuracy(similarities, gallery_labels, test_set["syndrome_name"].values, n=n)
    return top_n_results

ensemble_results = evaluate_ensemble(gallery, test_set, encodings_df)

print("Results for model_r50:", results_r50)
print("Results for model_r100:", results_r100)
print("Ensemble Results:", ensemble_results)

results_df = pd.DataFrame([
    {"Model": "model_r50", **results_r50},
    {"Model": "model_r100", **results_r100},
    {"Model": "Ensemble", **ensemble_results}
])
results_df.to_csv("evaluation_results.csv", index=False)
print("Evaluation results saved to evaluation_results.csv.")

#adding syndrom names to encoding

In [None]:
metadata_path = "/content/drive/MyDrive/GestaltMatcher/data/GestaltMatcherDB/v1.1.0/gmdb_metadata/image_metadata_v1.1.0.tsv"
image_metadata = pd.read_csv(metadata_path, sep='\t')

id_to_syndrome = image_metadata.set_index('image_id')['internal_syndrome_name'].to_dict()

encodings_df['syndrome_name'] = encodings_df['img_name'].map(id_to_syndrome)

print(encodings_df.head())

updated_encodings_path = "/content/drive/MyDrive/GestaltMatcher/data/GestaltMatcherDB/v1.1.0/gmdb_align/all_encodings_with_syndrome_name.csv"

encodings_df.to_csv(updated_encodings_path, index=False)

print(f"Updated encodings file with syndrome_name saved to {updated_encodings_path}")

#testing for a single image

In [None]:
import torch
import numpy as np
import pandas as pd
from sklearn.metrics.pairwise import cosine_similarity
import cv2
from albumentations import Compose, Resize, Normalize, HorizontalFlip
from albumentations.pytorch import ToTensorV2
from ast import literal_eval

base_transforms = Compose([
    Resize(112, 112),
    Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5)),
    ToTensorV2()
])

flip_transforms = Compose([
    HorizontalFlip(p=1.0),
    Resize(112, 112),
    Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5)),
    ToTensorV2()
])

def preprocess_image(image_path, transforms):
    image = cv2.imread(image_path)
    if image is None:
        raise FileNotFoundError(f"Image not found: {image_path}")
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    augmented = transforms(image=image)
    return augmented['image'].unsqueeze(0)

def generate_encodings(models, image_tensor):
    encodings = []
    for model in models:
        with torch.no_grad():
            encoding = model(image_tensor.to(device)).cpu().numpy()
            encodings.append(encoding.squeeze())
    return encodings

print(encodings_df['representations'].head())

def get_top_n_syndromes(image_path, encodings_df, n=10):
    img_tensor = preprocess_image(image_path, base_transforms)
    original_encodings = generate_encodings([model_r50, model_r100], img_tensor)
    flipped_tensor = preprocess_image(image_path, flip_transforms)
    flipped_encodings = generate_encodings([model_r50, model_r100], flipped_tensor)
    avg_encodings = [
        np.mean([orig, flip], axis=0)
        for orig, flip in zip(original_encodings, flipped_encodings)
    ]
    all_gallery_encodings = np.vstack(encodings_df['representations'].values)
    similarities = cosine_similarity([avg_encodings[0]], all_gallery_encodings)[0]
    syndrome_similarity_pairs = list(zip(encodings_df['syndrome_name'], similarities))
    unique_syndromes = {}
    for syndrome, similarity in syndrome_similarity_pairs:
        if syndrome not in unique_syndromes:
            unique_syndromes[syndrome] = similarity
        else:
            unique_syndromes[syndrome] = max(unique_syndromes[syndrome], similarity)
    sorted_syndromes = sorted(unique_syndromes.items(), key=lambda x: x[1], reverse=True)
    top_n_syndromes = [syndrome for syndrome, _ in sorted_syndromes[:n]]
    top_n_similarities = [similarity for _, similarity in sorted_syndromes[:n]]
    return top_n_syndromes, top_n_similarities

image_path = "/content/drive/MyDrive/GestaltMatcher/data/GestaltMatcherDB/v1.1.0/gmdb_align/10000_aligned.jpg"
top_1_syndromes, top_1_similarities = get_top_n_syndromes(image_path, encodings_df, n=1)
top_5_syndromes, top_5_similarities = get_top_n_syndromes(image_path, encodings_df, n=5)
top_10_syndromes, top_10_similarities = get_top_n_syndromes(image_path, encodings_df, n=10)

print("Top-1 Syndrome:", top_1_syndromes)
print("Top-5 Syndromes:", top_5_syndromes)
print("Top-10 Syndromes:", top_10_syndromes)

#Web App Embedded code Starts here

In [None]:
!pip install flask
!pip install flask-ngrok
!pip install pyngrok
!pip install flask pyngrok albumentations torch pandas numpy opencv-python-headless
checkpoint = torch.load(final_model_path_r50, map_location=device)
print("Checkpoint keys:", checkpoint.keys())


print("Checking 'encodings_df' columns before adding 'syndrome_name':")
print(encodings_df.columns)

#Adding syndrome name if missing
if "syndrome_name" not in encodings_df.columns:
    metadata_path = "/content/drive/MyDrive/GestaltMatcher/data/GestaltMatcherDB/v1.1.0/gmdb_metadata/image_metadata_v1.1.0.tsv"
    image_metadata = pd.read_csv(metadata_path, sep='\t')
    id_to_syndrome = image_metadata.set_index('image_id')['internal_syndrome_name'].to_dict()
    encodings_df['syndrome_name'] = encodings_df['img_name'].map(id_to_syndrome)

print("Checking 'encodings_df' columns after adding 'syndrome_name':")
print(encodings_df.columns)
print(encodings_df.head())


from pyngrok import ngrok

# Set your Ngrok authtoken
ngrok.set_auth_token("put you ngork authtoken here")

#flask app function

without css

In [None]:
from flask import Flask, request, jsonify, render_template_string
from pyngrok import ngrok
import torch
import numpy as np
import pandas as pd
from albumentations import Compose, Normalize, Resize
from albumentations.pytorch import ToTensorV2
import cv2
import os
from onnx2torch import convert

app = Flask(__name__)

ngrok.set_auth_token("your ngork token here")

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

arcface_r50_path = "/content/drive/MyDrive/GestaltMatcher/saved_models/glint360k_r50.onnx"
arcface_r100_path = "/content/drive/MyDrive/GestaltMatcher/saved_models/glint360k_r100.onnx"
final_model_path_r50 = "/content/drive/MyDrive/GestaltMatcher/saved_models/model_r50_trained_final.pth"
final_model_path_r100 = "/content/drive/MyDrive/GestaltMatcher/saved_models/model_r100_trained_final.pth"

class ArcFaceClassifier(torch.nn.Module):
    def __init__(self, backbone_path, num_classes, device="cuda"):
        super(ArcFaceClassifier, self).__init__()
        self.backbone = convert(backbone_path).to(device)
        self.classifier = torch.nn.Linear(512, num_classes)

    def forward(self, x):
        with torch.no_grad():
            features = self.backbone(x)
        out = self.classifier(features)
        return out

def load_model(model, checkpoint_path, num_classes):
    checkpoint = torch.load(checkpoint_path, map_location=device)
    checkpoint_num_classes = checkpoint["classifier.weight"].size(0)
    if num_classes != checkpoint_num_classes:
        model.classifier = torch.nn.Linear(512, checkpoint_num_classes).to(device)
    model.load_state_dict(checkpoint, strict=False)
    return model

num_classes = 275
model_r50 = ArcFaceClassifier(arcface_r50_path, num_classes, device=device).to(device)
model_r50 = load_model(model_r50, final_model_path_r50, num_classes)
model_r50.eval()

model_r100 = ArcFaceClassifier(arcface_r100_path, num_classes, device=device).to(device)
model_r100 = load_model(model_r100, final_model_path_r100, num_classes)
model_r100.eval()

encodings_path = "/content/drive/MyDrive/GestaltMatcher/data/GestaltMatcherDB/v1.1.0/gmdb_align/all_encodings_with_syndrome_name.csv"
encodings_df = pd.read_csv(encodings_path)
encodings_df["representations"] = encodings_df["representations"].apply(eval)

if 'syndrome_name' not in encodings_df.columns:
    metadata_path = "/content/drive/MyDrive/GestaltMatcher/data/GestaltMatcherDB/v1.1.0/gmdb_metadata/image_metadata_v1.1.0.tsv"
    image_metadata = pd.read_csv(metadata_path, sep='\t')
    id_to_syndrome = image_metadata.set_index('image_id')['internal_syndrome_name'].to_dict()
    encodings_df['syndrome_name'] = encodings_df['img_name'].map(id_to_syndrome)

if encodings_df["syndrome_name"].isnull().sum() > 0:
    raise ValueError("The 'syndrome_name' column contains missing values.")

base_transforms = Compose([
    Resize(112, 112),
    Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5)),
    ToTensorV2()
])

def preprocess_image(image_path, transforms):
    image = cv2.imread(image_path)
    if image is None:
        raise FileNotFoundError(f"Image not found: {image_path}")
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    augmented = transforms(image=image)
    return augmented['image'].unsqueeze(0)

def generate_encodings(models, image_tensor):
    encodings = []
    for model in models:
        with torch.no_grad():
            encoding = model(image_tensor.to(device)).cpu().numpy()
            encodings.append(encoding.squeeze())
    return np.mean(encodings, axis=0)

def get_top_n_syndromes(image_path, encodings_df, n=10):
    image_tensor = preprocess_image(image_path, base_transforms)
    models = [model_r50, model_r100]
    query_encoding = generate_encodings(models, image_tensor)

    gallery_encodings = np.vstack(encodings_df["representations"].values)
    similarities = np.dot(gallery_encodings, query_encoding) / (
        np.linalg.norm(gallery_encodings, axis=1) * np.linalg.norm(query_encoding)
    )

    top_n_indices = np.argsort(similarities)[::-1][:n]
    top_n_syndromes = encodings_df.iloc[top_n_indices]["syndrome_name"].values
    top_n_similarities = similarities[top_n_indices]
    return top_n_syndromes, top_n_similarities

@app.route("/")
def home():
    html = """
    <!DOCTYPE html>
    <html>
    <head>
        <title>Gestalt Matcher</title>
        <style>
            body { font-family: Arial, sans-serif; text-align: center; margin: 50px; }
            input[type="file"] { margin: 20px 0; }
        </style>
    </head>
    <body>
        <h1>Upload an Image for Syndrome Prediction</h1>
        <form action="/predict" method="post" enctype="multipart/form-data">
            <input type="file" name="file" />
            <button type="submit">Submit</button>
        </form>
    </body>
    </html>
    """
    return render_template_string(html)

@app.route("/predict", methods=["POST"])
def predict():
    if "file" not in request.files:
        return render_template_string("<h1>Error: No file provided</h1><a href='/'>Go back</a>"), 400

    file = request.files["file"]
    if file.filename == "":
        return render_template_string("<h1>Error: No file selected</h1><a href='/'>Go back</a>"), 400

    image_path = "/content/uploaded_image.jpg"
    file.save(image_path)

    try:
        top_10_syndromes, top_10_similarities = get_top_n_syndromes(image_path, encodings_df, n=10)
        results = [{"syndrome": syndrome, "similarity": similarity}
                   for syndrome, similarity in zip(top_10_syndromes, top_10_similarities)]
        html_results = f"""
        <!DOCTYPE html>
        <html>
        <head>
            <title>Results</title>
        </head>
        <body>
            <h1>Top-10 Predicted Syndromes</h1>
            <ul>
                {"".join([f"<li><strong>{r['syndrome']}</strong>: {r['similarity']:.4f}</li>" for r in results])}
            </ul>
            <a href="/">Go back</a>
        </body>
        </html>
        """
        return render_template_string(html_results)
    except Exception as e:
        error_message = f"<h1>Error: {str(e)}</h1><a href='/'>Go back</a>"
        return render_template_string(error_message), 500

if __name__ == "__main__":
    public_url = ngrok.connect(5000)
    print(f"Public URL: {public_url}")
    app.run(port=5000)

In [None]:
!pip install Flask-SQLAlchemy

#with css

In [None]:
from flask import Flask, request, jsonify, render_template_string
from pyngrok import ngrok
import torch
import numpy as np
import pandas as pd
from albumentations import Compose, Normalize, Resize
from albumentations.pytorch import ToTensorV2
import cv2
import os
from onnx2torch import convert

app = Flask(__name__)
ngrok.set_auth_token("2poeuBY1WjxBOxfakPm3xBtMCVM_2FiQjjAAEbuTb4JXhuBqR")

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

arcface_r50_path = "/content/drive/MyDrive/GestaltMatcher/saved_models/glint360k_r50.onnx"
arcface_r100_path = "/content/drive/MyDrive/GestaltMatcher/saved_models/glint360k_r100.onnx"
final_model_path_r50 = "/content/drive/MyDrive/GestaltMatcher/saved_models/model_r50_trained_final.pth"
final_model_path_r100 = "/content/drive/MyDrive/GestaltMatcher/saved_models/model_r100_trained_final.pth"

class ArcFaceClassifier(torch.nn.Module):
    def __init__(self, backbone_path, num_classes, device="cuda"):
        super(ArcFaceClassifier, self).__init__()
        self.backbone = convert(backbone_path).to(device)
        self.classifier = torch.nn.Linear(512, num_classes)

    def forward(self, x):
        with torch.no_grad():
            features = self.backbone(x)
        out = self.classifier(features)
        return out

def load_model(model, checkpoint_path, num_classes):
    checkpoint = torch.load(checkpoint_path, map_location=device)
    checkpoint_num_classes = checkpoint["classifier.weight"].size(0)
    if num_classes != checkpoint_num_classes:
        model.classifier = torch.nn.Linear(512, checkpoint_num_classes).to(device)
    model.load_state_dict(checkpoint, strict=False)
    return model

num_classes = 275
model_r50 = ArcFaceClassifier(arcface_r50_path, num_classes, device=device).to(device)
model_r50 = load_model(model_r50, final_model_path_r50, num_classes)
model_r50.eval()

model_r100 = ArcFaceClassifier(arcface_r100_path, num_classes, device=device).to(device)
model_r100 = load_model(model_r100, final_model_path_r100, num_classes)
model_r100.eval()

encodings_path = "/content/drive/MyDrive/GestaltMatcher/data/GestaltMatcherDB/v1.1.0/gmdb_align/all_encodings_with_syndrome_name.csv"
encodings_df = pd.read_csv(encodings_path)
encodings_df["representations"] = encodings_df["representations"].apply(eval)

if 'syndrome_name' not in encodings_df.columns:
    metadata_path = "/content/drive/MyDrive/GestaltMatcher/data/GestaltMatcherDB/v1.1.0/gmdb_metadata/image_metadata_v1.1.0.tsv"
    image_metadata = pd.read_csv(metadata_path, sep='\t')
    id_to_syndrome = image_metadata.set_index('image_id')['internal_syndrome_name'].to_dict()
    encodings_df['syndrome_name'] = encodings_df['img_name'].map(id_to_syndrome)

if encodings_df["syndrome_name"].isnull().sum() > 0:
    raise ValueError("The 'syndrome_name' column contains missing values.")

base_transforms = Compose([
    Resize(112, 112),
    Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5)),
    ToTensorV2()
])

def preprocess_image(image_path, transforms):
    image = cv2.imread(image_path)
    if image is None:
        raise FileNotFoundError(f"Image not found: {image_path}")
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    augmented = transforms(image=image)
    return augmented['image'].unsqueeze(0)

def generate_encodings(models, image_tensor):
    encodings = []
    for model in models:
        with torch.no_grad():
            encoding = model(image_tensor.to(device)).cpu().numpy()
            encodings.append(encoding.squeeze())
    return np.mean(encodings, axis=0)

def get_top_n_syndromes(image_path, encodings_df, n=10):
    image_tensor = preprocess_image(image_path, base_transforms)
    models = [model_r50, model_r100]
    query_encoding = generate_encodings(models, image_tensor)

    gallery_encodings = np.vstack(encodings_df["representations"].values)
    similarities = np.dot(gallery_encodings, query_encoding) / (
        np.linalg.norm(gallery_encodings, axis=1) * np.linalg.norm(query_encoding)
    )

    top_n_indices = np.argsort(similarities)[::-1][:n]
    top_n_syndromes = encodings_df.iloc[top_n_indices]["syndrome_name"].values
    top_n_similarities = similarities[top_n_indices]
    return top_n_syndromes, top_n_similarities

glassmorphism_css = """
body {
    font-family: Arial, sans-serif;
    display: flex;
    flex-direction: column;
    align-items: center;
    justify-content: center;
    height: 100vh;
    margin: 0;
    background: linear-gradient(135deg, #74ebd5, #9face6);
    overflow: hidden;
}
.glass-container {
    background: rgba(255, 255, 255, 0.15);
    border-radius: 16px;
    box-shadow: 0 4px 30px rgba(0, 0, 0, 0.1);
    backdrop-filter: blur(10px);
    -webkit-backdrop-filter: blur(10px);
    border: 1px solid rgba(255, 255, 255, 0.3);
    padding: 20px;
    max-width: 500px;
    width: 90%;
    text-align: center;
}
button {
    background-color: #4CAF50;
    border: none;
    color: white;
    padding: 10px 20px;
    text-align: center;
    text-decoration: none;
    display: inline-block;
    font-size: 16px;
    border-radius: 8px;
    cursor: pointer;
    margin-top: 20px;
    transition: background-color 0.3s ease;
}
button:hover {
    background-color: #45a049;
}
input[type="file"] {
    margin: 20px 0;
    padding: 10px;
    font-size: 16px;
}
ul {
    list-style-type: none;
    padding: 0;
}
li {
    margin: 5px 0;
}
"""

@app.route("/")
def home():
    html = f"""
    <!DOCTYPE html>
    <html>
    <head>
        <title>Gestalt Matcher</title>
        <style>{glassmorphism_css}</style>
    </head>
    <body>
        <div class="glass-container">
            <h1>Upload an Image for Syndrome Prediction</h1>
            <form action="/predict" method="post" enctype="multipart/form-data">
                <input type="file" name="file" />
                <button type="submit">Submit</button>
            </form>
        </div>
    </body>
    </html>
    """
    return render_template_string(html)

@app.route("/predict", methods=["POST"])
def predict():
    if "file" not in request.files:
        return render_template_string("<h1>Error: No file provided</h1><a href='/'>Go back</a>"), 400

    file = request.files["file"]
    if file.filename == "":
        return render_template_string("<h1>Error: No file selected</h1><a href='/'>Go back</a>"), 400

    image_path = "/content/uploaded_image.jpg"
    file.save(image_path)

    try:
        top_10_syndromes, top_10_similarities = get_top_n_syndromes(image_path, encodings_df, n=10)
        results = [{"syndrome": syndrome, "similarity": similarity}
                   for syndrome, similarity in zip(top_10_syndromes, top_10_similarities)]
        html_results = f"""
        <!DOCTYPE html>
        <html>
        <head>
            <title>Results</title>
            <style>{glassmorphism_css}</style>
        </head>
        <body>
            <div class="glass-container">
                <h1>Top-10 Predicted Syndromes</h1>
                <ul>
                    {"".join([f"<li><strong>{r['syndrome']}</strong>: {r['similarity']:.4f}</li>" for r in results])}
                </ul>
                <a href="/"><button>Go back</button></a>
            </div>
        </body>
        </html>
        """
        return render_template_string(html_results)
    except Exception as e:
        error_message = f"<h1>Error: {str(e)}</h1><a href='/'>Go back</a>"
        return render_template_string(error_message), 500

if __name__ == "__main__":
    public_url = ngrok.connect(5000)
    print(f"Public URL: {public_url}")
    app.run(port=5000)