In [29]:
import os
import cv2
import numpy as np
from tqdm import tqdm
from sklearn.model_selection import train_test_split
import json

# Constants
ALPHA = 0.7
BETA = 4
GAMMA = np.log(0.01)
PATCH_SIZE = 64
TOP_K_PATCHES = 128

def patch_quality(patch):
    Q = 0
    for channel in range(3):  # R, G, B
        patch_channel = patch[:, :, channel]
        mu_c = np.mean(patch_channel)
        sigma_c = np.std(patch_channel)
        Q += ALPHA * BETA * (mu_c - mu_c**2) + (1 - ALPHA) * (1 - np.exp(GAMMA * sigma_c))
    return Q / 3

def extract_top_k_patches(image, k=TOP_K_PATCHES, patch_size=PATCH_SIZE):
    h, w, _ = image.shape
    patches = []
    qualities = []

    for i in range(0, h - patch_size + 1, patch_size):
        for j in range(0, w - patch_size + 1, patch_size):
            patch = image[i:i+patch_size, j:j+patch_size]
            q = patch_quality(patch)
            patches.append(patch)
            qualities.append(q)

    if len(patches) == 0:
        return []

    sorted_indices = np.argsort(qualities)[::-1]
    top_indices = sorted_indices[:min(k, len(patches))]
    top_patches = [patches[i] for i in top_indices]

    if len(top_patches) < k:
        padding = [np.zeros_like(top_patches[0]) for _ in range(k - len(top_patches))]
        top_patches.extend(padding)

    return top_patches

def prepare_dataset_as_images(dataset_path, output_path):
    for mode in ['train', 'test']:
        os.makedirs(os.path.join(output_path, mode), exist_ok=True)

    split_dir = os.path.join(output_path, 'splits')
    os.makedirs(split_dir, exist_ok=True)

    for class_name in sorted(os.listdir(dataset_path)):
        class_path = os.path.join(dataset_path, class_name)
        if not os.path.isdir(class_path):
            continue

        image_files = sorted([f for f in os.listdir(class_path) if f.lower().endswith(('.jpg', '.jpeg', '.png'))])
        split_file_path = os.path.join(split_dir, f"{class_name}_split.json")

        if os.path.exists(split_file_path):
            with open(split_file_path, 'r') as f:
                split_data = json.load(f)
                train_files = split_data['train']
                test_files = split_data['test']
        else:
            train_files, test_files = train_test_split(image_files, test_size=0.2, random_state=42)
            split_data = {'train': train_files, 'test': test_files}
            with open(split_file_path, 'w') as f:
                json.dump(split_data, f, indent=2)

        for mode, file_list in [('train', train_files), ('test', test_files)]:
            save_dir = os.path.join(output_path, mode, class_name)
            os.makedirs(save_dir, exist_ok=True)

            for idx, file in enumerate(tqdm(file_list, desc=f"{mode.upper()} - {class_name}")):
                file_path = os.path.join(class_path, file)
                image = cv2.imread(file_path)
                if image is None:
                    continue

                patches = extract_top_k_patches(image)
                for i, patch in enumerate(patches):
                    patch_filename = f"{os.path.splitext(file)[0]}_patch_{i}.jpg"
                    patch_path = os.path.join(save_dir, patch_filename)
                    cv2.imwrite(patch_path, patch)

    print("Dataset prepared as image patches with persistent train/test splits!")

# Example usage for Kaggle
dataset_path = './camera_model_data/train/'  # or wherever your Kaggle dataset is mounted
output_path = './camera_model_data/processed_dataset'

prepare_dataset_as_images(dataset_path, output_path)

TRAIN - HTC-1-M7: 100%|███████████████████████████████████████████████████████| 220/220 [00:50<00:00,  4.34it/s]
TEST - HTC-1-M7: 100%|██████████████████████████████████████████████████████████| 55/55 [00:13<00:00,  4.18it/s]
TRAIN - LG-Nexus-5x: 100%|████████████████████████████████████████████████████| 220/220 [02:28<00:00,  1.48it/s]
TEST - LG-Nexus-5x: 100%|███████████████████████████████████████████████████████| 55/55 [00:36<00:00,  1.49it/s]
TRAIN - Motorola-Droid-Maxx: 100%|████████████████████████████████████████████| 220/220 [02:05<00:00,  1.76it/s]
TEST - Motorola-Droid-Maxx: 100%|███████████████████████████████████████████████| 55/55 [00:31<00:00,  1.75it/s]
TRAIN - Motorola-Nexus-6: 100%|███████████████████████████████████████████████| 220/220 [02:37<00:00,  1.40it/s]
TEST - Motorola-Nexus-6: 100%|██████████████████████████████████████████████████| 55/55 [00:38<00:00,  1.42it/s]
TRAIN - Motorola-X: 100%|█████████████████████████████████████████████████████| 220/220 [02:39<0

Dataset prepared as image patches with persistent train/test splits!





In [None]:
import os
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from torchvision import datasets, models, transforms
import numpy as np

# Constants
NUM_CLASSES = 10
BATCH_SIZE = 64
LEARNING_RATE = 0.0001
EPOCHS = 20
FEATURE_SAVE_PATH = '/camera_model_data/features_rn50.npy'
LABEL_SAVE_PATH = '/camera_model_data/true_labels_rn50.npy'
PREDICTION_SAVE_PATH = '/camera_model_data/predictions_rn50.npy'

# Dataset path
dataset_path = './camera_model_data/processed_dataset'

# Transforms
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(15),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225]),
])

