In [None]:
# ✅ General Purpose
import os
import glob
import random
from pathlib import Path

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# ✅ Image Processing
import cv2
from scipy.io import loadmat
from scipy.ndimage import gaussian_filter

# ✅ PyTorch and Neural Network
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torchvision
import torchvision.transforms as T


In [None]:
path_img_ex = '../input/shanghaitech/ShanghaiTech/part_B/train_data/images/IMG_6.jpg'
image_ex = cv2.cvtColor(cv2.imread(path_img_ex),cv2.COLOR_BGR2RGB)
figure = plt.figure(figsize=(5,5))
plt.imshow(image_ex)
plt.show()

In [None]:
path_gt_ex = "../input/shanghaitech/ShanghaiTech/part_B/train_data/ground-truth/GT_IMG_6.mat"
gt_ex = loadmat(path_gt_ex)
print('type: ', type(gt_ex))
print(gt_ex.items())

In [None]:
print(gt_ex.keys())

In [None]:
gt_coor_ex = gt_ex.get('image_info')[0][0][0][0][0]
print('Shape of coordinates: ', gt_coor_ex.shape)
#print(gt_coor_ex)


In [None]:
figure = plt.figure(figsize=(5,5))

for x_cor, y_cor in gt_coor_ex:
    cv2.drawMarker(image_ex, (int(x_cor), int(y_cor)),(255, 0, 0),thickness=3)

plt.imshow(image_ex)
plt.title("Image and Coordinate")

In [None]:
import numpy as np
from scipy.ndimage import gaussian_filter

def gen_density_map_gaussian(image, coords, sigma=5, truncate=3):
    """
    Generate a density map from point coordinates using Gaussian kernel.

    Args:
        image (np.array): Reference image (only for getting height and width).
        coords (array-like): Array/List of (x, y) coordinates.
        sigma (float): Gaussian standard deviation.
        truncate (float): Truncation factor for kernel size (default: 3).

    Returns:
        np.array: Density map of shape (H, W).
    """
    h, w = image.shape[:2]
    density_map = np.zeros((h, w), dtype=np.float32)

    if len(coords) == 0:
        return density_map

    for x, y in coords:
        x = int(x)
        y = int(y)

        if 0 <= x < w and 0 <= y < h:
            density_map[y, x] = 1

    # Apply Gaussian filter for smoothing
    density_map = gaussian_filter(density_map, sigma=sigma, mode='constant', truncate=truncate)

    # Optional normalization: Uncomment if you want total density sum ≈ number of people
    if density_map.sum() > 0:
        density_map = density_map * (len(coords) / density_map.sum())

    return density_map


In [None]:
density_map_ex = gen_density_map_gaussian(image_ex, gt_coor_ex, 5)

figure = plt.figure(figsize=(10,5))
plt.subplot(1,2,1)
image_ex = torch.tensor(image_ex/255, dtype=torch.float)
plt.xlabel(image_ex.shape)
plt.title('GT: '+str(gt_coor_ex.shape[0]))
plt.imshow(image_ex)

plt.subplot(1,2,2)
plt.xlabel(density_map_ex.shape)
plt.title('DM: '+str(np.sum(density_map_ex)))
plt.imshow(density_map_ex, cmap="jet")

print('max1 : ', image_ex.max())
print('max2 : ', density_map_ex.max())
print('min1 : ', image_ex.min())
print('min2 : ', density_map_ex.min())

In [None]:
import torch
import torchvision.transforms as T
import numpy as np
import cv2
import os
import random
from torch.utils.data import Dataset
from scipy.io import loadmat
from scipy.ndimage import gaussian_filter

