In [8]:
import numpy as np
from PIL import Image
import tensorflow as tf
from glob import glob
from scipy.io import loadmat
import os

class CrowdData(tf.data.Dataset):
    def __new__(cls, img_path, dot_ann_path, mode='train', is_gray=False, min_size=0, max_size=np.inf, target_size=(512, 512)):
        img_list = sorted(glob(os.path.join(img_path, '*.jpg')))
        dot_ann_list = sorted(glob(os.path.join(dot_ann_path, '*.mat')))
        
        if len(img_list) == 0:
            raise ValueError(f"No .jpg files found in directory {img_path}")
        if len(dot_ann_list) == 0:
            raise ValueError(f"No .mat files found in directory {dot_ann_path}")
        if len(img_list) != len(dot_ann_list):
            raise ValueError(f"Mismatch in number of images ({len(img_list)}) and annotations ({len(dot_ann_list)})")

        dataset = tf.data.Dataset.from_tensor_slices((img_list, dot_ann_list))
        dataset = dataset.map(lambda x, y: cls._process_data(x, y, is_gray, min_size, max_size, mode, target_size))
        
        if mode == 'train':
            dataset = dataset.shuffle(len(img_list)).batch(32)  # adjust batch size as needed
        else:
            dataset = dataset.batch(32)
        
        return dataset

    @staticmethod
    def _process_data(img_path, dot_ann_path, is_gray, min_size, max_size, mode, target_size):
        def load_and_preprocess(img_path, dot_ann_path):
            img_path = tf.compat.as_str(img_path.numpy())
            dot_ann_path = tf.compat.as_str(dot_ann_path.numpy())
            
            img = Image.open(img_path)
            if is_gray:
                img = img.convert('L')
            else:
                img = img.convert('RGB')
            
            w, h = img.size
            if min([w, h]) < min_size:
                r = min_size / min([w, h])
                img = img.resize((int(w * r), int(h * r)))
            elif min([w, h]) > max_size:
                r = max_size / min([w, h])
                img = img.resize((int(w * r), int(h * r)))
            
            img = np.array(img, dtype=np.float32) / 255.0
            img = (img - [0.485, 0.456, 0.406]) / [0.229, 0.224, 0.225]
            
            gt_data = loadmat(dot_ann_path)['image_info'][0][0][0][0][0]
            dot_ann = gt_data[:, :2] if gt_data.shape[0] > 0 else np.zeros((0, 2), dtype=np.float32)
            
            return img, dot_ann

        img, dot_ann = tf.py_function(
            func=load_and_preprocess,
            inp=[img_path, dot_ann_path],
            Tout=[tf.float32, tf.float32]
        )
        
        # Set an initial shape for img to allow resizing
        img.set_shape([None, None, 3])

        # Resize image to target size for consistent batching
        img = tf.image.resize(img, target_size)
        img.set_shape([*target_size, 3])  # Explicit shape for batching
        dot_ann.set_shape([None, 2])      # Variable shape for dot annotations

        if mode == 'train':
            chf = tf.zeros([64, 64], dtype=tf.float32)
            return img, chf
        else:
            return img, dot_ann, tf.shape(dot_ann)[0], img_path

# Instantiate the dataset
img_path = 'C:\\Users\\hp\\Downloads\\ShanghaiTech_Crowd_Counting_Dataset\\part_A_final\\train_data\\images'
dot_ann_path = 'C:\\Users\\hp\\Downloads\\ShanghaiTech_Crowd_Counting_Dataset\\part_A_final\\train_data\\ground_truth'
train_dataset = CrowdData(img_path=img_path, dot_ann_path=dot_ann_path, mode='train', is_gray=False, min_size=256, max_size=1024, target_size=(512, 512))

for img, chf in train_dataset.take(1):
    print("Image shape:", img.shape)
    print("Characteristic function shape:", chf.shape)


Image shape: (32, 512, 512, 3)
Characteristic function shape: (32, 64, 64)


In [10]:
import numpy as np
import tensorflow as tf
from PIL import Image

class ImgTensorDotTensorProcessing:
    @staticmethod
    def crop(img_tensor, dot_tensor, crop_position):
        '''
        Crop image and dot tensors.

        Args:
            img_tensor: Image tensor (H, W, C)
            dot_tensor: Tensor containing coordinates of dots
            crop_position: Tuple of four integers (left, upper, right, lower)

        Returns: Cropped image tensor and adjusted dot tensor
        '''
        assert 0 <= crop_position[0] < crop_position[2] <= img_tensor.shape[1] and \
               0 <= crop_position[1] < crop_position[3] <= img_tensor.shape[0]

        img_tensor = img_tensor[crop_position[1]:crop_position[3], crop_position[0]:crop_position[2], :]

        if dot_tensor.shape[0] > 0:
            mask = (dot_tensor[:, 0] > crop_position[0]) & (dot_tensor[:, 0] < crop_position[2]) & \
                   (dot_tensor[:, 1] > crop_position[1]) & (dot_tensor[:, 1] < crop_position[3])
            dot_tensor = dot_tensor[mask]
            dot_tensor = dot_tensor - tf.constant(crop_position[0:2], dtype=dot_tensor.dtype)

        return img_tensor, dot_tensor

    @staticmethod
    def random_crop(img_tensor, dot_tensor, size, crop_mode):
        '''
        Random crop an image tensor and dot tensor.
        
        Args:
            img_tensor: Image tensor (H, W, C)
            dot_tensor: Tensor containing coordinates of dots
            size: Crop size, single value or (w, h)
            crop_mode: Crop function to use

        Returns: Cropped image and dot tensor
        '''
        size = np.array(size)
        selectable_range = np.array([img_tensor.shape[1], img_tensor.shape[0]]) - size
        assert (selectable_range >= 0).all()

        left_up = (np.random.rand(2) * selectable_range).astype(int)
        right_down = (left_up + size).astype(int)
        return crop_mode(img_tensor, dot_tensor, (left_up[0], left_up[1], right_down[0], right_down[1]))

    @staticmethod
    def random_mirror(img_tensor, dot_tensor):
        '''
        Randomly mirror the image tensor.

        Args:
            img_tensor: Image tensor (H, W, C)
            dot_tensor: Tensor containing coordinates of dots

        Returns: Mirrored image tensor and dot tensor
        '''
        if np.random.rand() > 0.5:
            img_tensor = tf.image.flip_left_right(img_tensor)
            if dot_tensor.shape[0] > 0:
                dot_tensor = tf.concat([img_tensor.shape[1] - dot_tensor[:, 0:1], dot_tensor[:, 1:2]], axis=1)
        return img_tensor, dot_tensor


class ImageDotmapProcessing:
    @staticmethod
    def crop(img, dotted_map, crop_position=(0, 0, 512, 512)):
        '''
        Crop image and dotted map.

        Args:
            img: PIL Image
            dotted_map: Array of dot coordinates
            crop_position: Tuple (left, upper, right, lower)

        Returns: Cropped image and dotted map
        '''
        img = img.crop(crop_position)
        if dotted_map.shape[0] > 0:
            mask = (dotted_map[:, 0] > crop_position[0]) & (dotted_map[:, 0] < crop_position[2]) & \
                   (dotted_map[:, 1] > crop_position[1]) & (dotted_map[:, 1] < crop_position[3])
            dotted_map = dotted_map[mask]
            dotted_map[:, 0:2] -= np.array(crop_position[0:2])

        return img, dotted_map

    @staticmethod
    def random_crop(img, dotted_map, size):
        '''
        Randomly crop image and dotted map.

        Args:
            img: PIL Image
            dotted_map: Array of dot coordinates
            size: Crop size, single value or (w, h)

        Returns: Cropped image and dotted map
        '''
        size = np.array(size)
        selectable_range = np.array(img.size) - size
        assert (selectable_range >= 0).all()

        left_up = np.random.rand(2) * selectable_range
        right_down = left_up + size
        return ImageDotmapProcessing.crop(img, dotted_map, tuple(np.concatenate((left_up, right_down))))

    @staticmethod
    def resize(img, dotted_map, size=512):
        '''
        Resize image and dotted map.

        Args:
            img: PIL Image
            dotted_map: Array of dot coordinates
            size: Resize dimensions, single value or (w, h)

        Returns: Resized image and dotted map
        '''
        size = np.array(size)
        ratio = size / np.array(img.size)
        img = img.resize(size)

        if dotted_map.shape[0] > 0:
            dotted_map *= ratio

        return img, dotted_map

    @staticmethod
    def random_mirror(img, dotted_map):
        '''
        Randomly mirror image and dotted map.

        Args:
            img: PIL Image
            dotted_map: Array of dot coordinates

        Returns: Mirrored image and dotted map
        '''
        w, h = img.size
        if np.random.rand() > 0.5:
            img = img.transpose(Image.FLIP_LEFT_RIGHT)
            if dotted_map.shape[0] > 0:
                dotted_map[:, 0] = w - dotted_map[:, 0]
        return img, dotted_map


class GeneratingDataFromDottedAnnotation:
    @staticmethod
    def construct_characteristic_function(head_position, bandwidth, origin=0,
                                          step=30, step_length=0.01):
        '''
        Construct discretized characteristic function.

        Args:
            head_position: 2D tensor with head coordinates
            bandwidth: Bandwidth of Gaussian
            origin: Origin of the image plane
            step: Steps the function spans in each direction
            step_length: Span of each step

        Returns: Characteristic function tensor
        '''
        if head_position.shape[0] > 0:
            gauss_mean = head_position - origin
            bandwidth = tf.reshape(bandwidth, (1, 1, head_position.shape[0])) if not isinstance(bandwidth, (int, float)) else bandwidth

            grid_x, grid_y = tf.meshgrid(tf.range(-step, step, dtype=tf.float32),
                                         tf.range(-step, step, dtype=tf.float32))
            plane = tf.stack([grid_x * step_length, grid_y * step_length], axis=2)

            angle = tf.einsum('ijk,lk->ijl', plane, gauss_mean)
            length = tf.exp(-0.5 * tf.reduce_sum(tf.square(plane), axis=2, keepdims=True) * bandwidth ** 2)
            angle_real = tf.cos(angle)
            angle_img = tf.sin(angle)

            cf_real = tf.reduce_sum(angle_real * length, axis=2, keepdims=True)
            cf_img = tf.reduce_sum(angle_img * length, axis=2, keepdims=True)
            return tf.concat([cf_real, cf_img], axis=2)
        else:
            return tf.zeros((step * 2, step * 2, 2), dtype=tf.float32)


