# Crowd Counting by Estimating a Density Map With Convolutional Neural Networks

The main idea is to count objects indirectly by estimating a density map. The first step is to prepare training samples, so that for every image there is a corresponding density map. 

![image.png](https://miro.medium.com/max/875/1*kZ6LcW9VQx8v27kTHBFPZA.png)

The image presented in Fig. 2 (left) is annotated with points in the positions of pedestrians heads (Fig. 2 right). A density map is obtained by applying a convolution with a Gaussian kernel (and normalized so that integrating it gives the number of objects). The density map for the example above is presented in Fig. 3.

![image1.png](https://miro.medium.com/max/875/1*BwFvVyyQW6hxp6K-5dG3Lg.png)

Now, the goal is to train a fully convolutional network to map an image to a density map, which can be later integrated to get the number of objects. So far, we have considered two FCN architectures: U-Net [8] and Fully Convolutional Regression Network (FCRN) [7].

### U-Net

U-Net is a widely used FCN for image segmentation, very often applied to biomedical data. It has autoencoder-like structure (see Fig. 4). An input image is processed by a block of convolutional layers, followed by a pooling layer (downsampling). This procedure is repeated several times on subsequent blocks outputs, which is demonstrated on the left side of Fig. 4. This way the network encodes (and compresses) the key features of an input image. The second part of U-Net is symmetric, but pooling layers are replaced with upsampling, so that an output dimensions match the size of an input image. The information from higher resolution layers in the downsampling part is passed to corresponding layers in the upsampling part, which allows to reuse learned higher level features to decode contracted layers more precisely.

![image2.png](https://miro.medium.com/max/875/1*_6mXlLMKU0Vf1IJjBn7xkQ.png)

### FCRN

Fully Convolutional Regression Network (FCRN) was proposed in [7]. The architecture is very similar to U-Net. The main difference is that the information from higher resolution layers from downsampling part is not passed directly to the corresponding layers in upsampling part. In the paper two networks are proposed: FCRN-A and FCRN-B, which differ in downsampling intensity. While FCRN-A perform pooling every convolutional layer, FCRN-B does that every second layer.

# Implementation

Currently, U-Net and FCRN-A are implemented. They both use three downsampling and three upsampling convolutional blocks with fixed filter size 3×3. By default there are two convolutional layers in each block for U-Net, and one for FCRN-A. For U-Net we keep constant number of filters for all convolutional layers, and for FCRN-A we increase this number every subsequent layer to compensate for the loss of higher resolution information caused by pooling (which is not passed directly as in the case of U-Net).

In [2]:
#Importing Libraries
from typing import Tuple

import numpy as np
import torch
from torch import nn

## Convolutional Block

The basic piece to build both U-Net and FCRN is a convolutional block, consisting of a convolutional layer, batch normalization, and activation function:
The conv_block function creates N convolutional layers with OUT number of filters with ReLU activation function and batch normalization applied in each layer.

In [None]:
def conv_block(channels: Tuple[int, int],
               size: Tuple[int, int],
               stride: Tuple[int, int]=(1, 1),
               N: int=1):
    """
    Create a block with N convolutional layers with ReLU activation function.
    The first layer is IN x OUT, and all others - OUT x OUT.
    Args:
        channels: (IN, OUT) - no. of input and output channels
        size: kernel size (fixed for all convolution in a block)
        stride: stride (fixed for all convolution in a block)
        N: no. of convolutional layers
    Returns:
        A sequential container of N convolutional layers.
    """
    # a single convolution + batch normalization + ReLU block
    block = lambda in_channels: nn.Sequential(
        nn.Conv2d(in_channels=in_channels,
                  out_channels=channels[1],
                  kernel_size=size,
                  stride=stride,
                  bias=False,
                  padding=(size[0] // 2, size[1] // 2)),
        nn.BatchNorm2d(num_features=channels[1]),
        nn.ReLU()
    )
    # create and return a sequential container of convolutional layers
    # input size = channels[0] for first block and channels[1] for all others
    return nn.Sequential(*[block(channels[bool(i)]) for i in range(N)])



## FCRN-A architecture

The FCRN-A architecture is obtained by stacking multiple such blocks followed by either downsampling (max pooling) or upsampling layers:

In [None]:
class FCRN_A(nn.Module):
    """
    Fully Convolutional Regression Network A
    Ref. W. Xie et al. 'Microscopy Cell Counting with Fully Convolutional
    Regression Networks'
    """

    def __init__(self, N: int=1, input_filters: int=3, **kwargs):
        """
        Create FCRN-A model with:
            * fixed kernel size = (3, 3)
            * fixed max pooling kernel size = (2, 2) and upsampling factor = 2
            * no. of filters as defined in an original model:
              input size -> 32 -> 64 -> 128 -> 512 -> 128 -> 64 -> 1
        Args:
            N: no. of convolutional layers per block (see conv_block)
            input_filters: no. of input channels
        """
        super(FCRN_A, self).__init__()
        self.model = nn.Sequential(
            # downsampling
            conv_block(channels=(input_filters, 32), size=(3, 3), N=N),
            nn.MaxPool2d(2),

            conv_block(channels=(32, 64), size=(3, 3), N=N),
            nn.MaxPool2d(2),

            conv_block(channels=(64, 128), size=(3, 3), N=N),
            nn.MaxPool2d(2),

            # "convolutional fully connected"
            conv_block(channels=(128, 512), size=(3, 3), N=N),

            # upsampling
            nn.Upsample(scale_factor=2),
            conv_block(channels=(512, 128), size=(3, 3), N=N),

            nn.Upsample(scale_factor=2),
            conv_block(channels=(128, 64), size=(3, 3), N=N),

            nn.Upsample(scale_factor=2),
            conv_block(channels=(64, 1), size=(3, 3), N=N),
        )

    def forward(self, input: torch.Tensor):
        """Forward pass."""
        return self.model(input)

## U-Net architecture

U-Net also requires to concatenate the output from the downsampling path with the input to the corresponding layer in the upsampling part, which is performed by ConvCat class:

In [None]:
class ConvCat(nn.Module):
    """Convolution with upsampling + concatenate block."""

    def __init__(self,
                 channels: Tuple[int, int],
                 size: Tuple[int, int],
                 stride: Tuple[int, int]=(1, 1),
                 N: int=1):
        """
        Create a sequential container with convolutional block (see conv_block)
        with N convolutional layers and upsampling by factor 2.
        """
        super(ConvCat, self).__init__()
        self.conv = nn.Sequential(
            conv_block(channels, size, stride, N),
            nn.Upsample(scale_factor=2)
        )

    def forward(self, to_conv: torch.Tensor, to_cat: torch.Tensor):
        """Forward pass.
        Args:
            to_conv: input passed to convolutional block and upsampling
            to_cat: input concatenated with the output of a conv block
        """
        return torch.cat([self.conv(to_conv), to_cat], dim=1)

In [None]:
class UNet(nn.Module):
    """
    U-Net implementation.
    Ref. O. Ronneberger et al. "U-net: Convolutional networks for biomedical
    image segmentation."
    """

    def __init__(self, filters: int=64, input_filters: int=3, **kwargs):
        """
        Create U-Net model with:
            * fixed kernel size = (3, 3)
            * fixed max pooling kernel size = (2, 2) and upsampling factor = 2
            * fixed no. of convolutional layers per block = 2 (see conv_block)
            * constant no. of filters for convolutional layers
        Args:
            filters: no. of filters for convolutional layers
            input_filters: no. of input channels
        """
        super(UNet, self).__init__()
        # first block channels size
        initial_filters = (input_filters, filters)
        # channels size for downsampling
        down_filters = (filters, filters)
        # channels size for upsampling (input doubled because of concatenate)
        up_filters = (2 * filters, filters)

        # downsampling
        self.block1 = conv_block(channels=initial_filters, size=(3, 3), N=2)
        self.block2 = conv_block(channels=down_filters, size=(3, 3), N=2)
        self.block3 = conv_block(channels=down_filters, size=(3, 3), N=2)

        # upsampling
        self.block4 = ConvCat(channels=down_filters, size=(3, 3), N=2)
        self.block5 = ConvCat(channels=up_filters, size=(3, 3), N=2)
        self.block6 = ConvCat(channels=up_filters, size=(3, 3), N=2)

        # density prediction
        self.block7 = conv_block(channels=up_filters, size=(3, 3), N=2)
        self.density_pred = nn.Conv2d(in_channels=filters, out_channels=1,
                                      kernel_size=(1, 1), bias=False)

    def forward(self, input: torch.Tensor):
        """Forward pass."""
        # use the same max pooling kernel size (2, 2) across the network
        pool = nn.MaxPool2d(2)

        # downsampling
        block1 = self.block1(input)
        pool1 = pool(block1)
        block2 = self.block2(pool1)
        pool2 = pool(block2)
        block3 = self.block3(pool2)
        pool3 = pool(block3)

        # upsampling
        block4 = self.block4(pool3, block3)
        block5 = self.block5(block4, block2)
        block6 = self.block6(block5, block1)

        # density prediction
        block7 = self.block7(block6)
        return self.density_pred(block7)

In [None]:
# --- PYTESTS --- #

def run_network(network: nn.Module, input_channels: int):
    """Generate a random image, run through network, and check output size."""
    sample = torch.ones((1, input_channels, 224, 224))
    result = network(input_filters=input_channels)(sample)
    assert result.shape == (1, 1, 224, 224)


def test_UNet_color():
    """Test U-Net on RGB images."""
    run_network(UNet, 3)


def test_UNet_grayscale():
    """Test U-Net on grayscale images."""
    run_network(UNet, 1)


def test_FRCN_color():
    """Test FCRN-A on RGB images."""
    run_network(FCRN_A, 3)


def test_FRCN_grayscale():
    """Test FCRN-A on grayscale images."""
    run_network(FCRN_A, 1)