# LWCC

https://github.com/tersekmatija/lwcc


## Configuration

In [None]:
from psutil import virtual_memory
ram_gb = virtual_memory().total / 1e9
print('Your runtime has {:.1f} gigabytes of available RAM\n'.format(ram_gb))

if ram_gb < 20:
  print('Not using a high-RAM runtime')
else:
  print('You are using a high-RAM runtime!')

In [None]:
# Installation
!pip install lwcc

In [None]:
# Python Imports
import cv2
import glob
import h5py
import json
import numpy as np
import math
import matplotlib.pyplot as plt
import os
import pandas as pd
import scipy.io as io
import sklearn.metrics as skmetrics
import skimage.metrics as skimetrics
import time

from pathlib import Path
from PIL import Image
from matplotlib import cm as c
from tqdm import tqdm
from scipy.ndimage import gaussian_filter

from lwcc import LWCC
# from google.colab import drive
# drive.mount('/content/drive')

In [None]:
# !pip install -Uqq ipdb
# import ipdb
# %pdb on

In [None]:
# Constants
DEVICE = 'cpu'

DRIVE_PATH = "/content/drive/MyDrive"
COLLAB_PATH = f"{DRIVE_PATH}/Colab Notebooks CC"
DATASETS_PATH = f"{COLLAB_PATH}/Datasets"
MODELS_PATH = f"{COLLAB_PATH}/Models"

In [None]:
class Metrics(object):
  def __init__(self, eval_name, gt_count, pred_count, img_paths):
    self.eval_name = eval_name
    self.imgs = [os.path.basename(path) for path in img_paths]
    self.gt_count = np.array(gt_count); self.pred_count = np.array(pred_count)
    self.calculate_count_metrics()
    self.PSNR, self.SSIM, self.GAME4, self.GAME16 = [], [], [], []
    self.AVG_PSNR, self.AVG_SSIM, self.AVG_GAME4, self.AVG_GAME16 = None, None, None, None
    self.organize_by_img()

  def __init__(self, eval_name, gt_count, gt_density, pred_count, pred_density, img_paths):
    self.eval_name = eval_name
    self.imgs = [os.path.basename(path) for path in img_paths]
    self.gt_count, self.pred_count = np.array(gt_count), np.array(pred_count)
    self.calculate_count_metrics()
    self.calculate_density_metrics(gt_density, pred_density)
    self.organize_by_img()

  def calculate_count_metrics(self):
    self.MAE, self.MSE, self.RMSE = [], [], []
    for gt_count, pred_count in zip(self.gt_count, self.pred_count):
      self.MAE.append(skmetrics.mean_absolute_error([gt_count], [pred_count]))
      self.MSE.append(skmetrics.mean_squared_error([gt_count], [pred_count]))
      self.RMSE.append(skmetrics.mean_squared_error([gt_count], [pred_count], squared=False))
    self.AVG_MAE = np.mean(self.MAE)
    self.AVG_MSE = np.mean(self.MSE)
    self.AVG_RMSE = math.sqrt(np.mean(self.MSE))

  def calculate_density_metrics(self, gt_density, pred_density):
    self.PSNR, self.SSIM, self.GAME4, self.GAME16 = [], [], [], []
    for gt_den, pred_den, pred_count in zip(gt_density, pred_density, self.pred_count):
      pred_den = cv2.resize(pred_den, dsize=(gt_den.shape[1], gt_den.shape[0]), interpolation=cv2.INTER_CUBIC)
      if np.sum(pred_den):
        pred_den = pred_count * pred_den / np.sum(pred_den)
      self.PSNR.append(skimetrics.peak_signal_noise_ratio(gt_den, pred_den))
      self.SSIM.append(skimetrics.structural_similarity(gt_den, pred_den))
      self.GAME4.append(self.calculate_game(gt_den, pred_den, blocks=4))
      self.GAME16.append(self.calculate_game(gt_den, pred_den, blocks=16))
    self.AVG_PSNR = np.mean(self.PSNR)
    self.AVG_SSIM = np.mean(self.SSIM)
    self.AVG_GAME4 = np.mean(self.GAME4)
    self.AVG_GAME16 = np.mean(self.GAME16)

  def organize_by_img(self):
    metrics = [m for m in [self.MAE, self.MSE, self.RMSE, self.PSNR, self.SSIM] if m]
    self.metrics_by_img = {
        img_name: {
            "MAE": self.MAE[i],
            "MSE": self.MSE[i],
            "RMSE": self.RMSE[i],
            "PSNR": self.PSNR[i] if self.PSNR else None,
            "SSIM": self.SSIM[i] if self.SSIM else None,
            "GAME4": self.GAME4[i] if self.GAME4 else None,
            "GAME16": self.GAME16[i] if self.GAME16 else None
        } for i, img_name in enumerate(self.imgs)
    }

  def display_metrics(self, img_name = None):
    if img_name:
      metrics = self.metrics_by_img[img_name]
      mae, mse, rmse, game4, game16, psnr, ssim = metrics["MAE"], metrics["MSE"], metrics["RMSE"], metrics["GAME4"], metrics["GAME16"], metrics["PSNR"], metrics["SSIM"]
    else:
      mae, mse, rmse, game4, game16, psnr, ssim = self.AVG_MAE, self.AVG_MSE, self.AVG_RMSE, self.AVG_GAME4, self.AVG_GAME16, self.AVG_PSNR, self.AVG_SSIM
    print(f'{self.eval_name}, %.4f, %.4f, %.4f, %.4f, %.4f, %.4f, %.4f' % (mae, mse, rmse, game4, game16, psnr, ssim))

  def calculate_game(self, gt_den, pred_den, blocks=4):
    result = 0  # Aggregation of errors per block (mean is calculated afterwards)
    gt_den_s = self.split_matrix(gt_den, blocks=blocks)
    pred_den_s = self.split_matrix(pred_den, blocks=blocks)
    for i, (gden, pden) in enumerate(zip(gt_den_s, pred_den_s), start=1):
      block_result = abs(np.sum(pden) - np.sum(gden))
      result += block_result
    return result

  def split_matrix(self, matrix, blocks=4):
    if blocks == 4:
      T, B = np.array_split(matrix, 2)
      TL, TR = np.array_split(T, 2, axis=1)
      BL, BR = np.array_split(B, 2, axis=1)
      return [TL, TR, BL, BR]
    elif blocks == 16:
      T, MT, MB, B = np.array_split(matrix, 4)
      A11, A12, A13, A14 = np.array_split(T, 4, axis=1)
      A21, A22, A23, A24 = np.array_split(MT, 4, axis=1)
      A31, A32, A33, A34 = np.array_split(MB, 4, axis=1)
      A41, A42, A43, A44 = np.array_split(B, 4, axis=1)
      return [A11, A12, A13, A14, A21, A22, A23, A24, A31, A32, A33, A34, A41, A42, A43, A44]