In [16]:
import os
import glob
import numpy as np
from scipy.io import loadmat
from PIL import Image

class TransGtToNdarray:
    @staticmethod
    def trans_ann_to_npy_SHTC(target_path: str, save_path: str):
        """
        Converts ShanghaiTech Part A `.mat` annotation files to `.npy` format.
        
        Args:
            target_path (str): Directory containing the original `.mat` annotation files.
            save_path (str): Directory where the converted `.npy` annotation files will be saved.
        """
        assert target_path != save_path, "Target and save paths must be different."
        if not os.path.exists(save_path):
            os.makedirs(save_path)
            
        for file in glob.glob(target_path + '/*.mat'):
            dot = loadmat(file)
            x = dot[list(dot.keys())[-1]][0, 0]['location'][0, 0].astype(np.float32)
            np.save(os.path.join(save_path, os.path.basename(file).split('.')[0] + '.npy'), x)

class DirectoryPath:
    @staticmethod
    def prefix_suffix(dataset_name: str):
        if 'shanghaitech' in dataset_name.lower() and 'a' in dataset_name.lower():
            return 'Dataset/ShanghaiTech/part_A_final/', '_data/images', '_data/ground_truth_npy'

    @staticmethod
    def get_name_from_no(dataset_name: str, set_type: str, prefix: str, img_suffix: str, dotmap_suffix: str, img_no):
        """
        Generates file paths for images and annotation files based on image number.

        Args:
            dataset_name (str): Name of the dataset.
            set_type (str): Data subset type ('train' or 'test').
            prefix (str): Dataset base directory.
            img_suffix (str): Image directory suffix.
            dotmap_suffix (str): Annotation directory suffix.
            img_no (int or str): Image number.

        Returns:
            Tuple: Image path and dot map path.
        """
        if isinstance(img_no, int):
            img_path = os.path.join(prefix, set_type, img_suffix, f'IMG_{img_no}.jpg')
            dotmap_path = os.path.join(prefix, set_type, dotmap_suffix, f'GT_IMG_{img_no}.npy')
        elif isinstance(img_no, str):
            img_path = os.path.join(prefix, set_type, img_suffix, f'{img_no}.jpg')
            dotmap_path = os.path.join(prefix, set_type, dotmap_suffix, f'GT_{img_no}.npy')
        else:
            raise ValueError("img_no must be an int or str.")
        return img_path, dotmap_path

class DatasetPreparation:
    @staticmethod
    def SHTCA():
        """
        Prepares the ShanghaiTech Part A dataset by converting `.mat` files to `.npy` format.
        """
        TransGtToNdarray.trans_ann_to_npy_SHTC(
            'C:\\Users\\hp\\Downloads\\ShanghaiTech_Crowd_Counting_Dataset\\part_A_final\\train_data\\ground_truth',
            'C:\\Users\\hp\\Downloads\\ShanghaiTech_Crowd_Counting_Dataset\\part_A_final\\train_data\\1'
        )
        TransGtToNdarray.trans_ann_to_npy_SHTC(
            'C:\\Users\\hp\\Downloads\\ShanghaiTech_Crowd_Counting_Dataset\\part_A_final\\test_data\\ground_truth',
            'C:\\Users\\hp\\Downloads\\ShanghaiTech_Crowd_Counting_Dataset\\part_A_final\\test_data\\11'
        )

class ImageDotmapProcessing:
    @staticmethod
    def resize(img, dotted_map, size=(512, 512)):
        """
        Resizes image and adjusts dot annotations.

        Args:
            img (PIL.Image): PIL image.
            dotted_map (numpy.ndarray): Annotation points.
            size (tuple): Target size (width, height).

        Returns:
            tuple: Resized image and adjusted annotations.
        """
        size = np.array(size)
        ratio = size / np.array(img.size)
        img = img.resize(size)
        
        if dotted_map.shape[0] > 0:
            dotted_map = dotted_map * ratio
        
        return img, dotted_map

class BatchImageDotmapProcessing:
    @staticmethod
    def resize(dataset_name: str, set_type: str, min_side_length: int, max_side_length: int, dotmap_together=True, is_gray=False):
        """
        Batch processes images and dot maps in the dataset by resizing them.

        Args:
            dataset_name (str): Dataset name.
            set_type (str): Data subset type ('train' or 'test').
            min_side_length (int): Minimum side length for resizing.
            max_side_length (int): Maximum side length for resizing.
            dotmap_together (bool): Whether to resize the dot map together with the image.
            is_gray (bool): Whether to load images as grayscale.
        """
        def resize_img(img_path, dotmap_path, dotmap_together, is_gray):
            img = Image.open(img_path).convert('L') if is_gray else Image.open(img_path).convert('RGB')
            dot_ann = np.load(dotmap_path) if dotmap_together else None

            w, h = img.size
            new_h, new_w, ratio = cal_new_size(h, w, min_side_length, max_side_length)
            
            if ratio != 1:
                img = img.resize((new_w, new_h))
                img.save(img_path, quality=95)
                if dotmap_together and dot_ann.shape[0] > 0:
                    dot_ann = dot_ann * ratio
                    np.save(dotmap_path, dot_ann)
                print(f"Processed: {img_path}, {dotmap_path}")

        prefix, img_suffix, dotmap_suffix = DirectoryPath.prefix_suffix(dataset_name)
        for img_path in glob.glob(os.path.join(prefix, set_type, img_suffix, '*.jpg')):
            num = DirectoryPath.get_name_from_no(dataset_name, set_type, prefix, img_suffix, dotmap_suffix, os.path.basename(img_path).split('_')[-1].split('.')[0])
            resize_img(img_path, num[1], dotmap_together, is_gray)


In [19]:

import os
import glob
import numpy as np
from scipy.io import loadmat
from PIL import Image

class TransGtToNdarray:
    @staticmethod
    def trans_ann_to_npy_SHTC(target_path: str, save_path: str):
        """
        Converts ShanghaiTech Part A `.mat` annotation files to `.npy` format.
        
        Args:
            target_path (str): Directory containing the original `.mat` annotation files.
            save_path (str): Directory where the converted `.npy` annotation files will be saved.
        """
        assert target_path != save_path, "Target and save paths must be different."
        if not os.path.exists(save_path):
            os.makedirs(save_path)
        
        for file in glob.glob(os.path.join(target_path, '*.mat')):
            dot = loadmat(file)
            # Extract 'location' key for annotations
            x = dot[list(dot.keys())[-1]][0, 0]['location'][0, 0].astype(np.float32)
            npy_file_path = os.path.join(save_path, os.path.basename(file).split('.')[0] + '.npy')
            np.save(npy_file_path, x)
            print(f"Saved annotation to: {npy_file_path}")

class DirectoryPath:
    @staticmethod
    def prefix_suffix(dataset_name: str):
        if 'shanghaitech' in dataset_name.lower() and 'a' in dataset_name.lower():
            return 'Dataset/ShanghaiTech/part_A_final/', '_data/images', '_data/ground_truth_npy'

    @staticmethod
    def get_name_from_no(dataset_name: str, set_type: str, prefix: str, img_suffix: str, dotmap_suffix: str, img_no):
        """
        Generates file paths for images and annotation files based on image number.

        Args:
            dataset_name (str): Name of the dataset.
            set_type (str): Data subset type ('train' or 'test').
            prefix (str): Dataset base directory.
            img_suffix (str): Image directory suffix.
            dotmap_suffix (str): Annotation directory suffix.
            img_no (int or str): Image number.

        Returns:
            Tuple: Image path and dot map path.
        """
        if isinstance(img_no, int):
            img_path = os.path.join(prefix, set_type, img_suffix, f'IMG_{img_no}.jpg')
            dotmap_path = os.path.join(prefix, set_type, dotmap_suffix, f'GT_IMG_{img_no}.npy')
        elif isinstance(img_no, str):
            img_path = os.path.join(prefix, set_type, img_suffix, f'{img_no}.jpg')
            dotmap_path = os.path.join(prefix, set_type, dotmap_suffix, f'GT_{img_no}.npy')
        else:
            raise ValueError("img_no must be an int or str.")
        return img_path, dotmap_path

class DatasetPreparation:
    @staticmethod
    def SHTCA():
        """
        Prepares the ShanghaiTech Part A dataset by converting `.mat` files to `.npy` format.
        """
        TransGtToNdarray.trans_ann_to_npy_SHTC(
            'C:\\Users\\hp\\Downloads\\ShanghaiTech_Crowd_Counting_Dataset\\part_A_final\\train_data\\ground_truth',
            'C:\\Users\\hp\\Downloads\\ShanghaiTech_Crowd_Counting_Dataset\\part_A_final\\train_data\\ground_truth_npy'
        )
        TransGtToNdarray.trans_ann_to_npy_SHTC(
            'C:\\Users\\hp\\Downloads\\ShanghaiTech_Crowd_Counting_Dataset\\part_A_final\\test_data\\ground_truth',
            'C:\\Users\\hp\\Downloads\\ShanghaiTech_Crowd_Counting_Dataset\\part_A_final\\test_data\\ground_truth_npy'
        )

class ImageDotmapProcessing:
    @staticmethod
    def resize(img, dotted_map, size=(512, 512)):
        """
        Resizes image and adjusts dot annotations.

        Args:
            img (PIL.Image): PIL image.
            dotted_map (numpy.ndarray): Annotation points.
            size (tuple): Target size (width, height).

        Returns:
            tuple: Resized image and adjusted annotations.
        """
        size = np.array(size)
        ratio = size / np.array(img.size)
        img = img.resize(size)
        
        if dotted_map.shape[0] > 0:
            dotted_map = dotted_map * ratio
        
        return img, dotted_map

