In [2]:
import cv2
import matplotlib.pyplot as plt
import numpy as np
from skimage import measure
import csv
from collections import defaultdict
import time
import tracemalloc
import os

## Preprocess Image
---
1. Loads the image directly in grayscale. It is a simple and fast conversion
2. Enhances the local contrast in the image:
> Using a medium-size tileGridSize for a contrast that is neither too local nor too global.
> Using a medium-size clipLimit to increase the contrast, but it is not a high value so as not to cause excessive noise, but keep in mind that some people can be seen incompletely or far away, so a low value does not work.

3. Apply Gaussian: We want to reduce noise, but since the people are small silhouettes, we don't want their details to disappear, so we set the kernel smaller. The standard deviation is set to 0 for automatic adjustment. This setting consists of OpenCV calculating a value based on the kernel size, balancing smoothing and detail preservation.
4. Maintain detail in darker areas while controlling the effect of the overexposed regions. thresh 127 is balanced, if it is smaller it causes overexposure, and if it is larger it causes more “noise”.

In [3]:
def load_image(image_name, show_image=False):
    image_path=f"data/{image_name}"
    image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
    if image is None:
        print(f"The image cannot be loaded: {image_path}")
        return None
    if show_image:
        plt.imshow(image_path)
    return image

def clahe(image: np.ndarray,show_image=False, clip_limit=20.0, grid_size=(14, 14)) -> np.ndarray:
    """Contrast-limited adaptive histogram equalization."""
    image = image.astype('uint8')   # Ensure that the image is of type uint8
    clahe = cv2.createCLAHE(clipLimit=clip_limit, tileGridSize=grid_size)
    if show_image:
        plt.title('Clahe')
        plt.imshow(clahe)
    return clahe.apply(image)

def preprocess_image(image_name,show_image=False):
    image_grayscale = load_image(image_name,show_image)
    if image_grayscale is None:
        return None

    img_equalized = clahe(image_grayscale,show_image)
    blurred_image = cv2.GaussianBlur(img_equalized, (3, 3), 0)
    _, binary_image = cv2.threshold(blurred_image, 127, 255, cv2.THRESH_TRUNC)
    return binary_image

### Performance
---

