In [1]:
from google.colab import drive
import os

drive.mount('/content/drive')

# Adjust the path if it's located in a subfolder
project_path = "/content/drive/MyDrive/CrowdCounting"

# Check if the path exists
if os.path.exists(project_path):
  print(f"CrowdCounting directory found at: {project_path}")
  # Change the working directory to CrowdCounting
  os.chdir(project_path)
  print(f"Current working directory changed to: {os.getcwd()}")
else:
  print(f"Error: CrowdCounting directory not found at {project_path}")
  print("Please ensure the path is correct and the directory exists.")


Mounted at /content/drive
CrowdCounting directory found at: /content/drive/MyDrive/CrowdCounting
Current working directory changed to: /content/drive/MyDrive/CrowdCounting


In [11]:
# unmount drive in case the modified files couldn't updated to notebook then run mount again
drive.flush_and_unmount()
print('All changes made in this colab session should now be visible in Drive.')


All changes made in this colab session should now be visible in Drive.


In [8]:
# import all library needed
import json
import torch
from torch.utils.data import DataLoader
from torchvision import transforms
import matplotlib.pyplot as plt
import numpy as np
from tqdm import tqdm

from model import CSRNet
import importlib
import dataset  # pertama kali import
importlib.reload(dataset)
from dataset import listDataset

from skimage.metrics import structural_similarity as ssim
from skimage.metrics import peak_signal_noise_ratio as psnr
import cv2

# configurations
TRAIN_JSON       = 'test_data.json'         # define test set
CHECKPOINT_PATH  = 'hajj2-mask1-model_best.pth.tar' # define model that will evaluated
DEVICE           = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu') # define gpu number
BATCH_SIZE       = 1                          # define batch size, keep it 1 due to different images dimension
NUM_WORKERS      = 2                          # number of worker for dataloader, keep it 2
MAX_BATCHES      = None                       # set None to conduct full evaluation

# define function for data loader
def get_dataloader(json_path, batch_size, num_workers):
    with open(json_path, 'r') as f:
        img_list = json.load(f)

    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406],
                             std=[0.229, 0.224, 0.225])
    ])

    dataset = listDataset(img_list, transform=transform, train=False)
    loader = DataLoader(
        dataset,
        batch_size=batch_size,
        shuffle=False,
        num_workers=num_workers,
        pin_memory=True
    )
    return loader

train_loader = get_dataloader(TRAIN_JSON, BATCH_SIZE, NUM_WORKERS)

# define function for load model
def load_model(checkpoint_path, device):
    model = CSRNet().to(device)
    ckpt  = torch.load(checkpoint_path, map_location=device)
    model.load_state_dict(ckpt['state_dict'])
    model.eval()
    return model

# define object for the model
model = load_model(CHECKPOINT_PATH, DEVICE)

# define function to compute all metrics are needed
def evaluate_all_metrics(loader, model, device, max_batches=None):
    mae_sum, mse_sum, mnae_sum = 0.0, 0.0, 0.0
    psnr_list, ssim_list = [], []
    n = 0

    for i, (imgs, targets) in enumerate(tqdm(loader, desc='Evaluating')):
        if max_batches is not None and i >= max_batches:
            break
        imgs, targets = imgs.to(device), targets.to(device)
        with torch.no_grad():
            preds = model(imgs)

        pred_cnts = preds.sum(dim=(1,2,3)).cpu().numpy()
        true_cnts = targets.sum(dim=(1,2)).cpu().numpy()

        mae_sum  += np.abs(pred_cnts - true_cnts).sum()
        mse_sum  += ((np.abs(pred_cnts - true_cnts))**2).sum()
        mnae_sum += (np.abs(pred_cnts - true_cnts) / (true_cnts + 1e-8)).sum()

        for b in range(imgs.size(0)):
            pred_map = preds[b].squeeze().cpu().numpy()
            gt_map = targets[b].squeeze().cpu().numpy()
            # Resize pred_map to gt_map size
            pred_map_resized = cv2.resize(pred_map, (gt_map.shape[1], gt_map.shape[0]), interpolation=cv2.INTER_CUBIC)
            psnr_val = psnr(gt_map, pred_map_resized, data_range=gt_map.max() - gt_map.min())
            ssim_val = ssim(gt_map, pred_map_resized, data_range=gt_map.max() - gt_map.min())
            psnr_list.append(psnr_val)
            ssim_list.append(ssim_val)

        n += imgs.size(0)

    mae  = mae_sum / n
    mse  = mse_sum / n
    rmse = np.sqrt(mse_sum / n)
    mnae = mnae_sum / n
    avg_psnr = np.mean(psnr_list)
    avg_ssim = np.mean(ssim_list)

    return mae, mse, rmse, mnae, avg_psnr, avg_ssim

mae, mse, rmse, mnae, avg_psnr, avg_ssim = evaluate_all_metrics(train_loader, model, DEVICE, MAX_BATCHES)