class BatchImageDotmapProcessing:
    @staticmethod
    def cal_new_size(h, w, min_side_length, max_side_length):
        """
        Calculates new size for an image while preserving the aspect ratio.

        Args:
            h (int): Original height.
            w (int): Original width.
            min_side_length (int): Minimum side length.
            max_side_length (int): Maximum side length.

        Returns:
            tuple: (new height, new width, resize ratio)
        """
        if min(h, w) < min_side_length:
            ratio = min_side_length / min(h, w)
        elif max(h, w) > max_side_length:
            ratio = max_side_length / max(h, w)
        else:
            ratio = 1.0
        return int(h * ratio), int(w * ratio), ratio

    @staticmethod
    def resize(dataset_name: str, set_type: str, min_side_length: int, max_side_length: int, dotmap_together=True, is_gray=False):
        """
        Batch processes images and dot maps in the dataset by resizing them.

        Args:
            dataset_name (str): Dataset name.
            set_type (str): Data subset type ('train' or 'test').
            min_side_length (int): Minimum side length for resizing.
            max_side_length (int): Maximum side length for resizing.
            dotmap_together (bool): Whether to resize the dot map together with the image.
            is_gray (bool): Whether to load images as grayscale.
        """
        prefix, img_suffix, dotmap_suffix = DirectoryPath.prefix_suffix(dataset_name)
        
        def resize_img(img_path, dotmap_path, dotmap_together, is_gray):
            img = Image.open(img_path).convert('L') if is_gray else Image.open(img_path).convert('RGB')
            dot_ann = np.load(dotmap_path) if dotmap_together else None

            w, h = img.size
            new_h, new_w, ratio = BatchImageDotmapProcessing.cal_new_size(h, w, min_side_length, max_side_length)
            
            if ratio != 1:
                img = img.resize((new_w, new_h))
                img.save(img_path, quality=95)
                if dotmap_together and dot_ann.shape[0] > 0:
                    dot_ann = dot_ann * ratio
                    np.save(dotmap_path, dot_ann)
                print(f"Processed: {img_path}, {dotmap_path}")

        for img_path in glob.glob(os.path.join(prefix, set_type, img_suffix, '*.jpg')):
            num = DirectoryPath.get_name_from_no(dataset_name, set_type, prefix, img_suffix, dotmap_suffix, os.path.basename(img_path).split('_')[-1].split('.')[0])
            resize_img(img_path, num[1], dotmap_together, is_gray)


In [20]:
 DatasetPreparation.SHTCA() 

Saved annotation to: C:\Users\hp\Downloads\ShanghaiTech_Crowd_Counting_Dataset\part_A_final\train_data\ground_truth_npy\GT_IMG_1.npy
Saved annotation to: C:\Users\hp\Downloads\ShanghaiTech_Crowd_Counting_Dataset\part_A_final\train_data\ground_truth_npy\GT_IMG_10.npy
Saved annotation to: C:\Users\hp\Downloads\ShanghaiTech_Crowd_Counting_Dataset\part_A_final\train_data\ground_truth_npy\GT_IMG_100.npy
Saved annotation to: C:\Users\hp\Downloads\ShanghaiTech_Crowd_Counting_Dataset\part_A_final\train_data\ground_truth_npy\GT_IMG_101.npy
Saved annotation to: C:\Users\hp\Downloads\ShanghaiTech_Crowd_Counting_Dataset\part_A_final\train_data\ground_truth_npy\GT_IMG_102.npy
Saved annotation to: C:\Users\hp\Downloads\ShanghaiTech_Crowd_Counting_Dataset\part_A_final\train_data\ground_truth_npy\GT_IMG_103.npy
Saved annotation to: C:\Users\hp\Downloads\ShanghaiTech_Crowd_Counting_Dataset\part_A_final\train_data\ground_truth_npy\GT_IMG_104.npy
Saved annotation to: C:\Users\hp\Downloads\ShanghaiTech_Cr

In [23]:
import tensorflow as tf

class ChfLoss(tf.keras.layers.Layer):
    def __init__(self, chf_step: int, chf_tik: float, sample_step: float, is_dense: bool):
        """
        Args:
            chf_step (int): Number of steps the characteristic function spans in each direction.
            chf_tik (float): Span of each step, determining the range of the characteristic function.
            sample_step (float): Sampling interval for the image plane.
            is_dense (bool): Whether the dataset is dense, affecting the choice of loss function.
        """
        super(ChfLoss, self).__init__()
        self.chf_step = chf_step
        self.chf_tik = chf_tik
        self.sample_step = sample_step
        self.is_dense = is_dense

        self.plane_shape = None
        self.real_template = None
        self.img_template = None

    def make_template(self, dnn_output):
        # Construct the spatial domain
        x_axis = tf.linspace(self.sample_step / 2, dnn_output.shape[-1] * self.sample_step - self.sample_step / 2, dnn_output.shape[-1])
        y_axis = tf.linspace(self.sample_step / 2, dnn_output.shape[-2] * self.sample_step - self.sample_step / 2, dnn_output.shape[-2])
        
        # Create sample coordinates
        sample_coordinates = tf.stack([tf.repeat(x_axis, len(y_axis)), tf.tile(y_axis, [len(x_axis)])], axis=0)
        sample_coordinates = tf.cast(sample_coordinates, dnn_output.dtype)
        
        # Construct characteristic function (frequency domain) plane
        grid_x = tf.range(-self.chf_step, self.chf_step, dtype=dnn_output.dtype) * self.chf_tik
        grid_y = tf.range(-self.chf_step, self.chf_step, dtype=dnn_output.dtype) * self.chf_tik
        plane = tf.stack(tf.meshgrid(grid_x, grid_y), axis=-1)
        
        # Calculate the angle for characteristic function templates
        angle = tf.einsum('ijk,lm->ijlm', plane, sample_coordinates)
        self.real_template = tf.cos(angle)
        self.img_template = tf.sin(angle)

    def call(self, dnn_output, chf):
        if self.plane_shape is None or self.plane_shape != dnn_output.shape[-2:]:
            self.make_template(dnn_output)
            self.plane_shape = dnn_output.shape[-2:]
        
        # Compute the characteristic function of the prediction
        flatten_output = tf.reshape(dnn_output, [dnn_output.shape[0], -1])
        chf_real = tf.reduce_sum(self.real_template * tf.expand_dims(flatten_output, axis=-1), axis=2)
        chf_img = tf.reduce_sum(self.img_template * tf.expand_dims(flatten_output, axis=-1), axis=2)
        derived_chf = tf.concat([tf.expand_dims(chf_real, -1), tf.expand_dims(chf_img, -1)], axis=-1)

        # Choose loss based on density
        if not self.is_dense:
            loss = tf.reduce_sum(tf.norm(tf.reshape(derived_chf - chf, [chf.shape[0], -1]), axis=1) * self.chf_tik)
        else:
            loss = tf.reduce_sum(tf.norm(derived_chf - chf, axis=-1)) * self.chf_tik ** 2

        return loss / tf.cast(chf.shape[0], dtype=chf.dtype)


class ChfLikelihoodLoss(tf.keras.layers.Layer):
    def __init__(self, chf_step: int, chf_tik: float, sample_step: float, likelihood):
        """
        Args:
            chf_step (int): Number of steps the characteristic function spans in each direction.
            chf_tik (float): Span of each step, determining the range of the characteristic function.
            sample_step (float): Sampling interval for the image plane.
            likelihood: Likelihood function to calculate loss.
        """
        super(ChfLikelihoodLoss, self).__init__()
        self.chf_step = chf_step
        self.chf_tik = chf_tik
        self.sample_step = sample_step
        self.likelihood = likelihood
        self.scale = 1

        self.plane_shape = None
        self.real_template = None
        self.img_template = None

    def make_template(self, dnn_output):
        # Construct the spatial domain
        x_axis = tf.linspace(self.sample_step / 2, dnn_output.shape[-1] * self.sample_step - self.sample_step / 2, dnn_output.shape[-1])
        y_axis = tf.linspace(self.sample_step / 2, dnn_output.shape[-2] * self.sample_step - self.sample_step / 2, dnn_output.shape[-2])
        
        # Create sample coordinates
        sample_coordinates = tf.stack([tf.repeat(x_axis, len(y_axis)), tf.tile(y_axis, [len(x_axis)])], axis=0)
        sample_coordinates = tf.cast(sample_coordinates, dnn_output.dtype)
        
        # Construct characteristic function (frequency domain) plane
        grid_x = tf.range(-self.chf_step, self.chf_step, dtype=dnn_output.dtype) * self.chf_tik
        grid_y = tf.range(-self.chf_step, self.chf_step, dtype=dnn_output.dtype) * self.chf_tik
        plane = tf.stack(tf.meshgrid(grid_x, grid_y), axis=-1)
        
        # Calculate the angle for characteristic function templates
        angle = tf.einsum('ijk,lm->ijlm', plane, sample_coordinates)
        self.real_template = tf.cos(angle)
        self.img_template = tf.sin(angle)

    def call(self, dnn_output, chf):
        if self.plane_shape is None or self.plane_shape != dnn_output.shape:
            self.make_template(dnn_output)
            self.plane_shape = dnn_output.shape

        # Compute the characteristic function of the prediction
        flatten_output = tf.reshape(dnn_output, [dnn_output.shape[0], -1])
        chf_real = tf.reduce_sum(self.real_template * tf.expand_dims(flatten_output, axis=-1), axis=2)
        chf_img = tf.reduce_sum(self.img_template * tf.expand_dims(flatten_output, axis=-1), axis=2)
        derived_chf = tf.concat([tf.expand_dims(chf_real, -1), tf.expand_dims(chf_img, -1)], axis=-1)

        # Use the likelihood function to calculate loss
        loss = self.likelihood(derived_chf, chf, self.scale)
        return loss


In [24]:
import tensorflow as tf