class Dataset(object):
  def __init__(self, name, root_path, img_format = ".jpg"):
    self.name = name
    self.root = f"{DATASETS_PATH}/{root_path}"
    self.img_path = f"{root_path}/images"
    self.gt_path = f"{root_path}/ground_truth"
    self.img_format = img_format
    self.SIGMA = 15

  def get_image_paths(self):
    return sorted(glob.glob(os.path.join(self.img_path, f'*{self.img_format}')))

  def get_gt_for_img(self, img_path, return_density = True):
    raw_image = cv2.imread(img_path)
    gt_density = np.zeros(raw_image.shape[:2], dtype=np.float32).tolist()
    gt_count, gt_density = self.gt_to_binary_matrix(img_path, gt_density)

    if return_density:
      gt_density = np.array(gt_density, dtype=np.float32)
      gt_density = gaussian_filter(gt_density, self.SIGMA, mode='constant')
      gt_density = gt_count * gt_density / np.sum(gt_density)
      return gt_count, gt_density
    return gt_count

  def gt_to_binary_matrix(self, gt_path, gt_density):
    raise NotImplementedError

class ShanghaiTech(Dataset):
  def __init__(self, id = "A"):
    super().__init__(name=f"SH{id}", root_path=f"{DATASETS_PATH}/Shanghaitech/part_{id}_final/test_data")

  def gt_to_binary_matrix(self, img_path, gt_density):
    file_name = Path(img_path).stem
    gt_path = img_path.replace(file_name, f"GT_{file_name}").replace(self.img_format, ".mat").replace('images', 'ground_truth')

    mat = io.loadmat(gt_path)["image_info"]
    gt_count = mat[0,0][0,0][1][0][0]
    gt_points = mat[0,0][0,0][0]
    for x, y in gt_points:
      x = max(0, min(int(x), len(gt_density[0])-1)); y = max(0, min(int(y), len(gt_density)-1))
      gt_density[y][x] = 1
    return gt_count, gt_density