print(f"\n== Evaluation Metrics ==")
print(f"MAE   : {mae:.3f}")
print(f"MSE   : {mse:.3f}")
print(f"RMSE  : {rmse:.3f}")
print(f"MNAE  : {mnae:.3f}")
print(f"PSNR  : {avg_psnr:.3f} dB")
print(f"SSIM  : {avg_ssim:.4f}")



Skipping weight copy for: 0.weight due to shape mismatch


Evaluating: 100%|██████████| 34/34 [00:20<00:00,  1.66it/s]


== Evaluation Metrics ==
MAE   : 54.429
MSE   : 5795.169
RMSE  : 76.126
MNAE  : 0.114
PSNR  : 27.017 dB
SSIM  : 0.7648





## SHAP implementation
We need make some adjustment to use shap for interpreting crowd counting model. It because basically shap to interpret classification output

In [9]:
import torch.nn as nn

# Wrap model to return total count
class CSRNetWrapper(nn.Module):
    def __init__(self, model):
        super(CSRNetWrapper, self).__init__()
        self.model = model

    def forward(self, x):
        density_map = self.model(x)   # (1, 1, H, W)
        count = density_map.view(x.size(0), -1).sum(dim=1, keepdim=True) # (1, 1)

        return count

# set the relu to false due to shap didn't support relu.
def disable_inplace_relu(module):
    for name, child in module.named_children():
        if isinstance(child, nn.ReLU):
            setattr(module, name, nn.ReLU(inplace=False))
        else:
            disable_inplace_relu(child)

# Patch all inplace ReLU
disable_inplace_relu(model)
# Wrapped model
wrapped_model = CSRNetWrapper(model)
# Set model to eval mode
wrapped_model.eval()