class ChfLoss(tf.keras.layers.Layer):
    def __init__(self, chf_step: int, chf_tik: float, sample_step: float, is_dense: bool):
        """
        Args:
            chf_step (int): Number of steps the characteristic function spans in each direction.
            chf_tik (float): Span of each step, determining the range of the characteristic function.
            sample_step (float): Sampling interval for the image plane.
            is_dense (bool): Whether the dataset is dense, affecting the choice of loss function.
        """
        super(ChfLoss, self).__init__()
        self.chf_step = chf_step
        self.chf_tik = chf_tik
        self.sample_step = sample_step
        self.is_dense = is_dense

        self.plane_shape = None
        self.real_template = None
        self.img_template = None

    def make_template(self, dnn_output):
        # Construct the spatial domain
        x_axis = tf.linspace(self.sample_step / 2, dnn_output.shape[-1] * self.sample_step - self.sample_step / 2, dnn_output.shape[-1])
        y_axis = tf.linspace(self.sample_step / 2, dnn_output.shape[-2] * self.sample_step - self.sample_step / 2, dnn_output.shape[-2])
        
        # Create sample coordinates
        sample_coordinates = tf.stack([tf.repeat(x_axis, len(y_axis)), tf.tile(y_axis, [len(x_axis)])], axis=0)
        sample_coordinates = tf.cast(sample_coordinates, dnn_output.dtype)
        
        # Construct characteristic function (frequency domain) plane
        grid_x = tf.range(-self.chf_step, self.chf_step, dtype=dnn_output.dtype) * self.chf_tik
        grid_y = tf.range(-self.chf_step, self.chf_step, dtype=dnn_output.dtype) * self.chf_tik
        plane = tf.stack(tf.meshgrid(grid_x, grid_y), axis=-1)
        
        # Calculate the angle for characteristic function templates
        angle = tf.einsum('ijk,lm->ijlm', plane, sample_coordinates)
        self.real_template = tf.cos(angle)
        self.img_template = tf.sin(angle)

    def call(self, dnn_output, chf):
        if self.plane_shape is None or self.plane_shape != dnn_output.shape[-2:]:
            self.make_template(dnn_output)
            self.plane_shape = dnn_output.shape[-2:]
        
        # Compute the characteristic function of the prediction
        flatten_output = tf.reshape(dnn_output, [dnn_output.shape[0], -1])
        chf_real = tf.reduce_sum(self.real_template * tf.expand_dims(flatten_output, axis=-1), axis=2)
        chf_img = tf.reduce_sum(self.img_template * tf.expand_dims(flatten_output, axis=-1), axis=2)
        derived_chf = tf.concat([tf.expand_dims(chf_real, -1), tf.expand_dims(chf_img, -1)], axis=-1)

        # Choose loss based on density
        if not self.is_dense:
            loss = tf.reduce_sum(tf.norm(tf.reshape(derived_chf - chf, [chf.shape[0], -1]), axis=1) * self.chf_tik)
        else:
            loss = tf.reduce_sum(tf.norm(derived_chf - chf, axis=-1)) * self.chf_tik ** 2

        return loss / tf.cast(chf.shape[0], dtype=chf.dtype)


class ChfLikelihoodLoss(tf.keras.layers.Layer):
    def __init__(self, chf_step: int, chf_tik: float, sample_step: float, likelihood):
        """
        Args:
            chf_step (int): Number of steps the characteristic function spans in each direction.
            chf_tik (float): Span of each step, determining the range of the characteristic function.
            sample_step (float): Sampling interval for the image plane.
            likelihood: Likelihood function to calculate loss.
        """
        super(ChfLikelihoodLoss, self).__init__()
        self.chf_step = chf_step
        self.chf_tik = chf_tik
        self.sample_step = sample_step
        self.likelihood = likelihood
        self.scale = 1

        self.plane_shape = None
        self.real_template = None
        self.img_template = None

    def make_template(self, dnn_output):
        # Construct the spatial domain
        x_axis = tf.linspace(self.sample_step / 2, dnn_output.shape[-1] * self.sample_step - self.sample_step / 2, dnn_output.shape[-1])
        y_axis = tf.linspace(self.sample_step / 2, dnn_output.shape[-2] * self.sample_step - self.sample_step / 2, dnn_output.shape[-2])
        
        # Create sample coordinates
        sample_coordinates = tf.stack([tf.repeat(x_axis, len(y_axis)), tf.tile(y_axis, [len(x_axis)])], axis=0)
        sample_coordinates = tf.cast(sample_coordinates, dnn_output.dtype)
        
        # Construct characteristic function (frequency domain) plane
        grid_x = tf.range(-self.chf_step, self.chf_step, dtype=dnn_output.dtype) * self.chf_tik
        grid_y = tf.range(-self.chf_step, self.chf_step, dtype=dnn_output.dtype) * self.chf_tik
        plane = tf.stack(tf.meshgrid(grid_x, grid_y), axis=-1)
        
        # Calculate the angle for characteristic function templates
        angle = tf.einsum('ijk,lm->ijlm', plane, sample_coordinates)
        self.real_template = tf.cos(angle)
        self.img_template = tf.sin(angle)

    def call(self, dnn_output, chf):
        if self.plane_shape is None or self.plane_shape != dnn_output.shape:
            self.make_template(dnn_output)
            self.plane_shape = dnn_output.shape

        # Compute the characteristic function of the prediction
        flatten_output = tf.reshape(dnn_output, [dnn_output.shape[0], -1])
        chf_real = tf.reduce_sum(self.real_template * tf.expand_dims(flatten_output, axis=-1), axis=2)
        chf_img = tf.reduce_sum(self.img_template * tf.expand_dims(flatten_output, axis=-1), axis=2)
        derived_chf = tf.concat([tf.expand_dims(chf_real, -1), tf.expand_dims(chf_img, -1)], axis=-1)

        # Use the likelihood function to calculate loss
        loss = self.likelihood(derived_chf, chf, self.scale)
        return loss


In [27]:
def call(self, dnn_output, chf):
    if self.plane_shape is None or self.plane_shape != dnn_output.shape[-2:]:
        self.make_template(dnn_output)
        self.plane_shape = dnn_output.shape[-2:]
    
    # Compute the characteristic function of the prediction
    flatten_output = tf.reshape(dnn_output, [dnn_output.shape[0], -1, 1])  # Reshape to [batch, flattened, 1] for broadcasting
    
    # Adjust self.real_template and self.img_template to include batch dimension
    real_template_expanded = tf.expand_dims(self.real_template, axis=0)  # Shape: [1, 2*chf_step, 2*chf_step, num_pixels]
    img_template_expanded = tf.expand_dims(self.img_template, axis=0)    # Shape: [1, 2*chf_step, 2*chf_step, num_pixels]
    
    # Calculate real and imaginary parts of the derived characteristic function
    chf_real = tf.reduce_sum(real_template_expanded * flatten_output, axis=3, keepdims=True)
    chf_img = tf.reduce_sum(img_template_expanded * flatten_output, axis=3, keepdims=True)
    derived_chf = tf.concat([chf_real, chf_img], axis=3)
    
    # Choose loss based on density
    if not self.is_dense:
        loss = tf.reduce_sum(tf.norm(tf.reshape(derived_chf - chf, [chf.shape[0], -1]), axis=1) * self.chf_tik)
    else:
        loss = tf.reduce_sum(tf.norm(derived_chf - chf, axis=2)) * self.chf_tik ** 2

    return loss / tf.cast(chf.shape[0], dtype=chf.dtype)


In [32]:
def call(self, dnn_output, chf):
    if self.plane_shape is None or self.plane_shape != dnn_output.shape[-2:]:
        self.make_template(dnn_output)
        self.plane_shape = dnn_output.shape[-2:]
    
    # Flatten dnn_output and ensure compatible shape with real_template and img_template
    flatten_output = tf.reshape(dnn_output, [dnn_output.shape[0], -1, 1])  # Shape: [batch, num_pixels, 1]
    
    # Expand dimensions of templates to match with flattened DNN output
    real_template_expanded = tf.expand_dims(self.real_template, axis=0)  # Shape: [1, 2*chf_step, 2*chf_step, num_pixels]
    img_template_expanded = tf.expand_dims(self.img_template, axis=0)    # Shape: [1, 2*chf_step, 2*chf_step, num_pixels]

    # Make sure templates are compatible with the batch size and flatten_output
    real_template_expanded = tf.broadcast_to(real_template_expanded, [dnn_output.shape[0], *self.real_template.shape])
    img_template_expanded = tf.broadcast_to(img_template_expanded, [dnn_output.shape[0], *self.img_template.shape])

    # Multiply and sum across appropriate axis
    chf_real = tf.reduce_sum(real_template_expanded * flatten_output, axis=2, keepdims=True)
    chf_img = tf.reduce_sum(img_template_expanded * flatten_output, axis=2, keepdims=True)
    derived_chf = tf.concat([chf_real, chf_img], axis=3)

    # Choose loss function based on density
    if not self.is_dense:
        loss = tf.reduce_sum(tf.norm(tf.reshape(derived_chf - chf, [chf.shape[0], -1]), axis=1) * self.chf_tik)
    else:
        loss = tf.reduce_sum(tf.norm(derived_chf - chf, axis=2)) * (self.chf_tik ** 2)

    return loss / tf.cast(chf.shape[0], dtype=chf.dtype)


In [34]:
!pip install tensorflow-hub


Collecting tensorflow-hub
  Downloading tensorflow_hub-0.16.1-py2.py3-none-any.whl (30 kB)
Collecting tf-keras>=2.14.1
  Downloading tf_keras-2.18.0-py3-none-any.whl (1.7 MB)
     ---------------------------------------- 1.7/1.7 MB 12.2 MB/s eta 0:00:00
Collecting tensorflow<2.19,>=2.18
  Downloading tensorflow-2.18.0-cp39-cp39-win_amd64.whl (7.5 kB)
Collecting tensorflow-intel==2.18.0
  Downloading tensorflow_intel-2.18.0-cp39-cp39-win_amd64.whl (390.0 MB)
     -------------------------------------- 390.0/390.0 MB 1.1 MB/s eta 0:00:00
Collecting tensorboard<2.19,>=2.18
  Downloading tensorboard-2.18.0-py3-none-any.whl (5.5 MB)
     ---------------------------------------- 5.5/5.5 MB 1.3 MB/s eta 0:00:00