class EnhancedShanghaiTechDataset(Dataset):
    def __init__(self, root_dir, gt_downsample=4, shuffle=False, sigma_base=5, augment=False, target_size=(512, 512), normalize_dm=True):
        self.root_dir = root_dir
        self.gt_downsample = gt_downsample
        self.shuffle = shuffle
        self.sigma_base = sigma_base
        self.augment = augment
        self.target_size = target_size  # (Height, Width)
        self.normalize_dm = normalize_dm

        # ✅ Load image filenames
        self.img_names = [f for f in os.listdir(os.path.join(root_dir, 'images')) if f.endswith('.jpg')]
        if self.shuffle:
            random.shuffle(self.img_names)

        self.n_people = {}
        self.DMs = {}

        # ✅ Precompute density maps and GT counts
        for img_name in self.img_names:
            img_path = os.path.join(root_dir, 'images', img_name)
            GT_path = os.path.join(root_dir, 'ground-truth', f"GT_{os.path.splitext(img_name)[0]}.mat")

            GT = loadmat(GT_path)['image_info'][0, 0][0, 0][0]
            img = cv2.cvtColor(cv2.imread(img_path), cv2.COLOR_BGR2RGB)
            resized_img = cv2.resize(img, self.target_size)

            dm = self.generate_density_map(resized_img, GT, original_shape=img.shape[:2])
            self.DMs[img_path] = dm
            self.n_people[img_path] = len(GT)

        # ✅ Data augmentation
        self.augmentation = T.Compose([
            T.RandomHorizontalFlip(),
            T.RandomRotation(degrees=10),
            T.RandomResizedCrop(size=self.target_size, scale=(0.8, 1.0)),
            T.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1)
        ])

    def generate_density_map(self, img, points, original_shape):
        h, w = img.shape[:2]
        density_map = np.zeros((h, w), dtype=np.float32)
        h_scale = w / original_shape[1]
        v_scale = h / original_shape[0]

        for point in points:
            x = int(point[0] * h_scale)
            y = int(point[1] * v_scale)

            if x >= w or y >= h or x < 0 or y < 0:
                continue

            sigma = self.sigma_base * (0.5 if len(points) > 100 else 1)
            density_map[y, x] = 1

        density_map = gaussian_filter(density_map, sigma=sigma, truncate=3)

        if self.normalize_dm and density_map.sum() > 0 and len(points) > 0:
            density_map *= (len(points) / density_map.sum())

        return density_map

    def __len__(self):
        return len(self.img_names)

    def __getitem__(self, idx):
        img_name = self.img_names[idx]
        img_path = os.path.join(self.root_dir, 'images', img_name)
        img = cv2.cvtColor(cv2.imread(img_path), cv2.COLOR_BGR2RGB)
        img = cv2.resize(img, self.target_size)

        gt_density_map = self.DMs[img_path]
        gt_n_people = self.n_people[img_path]

        # ✅ Augmentation (PIL-based for torchvision transforms)
        if self.augment:
            img = T.ToPILImage()(img)
            img = self.augmentation(img)
            img = np.array(img)

        # ✅ Downsample
        h, w = img.shape[:2]
        ds_h, ds_w = h // self.gt_downsample, w // self.gt_downsample

        img = cv2.resize(img, (ds_w * self.gt_downsample, ds_h * self.gt_downsample))
        gt_density_map = cv2.resize(gt_density_map, (ds_w, ds_h))
        gt_density_map = gt_density_map[np.newaxis, :, :] * (self.gt_downsample ** 2)

        # ✅ Convert to torch tensors
        img_tensor = torch.from_numpy(img.transpose(2, 0, 1)).float() / 255.0
        dm_tensor = torch.from_numpy(gt_density_map).float()

        return img_tensor, dm_tensor, gt_n_people


In [None]:
root_dir = "../input/shanghaitech/ShanghaiTech/part_B/test_data/"
dataset = EnhancedShanghaiTechDataset(root_dir, gt_downsample=4, shuffle=True, sigma_base=5)


