In [1]:
import os
import shutil
import time
import random
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as T
from torchvision import datasets, models, transforms
from torch.utils.data import DataLoader

# 1. Device Config
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# 2. Reproducibility (Crucial for scientific results)
def seed_everything(seed=42):
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
seed_everything(42)

# 3. Physics-Aware Normalization Helpers
# These allow us to add the pattern in "Real World" pixel space (0-1)
# BEFORE the model normalizes it for processing.
class ReIDNormalize(nn.Module):
    def __init__(self, mean, std):
        super(ReIDNormalize, self).__init__()
        self.register_buffer('mean', torch.Tensor(mean).view(1, 3, 1, 1))
        self.register_buffer('std', torch.Tensor(std).view(1, 3, 1, 1))
    def forward(self, tensor): return (tensor - self.mean) / self.std

class ReIDUnNormalize(nn.Module):
    def __init__(self, mean, std):
        super(ReIDUnNormalize, self).__init__()
        self.register_buffer('mean', torch.Tensor(mean).view(1, 3, 1, 1))
        self.register_buffer('std', torch.Tensor(std).view(1, 3, 1, 1))
    def forward(self, tensor): return (tensor * self.std) + self.mean

# Standard ImageNet Normalization
norm_layer = ReIDNormalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]).to(device)
unnorm_layer = ReIDUnNormalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]).to(device)
print("Environment & Helpers Ready.")

Using device: cuda
Environment & Helpers Ready.


In [2]:
# --- CONFIGURATION ---
DATA_ROOT = './Market-1501-v15.09.15' # Ensure this matches your folder name