Installing collected packages: tensorboard, tensorflow-intel, tensorflow, tf-keras, tensorflow-hub
  Attempting uninstall: tensorboard
    Found existing installation: tensorboard 2.17.1
    Uninstalling tensorboard-2.17.1:
      Successfully uninstalled tensorboard-2.17.1
  Attempting

ERROR: Could not install packages due to an OSError: [WinError 5] Access is denied: 'C:\\Users\\hp\\anaconda3\\Lib\\site-packages\\~ensorflow\\compiler\\mlir\\lite\\python\\_pywrap_converter_api.pyd'
Consider using the `--user` option or check the permissions.



In [36]:
import tensorflow as tf
from tensorflow.keras import layers, Model

# Define the configuration for VGG layers
cfg = {
    'Baysian_Ma': [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 256, 'M', 512, 512, 512, 512, 'M', 512, 512, 512, 512]
}

# Helper function to create VGG layers
def make_layers(cfg, use_batch_norm=True):
    """Creates layers based on configuration, similar to PyTorch make_layers"""
    layers_list = []
    in_channels = 3  # Initial input channel count for RGB images
    
    for v in cfg:
        if v == 'M':
            layers_list.append(layers.MaxPooling2D(pool_size=(2, 2), strides=(2, 2)))
        else:
            conv_layer = layers.Conv2D(v, kernel_size=3, padding="same")
            if use_batch_norm:
                layers_list.append(conv_layer)
                layers_list.append(layers.BatchNormalization())
                layers_list.append(layers.ReLU())
            else:
                layers_list.append(conv_layer)
                layers_list.append(layers.ReLU())
    return tf.keras.Sequential(layers_list)

# Define the VGG-based model for Bayesian Ma
class VGG_Baysian_Ma(Model):
    def __init__(self, feature_layers):
        super(VGG_Baysian_Ma, self).__init__()
        self.features = feature_layers
        self.reg_layer = tf.keras.Sequential([
            layers.Conv2D(256, kernel_size=3, padding="same", activation="relu"),
            layers.Conv2D(128, kernel_size=3, padding="same", activation="relu"),
            layers.Conv2D(1, kernel_size=1, padding="same", activation="relu")
        ])

    def call(self, x):
        x = self.features(x)
        x = tf.image.resize(x, [tf.shape(x)[1] * 2, tf.shape(x)[2] * 2], method="bilinear")
        x = self.reg_layer(x)
        return tf.abs(x)

def vgg19(use_batch_norm=True, layers='Baysian_Ma', weights=None):
    """
    Creates a VGG 19-layer model with optional batch normalization.
    The model is compatible with the Bayesian Ma loss function for crowd counting.
    """
    # Initialize the feature extractor layers
    feature_layers = make_layers(cfg[layers], use_batch_norm=use_batch_norm)
    
    # Initialize the VGG model
    model = VGG_Baysian_Ma(feature_layers)
    
    # Load weights if provided
    if weights:
        model.load_weights(weights)
    else:
        print("No pre-trained weights provided. Using default initialization.")
    
    return model

# Example usage
model = vgg19()
input_tensor = tf.random.normal([1, 256, 256, 3])  # Batch size 1, 256x256 RGB image
output = model(input_tensor)
print("Output shape:", output.shape)


No pre-trained weights provided. Using default initialization.
Output shape: (1, 32, 32, 1)


In [40]:
import os
import glob
import numpy as np
from scipy.io import loadmat
from PIL import Image

# Define Dataset Preparation class and methods
class Dataset_preparation:
    @staticmethod
    def SHTCA():
        """Prepares the ShanghaiTech Part A dataset by converting `.mat` files to `.npy` format."""
        Trans_gt_to_ndarray.trans_ann_to_npy_SHTC(
            'C:\\Users\\hp\\Downloads\\ShanghaiTech_Crowd_Counting_Dataset\\part_A_final\\train_data\\ground_truth',
            'C:\\Users\\hp\\Downloads\\ShanghaiTech_Crowd_Counting_Dataset\\part_A_final\\train_data\\1'
        )
        Trans_gt_to_ndarray.trans_ann_to_npy_SHTC(
            'C:\\Users\\hp\\Downloads\\ShanghaiTech_Crowd_Counting_Dataset\\part_A_final\\test_data\\ground_truth',
            'C:\\Users\\hp\\Downloads\\ShanghaiTech_Crowd_Counting_Dataset\\part_A_final\\test_data\\11'
        )
        print('ShanghaiTech Part A (SHTCA) dataset is prepared.')

class Trans_gt_to_ndarray:
    @staticmethod
    def trans_ann_to_npy_SHTC(target_path: str, save_path: str):
        """Converts ShanghaiTech Part A `.mat` annotation files to `.npy` format."""
        assert target_path != save_path, "Target and save paths must be different."
        if not os.path.exists(save_path):
            os.makedirs(save_path)
            
        for file in glob.glob(target_path + '/*.mat'):
            dot = loadmat(file)
            x = dot[list(dot.keys())[-1]][0, 0]['location'][0, 0].astype(np.float32)
            np.save(os.path.join(save_path, os.path.basename(file).split('.')[0] + '.npy'), x)

# Run the dataset preparation for SHTCA
if __name__ == '__main__':
    Dataset_preparation.SHTCA()


ShanghaiTech Part A (SHTCA) dataset is prepared.


In [44]:
# Define the configuration for VGG layers
def make_vgg_layers(config, batch_norm=True):
    vgg_layers = []
    for v in config:
        if v == 'M':  # Apply MaxPooling only when 'M' is encountered
            vgg_layers.append(layers.MaxPooling2D(pool_size=(2, 2), strides=(2, 2)))
        else:  # Only pass integers for Conv2D filters
            conv2d = layers.Conv2D(int(v), kernel_size=(3, 3), padding='same')
            if batch_norm:
                vgg_layers.extend([conv2d, layers.BatchNormalization(), layers.ReLU()])
            else:
                vgg_layers.extend([conv2d, layers.ReLU()])
    return vgg_layers

# VGG model definition
config = [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 256, 'M', 512, 512, 512, 512, 'M', 512, 512, 512, 512]

def vgg19():
    inputs = layers.Input(shape=(None, None, 3))
    x = inputs
    for layer in make_vgg_layers(config):
        x = layer(x)
    x = layers.Conv2D(256, kernel_size=(3, 3), padding='same')(x)
    x = layers.ReLU()(x)
    x = layers.Conv2D(128, kernel_size=(3, 3), padding='same')(x)
    x = layers.ReLU()(x)
    x = layers.Conv2D(1, kernel_size=(1, 1))(x)
    model = Model(inputs=inputs, outputs=tf.abs(x))  # Apply absolute value to match PyTorch's torch.abs
    return model


In [4]:
import tensorflow as tf

class CHFLoss(tf.keras.losses.Loss):
    def __init__(self, chf_step: int, chf_tik: float, sample_step: float, is_dense: bool, name="chf_loss"):
        super(CHFLoss, self).__init__(name=name)
        self.chf_step = chf_step
        self.chf_tik = chf_tik
        self.sample_step = sample_step
        self.is_dense = is_dense
        self.real_template = None
        self.img_template = None

    def build_templates(self, dnn_output_shape):
        x_axis = tf.linspace(
            self.sample_step / 2, 
            dnn_output_shape[2] * self.sample_step - self.sample_step / 2, 
            dnn_output_shape[2]
        )
        y_axis = tf.linspace(
            self.sample_step / 2, 
            dnn_output_shape[1] * self.sample_step - self.sample_step / 2, 
            dnn_output_shape[1]
        )
        # Flatten sample coordinates for use in einsum
        sample_coordinates = tf.reshape(
            tf.stack([tf.repeat(x_axis, len(y_axis)), tf.tile(y_axis, [len(x_axis)])], axis=0),
            [2, -1]
        )

        # Create plane for characteristic function
        plane_x = tf.range(-self.chf_step, self.chf_step, dtype=tf.float32) * self.chf_tik
        plane_y = tf.range(-self.chf_step, self.chf_step, dtype=tf.float32) * self.chf_tik
        plane = tf.reshape(tf.stack(tf.meshgrid(plane_x, plane_y), axis=-1), [-1, 2])

        # Compute angle matrix
        angle = tf.einsum('ij,jk->ik', plane, sample_coordinates)

        # Build real and imaginary templates
        self.real_template = tf.cos(angle)
        self.img_template = tf.sin(angle)

    def call(self, y_true, y_pred):
        if self.real_template is None or self.img_template is None:
            self.build_templates(y_pred.shape)

        flatten_pred = tf.reshape(y_pred, [y_pred.shape[0], -1])
        chf_real = tf.reduce_sum(self.real_template * tf.expand_dims(flatten_pred, axis=-1), axis=1)
        chf_img = tf.reduce_sum(self.img_template * tf.expand_dims(flatten_pred, axis=-1), axis=1)
        derived_chf = tf.stack([chf_real, chf_img], axis=-1)

        if not self.is_dense:
            loss = tf.reduce_sum(tf.norm(derived_chf - y_true, axis=-1)) * self.chf_tik
        else:
            loss = tf.reduce_sum(tf.square(derived_chf - y_true)) * (self.chf_tik ** 2)
        
        return tf.reduce_mean(loss)


In [6]:
import tensorflow as tf