In [None]:
for i, (img, gt_dmap, n_people) in enumerate(dataset):
  plt.figure(figsize=(10, 5))
  plt.subplot(1,2,1)
  plt.xlabel(img.shape)
  plt.title('GT: ' + str(n_people))
  plt.imshow(img.permute(1, 2, 0))

  plt.subplot(1,2,2)
  plt.xlabel(gt_dmap.shape)
  plt.title('DM: ' + str(np.sum(gt_dmap.numpy())))
  plt.imshow(gt_dmap.permute(1, 2, 0), cmap="jet")
  plt.show()

  if i > 0:
    #print('type of img: ', type(img))
    #print('type of dmap: ', type(gt_dmap))
    #print('shape of img: ', img.shape)
    break


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class FourColumnMDNN(nn.Module):
    def __init__(self):
        super(FourColumnMDNN, self).__init__()

        # ✅ Very Large Receptive Field Column
        self.column1 = nn.Sequential(
            nn.Conv2d(3, 16, kernel_size=11, padding=5),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(2),

            nn.Conv2d(16, 32, kernel_size=9, padding=4),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(2),

            nn.Conv2d(32, 64, kernel_size=7, padding=3),
            nn.BatchNorm2d(64),
            nn.ReLU(),
        )

        # ✅ Large Receptive Field Column
        self.column2 = nn.Sequential(
            nn.Conv2d(3, 16, kernel_size=9, padding=4),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(2),

            nn.Conv2d(16, 32, kernel_size=7, padding=3),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(2),

            nn.Conv2d(32, 64, kernel_size=5, padding=2),
            nn.BatchNorm2d(64),
            nn.ReLU(),
        )

        # ✅ Medium Receptive Field Column
        self.column3 = nn.Sequential(
            nn.Conv2d(3, 20, kernel_size=7, padding=3),
            nn.BatchNorm2d(20),
            nn.ReLU(),
            nn.MaxPool2d(2),

            nn.Conv2d(20, 40, kernel_size=5, padding=2),
            nn.BatchNorm2d(40),
            nn.ReLU(),
            nn.MaxPool2d(2),

            nn.Conv2d(40, 80, kernel_size=3, padding=1),
            nn.BatchNorm2d(80),
            nn.ReLU(),
        )

        # ✅ Small Receptive Field Column (fine details for dense crowds)
        self.column4 = nn.Sequential(
            nn.Conv2d(3, 24, kernel_size=5, padding=2),
            nn.BatchNorm2d(24),
            nn.ReLU(),
            nn.MaxPool2d(2),

            nn.Conv2d(24, 48, kernel_size=3, padding=1),
            nn.BatchNorm2d(48),
            nn.ReLU(),
            nn.MaxPool2d(2),

            nn.Conv2d(48, 96, kernel_size=3, padding=1),
            nn.BatchNorm2d(96),
            nn.ReLU(),
        )

        # ✅ Fusion Layer
        self.fusion = nn.Sequential(
            nn.Conv2d(64 + 64 + 80 + 96, 64, kernel_size=1),
            nn.ReLU(),
            nn.Conv2d(64, 1, kernel_size=1),
            nn.ReLU()
        )

    def forward(self, x):
        x1 = self.column1(x)
        x2 = self.column2(x)
        x3 = self.column3(x)
        x4 = self.column4(x)

        x_cat = torch.cat((x1, x2, x3, x4), dim=1)
        out = self.fusion(x_cat)
        return out


In [None]:
img=torch.rand((1,3,768,1024),dtype=torch.float)
mcnn=FourColumnMDNN()
out_dmap=mcnn(img)
print(out_dmap.shape)

In [None]:
import torch
import numpy as np
from torch.utils.data import DataLoader, Subset

batch_size = 8
device = 'cuda:0' if torch.cuda.is_available() else 'cpu'

train_root_dir = "../input/shanghaitech/ShanghaiTech/part_B/train_data/"
init_training_set = EnhancedShanghaiTechDataset(train_root_dir, gt_downsample=4, shuffle=True)

# ✅ Split part of the training set as validation set
train_size = int(0.9 * len(init_training_set))
val_size = len(init_training_set) - train_size

# Optional: Shuffle indices before split (recommended)
indices = list(range(len(init_training_set)))
np.random.shuffle(indices)

train_indices = indices[:train_size]
val_indices = indices[train_size:]

train_dataset = Subset(init_training_set, train_indices)
val_dataset = Subset(init_training_set, val_indices)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

# ✅ Test set
test_root_dir = "../input/shanghaitech/ShanghaiTech/part_B/test_data/"
test_set = EnhancedShanghaiTechDataset(test_root_dir, gt_downsample=4, shuffle=False)
test_loader = DataLoader(test_set, batch_size=batch_size, shuffle=False)

print("Number of batches in train_loader:", len(train_loader))
print("Number of batches in val_loader:", len(val_loader))
print("Number of batches in test_loader:", len(test_loader))


In [None]:
import matplotlib.pyplot as plt
import numpy as np
import torch

