# Amsterdam Crowd Counter

**Upload a picture of a crowd and let our Crowd Counter count/estimate the amount of people!**

For more information, please get in touch at [crowdcounter@amsterdam.nl](mailto:crowdcounter@amsterdam.nl).

In [1]:
%%capture
cd ..

In [57]:
import os
import io
import math
import datetime
# from tqdm.notebook import tqdm

import numpy as np
import torch

from PIL import Image
import matplotlib.pyplot as plt
from matplotlib import cm

import models.ViCCT_models
import models.Swin_ViCCT_models
from timm.models import create_model

from datasets.dataset_utils import img_equal_split, img_equal_unsplit
import torchvision.transforms as standard_transforms

import gradio as gr

In [3]:
# Explains some parameters.

# First, which model will we use?
# The generic ViCCT version 1 model is specified with 'ViCCT_base'. 
# The version 2 ViCCT model, which has Swin as its base, is specified with 'Swin_ViCCT_large_22k'.
# model_name = 'ViCCT_base'
# model_name = 'Swin_ViCCT_large_22k'

# The model is trained to perform crowd counting. We specify here where the weights of this trained model is located.
# weights_path = 'models/trained_models/ViCCT_base_generic_1300_epochs.pth'
# weights_path = 'models/trained_models/Swin_ViCCT_large_22k_generic_1600_epochs.pth'

# Some images are of extremely large resolution. When the heads in images occupy many (e.g. something like 100 x 100 
# pixels each) pixels, the model is unable to make pretty predictions. One way to overcome this issue is to scale the image
# by some factor. This factory is specified here. A factor of 1. means no scaling is performed.
# scale_factor = 1.

# Lastly, do we use cuda? If you have cuda, it's advised to use it.
# use_cuda = True

In [4]:
# Set some global variables. Only for hardcore users, no need to modify these.

mean_std = ([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])  # Mean and std.dev. of ImageNet
overlap = 32  # We ensure crops have at least this many pixels of overlap.
ignore_buffer = 16  # When reconstructing the whole density map, ignore this many pixels on crop prediction borders.

train_img_transform = standard_transforms.Compose([
    standard_transforms.ToTensor(),
    standard_transforms.Normalize(*mean_std)
])

model = None

In [5]:
def load_model(model_name='Swin_ViCCT_large_22k',
               weights_path='models/trained_models/Swin_ViCCT_large_22k_generic_1600_epochs.pth',
               use_cuda="True"):
    """ Creates the ViCCT model and initialises it with the specified weights. """
    
    model = create_model(  # From the timm library. This function created the model specific architecture.
    model_name,
    init_path=weights_path,
    pretrained_cc=True,
    drop_rate=None if 'Swin' in model_name else 0.,  # Dropout

    # Bamboozled by Facebook. This isn't drop_path_rate, but rather 'drop_connect'.
    # I'm not yet sure what it is for the Swin version
    drop_path_rate=None if 'Swin' in model_name else 0.,
    drop_block_rate=None,  # Drops our entire Transformer blocks I think? Not used for ViCCT.
    )

    if use_cuda:
        model = model.cuda()  # Place model on GPU
    
    model = model.eval()
    
    return model

In [6]:
def fig2img(fig):
    """Convert a Matplotlib figure to a PIL Image and return it"""
    buf = io.BytesIO()
    fig.savefig(buf, format='png', bbox_inches='tight')
    buf.seek(0)
    img = Image.open(buf)
    return img

In [7]:
def create_density_map_image(den, pred_cnt):
    """Create a density map image using the density map."""
    
    fig = plt.figure(figsize=(1440/100, 810/100), dpi=100)
    plt.title(f'Predicted count: {pred_cnt:.1f}')
    plt.imshow(den, cmap=cm.jet)
    ax = plt.gca()
    ax.axes.xaxis.set_visible(False)
    ax.axes.yaxis.set_visible(False)
    den_im = fig2img(fig)
    
    # Clean up memory.
    fig.clear()
    plt.close(fig)
    
    return den_im