CSRNetWrapper(
  (model): CSRNet(
    (frontend): Sequential(
      (0): Conv2d(4, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (1): ReLU()
      (2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (3): ReLU()
      (4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
      (5): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (6): ReLU()
      (7): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (8): ReLU()
      (9): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
      (10): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (11): ReLU()
      (12): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (13): ReLU()
      (14): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (15): ReLU()
      (16): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)


In [10]:
from torchvision import transforms
from torch.utils.data import DataLoader
import json

# Since SHAP needs same dimension size for all images, we force dimension the input size
def get_dataloader(json_path, batch_size=1, num_workers=0, resize=(512, 512)):
    with open(json_path, 'r') as f:
        img_list = json.load(f)

    transform = transforms.Compose([
        transforms.ToTensor(),  # ubah dari np.ndarray ke torch.Tensor
        #transforms.Resize((512, 512)),  # Resize tensor
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])


    dataset = listDataset(img_list, transform=transform, train=False)

    loader = DataLoader(
        dataset,
        batch_size=batch_size,
        shuffle=False,
        num_workers=num_workers,
        pin_memory=True
    )
    return loader


In [11]:
# Define function for denormalize image
def denormalize(img_tensor,*, mean, std):
    mean = torch.tensor(mean).view(-1, 1, 1)
    std = torch.tensor(std).view(-1, 1, 1)
    return img_tensor * std + mean

# Since SHAP needs same dimension size for all images, we force the dimension to same face
def resize_tensor_image(tensor_img, size=(512, 512)):
    return F.interpolate(tensor_img, size=size, mode='bilinear', align_corners=False)

# Define function to plot original image
def image_rgb(image_tensor,idx):

    img_denorm = denormalize(image_tensor[0, :3].cpu(), mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    image_np = np.transpose(img_denorm.numpy(), (1, 2, 0))  # (C, H, W) → (H, W, C)

    plt.figure(figsize=(10, 4))
    plt.imshow(image_np)
    plt.title(f'Sample #{idx}')
    plt.axis('off')
    plt.show()

# Define function to visualize SHAP value
def visualize_shap_channel(image_tensor, shap_value_tensor, idx, channel_idx, channel_name):

    channel_input = image_tensor[0, channel_idx].detach().cpu().numpy()
    channel_shap  = shap_value_tensor[0, channel_idx]

    vmax = max(np.max(np.abs(channel_shap)), 1e-3)  # batas minimum untuk kontras


    plt.figure(figsize=(10, 4))

    plt.subplot(1, 2, 1)
    if channel_name == "Edge":
      plt.imshow(channel_input, cmap='gray')
    else:
      plt.imshow(channel_input,cmap=channel_name)
    plt.title(f'Sample #{idx} - {channel_name} Channel Input')
    plt.axis('off')
    plt.subplot(1, 2, 2)
    im = plt.imshow(channel_shap, cmap='seismic', vmin=-vmax, vmax=vmax)
    plt.title(f'Sample #{idx} - SHAP for {channel_name}')
    plt.colorbar(im, fraction=0.046, pad=0.04)
    plt.tight_layout()
    plt.axis('off')
    plt.show()

# Define function to visualize SHAP feature important per chanel (SHAP mean)
def plot_shap_bar(avg_shaps, channel_names, idx):
    plt.figure(figsize=(6, 4))
    plt.bar(channel_names, avg_shaps, color='skyblue')
    plt.title(f"Sample #{idx} - Avg |SHAP| per Channel")
    plt.ylabel("Mean(|SHAP|)")
    plt.tight_layout()
    plt.grid(True, axis='y')
    plt.show()

# Define function to plot gt dan pred density map
def plot_density(gt, pred, idx):
    # 1. Ground Truth Density Map
    plt.figure(figsize=(10, 5))
    plt.subplot(1, 2, 1)
    plt.imshow(gt.squeeze(), cmap='jet')
    plt.title(f"GT Density Map\nCount: {gt.sum():.1f}")
    plt.axis("off")

    # 2. Predicted Density Map
    plt.subplot(1, 2, 2)
    plt.imshow(pred.squeeze(), cmap='jet')
    plt.title(f"Predicted Map\nCount: {pred.sum():.1f}")
    plt.axis("off")
    plt.show()

In [12]:
# import all library are needed
import shap
import matplotlib.pyplot as plt
import numpy as np
import torch
from torchvision import transforms
import torchvision.transforms.functional as TF
import torch.nn.functional as F

# Initialize the image loader
shap_loader = get_dataloader(TRAIN_JSON, batch_size=1, num_workers=0)
shap_iter = iter(shap_loader)

# Name of channle
channel_names = ["Reds", "Greens", "Blues", "Edge"]

# define list to save mae value ratio, this is needed for get the best and worst case
mae_ratios = []

# define list to save background for shap
background_list = []

# run looping for all test set images
for idx, (image, target) in enumerate(shap_iter):
    #image, target = next(shap_iter)
    image_resized = resize_tensor_image(image, size=(512, 512)).cpu() # forcing image size
    # take 5 images for shap background
    if len(background_list) < 5:
        background_list.append(image_resized.clone())
    # predicting original images (not resized)
    with torch.no_grad():
        output = model(image.to(DEVICE))
        pred = output.cpu().numpy()
        true = target.cpu().numpy()
        pred_cnts = output.sum(dim=(1,2,3)).cpu().numpy()
        true_cnts = target.sum(dim=(1,2)).cpu().numpy()
        abs_diff = np.abs(pred_cnts - true_cnts)[0]
        gt = true_cnts[0]

        # Avoid devide by zero
        ratio = abs_diff / (gt + 1e-8)

    # Save information needed for next step to list
    mae_ratios.append({
        'idx': idx,
        'ratio': ratio,
        'mae': abs_diff,
        'gt': gt,
        'pred': pred_cnts[0],
        'd_gt': true,
        'd_pred': pred,
        'image': image,
        'image_resized': image_resized
    })

# Take sample for the best and worst case
min_ratio_sample = min(mae_ratios, key=lambda x: x['ratio'])
max_ratio_sample = max(mae_ratios, key=lambda x: x['ratio'])


# save the best and worst value to list
samples_to_show = [min_ratio_sample, max_ratio_sample]
# save background for shap analysis
background = torch.cat(background_list, dim=0)


In [13]:
# Loop for those cases
for sample in samples_to_show:
    # get all information are needed from the list
    idx = sample['idx']
    image = sample['image']
    image_resized = sample['image_resized']
    gt = sample['gt']
    pred = sample['pred']
    ratio = sample['ratio']
    mae_val = sample['mae']
    gt_density = sample['d_gt']
    pred_density = sample['d_pred']

    # Run SHAP
    wrapped_model = wrapped_model.cpu()                       # Used this model to align shap requirement
    wrapped_model.eval()                                      # set model to evaluation mode
    explainer = shap.DeepExplainer(wrapped_model, background) # define explainer, we use deep explainer
    shap_values = explainer.shap_values(image_resized)        # get shape values for best and worst value

    # Plot the rgp image
    image_rgb(image, idx)

    # Plot all density (gt and pred)
    plot_density(gt_density, pred_density, idx)

    # Print all informations about the sample (GT, PRED, MAE and Ratio)
    print(f"\nSample #{idx} - GT: {gt:.2f}, Pred: {pred:.2f}, MAE: {mae_val:.2f}, Ratio: {ratio:.4f}")

    # Shap visualization per channel
    avg_shaps = []
    for ch in range(4):
        max_val = np.abs(shap_values[0][0, ch]).max()
        mean_val = np.abs(shap_values[0][0, ch]).mean()
        avg_shaps.append(mean_val)
        # print shap (mean) value per channel
        print(f"Channel {ch} ({channel_names[ch]}): max SHAP = {max_val:.6f}, mean SHAP = {mean_val:.6f}")
        # plot image per channel
        visualize_shap_channel(image, shap_values[0], idx=idx, channel_idx=ch, channel_name=channel_names[ch])

    # bar plot for shap (feature importance per channel)
    plot_shap_bar(avg_shaps, channel_names, idx)


Output hidden; open in https://colab.research.google.com to view.