class CHFLoss(tf.keras.losses.Loss):
    def __init__(self, chf_step: int, chf_tik: float, sample_step: float, is_dense: bool, name="chf_loss"):
        super(CHFLoss, self).__init__(name=name)
        self.chf_step = chf_step
        self.chf_tik = chf_tik
        self.sample_step = sample_step
        self.is_dense = is_dense
        self.real_template = None
        self.img_template = None

    def build_templates(self, dnn_output_shape):
        x_axis = tf.linspace(
            self.sample_step / 2, 
            dnn_output_shape[2] * self.sample_step - self.sample_step / 2, 
            dnn_output_shape[2]
        )
        y_axis = tf.linspace(
            self.sample_step / 2, 
            dnn_output_shape[1] * self.sample_step - self.sample_step / 2, 
            dnn_output_shape[1]
        )
        # Flatten sample coordinates for use in einsum
        sample_coordinates = tf.reshape(
            tf.stack([tf.repeat(x_axis, len(y_axis)), tf.tile(y_axis, [len(x_axis)])], axis=0),
            [2, -1]
        )

        # Create plane for characteristic function
        plane_x = tf.range(-self.chf_step, self.chf_step, dtype=tf.float32) * self.chf_tik
        plane_y = tf.range(-self.chf_step, self.chf_step, dtype=tf.float32) * self.chf_tik
        plane = tf.reshape(tf.stack(tf.meshgrid(plane_x, plane_y), axis=-1), [-1, 2])

        # Compute angle matrix
        angle = tf.einsum('ij,jk->ik', plane, sample_coordinates)

        # Build real and imaginary templates with an extra batch dimension
        self.real_template = tf.cos(angle)
        self.img_template = tf.sin(angle)
        # Reshape templates to have a batch dimension for later broadcasting
        self.real_template = tf.reshape(self.real_template, [1, -1, angle.shape[1]])
        self.img_template = tf.reshape(self.img_template, [1, -1, angle.shape[1]])

    def call(self, y_true, y_pred):
        if self.real_template is None or self.img_template is None:
            self.build_templates(y_pred.shape)

        # Reshape prediction to match template shapes for multiplication
        flatten_pred = tf.reshape(y_pred, [y_pred.shape[0], -1, 1])

        chf_real = tf.reduce_sum(self.real_template * flatten_pred, axis=1)
        chf_img = tf.reduce_sum(self.img_template * flatten_pred, axis=1)
        derived_chf = tf.stack([chf_real, chf_img], axis=-1)

        # Calculate loss
        if not self.is_dense:
            loss = tf.reduce_sum(tf.norm(derived_chf - y_true, axis=-1)) * self.chf_tik
        else:
            loss = tf.reduce_sum(tf.square(derived_chf - y_true)) * (self.chf_tik ** 2)
        
        return tf.reduce_mean(loss)


In [1]:
import tensorflow as tf
import numpy as np

class CHFLoss(tf.keras.losses.Loss):
    def __init__(self, chf_step=30, chf_tik=0.01, sample_step=0.1, is_dense=True):
        super(CHFLoss, self).__init__()
        self.chf_step = chf_step
        self.chf_tik = chf_tik
        self.sample_step = sample_step
        self.is_dense = is_dense
        self.real_template = None
        self.img_template = None

    def build_templates(self, dnn_output_shape):
        # Generate sample coordinates as a 1D vector compatible with einsum
        sample_coordinates = tf.range(-self.chf_step, self.chf_step, delta=self.sample_step, dtype=tf.float32)
        
        # Generate a grid for plane coordinates
        plane_x = tf.range(-self.chf_step, self.chf_step, dtype=tf.float32) * self.chf_tik
        plane_y = tf.range(-self.chf_step, self.chf_step, dtype=tf.float32) * self.chf_tik
        plane = tf.stack(tf.meshgrid(plane_x, plane_y), axis=-1)  # Shape: [M, M, 2]
        plane = tf.reshape(plane, [-1, 2])  # Flatten to [M^2, 2] for einsum compatibility

        # Calculate the angle matrix using matrix multiplication
        angle = tf.einsum('ik,k->i', plane, sample_coordinates)  # Shape [M^2, N]
        angle = tf.reshape(angle, [1, -1])  # Add batch dimension for broadcasting

        # Create the real and imaginary CHF templates
        self.real_template = tf.cos(angle)
        self.img_template = tf.sin(angle)

    def call(self, y_true, y_pred):
        if self.real_template is None or self.img_template is None:
            self.build_templates(y_pred.shape)

        # Flatten y_pred and y_true to match template shapes
        batch_size = tf.shape(y_pred)[0]
        flatten_pred = tf.reshape(y_pred, [batch_size, -1])
        flatten_true = tf.reshape(y_true, [batch_size, -1])

        # Broadcast templates to match batch size
        real_template = tf.tile(self.real_template, [batch_size, 1])
        img_template = tf.tile(self.img_template, [batch_size, 1])

        # Compute CHF real and imaginary components
        chf_real = tf.reduce_sum(real_template * flatten_pred, axis=1)
        chf_img = tf.reduce_sum(img_template * flatten_pred, axis=1)
        derived_chf = tf.stack([chf_real, chf_img], axis=-1)

        # Compute CHF for true values
        true_chf_real = tf.reduce_sum(real_template * flatten_true, axis=1)
        true_chf_img = tf.reduce_sum(img_template * flatten_true, axis=1)
        true_chf = tf.stack([true_chf_real, true_chf_img], axis=-1)

        # Calculate CHF loss
        loss = tf.reduce_mean(tf.reduce_sum(tf.square(derived_chf - true_chf), axis=-1))
        return loss




In [4]:
import tensorflow as tf
from tensorflow.keras import layers, Model
import numpy as np
import os
import glob
from PIL import Image
import sys


In [5]:
# Directory paths for the dataset
TRAIN_IMAGES_DIR = "C:\\Users\\hp\\Downloads\\ShanghaiTech_Crowd_Counting_Dataset\\part_A_final\\train_data\\images"
TRAIN_ANNOTATIONS_DIR = "C:\\Users\\hp\\Downloads\\ShanghaiTech_Crowd_Counting_Dataset\\part_A_final\\train_data\\ground_truth_npy"
TEST_IMAGES_DIR = "C:\\Users\\hp\\Downloads\\ShanghaiTech_Crowd_Counting_Dataset\\part_A_final\\test_data\\images"
TEST_ANNOTATIONS_DIR = "C:\\Users\\hp\\Downloads\\ShanghaiTech_Crowd_Counting_Dataset\\part_A_final\\test_data\\ground_truth_npy"


In [6]:
def load_image(image_path):
    image = tf.io.read_file(image_path)
    image = tf.image.decode_jpeg(image, channels=3)
    image = tf.image.convert_image_dtype(image, tf.float32)
    return image

def load_annotation(annotation_path):
    annotation = np.load(annotation_path.numpy().decode("utf-8"))
    return annotation.astype(np.float32)

def process_data(img_path, ann_path, img_size=(256, 256)):
    img = load_image(img_path)
    ann = tf.py_function(load_annotation, [ann_path], tf.float32)
    ann.set_shape([None, None])  # Set shape explicitly
    img = tf.image.resize(img, img_size)
    ann = tf.image.resize(tf.expand_dims(ann, axis=-1), img_size)
    return img, ann

def prepare_datasets(batch_size=4, img_size=(256, 256)):
    train_image_paths = glob.glob(os.path.join(TRAIN_IMAGES_DIR, "*.jpg"))
    train_annotation_paths = [os.path.join(TRAIN_ANNOTATIONS_DIR, "GT_" + os.path.basename(img).replace(".jpg", ".npy")) for img in train_image_paths]
    test_image_paths = glob.glob(os.path.join(TEST_IMAGES_DIR, "*.jpg"))
    test_annotation_paths = [os.path.join(TEST_ANNOTATIONS_DIR, "GT_" + os.path.basename(img).replace(".jpg", ".npy")) for img in test_image_paths]
    
    train_data = tf.data.Dataset.from_tensor_slices((train_image_paths, train_annotation_paths))
    train_data = train_data.map(lambda img, ann: process_data(img, ann, img_size), num_parallel_calls=tf.data.AUTOTUNE)
    train_data = train_data.batch(batch_size).prefetch(tf.data.AUTOTUNE)
    
    test_data = tf.data.Dataset.from_tensor_slices((test_image_paths, test_annotation_paths))
    test_data = test_data.map(lambda img, ann: process_data(img, ann, img_size), num_parallel_calls=tf.data.AUTOTUNE)
    test_data = test_data.batch(batch_size).prefetch(tf.data.AUTOTUNE)
    
    return train_data, test_data


In [7]:
def vgg19(input_shape=(256, 256, 3)):
    config = [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 256, 'M', 512, 512, 512, 512, 'M', 512, 512, 512, 512]
    inputs = layers.Input(shape=input_shape)
    x = inputs
    for v in config:
        if v == 'M':
            x = layers.MaxPooling2D(pool_size=(2, 2), strides=(2, 2))(x)
        else:
            x = layers.Conv2D(v, kernel_size=(3, 3), padding='same')(x)
            x = layers.ReLU()(x)
    x = layers.Conv2D(256, kernel_size=(3, 3), padding='same')(x)
    x = layers.ReLU()(x)
    x = layers.Conv2D(128, kernel_size=(3, 3), padding='same')(x)
    x = layers.ReLU()(x)
    x = layers.Conv2D(1, kernel_size=(1, 1))(x)
    x = layers.Lambda(lambda x: tf.abs(x))(x)
    model = Model(inputs=inputs, outputs=x)
    return model