In [8]:
def create_overlay_image(input_image, den, pred_cnt):
    """Use an image and its generated density map + prediction count to create & return an overlayed image."""
    
    img_heat = np.array(input_image)
    den_heat = den.clone().numpy()

    den_heat = den_heat / 3000  # Scale values to original domain
    den_heat[den_heat < 0] = 0  # Remove negative values
    den_heat = den_heat / den_heat.max() # Normalise between 0 and 1

    den_heat **= 0.5  # Reduce large values, increase small values
    den_heat *= 255  # Values from 0 to 255 now
    den_heat[den_heat < 50] = 0  # Threshold of 50

    img_heat[:, :, 0][den_heat > 0] = img_heat[:, :, 0][den_heat > 0] / 2
    img_heat[:, :, 1][den_heat > 0] = img_heat[:, :, 1][den_heat > 0] / 2
    img_heat[:, :, 2][den_heat > 0] = den_heat[den_heat > 0]

    fig = plt.figure(figsize=(1440/100, 810/100), dpi=100)
    plt.title(f'Predicted count: {pred_cnt:.1f}')
    plt.imshow(img_heat, cmap=cm.jet)
        
    ax = plt.gca()
    ax.axes.xaxis.set_visible(False)
    ax.axes.yaxis.set_visible(False)
    overlay_im = fig2img(fig)
    
    # Clean up memory.
    fig.clear()
    plt.close(fig)
    
    return overlay_im

In [9]:
def process_image(img_stack, pred_stack, img_h, img_w, use_cuda=True):
    """Process a prepared image (using its prepared elements) with the ViCCT model."""
    
    if not use_cuda and img_stack.shape[0] > 100:  # If on CPU and more than 100 image crops.
        print('\033[93m'
              'WARNING: you are making a prediction on a very large image. This might take a long time! '
              'You may want to use a lower "Scale Factor" value for faster processing. '
              'You can stop a running process by pressing F5.'
              '\033[0m')

    with torch.no_grad():  # Dont make gradients
        print(f"Processing {len(img_stack)} image parts.")
        for idx, img_crop in enumerate(img_stack):  # For each image crop
            pred_stack[idx] = model.forward(img_crop.unsqueeze(0)).cpu()  # Make prediction.
    print('Done!')

    # Unsplit the perdiction crops to get the entire density map of the image.
    den = img_equal_unsplit(pred_stack, overlap, ignore_buffer, img_h, img_w, 1)
    den = den.squeeze()  # Remove the channel dimension

    # Compute the perdicted count, which is the sum of the entire density map. Note that the model is trained with density maps
    # scaled by a factor of 3000 (See sec 5.2 of my thesis for why: https://scripties.uba.uva.nl/search?id=723178). In short,
    # This works :)
    pred_cnt = den.sum() / 3000
    
    return den, pred_cnt

In [10]:
def prepare_loaded_image(img, use_cuda=True):
    """Prepare an image for processing with the ViCCT model."""
    
    # Get image dimensions
    img_w, img_h = img.size
    
    # Before we make the prediction, we normalise the image and split it up into crops
    img = train_img_transform(img)
    img_stack = img_equal_split(img, 224, overlap)  # Split the image ensuring a minimum of 'overlap' of overlap between crops.

    if use_cuda:
        img_stack = img_stack.cuda()  # Place image stack on GPU        

    # This is the placeholder where we store the model predictions.
    pred_stack = torch.zeros(img_stack.shape[0], 1, 224, 224)
    
    return img_stack, pred_stack, img_h, img_w

In [11]:
def rescale_image(img, scale_factor):
    """Rescale and return an image based on the given scale factor."""
    
    # Get image dimensions
    img_w, img_h = img.size
    
    # Rescale image
    if scale_factor != 1.:
        new_w, new_h = round(img_w * scale_factor), round(img_h * scale_factor)
        img = img.resize((new_w, new_h))
    
    return img

In [12]:
def compute_scale_factor(image, ideal_min_res=2000):
    """Computes the scale factor for images."""
    
    x_res, y_res = image.size

    min_res = min(x_res, y_res)
    if min_res < ideal_min_res:
        factor = 1
    if min_res > ideal_min_res:
        factor = ideal_min_res / min_res
        if factor > 1:
            factor = 1
    
    return factor