To evaluate the algorithm using [wrapper](https://stackoverflow.com/questions/36610950/passing-kwargs-received-in-a-wrapper-function-definition-to-arguments-of-an-e).  
- tracemalloc to monitor memory.
- time for time
- os for CPU cycles

In [4]:
def measure_performance(func):
    def wrapper(*args, **kwargs):
        tracemalloc.start()
        start_time = time.perf_counter()
        start_cpu = os.times().user
        result = func(*args, **kwargs)
        end_cpu = os.times().user
        end_time = time.perf_counter()
        current, peak = tracemalloc.get_traced_memory()
        tracemalloc.stop()
        print(f"Function {func.__name__} took {end_time - start_time:.4f} seconds")
        print(f"CPU cycles used: {end_cpu - start_cpu:.4f}")
        print(f"Current memory usage is {current / 10**6:.4f} MB; Peak was {peak / 10**6:.4f} MB")
        return result
    return wrapper

## Extract differences
---
1. The XOR operator will highlight the differences between the two images, which in this case will be the additional objects on the background
2. Apply opening to remove small noise

In [5]:
def segmentation_foreground(binary_image: np.ndarray, binary_bckg_image: np.ndarray,show_image=False):
    segmented_image = cv2.bitwise_xor(binary_bckg_image, binary_image)
    # Delete small differences
    kernel = np.ones((5, 5), np.uint8)
    refined_image = cv2.morphologyEx(segmented_image, cv2.MORPH_OPEN, kernel)
    return refined_image

## Detect edges
---
1. Detect edges and using "adhoc" numbers
2. Enhance edges improve to separate them from the background

In [6]:
def edge_detection(image: np.ndarray,show_image=False, min_threshold=230, max_threshold=180):
    edges = cv2.Canny(image, min_threshold, max_threshold)
    intensified_edges = cv2.Laplacian(edges, cv2.CV_64F)
    return intensified_edges

## Obtain person detected coordinates
---
1. Convert image from float64 to 8uint and 1 channel to be used by findContours
2. Obtain contours, and filter by the perimeter size
3. Obtain the center points of the contours

In [None]:
def obtain_contours(image: np.ndarray,show_image=False):
    image_8b_c1 = cv2.convertScaleAbs(image, alpha=(255.0/np.max(image)))
    contours, _ = cv2.findContours(image_8b_c1, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    min_perimeter = 50
    filtered_contours = [cnt for cnt in contours if cv2.arcLength(cnt, True) > min_perimeter]
    return filtered_contours

def obtain_center_points(filtered_contours,show_image=False):
    y_min = 420
    contour_points = []
    for contorno in filtered_contours:
        M = cv2.moments(contorno)
        if M['m00'] != 0:
            cx = int(M['m10'] / M['m00'])
            cy = int(M['m01'] / M['m00'])
            if cy > y_min:
                contour_points.append((cx, cy))
    return contour_points

In [8]:
@measure_performance
def process_image(image_path, bckg_image,show_image=False):
    binary_image = preprocess_image(image_path)
    if binary_image is None:
        return None

    segmented_image = segmentation_foreground(binary_image, bckg_image,show_image)
    edges = edge_detection(segmented_image,show_image)
    contours = obtain_contours(edges,show_image)
    center_points = obtain_center_points(contours,show_image)
    return {
        'image': image_path,
        'count': len(center_points),
        'points': center_points
    }

In [9]:
def process_images(image_names, bckg_image,show_image=False):
    processed_images = []
    for name in image_names:
        processed_image = process_image(name, bckg_image,show_image)
        if processed_image is not None:
            processed_images.append(processed_image)
    return processed_images

## Obtain ground truth data

In [17]:
def read_manual_annotations(input_file,with_headers=False):
    file_path=f"data/{input_file}"
    # Diccionario para almacenar los datos
    data = defaultdict(lambda: {'people_count': 0, 'coordinates': []})
    # No headers
    with open(file_path, 'r', newline='') as csvfile:
        reader = csv.reader(csvfile)
        for row in reader:
            if with_headers:
                name = row['name']
                x = int(row['x'])
                y = int(row['y'])
            else:  
                # Extract values based on the order: label, x, y, name, height, width
                name = row[3]
                x = int(row[1])
                y = int(row[2])
            data[name]['people_count'] += 1
            data[name]['coordinates'].append((x, y))
    return data

## Validation
---
True Positives (TP):
- A reference point (manual) is considered correctly detected (TP) if there is at least one detection point at a distance of 30 pixels on the Y-axis and 12 pixels on the X-axis.
- Only the detection point closest to the reference point is counted as a TP to avoid inflating the number of TPs.

False Negatives (FN):
- A fiducial point is considered not detected (FN) if there is no detection point within the limits of 30 pixels on the Y-axis and 12 pixels on the X-axis.

False Positive (FP):
- A detection point is considered a false positive (FP) if it is not within the limits of 30 pixels on the Y-axis and 12 pixels on the X-axis of any reference point.

Calculation Steps:
- Initialization: point lists are converted to Numpy arrays and counters for TP, FP and FN are initialized.
- Manual Point Traversal: For each manual point, the closest detection point that meets the distance thresholds in X and Y is searched.
- Closeness Check: If a close detection point is found, the TP counter is incremented and that detection point is marked as used.
- False Negative Detection: If no nearby detection point is found, the FN counter is incremented.
- False Positive Calculation: All unused detection points are counted as FP.

True Positives (TP):  
- A detected point is considered a true positive if it is close enough to a manual point. In this case, “close enough” means that the distance on the y-axis is less than or equal to 30 pixels and the distance on the x-axis is less than or equal to 12 pixels. If a detected point meets these criteria for a manual point, it is counted as a TP.

False Negatives (FN):
- A manual point is considered a false negative if no detected point is close enough to it (according to the criteria mentioned above). This means that the algorithm did not detect a person who was present according to the manual count.

False Positives (FP):
- A detected point is considered a false positive if it has not been matched with any manual point. 


In [11]:
def calculate_tp_fp_fn(manual_points, detected_points, y_thresh=30, x_thresh=12):
    manual_points = np.array(manual_points)
    detected_points = np.array(detected_points)
    tp = 0
    fp = 0
    fn = 0
    detected_used = np.zeros(len(detected_points), dtype=bool)
    for manual_point in manual_points:
        detected = False
        closest_index = None
        closest_distance = float('inf')
        
        for i, detected_point in enumerate(detected_points):
            if not detected_used[i]:
                distance_y = abs(manual_point[1] - detected_point[1])
                distance_x = abs(manual_point[0] - detected_point[0])
                if distance_y <= y_thresh and distance_x <= x_thresh:
                    distance = np.sqrt(distance_y ** 2 + distance_x ** 2)
                    if distance < closest_distance:
                        closest_distance = distance
                        closest_index = i
                        detected = True
        
        if detected and closest_index is not None:
            tp += 1
            detected_used[closest_index] = True
        else:
            fn += 1

    fp = np.sum(~detected_used)

    return tp, fp, fn

Precision: It is the proportion of true positives (TP) over the total positive predictions (TP + FP).  
Recall: It is the proportion of true positives over the total number of instances that are truly positive (TP + FN).  
F1 Score: It is the harmonic mean of accuracy and completeness, and is calculated as follows.

In [12]:
def calculate_F1score_recall_precision(manual_points, detected_points):
    tp, fp, fn=calculate_tp_fp_fn(manual_points,detected_points)
    precision=tp/(tp+fp)
    recall=tp/(tp+fn)
    f1score=2*((precision*recall)/(precision+recall))
    return precision, recall, f1score
    

In [13]:
def validation(manual_results, results):
    comparison = []
    for result in results:
        image_name = result['image']
        manual_data = manual_results.get(image_name, {'people_count': 0, 'coordinates': []})
        manual_count = manual_data['people_count']
        manual_points = manual_data['coordinates']
        detected_count = result['count']
        detected_points = result['points']
        precision, recall, f1score=calculate_F1score_recall_precision(manual_points,detected_points)
        mse=np.mean((manual_count - detected_count) ** 2)#sustraendo debe ser la predicción
        comparison.append({
            'image': image_name,
            'manual_count': manual_count,
            'detected_count': detected_count,
            'MSE':mse,
            'recall':recall,
            'precision':precision,
            'f1score': f1score
        })
    return comparison

### MAIN
---

In [19]:
manual_path = "manual_alt.csv"
manual_results = read_manual_annotations(manual_path)
bckg_image_name = '1660284000.jpg'
bckg_image = preprocess_image(bckg_image_name)
image_names = ["1660287600.jpg", "1660294800.jpg","1660320000.jpg"]

results = process_images(image_names, bckg_image)
comparison = validation(manual_results, results)

for comp in comparison:
    print(f"Image: {comp['image']}, Manual Count: {comp['manual_count']}, Detected Count: {comp['detected_count']}")
    print(f" MSE Image level: {comp['MSE']}, Recall: {comp['recall']}, precision: {comp['precision']} f1score: {comp['f1score']}")

Function process_image took 0.0295 seconds
CPU cycles used: 0.0400
Current memory usage is 0.0075 MB; Peak was 23.0908 MB
Function process_image took 0.0199 seconds
CPU cycles used: 0.0200
Current memory usage is 0.0089 MB; Peak was 23.1816 MB
Function process_image took 0.0239 seconds
CPU cycles used: 0.0400
Current memory usage is 0.0131 MB; Peak was 23.2914 MB
Image: 1660287600.jpg, Manual Count: 19, Detected Count: 107
 MSE Image level: 7744.0, Recall: 0.42105263157894735, precision: 0.07476635514018691 f1score: 0.12698412698412698
Image: 1660294800.jpg, Manual Count: 66, Detected Count: 139
 MSE Image level: 5329.0, Recall: 0.22727272727272727, precision: 0.1079136690647482 f1score: 0.1463414634146341
Image: 1660320000.jpg, Manual Count: 155, Detected Count: 218
 MSE Image level: 3969.0, Recall: 0.44516129032258067, precision: 0.3165137614678899 f1score: 0.36997319034852544


Notes:  
- Currently, the background image keeps with the 2 persons.
- They are not equally enlightened in the same way; however, on the test done, the contrast is lowered (in the same image).
- Not establishing specific contours for a person (different cases).
. How do you achieve accuracy with the points? Establish a certain distance.