class UCF(Dataset):
  def __init__(self, id = "CC50"):
    if id == "CC50":
      super().__init__(name="UCF-CC50", root_path=f"{DATASETS_PATH}/UCF_CC")
    elif id == "QNRF":
      super().__init__(name="UCF-QNRF", root_path=f"{DATASETS_PATH}/UCF-QNRF_ECCV18/Test")
    else:
      raise ValueError

  def gt_to_binary_matrix(self, img_path, gt_density):
    file_name = Path(img_path).stem
    gt_path = img_path.replace(file_name, f"{file_name}_ann").replace(self.img_format, ".mat").replace('images', 'ground_truth')

    mat = io.loadmat(gt_path)["annPoints"]
    gt_count = len(mat)
    gt_points = mat
    for x, y in gt_points:
      x = max(0, min(int(x), len(gt_density[0])-1)); y = max(0, min(int(y), len(gt_density)-1))
      gt_density[y][x] = 1
    return gt_count, gt_density

class NWPUCounting(Dataset):
  def __init__(self):
    super().__init__(name="NWPUC", root_path=f"{DATASETS_PATH}/UCF-NWPU_Crowd_min_576x768_mod16_2048/test_data")

class MT(Dataset):
  def __init__(self, id="A"):
    super().__init__(name=f"MT", root_path=f"{DATASETS_PATH}/MT")

  def gt_to_binary_matrix(self, img_path, gt_density):
    gt_path = img_path.replace(self.img_format, ".json").replace('images', 'ground_truth')
    with open(gt_path, 'r') as json_data:
      gt_raw = json.loads(json_data.read())
    gt_count = gt_raw["human_num"]
    for p in gt_raw["points"]:
      x = max(0, min(int(p["x"]), len(gt_density[0])-1)); y = max(0, min(int(p["y"]), len(gt_density)-1))
      gt_density[y][x] = 1
    return gt_count, gt_density

# Models pretrained on SHA or QNRF should perform best on dense crowds, SHB for sparse crowds.
#   Using SHAweights on relatively sparse crowds might also give very wrong results.
#   On the other hand, SHB might perform better as the weights were trained on Shanghai B data set, which containts images with relatively sparse crowds.
#   Using high quality images with sparse crowds might also yield bad results, as the algorithms might mistake some textures of clothings for a crowd.
WEIGHTS_FOR_MODELS = {"CSRNet": ["SHA", "SHB"], "Bay": ["SHA", "SHB", "QNRF"], "DM-Count": ["SHA", "SHB", "QNRF"], "SFANet": ["SHB"]}

class ModelEvaluator(object):
  def __init__(self, dataset_name, model_name, model_weights):
    self.dataset_name = dataset_name
    if model_name not in ["CSRNet", "Bay", "DM-Count", "SFANet"]:
      raise ValueError(model_name)
    self.model_name = model_name
    if model_weights not in WEIGHTS_FOR_MODELS[model_name]:
      raise ValueError(model_weights)
    self.model_weights = model_weights
    self.model = LWCC.load_model(model_name=model_name, model_weights=model_weights)
    self.model_str = f"{self.model_name}-{self.model_weights}"

  def evaluate(self, img_paths, gt_count, gt_density, vis = 0, is_gray = False):
    pred_count, pred_density = self._inference(img_paths, is_gray=is_gray)
    metrics = Metrics(self.model_str, gt_count, gt_density, pred_count, pred_density, img_paths)
    pred_vis = {self.model_str: {
        "count": pred_count[:vis], "density": pred_density[:vis],
        "img_metrics": [metrics.metrics_by_img[os.path.basename(p)] for p in img_paths[:vis]]
    }}
    return metrics, pred_vis

  def _inference(self, image_paths, resize_image = True, return_density = True, is_gray = False):
    """ Returns two dictionaries: {img_name: count}, {img_name: density} """
    pred_count = []; pred_density = []
    for img_path in tqdm(image_paths, desc=f"{self.model_name} - {self.model_weights} on {self.dataset_name}", disable=True):
      pred_c, pred_d = LWCC.get_count(
        img_path, resize_img=resize_image, return_density=return_density, model=self.model, is_gray = is_gray
      )
      pred_count.append(pred_c); pred_density.append(pred_d)
      # print(f'{os.path.basename(img_path).replace(".jpg", "")}, {pred_c}')
    if return_density:
      return pred_count, pred_density
    else:
      return pred_count