In [13]:
def normalize_image_orientation(img):
    """Modifies image to its normalized orientation/rotation using exif information. Returns normalized image."""
        
    # Get image orientation from exit (return unchanged image if exif or rotation data is not available).
    try:
        exif = img.getexif()
        orientation = dict(exif.items())[274]  # 274 is the exif key for image orientation.
    except (KeyError) as e:
        return img
    
    # Rotate image to normal orientation.
    if orientation == 2:
        img = img.transpose(Image.FLIP_LEFT_RIGHT)
    elif orientation == 3:
        img = img.rotate(180)
    elif orientation == 4:
        img = img.rotate(180).transpose(Image.FLIP_LEFT_RIGHT)
    elif orientation == 5:
        img = img.rotate(-90, expand=True).transpose(Image.FLIP_LEFT_RIGHT)
    elif orientation == 6:
        img = img.rotate(-90, expand=True)
    elif orientation == 7:
        img = img.rotate(90, expand=True).transpose(Image.FLIP_LEFT_RIGHT)
    elif orientation == 8:
        img = img.rotate(90, expand=True)
        
    return img

In [32]:
def count_people(image_input):
    """Count the amount of people in an image. Return the resulting density map image, overlay image, and count."""
    
    t0 = datetime.datetime.now()
    
    # Normalize image orientation.
    image_input = normalize_image_orientation(image_input)
        
    # Rescale image.
    scale_factor = compute_scale_factor(image_input)
    image = rescale_image(image_input, scale_factor)
    
    # Give the user an error for images with a too low resolution. (alternative: upscale)
    w = image.width
    h = image.height
    if w < 224 or h < 224:
        raise gr.Error("Image is too small, please provide a bigger image (244x244 or larger) and try again.")
        return None, None, 0
    
    # Prepare and process image (create prediction).
    img_stack, pred_stack, img_h, img_w = prepare_loaded_image(image)
    den, pred_cnt = process_image(img_stack, pred_stack, img_h, img_w)
    
    # Create density map image.
    den_im = create_density_map_image(den, pred_cnt)
    
    # Create overlay image.
    overlay_im = create_overlay_image(image, den, pred_cnt)
    
    # Log succesful counting.
    t1 = datetime.datetime.now()
    processing_time = (t1 -t0).total_seconds()
    with open("log.txt", "a") as myfile:
        myfile.write(f"{t1}; succesfully processed an image of size {w}*{h} (w*h) -after possible downscaling- in {processing_time} seconds.\n")

    return den_im, overlay_im, round(float(pred_cnt),1)

In [33]:
# Load the model.
model = load_model()

In [59]:
# Launch the demo website.

def launch_demo():
    demo = gr.Blocks(title="Crowd Counter")

    with demo:

        # Introduction.
        gr.Markdown("# Amsterdam Crowd Counter")
        gr.Markdown("Upload an image & count people.")

        # Interactive elements.
        image_input = gr.Image(type='pil')
        count_button = gr.Button("Count People")
        count_result = gr.Number(label="People Count", elem_id='count', visible=False)
        with gr.Row():
            image_output_overlay = gr.Image(elem_id='output_image')
            image_output = gr.Image(elem_id='output_image')

        # Interactions.
        count_button.click(fn=count_people, inputs=image_input, outputs=[image_output, image_output_overlay, count_result])

        # Explanation about this website/service.
        gr.Markdown("""Counting results are generated using an AI model called [ViCCT](https://github.com/jongstra/ViCCT).
                       This model is trained using multiple annotated datasets with large amounts of crowds.
                       The resulting model is only usable for counting people and estimating crowd densities,
                       not for identifying individuals.""")
        gr.Markdown("""This service is in testing phase and is provided "as-is",
                       without warranty of any kind, nor any guarantees about correctness of results.
                       This service should never be used as a sole means of crowd size estimation,
                       but is intended to be used for human-assisted solutions.""")
        gr.Markdown("For questions/feedback, contact us at [crowdcounter@amsterdam.nl](mailto:crowdcounter@amsterdam.nl).")

    demo.launch(share=False)
#     demo.launch(server_port=8800, share=False)

In [60]:
if __name__ == "__main__":
    gr.close_all()  # Try to close any running Gradio processes, to free up ports.
    global model
    model = load_model()
    launch_demo()

Running on local URL:  http://127.0.0.1:7865

To create a public link, set `share=True` in `launch()`.