In [8]:
class CHFLoss(tf.keras.losses.Loss):
    def __init__(self, chf_step=30, chf_tik=0.01, sample_interval=0.1, is_dense=True):
        super(CHFLoss, self).__init__()
        self.chf_step = chf_step
        self.chf_tik = chf_tik
        self.sample_interval = sample_interval
        self.is_dense = is_dense
        self.real_template = None
        self.img_template = None

    def build_templates(self, dnn_output_shape):
        sample_coordinates = tf.range(-self.chf_step, self.chf_step, delta=self.sample_interval, dtype=tf.float32)
        sample_coordinates = tf.reshape(sample_coordinates, [-1, 1])
        plane_x = tf.range(-self.chf_step, self.chf_step, dtype=tf.float32) * self.chf_tik
        plane_y = tf.range(-self.chf_step, self.chf_step, dtype=tf.float32) * self.chf_tik
        plane = tf.stack(tf.meshgrid(plane_x, plane_y), axis=-1)
        plane = tf.reshape(plane, [-1, 2])
        angle = tf.einsum('ij,jk->ik', plane, sample_coordinates)
        self.real_template = tf.expand_dims(tf.cos(angle), axis=0)
        self.img_template = tf.expand_dims(tf.sin(angle), axis=0)

    def call(self, y_true, y_pred):
        if self.real_template is None or self.img_template is None:
            self.build_templates(y_pred.shape)
        batch_size = tf.shape(y_pred)[0]
        flatten_pred = tf.reshape(y_pred, [batch_size, -1])
        flatten_real_template = tf.tile(self.real_template, [batch_size, 1, 1])
        flatten_img_template = tf.tile(self.img_template, [batch_size, 1, 1])
        chf_real = tf.reduce_sum(flatten_real_template * tf.expand_dims(flatten_pred, axis=-1), axis=1)
        chf_img = tf.reduce_sum(flatten_img_template * tf.expand_dims(flatten_pred, axis=-1), axis=1)
        derived_chf = tf.stack([chf_real, chf_img], axis=-1)
        flatten_true = tf.reshape(y_true, [batch_size, -1])
        true_chf_real = tf.reduce_sum(flatten_real_template * tf.expand_dims(flatten_true, axis=-1), axis=1)
        true_chf_img = tf.reduce_sum(flatten_img_template * tf.expand_dims(flatten_true, axis=-1), axis=1)
        true_chf = tf.stack([true_chf_real, true_chf_img], axis=-1)
        loss = tf.reduce_mean(tf.reduce_sum(tf.square(derived_chf - true_chf), axis=-1))
        return loss


In [9]:
def train_model(model, train_data, test_data, chf_loss, optimizer, epochs=10):
    for epoch in range(epochs):
        print(f"\nEpoch {epoch + 1}/{epochs}")
        for step, (x_batch_train, y_batch_train) in enumerate(train_data):
            with tf.GradientTape() as tape:
                predictions = model(x_batch_train, training=True)
                predictions_resized = tf.image.resize(predictions, [256, 256])
                loss_value = chf_loss(y_batch_train, predictions_resized)
            grads = tape.gradient(loss_value, model.trainable_variables)
            optimizer.apply_gradients(zip(grads, model.trainable_variables))
            if step % 10 == 0:
                print(f"Step {step} - Loss: {loss_value.numpy():.4f}")
        mse_metric = tf.keras.metrics.MeanSquaredError()
        mae_metric = tf.keras.metrics.MeanAbsoluteError()
        for x_batch_test, y_batch_test in test_data:
            test_predictions = model(x_batch_test, training=False)
            test_predictions_resized = tf.image.resize(test_predictions, [256, 256])
            mse_metric.update_state(y_batch_test, test_predictions_resized)
            mae_metric.update_state(y_batch_test, test_predictions_resized)
        print(f"Epoch {epoch + 1} - MSE: {mse_metric.result().numpy()}, MAE: {mae_metric.result().numpy()}")


In [2]:
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, Model
import glob
import matplotlib.pyplot as plt

# Paths for dataset
TRAIN_IMAGES_DIR = "C:\\Users\\hp\\Downloads\\ShanghaiTech_Crowd_Counting_Dataset\\part_A_final\\train_data\\images"
TRAIN_ANNOTATIONS_DIR = "C:\\Users\\hp\\Downloads\\ShanghaiTech_Crowd_Counting_Dataset\\part_A_final\\train_data\\ground_truth_npy"
TEST_IMAGES_DIR = "C:\\Users\\hp\\Downloads\\ShanghaiTech_Crowd_Counting_Dataset\\part_A_final\\test_data\\images"
TEST_ANNOTATIONS_DIR = "C:\\Users\\hp\\Downloads\\ShanghaiTech_Crowd_Counting_Dataset\\part_A_final\\test_data\\ground_truth_npy"

# Data loader functions
def load_image(image_path):
    img = tf.io.read_file(image_path)
    img = tf.image.decode_jpeg(img, channels=3)
    img = tf.image.convert_image_dtype(img, tf.float32)
    return img

def load_annotation(annotation_path):
    annotation = np.load(annotation_path.numpy().decode("utf-8"))
    return annotation.astype(np.float32)

def process_data(img_path, ann_path, img_size):
    img = load_image(img_path)
    ann = tf.py_function(load_annotation, [ann_path], tf.float32)
    ann.set_shape([None, None])
    img = tf.image.resize(img, img_size)
    ann = tf.image.resize(tf.expand_dims(ann, -1), img_size)
    return img, ann

def prepare_datasets(batch_size=4, img_size=(256, 256)):
    train_images = sorted(glob.glob(f"{TRAIN_IMAGES_DIR}/*.jpg"))
    train_annotations = sorted(glob.glob(f"{TRAIN_ANNOTATIONS_DIR}/*.npy"))

    test_images = sorted(glob.glob(f"{TEST_IMAGES_DIR}/*.jpg"))
    test_annotations = sorted(glob.glob(f"{TEST_ANNOTATIONS_DIR}/*.npy"))

    train_data = tf.data.Dataset.from_tensor_slices((train_images, train_annotations))
    train_data = train_data.map(lambda x, y: process_data(x, y, img_size), num_parallel_calls=tf.data.AUTOTUNE)
    train_data = train_data.batch(batch_size).prefetch(tf.data.AUTOTUNE)

    test_data = tf.data.Dataset.from_tensor_slices((test_images, test_annotations))
    test_data = test_data.map(lambda x, y: process_data(x, y, img_size), num_parallel_calls=tf.data.AUTOTUNE)
    test_data = test_data.batch(batch_size).prefetch(tf.data.AUTOTUNE)

    return train_data, test_data

# Define VGG model with custom absolute layer
class AbsoluteLayer(layers.Layer):
    def call(self, inputs):
        return tf.abs(inputs)

def vgg19(input_shape=(256, 256, 3)):
    config = [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 256, 'M', 512, 512, 512, 512, 'M', 512, 512, 512, 512]
    inputs = layers.Input(shape=input_shape)
    x = inputs
    for v in config:
        if v == 'M':
            x = layers.MaxPooling2D(pool_size=(2, 2), strides=(2, 2))(x)
        else:
            x = layers.Conv2D(v, kernel_size=(3, 3), padding='same')(x)
            x = layers.ReLU()(x)
    x = layers.Conv2D(256, kernel_size=(3, 3), padding='same')(x)
    x = layers.ReLU()(x)
    x = layers.Conv2D(128, kernel_size=(3, 3), padding='same')(x)
    x = layers.ReLU()(x)
    x = layers.Conv2D(1, kernel_size=(1, 1))(x)
    x = AbsoluteLayer()(x)
    model = Model(inputs=inputs, outputs=x)
    return model

# CHF Loss
class CHFLoss(tf.keras.losses.Loss):
    def __init__(self, chf_step=30, chf_tik=0.01):
        super(CHFLoss, self).__init__()
        self.chf_step = chf_step
        self.chf_tik = chf_tik

    def chf_calculation(self, y_flat):
        freqs = tf.range(-self.chf_step, self.chf_step, dtype=tf.float32) * self.chf_tik
        freq_grid = tf.reshape(freqs, [1, -1])
        y_flat_expanded = tf.expand_dims(y_flat, -1)
        chf_real = tf.reduce_sum(tf.cos(tf.matmul(y_flat_expanded, freq_grid)), axis=1)
        chf_img = tf.reduce_sum(tf.sin(tf.matmul(y_flat_expanded, freq_grid)), axis=1)
        return chf_real, chf_img

    def call(self, y_true, y_pred):
        y_true_flat = tf.reshape(y_true, [tf.shape(y_true)[0], -1])
        y_pred_flat = tf.reshape(y_pred, [tf.shape(y_pred)[0], -1])
        true_chf_real, true_chf_img = self.chf_calculation(y_true_flat)
        pred_chf_real, pred_chf_img = self.chf_calculation(y_pred_flat)
        real_loss = tf.reduce_mean(tf.square(true_chf_real - pred_chf_real))
        img_loss = tf.reduce_mean(tf.square(true_chf_img - pred_chf_img))
        return (real_loss + img_loss) / 1_000_000  # Scale loss

# Training function with scaling for losses and metrics
def train_model(model, train_data, test_data, chf_loss, optimizer, epochs=10):
    mse_metric = tf.keras.metrics.MeanSquaredError()
    mae_metric = tf.keras.metrics.MeanAbsoluteError()

    for epoch in range(epochs):
        print(f"\nEpoch {epoch + 1}/{epochs}")
        for step, (x_batch, y_batch) in enumerate(train_data):
            with tf.GradientTape() as tape:
                predictions = model(x_batch, training=True)
                loss = chf_loss(y_batch, predictions)

            grads = tape.gradient(loss, model.trainable_weights)
            optimizer.apply_gradients(zip(grads, model.trainable_weights))

            if step % 10 == 0:
                print(f"Step {step} - Loss: {loss:.4f}")

        mse_metric.reset_state()
        mae_metric.reset_state()
        for x_batch_test, y_batch_test in test_data:
            test_predictions = model(x_batch_test, training=False)
            test_predictions_resized = tf.image.resize(test_predictions, [256, 256])

            mse_metric.update_state(y_batch_test, test_predictions_resized)
            mae_metric.update_state(y_batch_test, test_predictions_resized)

        # Print scaled metrics
        mse_scaled = mse_metric.result().numpy() / 100
        mae_scaled = mae_metric.result().numpy() / 10
        print(f"Epoch {epoch + 1} - MSE: {mse_scaled}, MAE: {mae_scaled}")

# Visualization function for density maps
def visualize_predictions(model, test_data, save_dir='predictions'):
    import os
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)
    for i, (x_batch_test, _) in enumerate(test_data.take(5)):  # visualize 5 test samples
        pred_density_map = model(x_batch_test, training=False)
        pred_density_map_resized = tf.image.resize(pred_density_map, [256, 256]).numpy()

        for j, density_map in enumerate(pred_density_map_resized):
            plt.imshow(density_map.squeeze(), cmap='jet')
            plt.colorbar()
            plt.title(f"Predicted Density Map {i}_{j}")
            plt.savefig(f"{save_dir}/density_map_{i}_{j}.png")
            plt.close()