class Evaluator(object):
  def evaluate(self, dataset, vis=0, eval_limit=0, batch=0, is_gray=False):
    img_paths = dataset.get_image_paths()
    if eval_limit:
      img_paths = img_paths[:eval_limit]
    if batch == 1:
      # Process the first half of the images
      img_paths = img_paths[:int(len(img_paths)/2)]
    elif batch == 2:
      # Process the final half of the images
      img_paths = img_paths[int(len(img_paths)/2):]
    if batch != 0:
      print(f"Batch {batch}/2:", [os.path.basename(i) for i in img_paths])
    # Else: Don't split into batches
    gt_count, gt_density = zip(*[dataset.get_gt_for_img(img_path) for img_path in img_paths])

    metrics = {}
    preds_vis = [{} for _ in range(vis)]
    for model in WEIGHTS_FOR_MODELS:
      metrics[model] = {}
      for weights in WEIGHTS_FOR_MODELS[model]:
        # print()
        # print(model, weights)
        evaluator = ModelEvaluator(dataset.name, model, weights)
        metric, pred_vis = evaluator.evaluate(img_paths, gt_count, gt_density, vis=vis, is_gray=is_gray)
        metrics[model][weights] = metric
        for model_name, preds in pred_vis.items():
          for i in range(vis):
            preds_vis[i][model_name] = {"count": preds["count"][i], "density": preds["density"][i], "img_metrics": preds["img_metrics"][i]}
    self.print_metrics(metrics, dataset)
    self.visualize_results(gt_count, gt_density, preds_vis, img_paths, metrics, dataset.name)
    return metrics

  def visualize_results(self, gt_count, gt_density, predictions, img_paths, metrics, dataset_name):
    print()
    print("Visualization")
    for gt_c, gt_d, preds, img_p in zip(gt_count, gt_density, predictions, img_paths):
      raw_image = cv2.cvtColor(cv2.imread(img_p), cv2.COLOR_BGR2RGB)
      plt.rcParams['axes.facecolor'] = 'white'
      fig = plt.figure(figsize=(18, 10))
      fig.add_subplot(3, 4, 1); plt.imshow(raw_image); plt.axis('off'); plt.title(r"$\bf{" + f"{dataset_name}:"  + r"}$ " + f"{os.path.basename(img_p)}")
      fig.add_subplot(3, 4, 2); plt.imshow(gt_d); plt.axis('off'); plt.title(r"$\bf{" + f"Ground \ Truth: {gt_c}" + r"}$")
      for i, (model, pred) in enumerate(preds.items(), start=3):
        p_c, p_d, p_m = pred["count"], pred["density"], pred["img_metrics"]
        fig.add_subplot(3, 4, i); plt.imshow(p_d); plt.axis('off');
        plt.title((r"$\bf{" + f"{model}: %.1f" % p_c) + r"}$" + "\n" + ("MAE: %.1f, GAME16: %.1f") % (p_m["MAE"], p_m["GAME16"]))
      plt.show()
      plt.close()

  def print_metrics(self, metrics, dataset):
    print()
    print(f"Metrics for {dataset.name}:")
    print("Model, MAE, MSE, RMSE, GAME4, GAME16, PSNR, SSIM")
    for model in metrics:
      for weights, metric in metrics[model].items():
        metric.display_metrics()
  
evaluator = Evaluator()
metrics = {}

## Visualization

In [None]:
evaluator.evaluate(ShanghaiTech("A"), vis=1, eval_limit=1)

## Inference

In [None]:
metrics["MT"] = evaluator.evaluate(MT(), vis=56)

In [None]:
metrics["SHA"] = evaluator.evaluate(ShanghaiTech("A"), vis=60)

In [None]:
metrics["SHB"] = evaluator.evaluate(ShanghaiTech("B"), vis=60, batch=1)

In [None]:
metrics["SHB2"] = evaluator.evaluate(ShanghaiTech("B"), vis=60, batch=2)

In [None]:
metrics["UCFCC50"] = evaluator.evaluate(UCF("CC50"), vis=50, is_gray=True)

In [None]:
metrics["UCF-QNRF"] = evaluator.evaluate(UCF("QNRF"), vis=60, batch=1)

In [None]:
metrics["UCF-QNRF_2"] = evaluator.evaluate(UCF("QNRF"), vis=60, batch=2)

# SASNet

https://github.com/TencentYoutuResearch/CrowdCounting-SASNet

In [None]:
import sys
import torch

from torchvision import transforms

SASNET_PATH = f"{MODELS_PATH}/SASNet"
SASNET_WEIGHTS = f"{SASNET_PATH}/checkpoints"
sys.path.append(os.path.abspath(SASNET_PATH))
from model import SASNet
from lwcc.util.functions import load_image

DEVICE = 'cpu'
DEVICE = 'cuda'