# Datasets and Dataloaders
train_dataset = datasets.ImageFolder(root=os.path.join(dataset_path, 'train'), transform=transform)
test_dataset = datasets.ImageFolder(root=os.path.join(dataset_path, 'test'), transform=transform)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)

# Print class mapping
print("Class to index mapping:", train_dataset.class_to_idx)

# Device configuration
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

# ResNet50 Model for classification + feature extraction
class ResNet50FeatureExtractor(nn.Module):
    def __init__(self, num_classes=10):
        super(ResNet50FeatureExtractor, self).__init__()
        self.resnet = models.resnet50(pretrained=True)
        self.feature_extractor = nn.Sequential(*list(self.resnet.children())[:-1])  # exclude final fc
        in_features = self.resnet.fc.in_features
        self.classifier = nn.Linear(in_features, num_classes)

    def forward(self, x):
        features = self.feature_extractor(x)  # [B, 2048, 1, 1]
        features = features.view(features.size(0), -1)  # flatten to [B, 2048]
        logits = self.classifier(features)
        return logits, features

model = ResNet50FeatureExtractor(num_classes=NUM_CLASSES).to(device)

# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)

# Training loop
def train_model():
    best_acc = 0.0
    for epoch in range(EPOCHS):
        model.train()
        running_loss = 0.0
        correct, total = 0, 0

        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs, _ = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, preds = torch.max(outputs, 1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)

        acc = correct / total
        print(f"Epoch [{epoch+1}/{EPOCHS}] - Loss: {running_loss:.4f}, Accuracy: {acc:.4f}")

        # Evaluation after each epoch
        test_acc = evaluate_model()
        if test_acc > best_acc:
            best_acc = test_acc
            torch.save(model.state_dict(), './camera_model_data/best_resnet50_rgb.pth')

# Evaluation and feature saving
def evaluate_model(save_results=False):
    model.eval()
    all_preds, all_labels, all_features = [], [], []

    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            outputs, features = model(images)
            _, preds = torch.max(outputs, 1)

            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
            all_features.append(features.cpu().numpy())

    all_features = np.concatenate(all_features, axis=0)
    acc = np.mean(np.array(all_preds) == np.array(all_labels))
    print(f"Test Accuracy: {acc:.4f}")

    if save_results:
        np.save(FEATURE_SAVE_PATH, all_features)
        np.save(LABEL_SAVE_PATH, np.array(all_labels))
        np.save(PREDICTION_SAVE_PATH, np.array(all_preds))

    return acc

# Run training and save features
train_model()
evaluate_model(save_results=True)

Class to index mapping: {'HTC-1-M7': 0, 'LG-Nexus-5x': 1, 'Motorola-Droid-Maxx': 2, 'Motorola-Nexus-6': 3, 'Motorola-X': 4, 'Samsung-Galaxy-Note3': 5, 'Samsung-Galaxy-S4': 6, 'Sony-NEX-7': 7, 'iPhone-4s': 8, 'iPhone-6': 9}
Using device: cuda
Epoch [1/20] - Loss: 610.8714, Accuracy: 0.9537
Test Accuracy: 0.9575
Epoch [2/20] - Loss: 242.4945, Accuracy: 0.9815
Test Accuracy: 0.9486


In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import seaborn as sns

# Load saved predictions and labels
predictions = np.load('./camera_model_data/processed_dataset/predictions_rn50.npy')
true_labels = np.load('./camera_model_data/processed_dataset/true_labels_rn50.npy')

# Compute confusion matrix
cm = confusion_matrix(true_labels, predictions)

# Optional: Normalize the confusion matrix
cm_normalized = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]

# Plot
plt.figure(figsize=(8, 6))
sns.heatmap(cm_normalized, annot=True, fmt='.2f', cmap='Blues', xticklabels=range(10), yticklabels=range(10))
plt.title("Normalized Confusion Matrix - ResNet50")
plt.xlabel("Predicted Labels")
plt.ylabel("True Labels")
plt.tight_layout()
plt.show()

In [None]:
import pandas as pd
from sklearn.metrics import accuracy_score
import matplotlib.pyplot as plt

# Load predictions from CSV
df = pd.read_csv('./camera_model_data/processed_dataset/resnet50_classifier_outputs.csv')

# Extract predictions and true labels
true_labels = df['True_Label'].values
svm_preds = df['SVM'].values
knn_preds = df['KNN'].values
rf_preds = df['RF'].values
ensemble_preds = df['Ensemble'].values

# Compute accuracies
svm_acc = accuracy_score(true_labels, svm_preds) * 100
knn_acc = accuracy_score(true_labels, knn_preds) * 100
rf_acc = accuracy_score(true_labels, rf_preds) * 100
ensemble_acc = accuracy_score(true_labels, ensemble_preds) * 100

# Plot
accuracies = {
    'SVM': svm_acc,
    'KNN': knn_acc,
    'Random Forest': rf_acc,
    'Ensemble': ensemble_acc
}

plt.figure(figsize=(10,6))
bars = plt.bar(accuracies.keys(), accuracies.values(), color=['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728'])
plt.ylim(0, 105)
plt.ylabel('Accuracy (%)')
plt.title('ResNet50 ML Model Accuracy Comparison')
plt.grid(axis='y', linestyle='--', alpha=0.7)

for bar in bars:
    yval = bar.get_height()
    plt.text(bar.get_x() + bar.get_width()/2, yval + 1, f'{yval:.2f}%', ha='center', va='bottom')

plt.show()