# Initialize datasets, model, CHF loss, and optimizer
train_data, test_data = prepare_datasets(batch_size=4, img_size=(256, 256))
model = vgg19()
chf_loss = CHFLoss(chf_step=30, chf_tik=0.01)
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-5)

# Train the model
train_model(model, train_data, test_data, chf_loss, optimizer, epochs=10)

# Visualize predictions
visualize_predictions(model, test_data)



Epoch 1/10
Step 0 - Loss: 93.7389
Step 10 - Loss: 115.3169
Step 20 - Loss: 94.5600
Step 30 - Loss: 94.0384
Step 40 - Loss: 107.8531
Step 50 - Loss: 97.1957
Step 60 - Loss: 110.9271
Step 70 - Loss: 110.1096
Epoch 1 - MSE: 632.493828125, MAE: 19.49561309814453

Epoch 2/10
Step 0 - Loss: 93.3881
Step 10 - Loss: 114.5573
Step 20 - Loss: 94.0496
Step 30 - Loss: 93.6874
Step 40 - Loss: 107.0565
Step 50 - Loss: 96.8008
Step 60 - Loss: 110.8149
Step 70 - Loss: 109.8749
Epoch 2 - MSE: 665.335859375, MAE: 19.201513671875

Epoch 3/10
Step 0 - Loss: 93.3804
Step 10 - Loss: 114.5897
Step 20 - Loss: 94.0521
Step 30 - Loss: 93.7049
Step 40 - Loss: 107.1218
Step 50 - Loss: 96.9099
Step 60 - Loss: 110.8794
Step 70 - Loss: 110.1371
Epoch 3 - MSE: 688.03234375, MAE: 19.402401733398438

Epoch 4/10
Step 0 - Loss: 93.4093
Step 10 - Loss: 114.4519
Step 20 - Loss: 94.0513
Step 30 - Loss: 93.6781
Step 40 - Loss: 107.0361
Step 50 - Loss: 96.8038
Step 60 - Loss: 110.8105
Step 70 - Loss: 109.8761
Epoch 4 - MSE: 

In [3]:
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, Model
import glob
import os
import matplotlib.pyplot as plt

# Paths for dataset
TRAIN_IMAGES_DIR = "C:\\Users\\hp\\Downloads\\ShanghaiTech_Crowd_Counting_Dataset\\part_A_final\\train_data\\images"
TRAIN_ANNOTATIONS_DIR = "C:\\Users\\hp\\Downloads\\ShanghaiTech_Crowd_Counting_Dataset\\part_A_final\\train_data\\ground_truth_npy"
TEST_IMAGES_DIR = "C:\\Users\\hp\\Downloads\\ShanghaiTech_Crowd_Counting_Dataset\\part_A_final\\test_data\\images"
TEST_ANNOTATIONS_DIR = "C:\\Users\\hp\\Downloads\\ShanghaiTech_Crowd_Counting_Dataset\\part_A_final\\test_data\\ground_truth_npy"

# Data loader functions
def load_image(image_path):
    img = tf.io.read_file(image_path)
    img = tf.image.decode_jpeg(img, channels=3)
    img = tf.image.convert_image_dtype(img, tf.float32)
    return img

def load_annotation(annotation_path):
    annotation = np.load(annotation_path.numpy().decode("utf-8"))
    return annotation.astype(np.float32)

def process_data(img_path, ann_path, img_size):
    img = load_image(img_path)
    ann = tf.py_function(load_annotation, [ann_path], tf.float32)
    ann.set_shape([None, None])
    img = tf.image.resize(img, img_size)
    ann = tf.image.resize(tf.expand_dims(ann, -1), img_size)
    return img, ann

def prepare_datasets(batch_size=8, img_size=(256, 256)):
    train_images = sorted(glob.glob(f"{TRAIN_IMAGES_DIR}/*.jpg"))
    train_annotations = sorted(glob.glob(f"{TRAIN_ANNOTATIONS_DIR}/*.npy"))
    test_images = sorted(glob.glob(f"{TEST_IMAGES_DIR}/*.jpg"))
    test_annotations = sorted(glob.glob(f"{TEST_ANNOTATIONS_DIR}/*.npy"))

    train_data = tf.data.Dataset.from_tensor_slices((train_images, train_annotations))
    train_data = train_data.map(lambda x, y: process_data(x, y, img_size), num_parallel_calls=tf.data.AUTOTUNE)
    train_data = train_data.batch(batch_size).prefetch(tf.data.AUTOTUNE)

    test_data = tf.data.Dataset.from_tensor_slices((test_images, test_annotations))
    test_data = test_data.map(lambda x, y: process_data(x, y, img_size), num_parallel_calls=tf.data.AUTOTUNE)
    test_data = test_data.batch(batch_size).prefetch(tf.data.AUTOTUNE)

    return train_data, test_data

# Define VGG model with modified regression head
class AbsoluteLayer(layers.Layer):
    def call(self, inputs):
        return tf.abs(inputs)

def vgg19(input_shape=(256, 256, 3)):
    config = [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 256, 'M', 512, 512, 512, 512, 'M', 512, 512, 512, 512]
    inputs = layers.Input(shape=input_shape)
    x = inputs
    for v in config:
        if v == 'M':
            x = layers.MaxPooling2D(pool_size=(2, 2), strides=(2, 2))(x)
        else:
            x = layers.Conv2D(v, kernel_size=(3, 3), padding='same')(x)
            x = layers.ReLU()(x)
    # Custom regression head
    x = layers.Conv2D(256, kernel_size=(3, 3), padding='same')(x)
    x = layers.ReLU()(x)
    x = layers.Conv2D(128, kernel_size=(3, 3), padding='same')(x)
    x = layers.ReLU()(x)
    x = layers.Conv2D(1, kernel_size=(1, 1))(x)
    x = AbsoluteLayer()(x)
    model = Model(inputs=inputs, outputs=x)
    return model

# CHF Loss with fine-tuned parameters
class CHFLoss(tf.keras.losses.Loss):
    def __init__(self, chf_step=30, chf_tik=0.01, integral_range=0.3, grid_granularity=0.01):
        super(CHFLoss, self).__init__()
        self.chf_step = chf_step
        self.chf_tik = chf_tik
        self.integral_range = integral_range
        self.grid_granularity = grid_granularity

    def chf_calculation(self, y_flat):
        freqs = tf.range(-self.chf_step, self.chf_step, dtype=tf.float32) * self.chf_tik
        freq_grid = tf.reshape(freqs, [1, -1])
        y_flat_expanded = tf.expand_dims(y_flat, -1)
        chf_real = tf.reduce_sum(tf.cos(tf.matmul(y_flat_expanded, freq_grid)), axis=1)
        chf_img = tf.reduce_sum(tf.sin(tf.matmul(y_flat_expanded, freq_grid)), axis=1)
        return chf_real, chf_img

    def call(self, y_true, y_pred):
        y_true_flat = tf.reshape(y_true, [tf.shape(y_true)[0], -1])
        y_pred_flat = tf.reshape(y_pred, [tf.shape(y_pred)[0], -1])
        true_chf_real, true_chf_img = self.chf_calculation(y_true_flat)
        pred_chf_real, pred_chf_img = self.chf_calculation(y_pred_flat)
        real_loss = tf.reduce_mean(tf.square(true_chf_real - pred_chf_real))
        img_loss = tf.reduce_mean(tf.square(true_chf_img - pred_chf_img))
        return real_loss + img_loss

# Trainer Class with updated evaluation
class CrowdCountingTrainer:
    def __init__(self, model, train_data, test_data, optimizer, chf_loss, epochs=10):
        self.model = model
        self.train_data = train_data
        self.test_data = test_data
        self.optimizer = optimizer
        self.chf_loss = chf_loss
        self.epochs = epochs
        self.mse_metric = tf.keras.metrics.MeanSquaredError()
        self.mae_metric = tf.keras.metrics.MeanAbsoluteError()

    def train_epoch(self):
        for step, (x_batch, y_batch) in enumerate(self.train_data):
            with tf.GradientTape() as tape:
                predictions = self.model(x_batch, training=True)
                loss = self.chf_loss(y_batch, predictions) / 1e6  # Scale for readability
            grads = tape.gradient(loss, self.model.trainable_weights)
            self.optimizer.apply_gradients(zip(grads, self.model.trainable_weights))
            if step % 10 == 0:
                print(f"Step {step} - Loss: {loss:.4f}")

    def evaluate(self):
        self.mse_metric.reset_state()
        self.mae_metric.reset_state()
        for x_batch_test, y_batch_test in self.test_data:
            test_predictions = self.model(x_batch_test, training=False)
            test_predictions_resized = tf.image.resize(test_predictions, [256, 256])
            self.mse_metric.update_state(y_batch_test, test_predictions_resized)
            self.mae_metric.update_state(y_batch_test, test_predictions_resized)
        print(f"MSE: {self.mse_metric.result().numpy() }, MAE: {self.mae_metric.result().numpy() }")

    def train(self):
        for epoch in range(self.epochs):
            print(f"\nEpoch {epoch + 1}/{self.epochs}")
            self.train_epoch()
            if (epoch + 1) % 2 == 0:  # Save model and evaluate every 2 epochs
                self.evaluate()

# Model Training and Evaluation
train_data, test_data = prepare_datasets(batch_size=8, img_size=(256, 256))
model = vgg19()
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-5)
chf_loss = CHFLoss(chf_step=30, chf_tik=0.01, integral_range=0.3, grid_granularity=0.01)

trainer = CrowdCountingTrainer(model, train_data, test_data, optimizer, chf_loss, epochs=10)
trainer.train()



Epoch 1/10
Step 0 - Loss: 102.2162
Step 10 - Loss: 96.8978
Step 20 - Loss: 108.1253
Step 30 - Loss: 105.9141

Epoch 2/10
Step 0 - Loss: 102.2069
Step 10 - Loss: 96.6600
Step 20 - Loss: 107.6529
Step 30 - Loss: 105.5431
MSE: 726.014609375, MAE: 19.927342224121094

Epoch 3/10
Step 0 - Loss: 101.7451


KeyboardInterrupt: 