In [None]:
class SASNetModelEvaluator(ModelEvaluator):
  def __init__(self, dataset_name, model_weights):
    self.dataset_name = dataset_name
    self.model_name = "SASNet"
    if model_weights not in ["SHHA", "SHHB"]:
      raise ValueError(model_weights)
    self.model_weights = model_weights
    self.log_para = 1000 # Post-processing: Model magnify the target density map, must be reduced
    self.model = SASNet()
    device = torch.device(DEVICE)
    self.model.to(device)
    self.model = self.model.cuda()
    self.model.load_state_dict(torch.load(f"{SASNET_WEIGHTS}/{model_weights}.pth", map_location=device))
    self.model.eval() # Evaluation mode https://stackoverflow.com/questions/60018578/what-does-model-eval-do-in-pytorch
    self.model_str = f'{self.model_name}-{("SHA" if model_weights == "SHHA" else "SHB")}'

  def _inference(self, image_paths, is_gray=False):
    pred_count = []; pred_density = []
    with torch.no_grad():
      for img_path in tqdm(image_paths, desc=f"{self.model_str} on {self.dataset_name}", disable=False):
        # LWCC Load image. Applies transform https://github.com/tersekmatija/lwcc/blob/3675ce7a197eea992d522ee2320e1503439e5d19/lwcc/util/functions.py#L44
        #   Same as SASNet and other CC loaders
        img, name = load_image(img_path, self.model_name, is_gray=is_gray)
        img = torch.Tensor(img)
        img = img.to(DEVICE)
        result = self.model(img).cpu()
        pred_d = result[0, 0, :, :].numpy()
        pred_d = pred_d/self.log_para # Model magnifies target density map, it must be reduced
        pred_c = np.sum(pred_d)
        pred_count.append(pred_c); pred_density.append(pred_d)
        # print(f'{os.path.basename(img_path).replace(".jpg", "")}, {pred_c}')
      return pred_count, pred_density

class SASNetEvaluator(Evaluator):
  def evaluate(self, dataset, vis=0, eval_limit=0, batch=0, is_gray=False):
    img_paths = dataset.get_image_paths()
    if eval_limit:
      img_paths = img_paths[:eval_limit]
    if batch == 1:
      # Process the first half of the images
      img_paths = img_paths[:int(len(img_paths)/2)]
    elif batch == 2:
      # Process the final half of the images
      img_paths = img_paths[int(len(img_paths)/2):]
    if batch != 0:
      print(f"Batch {batch}/2:", [os.path.basename(i) for i in img_paths])
    # Else: Don't split into batches
    gt_count, gt_density = zip(*[dataset.get_gt_for_img(img_path) for img_path in img_paths])

    metrics = {"SASNet": {}}
    preds_vis = [{} for _ in range(vis)]
    for weights in ["SHHA", "SHHB"]:
      # print()
      # print("SASNet", weights)
      evaluator = SASNetModelEvaluator(dataset.name, weights)
      metric, pred_vis = evaluator.evaluate(img_paths, gt_count, gt_density, vis=vis, is_gray=is_gray)
      metrics["SASNet"][weights] = metric
      for model_name, preds in pred_vis.items():
        for i in range(vis):
          preds_vis[i][model_name] = {"count": preds["count"][i], "density": preds["density"][i], "img_metrics": preds["img_metrics"][i]}
    self.print_metrics(metrics, dataset)
    self.visualize_results(gt_count, gt_density, preds_vis, img_paths, metrics, dataset.name)
    return metrics

sasnet_evaluator = SASNetEvaluator();
metrics = {}

In [None]:
sasnet_evaluator.evaluate(ShanghaiTech("A"), vis=1, eval_limit=1);

In [None]:
metrics["MT"] = sasnet_evaluator.evaluate(MT(), vis=56)

In [None]:
metrics["SAS-SHA"] = sasnet_evaluator.evaluate(ShanghaiTech("A"), vis=60)

In [None]:
metrics["SAS-SHB"] = sasnet_evaluator.evaluate(ShanghaiTech("B"), vis=60, batch=1)

In [None]:
metrics["SAS-SHB2"] = sasnet_evaluator.evaluate(ShanghaiTech("B"), vis=60, batch=2)

In [None]:
metrics["SAS-UCFCC50"] = sasnet_evaluator.evaluate(UCF("CC50"), vis=50, is_gray=True)

In [None]:
metrics["SAS-UCF-QNRF"] = sasnet_evaluator.evaluate(UCF("QNRF"), vis=60, batch=1)

In [None]:
metrics["SAS-UCF-QNRF_2"] = sasnet_evaluator.evaluate(UCF("QNRF"), vis=60, batch=2)

In [None]:
metrics["MT"]["SASNet"]["SHHB"].metrics_by_img