# Standard Transforms
transform_test = transforms.Compose([
    transforms.Resize((256, 128), interpolation=3),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

# Load Data
image_datasets = {
    'gallery': datasets.ImageFolder(os.path.join(DATA_ROOT, 'bounding_box_test'), transform_test),
    'query': datasets.ImageFolder(os.path.join(DATA_ROOT, 'query'), transform_test)
}

dataloaders = {x: DataLoader(image_datasets[x], batch_size=32, shuffle=False, num_workers=2) 
               for x in ['gallery', 'query']}

# --- MODEL B (ResNet-50) ---
class ModelB_ResNet50(nn.Module):
    def __init__(self, num_classes=751):
        super(ModelB_ResNet50, self).__init__()
        self.model = models.resnet50(pretrained=True)
        self.in_features = self.model.fc.in_features
        self.model.fc = nn.Linear(self.in_features, num_classes)
        
    def forward(self, x): return self.model(x)

    def get_embedding(self, x):
        # Extract features (IDE baseline)
        x = self.model.conv1(x)
        x = self.model.bn1(x)
        x = self.model.relu(x)
        x = self.model.maxpool(x)
        x = self.model.layer1(x)
        x = self.model.layer2(x)
        x = self.model.layer3(x)
        x = self.model.layer4(x)
        x = self.model.avgpool(x)
        x = torch.flatten(x, 1)
        return x

# Initialize and Load Weights
model_b = ModelB_ResNet50(num_classes=751).to(device)
if os.path.exists('model_b_market1501.pth'):
    model_b.load_state_dict(torch.load('model_b_market1501.pth'))
    print("Model B Loaded Successfully.")
else:
    print("Error: 'model_b_market1501.pth' not found. Please copy it from your previous project.")

  model_b.load_state_dict(torch.load('model_b_market1501.pth'))


Model B Loaded Successfully.


In [9]:
# --- HELPER FUNCTIONS (UPDATED) ---

def get_images_by_pid(dataset, target_pid, max_images=50):
    found_images = []
    for i in range(len(dataset)):
        path, _ = dataset.imgs[i]
        filename = os.path.basename(path)
        pid = int(filename.split('_')[0])
        if pid == target_pid:
            img_tensor = dataset[i][0].unsqueeze(0)
            found_images.append(img_tensor)
            if len(found_images) >= max_images: break
    if len(found_images) == 0: return None
    return torch.cat(found_images, dim=0).to(device)

def extract_features_robust(model, dataloader, device):
    model.eval()
    features = []
    with torch.no_grad():
        for imgs, _ in dataloader:
            imgs = imgs.to(device)
            outputs = model.get_embedding(imgs)
            fnorm = torch.norm(outputs, p=2, dim=1, keepdim=True)
            outputs = outputs.div(fnorm.expand_as(outputs))
            features.append(outputs.cpu())
    return torch.cat(features, dim=0)

def get_id_and_cam(dataset):
    pids = []
    for path, _ in dataset.imgs:
        filename = os.path.basename(path)
        pids.append(int(filename.split('_')[0]))
    return np.array(pids)

def calculate_metrics_exact(q_feats, g_feats, g_pids, target_pid):
    """Calculates detailed metrics (R1, R5, mAP, SS) for a specific target ID."""
    # Compute Similarity
    distmat = torch.mm(q_feats, g_feats.t())
    distmat = distmat.cpu().numpy()
    
    cmc_scores = []
    ap_scores = []
    ss_scores = []
    
    for i in range(q_feats.size(0)):
        sims = distmat[i]
        indices = np.argsort(sims)[::-1]
        
        # Check Matches
        matches = (g_pids[indices] == target_pid).astype(np.int32)
        
        # Similarity Score (SS) of top match
        if matches.sum() > 0:
            first_match_idx = np.where(matches == 1)[0][0]
            ss_scores.append(sims[indices[first_match_idx]])
        else:
            ss_scores.append(0.0)
            
        if matches.sum() == 0: continue
        
        # CMC
        cmc = matches.cumsum()
        cmc[cmc > 1] = 1
        cmc_scores.append(cmc[:10])
        
        # mAP
        num_rel = matches.sum()
        tmp_cmc = matches.cumsum()
        tmp_cmc = [x / (j + 1.) for j, x in enumerate(tmp_cmc)]
        tmp_cmc = np.asarray(tmp_cmc) * matches
        ap_scores.append(tmp_cmc.sum() / num_rel)

    if len(cmc_scores) == 0: return {'rank-1':0,'rank-5':0,'mAP':0,'ss':0}

    all_cmc = np.mean(cmc_scores, axis=0)
    return {
        'rank-1': all_cmc[0],
        'rank-5': all_cmc[4],
        'rank-10': all_cmc[9],
        'mAP': np.mean(ap_scores),
        'ss': np.mean(ss_scores)
    }

In [4]:
# --- IMPERSONATION ATTACK CLASS ---

class AdvPatternAttack_Impersonation:
    def __init__(self, model, device, lr=0.05):
        self.model = model
        self.device = device
        self.lr = lr
        self.model.eval()
        
        # Jitter: Essential. If we don't jitter, the pattern overfits 
        # to the specific pixel alignment of the training images.
        self.jitter = T.Compose([
            T.RandomAffine(degrees=0, translate=(0.05, 0.05), scale=(0.95, 1.05))
        ])

    def generate_impersonation_pattern(self, adversary_imgs, target_imgs, num_steps=2500):
        print(f"--- Starting IMPERSONATION Attack ({num_steps} steps) ---")
        
        # 1. Prepare Target Centroid (The "Ghost" we want to mimic)
        with torch.no_grad():
            # Un-normalize -> Re-normalize ensures we are in the model's feature space
            clean_tgt = unnorm_layer(target_imgs)
            clean_tgt.clamp_(0, 1)
            target_input = norm_layer(clean_tgt)
            
            # Extract features of all target images
            target_feats = self.model.get_embedding(target_input)
            target_feats = torch.nn.functional.normalize(target_feats, p=2, dim=1)
            
            # Calculate Average (Centroid)
            # This is more stable than picking just one target image
            target_centroid = torch.mean(target_feats, dim=0, keepdim=True).detach()
        
        # 2. Prepare Adversary Images
        with torch.no_grad():
            clean_adv = unnorm_layer(adversary_imgs)
            clean_adv.clamp_(0, 1)

        # 3. Mask (Chest Only - Paper Exact)
        mask = torch.zeros((1, 3, 256, 128)).to(self.device)
        mask[:, :, 60:160, 20:108] = 1.0 
        
        # 4. Init Pattern
        delta = torch.rand((1, 3, 256, 128)).to(self.device)
        delta.requires_grad = True
        
        optimizer = optim.Adam([delta], lr=self.lr)
        # Drop LR at step 1000 and 2000 to "lock in" the best features
        scheduler = optim.lr_scheduler.MultiStepLR(optimizer, milestones=[1000, 2000], gamma=0.1)
        
        # Loss: 1.0 means "Make them similar", -1.0 means "Make them different"
        cosine_loss = nn.CosineEmbeddingLoss(margin=0.0)
        
        for step in range(num_steps):
            optimizer.zero_grad()
            
            # Apply Pattern
            patch = delta * mask
            jit_patch = self.jitter(patch)
            
            # Add to Adversary (Physics-Aware)
            adv_raw = clean_adv + jit_patch
            adv_raw.clamp_(0, 1) # Valid pixel range
            
            # Extract Features
            adv_input = norm_layer(adv_raw)
            adv_feats = self.model.get_embedding(adv_input)
            
            # --- LOSS FUNCTION ---
            # We want Adversary Features to match Target Centroid
            
            # Create a label of 1s (Similar)
            t_label = torch.ones(adv_feats.size(0)).to(self.device)
            # Expand centroid to match batch size
            t_centroid_batch = target_centroid.expand_as(adv_feats)
            
            # Calculate Loss & Scale Up
            # Scaling by 20.0 prevents vanishing gradients when similarity is "okay" but not "perfect"
            loss = cosine_loss(adv_feats, t_centroid_batch, t_label) * 20.0
            
            loss.backward()
            optimizer.step()
            scheduler.step()
            
            # PGD Clamp
            with torch.no_grad():
                delta.clamp_(0, 1)
            
            if step % 500 == 0:
                print(f"Step {step}, Loss: {loss.item():.4f}")
                
        return delta.detach(), mask

print("Impersonation Class Defined.")

Impersonation Class Defined.


In [None]:
# --- EXECUTE MULTI-PAIR IMPERSONATION (TABLE 3) ---

def run_table3_replication(model, dataloaders, num_pairs=5, num_steps=2000):
    print(f"--- Starting Full Table 3 Replication ({num_pairs} Pairs) ---")
    
    # 1. Get Valid PIDs
    gal_pids = set([int(os.path.basename(path).split('_')[0]) for path, _ in dataloaders['gallery'].dataset.imgs])
    qry_pids = set([int(os.path.basename(path).split('_')[0]) for path, _ in dataloaders['query'].dataset.imgs])
    valid_pids = list(gal_pids.intersection(qry_pids))
    valid_pids = [p for p in valid_pids if p > 0]
    
    # 2. Prepare Gallery Features (Static)
    g_feats = extract_features_robust(model, dataloaders['gallery'], device)
    g_pids = get_id_and_cam(dataloaders['gallery'].dataset)
    # Move Gallery to GPU for fast calculation
    g_feats = g_feats.to(device)
    
    # Storage
    agg_tgt = {'rank-1': [], 'rank-5': [], 'rank-10': [], 'mAP': [], 'ss': []}
    agg_slf = {'rank-1': [], 'rank-5': [], 'rank-10': [], 'mAP': [], 'ss': []}
    
    for i in range(num_pairs):
        # Pick Pair
        adv_pid, tgt_pid = random.sample(valid_pids, 2)
        print(f"\n[Pair {i+1}] Adv: {adv_pid} -> Tgt: {tgt_pid}")
        
        # Get Data
        adv_gs = get_images_by_pid(dataloaders['gallery'].dataset, adv_pid)
        tgt_gs = get_images_by_pid(dataloaders['gallery'].dataset, tgt_pid)
        adv_ts = get_images_by_pid(dataloaders['query'].dataset, adv_pid)
        
        if adv_gs is None or tgt_gs is None: continue
            
        # Train Pattern
        attacker = AdvPatternAttack_Impersonation(model, device, lr=0.05)
        pat, mask = attacker.generate_impersonation_pattern(adv_gs, tgt_gs, num_steps=num_steps)
        
        # Generate Adversarial Queries
        with torch.no_grad():
            q_raw = unnorm_layer(adv_ts.to(device))
            q_adv = norm_layer((q_raw + (pat * mask)).clamp(0, 1))
            q_feat = torch.nn.functional.normalize(model.get_embedding(q_adv), p=2, dim=1)
            
        # Calc Metrics (Success: Match Target)
        m_tgt = calculate_metrics_exact(q_feat, g_feats, g_pids, tgt_pid)
        # Calc Metrics (Fail: Match Self)
        m_slf = calculate_metrics_exact(q_feat, g_feats, g_pids, adv_pid)
        
        print(f"   > Target Rank-1: {m_tgt['rank-1']:.1%}")
        
        for k in agg_tgt:
            agg_tgt[k].append(m_tgt[k])
            agg_slf[k].append(m_slf[k])
            
    # Averages
    avg_tgt = {k: np.mean(v) for k, v in agg_tgt.items()}
    avg_slf = {k: np.mean(v) for k, v in agg_slf.items()}
    return avg_tgt, avg_slf

# Run it
final_tgt, final_self = run_table3_replication(model_b, dataloaders, num_pairs=5, num_steps=2000)

# --- PRINT TABLE 3 ---
print("\n" + "="*95)
print(f"Table 3. Digital-environment attack results (Average of 5 Pairs)")
print("="*95)
print(f"{'Matched Person':<20} {'rank-1':<12} {'rank-5':<12} {'rank-10':<12} {'mAP':<12} {'ss':<12}")
print("-" * 95)
print(f"{'Target (Success)':<20} {final_tgt['rank-1']:.1%}      {final_tgt['rank-5']:.1%}      {final_tgt['rank-10']:.1%}      {final_tgt['mAP']:.1%}      {final_tgt['ss']:.3f}")
print(f"{'Adversary (Fail)':<20} {final_self['rank-1']:.1%}      {final_self['rank-5']:.1%}      {final_self['rank-10']:.1%}      {final_self['mAP']:.1%}      {final_self['ss']:.3f}")
print("="*95)

--- Starting Full Table 3 Replication (5 Pairs) ---

[Pair 1] Adv: 791 -> Tgt: 1190
--- Starting IMPERSONATION Attack (2000 steps) ---
Step 0, Loss: 9.7505


In [7]:
def calculate_metrics_exact(q_feats, g_feats, g_pids, target_pid):
    """
    Calculates Rank-1, Rank-5, Rank-10, mAP, and SS for a specific target ID.
    """
    # Compute Cosine Similarity Matrix
    distmat = torch.mm(q_feats, g_feats.t())
    distmat = distmat.cpu().numpy()
    
    cmc_scores = []
    ap_scores = []
    ss_scores = [] 
    
    for i in range(q_feats.size(0)):
        # Get similarities for this query
        sims = distmat[i]
        # Sort descending (highest sim first)
        indices = np.argsort(sims)[::-1]
        
        # 1. Find matches (Ground Truth)
        matches = (g_pids[indices] == target_pid).astype(np.int32)
        
        # 2. Calculate SS (Similarity Score of the first correct match)
        if matches.sum() > 0:
            first_match_idx = np.where(matches == 1)[0][0]
            # Score of the top-1 correct image
            ss_scores.append(sims[indices[first_match_idx]])
        else:
            ss_scores.append(0.0)

        # 3. Calculate CMC (Ranks)
        if matches.sum() == 0: 
            continue
            
        cmc = matches.cumsum()
        cmc[cmc > 1] = 1
        cmc_scores.append(cmc[:10])
        
        # 4. Calculate mAP
        num_rel = matches.sum()
        tmp_cmc = matches.cumsum()
        tmp_cmc = [x / (j + 1.) for j, x in enumerate(tmp_cmc)]
        tmp_cmc = np.asarray(tmp_cmc) * matches
        ap_scores.append(tmp_cmc.sum() / num_rel)

    # Averages
    if len(cmc_scores) == 0:
        return {'rank-1': 0.0, 'rank-5': 0.0, 'rank-10': 0.0, 'mAP': 0.0, 'ss': 0.0}
        
    all_cmc = np.mean(cmc_scores, axis=0)
    mean_ap = np.mean(ap_scores)
    mean_ss = np.mean(ss_scores)
    
    return {
        'rank-1': all_cmc[0],
        'rank-5': all_cmc[4],
        'rank-10': all_cmc[9],
        'mAP': mean_ap,
        'ss': mean_ss
    }

In [8]:
def run_table3_experiment(model, dataloaders, num_pairs=5, num_steps=2000):
    print(f"--- Starting Table 3 Replication ({num_pairs} Pairs) ---")
    
    # 1. Get Valid PIDs
    # We need PIDs that exist in both Gallery and Query
    gal_pids = set([int(os.path.basename(path).split('_')[0]) for path, _ in dataloaders['gallery'].dataset.imgs])
    qry_pids = set([int(os.path.basename(path).split('_')[0]) for path, _ in dataloaders['query'].dataset.imgs])
    valid_pids = list(gal_pids.intersection(qry_pids))
    valid_pids = [p for p in valid_pids if p > 0] # Remove junk
    
    # 2. Select Random Pairs
    pairs = []
    for _ in range(num_pairs):
        pairs.append(random.sample(valid_pids, 2))
        
    print(f"Test Pairs (Adv -> Tgt): {pairs}")
    
    # Storage for averaging
    agg_target_metrics = {'rank-1': [], 'rank-5': [], 'rank-10': [], 'mAP': [], 'ss': []}
    agg_self_metrics =   {'rank-1': [], 'rank-5': [], 'rank-10': [], 'mAP': [], 'ss': []}
    
    # 3. Pre-compute Gallery Features (Static)
    g_feats = extract_features_robust(model, dataloaders['gallery'], device)
    g_pids = get_id_and_cam(dataloaders['gallery'].dataset)
    g_feats = g_feats.to(device) # Move to GPU for speed
    
    # 4. Loop
    for i, (adv_pid, tgt_pid) in enumerate(pairs):
        print(f"\n[Pair {i+1}/{num_pairs}] Training: PID {adv_pid} -> PID {tgt_pid}")
        
        # Get Data
        adv_gs = get_images_by_pid(dataloaders['gallery'].dataset, adv_pid)
        tgt_gs = get_images_by_pid(dataloaders['gallery'].dataset, tgt_pid)
        adv_ts = get_images_by_pid(dataloaders['query'].dataset, adv_pid)
        
        if adv_gs is None or tgt_gs is None or adv_ts is None:
            print("Skipping pair (data missing)")
            continue
            
        # Train Pattern
        attacker = AdvPatternAttack_Impersonation(model, device, lr=0.05)
        pat, mask = attacker.generate_impersonation_pattern(adv_gs, tgt_gs, num_steps=num_steps)
        
        # Generate Adversarial Query Features
        with torch.no_grad():
            q_raw = unnorm_layer(adv_ts.to(device))
            q_adv = norm_layer((q_raw + (pat * mask)).clamp(0, 1))
            q_feat_adv = torch.nn.functional.normalize(model.get_embedding(q_adv), p=2, dim=1)
        
        # --- METRIC A: TARGET MATCH (Success) ---
        # "Did we look like the Target?" -> Ground Truth = tgt_pid
        m_tgt = calculate_metrics_exact(q_feat_adv, g_feats, g_pids, tgt_pid)
        
        # --- METRIC B: ADVERSARY MATCH (Fail) ---
        # "Did we still look like ourselves?" -> Ground Truth = adv_pid
        m_self = calculate_metrics_exact(q_feat_adv, g_feats, g_pids, adv_pid)
        
        # Log & Store
        print(f"   > Target Match R1: {m_tgt['rank-1']:.1%}")
        print(f"   > Self Match R1:   {m_self['rank-1']:.1%}")
        
        for k in agg_target_metrics:
            agg_target_metrics[k].append(m_tgt[k])
            agg_self_metrics[k].append(m_self[k])

    # 5. Compute Final Averages
    avg_target = {k: np.mean(v) for k, v in agg_target_metrics.items()}
    avg_self = {k: np.mean(v) for k, v in agg_self_metrics.items()}
    
    return avg_target, avg_self

# --- EXECUTE & PRINT TABLE ---
# Run on 5 pairs to get a robust average
final_tgt, final_self = run_table3_experiment(model_b, dataloaders, num_pairs=5, num_steps=2000)

print("\n" + "="*95)
print(f"Table 3. Digital-environment attack results (Replicated on Model B - Avg of 5 Pairs)")
print("="*95)
print(f"{'Matched Person':<20} {'rank-1':<12} {'rank-5':<12} {'rank-10':<12} {'mAP':<12} {'ss':<12}")
print("-" * 95)

# Row 1: Matched as Target (Success)
print(f"{'Target (TS)':<20} {final_tgt['rank-1']:.1%}      {final_tgt['rank-5']:.1%}      {final_tgt['rank-10']:.1%}      {final_tgt['mAP']:.1%}      {final_tgt['ss']:.3f}")

# Row 2: Matched as Adversary (Failure)
print(f"{'Adversary (TS)':<20} {final_self['rank-1']:.1%}      {final_self['rank-5']:.1%}      {final_self['rank-10']:.1%}      {final_self['mAP']:.1%}      {final_self['ss']:.3f}")
print("="*95)

--- Starting Table 3 Replication (5 Pairs) ---
Test Pairs (Adv -> Tgt): [[1272, 624], [576, 33], [85, 219], [1131, 687], [1299, 870]]

[Pair 1/5] Training: PID 1272 -> PID 624
--- Starting IMPERSONATION Attack (2000 steps) ---
Step 0, Loss: 14.1843
Step 500, Loss: 7.8425
Step 1000, Loss: 7.0179
Step 1500, Loss: 7.6609
   > Target Match R1: 20.0%
   > Self Match R1:   0.0%

[Pair 2/5] Training: PID 576 -> PID 33
--- Starting IMPERSONATION Attack (2000 steps) ---
Step 0, Loss: 14.0802
Step 500, Loss: 7.0775
Step 1000, Loss: 7.9431
Step 1500, Loss: 7.9175
   > Target Match R1: 0.0%
   > Self Match R1:   0.0%

[Pair 3/5] Training: PID 85 -> PID 219
--- Starting IMPERSONATION Attack (2000 steps) ---
Step 0, Loss: 13.1561
Step 500, Loss: 7.9163
Step 1000, Loss: 7.6638
Step 1500, Loss: 7.9920
   > Target Match R1: 0.0%
   > Self Match R1:   50.0%

[Pair 4/5] Training: PID 1131 -> PID 687
--- Starting IMPERSONATION Attack (2000 steps) ---
Step 0, Loss: 11.5202
Step 500, Loss: 4.7454
Step 1000,