def plot_corresponding_pairs(batch1, batch2, plot_map='jet'):
    num_images = batch1.shape[0]
    num_cols = 4  # Number of images per row
    num_rows = int(np.ceil(num_images / num_cols)) * 2  # 2 rows per image (RGB + Density Map)

    fig, axes = plt.subplots(num_rows, num_cols, figsize=(15, num_rows * 2))
    axes = np.array(axes).reshape(num_rows, num_cols)  # Handle both 1D and 2D axes cases

    for i in range(num_images):
        row_rgb = (i // num_cols) * 2
        col = i % num_cols
        row_dm = row_rgb + 1

        # ✅ Plot RGB image
        img_np = batch1[i].cpu().numpy().transpose(1, 2, 0)
        img_min, img_max = img_np.min(), img_np.max()
        if img_max - img_min > 1e-5:
            img_np = (img_np - img_min) / (img_max - img_min)
        axes[row_rgb, col].imshow(img_np)
        axes[row_rgb, col].axis('off')
        axes[row_rgb, col].set_title(f"Image {i}")

        # ✅ Plot Density Map
        dm_np = batch2[i].cpu().squeeze().numpy()
        axes[row_dm, col].imshow(dm_np, cmap=plot_map)
        axes[row_dm, col].axis('off')
        pred_count = np.sum(dm_np)
        axes[row_dm, col].set_title(f"Pred Count: {pred_count:.2f}")

    # ✅ Hide unused subplots
    total_slots = num_rows * num_cols
    for j in range(num_images * 2, total_slots):
        row, col = divmod(j, num_cols)
        axes[row, col].axis('off')

    plt.tight_layout()
    plt.show()


In [None]:
dataiter = iter(train_loader)
ex_images, ex_dmaps, ex_n_people = next(dataiter)

# ✅ Visualize input images and their ground truth density maps
plot_corresponding_pairs(ex_images, ex_dmaps, plot_map='jet')

# ✅ Print Ground Truth total people count for each image
print("Ground Truth Counts per Image:")
print(' '.join(f'{ex_n_people[j].item():5.1f}' for j in range(ex_images.size(0))))


In [None]:
import torch
import torch.nn as nn

class CombinedLoss(nn.Module):
    def __init__(self, weight_dmap=0.9, weight_sum_gt=0.1):
        super(CombinedLoss, self).__init__()
        self.weight_dmap = weight_dmap          # Weight for pixel-wise density map loss
        self.weight_sum_gt = weight_sum_gt      # Weight for total count loss

        self.img_loss = nn.MSELoss()            # Density map (image-level) loss
        self.gt_loss_mae = nn.L1Loss()          # Count-level MAE loss

    def forward(self, logits, batch_dmap, batch_gts):
        batch_gts = batch_gts.float()

        # ✅ Density Map Loss (MSE over pixels)
        img_loss = self.img_loss(logits, batch_dmap)

        # ✅ Count Loss (MAE between predicted total and GT total)
        pred_counts = torch.squeeze(logits.sum(dim=(2,3)))
        gt_loss_mae = self.gt_loss_mae(pred_counts, batch_gts)

        # ✅ Final Combined Loss
        combined_loss = self.weight_dmap * img_loss + self.weight_sum_gt * gt_loss_mae

        return combined_loss, gt_loss_mae


In [None]:
# import torch
# import torch.optim as optim
# import numpy as np

# num_epochs = 200
# train_losses = []
# val_losses = []
# train_mae_losses = []
# val_mae_losses = []

# model = FourColumnMDNN().to(device)
# criterion = CombinedLoss(0.9, 0.1)
# optimizer = optim.Adam(model.parameters(), lr=1e-4)

# best_val_loss = np.inf
# best_nr_epoch = 0

# for epoch in range(num_epochs):
#     print(f"\nEpoch {epoch+1}/{num_epochs}:")
#     model.train()
#     tr_loss_acc = 0.0
#     tr_mae_acc = 0.0

#     for batch_img, batch_dmap, batch_gts in train_loader:
#         batch_img, batch_dmap, batch_gts = batch_img.to(device), batch_dmap.to(device), batch_gts.to(device)

#         optimizer.zero_grad()
#         logits = model(batch_img)
#         loss, mae_loss = criterion(logits, batch_dmap, batch_gts)
#         loss.backward()
#         optimizer.step()

#         tr_loss_acc += loss.item() * batch_img.size(0)
#         tr_mae_acc += mae_loss.item() * batch_img.size(0)

#     tr_loss = tr_loss_acc / len(train_loader.dataset)
#     tr_mae = tr_mae_acc / len(train_loader.dataset)

#     print(f">> TRAIN: Loss: {tr_loss:.6f}, MAE: {tr_mae:.6f}")

#     # Validation phase
#     model.eval()
#     val_loss_acc = 0.0
#     val_mae_acc = 0.0

#     with torch.inference_mode():
#         for batch_img_val, batch_dmap_val, batch_gts_val in val_loader:
#             batch_img_val, batch_dmap_val, batch_gts_val = batch_img_val.to(device), batch_dmap_val.to(device), batch_gts_val.to(device)
#             logits = model(batch_img_val)
#             loss, mae_loss = criterion(logits, batch_dmap_val, batch_gts_val)

#             val_loss_acc += loss.item() * batch_img_val.size(0)
#             val_mae_acc += mae_loss.item() * batch_img_val.size(0)

#     val_loss = val_loss_acc / len(val_loader.dataset)
#     val_mae = val_mae_acc / len(val_loader.dataset)

#     print(f">> VAL:   Loss: {val_loss:.6f}, MAE: {val_mae:.6f}")

#     # Save best model
#     if val_loss < best_val_loss:
#         best_val_loss = val_loss
#         best_nr_epoch = epoch
#         torch.save(model.state_dict(), './crowd_counting.pth')
#         print(f"✅ Best model saved at epoch {epoch+1} with val_loss {val_loss:.6f}")

#     # Track history
#     train_losses.append(tr_loss)
#     train_mae_losses.append(tr_mae)
#     val_losses.append(val_loss)
#     val_mae_losses.append(val_mae)

# print("\n✅ Training Complete!")
# print(f"Best epoch: {best_nr_epoch+1}")
# print(f"Best Train MAE: {train_mae_losses[best_nr_epoch]:.6f}")
# print(f"Best Val MAE:   {val_mae_losses[best_nr_epoch]:.6f}")


In [None]:
# import matplotlib.pyplot as plt

# plt.figure(figsize=(12, 5))

# # ✅ Plot Weighted Loss (Combined Loss)
# plt.subplot(1, 2, 1)
# plt.plot(train_losses, label='Training Weighted Loss')
# plt.plot(val_losses, label='Validation Weighted Loss')
# plt.title('Training vs Validation Loss')
# plt.xlabel('Epoch')
# plt.ylabel('Loss')
# plt.legend()
# plt.grid(True)

# # ✅ Plot MAE (Crowd Count Error)
# plt.subplot(1, 2, 2)
# plt.plot(train_mae_losses, label='Training MAE')
# plt.plot(val_mae_losses, label='Validation MAE')
# plt.title('Training vs Validation MAE')
# plt.xlabel('Epoch')
# plt.ylabel('Mean Absolute Error (MAE)')
# plt.legend()
# plt.grid(True)

# plt.tight_layout()
# plt.show()


In [None]:
# best_model = FourColumnMDNN().to(device)
# best_model.load_state_dict(torch.load('./crowd_counting.pth'))
# best_model.eval()  # ✅ Important: Set model to evaluation mode

# # Get a batch of validation images
# dataiter = iter(val_loader)
# ex_images, _, ex_gts = next(dataiter)

# # Run model inference
# with torch.no_grad():
#     pred_dms = best_model(ex_images.to(device))

# # Visualize input images and predicted density maps
# plot_corresponding_pairs(ex_images.cpu(), pred_dms.cpu(), plot_map='twilight')

# # Print ground truth people counts for each image in the batch
# print('Ground Truth Counts:', ' '.join(f'{ex_gts[j].item():5.1f}' for j in range(ex_images.shape[0])))


In [None]:
import torch
import torch.nn as nn
import matplotlib.pyplot as plt

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# ✅ Load the model
best_model = FourColumnMDNN().to(device)
best_model.load_state_dict(torch.load('/kaggle/input/crowd_counting/tensorflow2/default/1/crowd_counting_MDNN.pth', map_location=device))
best_model.eval()

criterion = nn.L1Loss()
test_loss_acc = 0.0

# ✅ Testing loop on full test set
with torch.inference_mode():
    for batch_idx, (batch_img, batch_dmap, batch_gts) in enumerate(test_loader):
        batch_img, batch_dmap, batch_gts = batch_img.to(device), batch_dmap.to(device), batch_gts.to(device)

        logits = best_model(batch_img)
        loss = criterion(torch.squeeze(logits.sum(dim=(2,3))), batch_gts)

        test_loss_acc += loss.item()

        # ✅ Optional: Visualize current batch
        pred_dms = logits.cpu()
        gt_counts = batch_gts.cpu()

        # Loop through each image in batch
        for i in range(batch_img.size(0)):
            fig, axs = plt.subplots(1, 2, figsize=(10, 4))

            # Original image
            img_np = batch_img[i].cpu().numpy().transpose(1, 2, 0)
            img_np = (img_np - img_np.min()) / (img_np.max() - img_np.min())  # Normalize for display
            axs[0].imshow(img_np)
            axs[0].set_title(f"Test Image - GT Count: {gt_counts[i].item():.2f}")
            axs[0].axis('off')

            # Predicted Density Map
            density_map = pred_dms[i, 0, :, :].numpy()
            axs[1].imshow(density_map, cmap='jet')
            axs[1].set_title(f"Pred Count: {density_map.sum():.2f}")
            axs[1].axis('off')

            plt.tight_layout()
            plt.show()

print('TEST: test_MAE: {:.3f}'.format(test_loss_acc / len(test_loader.dataset)))
