In [None]:
from PIL import Image
import pandas as pd
from sklearn.metrics import confusion_matrix
import seaborn as sns
import cv2
import numpy as np
import matplotlib.pyplot as plt
from abc import ABC, abstractmethod


# Data Loading

In [None]:
# # run this once no need to run again
# df2 = pd.read_csv('annotation/2.csv', header=None)
# df4 = pd.read_csv('annotation/4.csv', header=None)
# lab1 = pd.read_csv('annotation/labels_my-project-name_2023-11-23-11-28-38.csv', header=None)
# lab2 = pd.read_csv('annotation/labels_my-project-name_2023-11-26-12-25-12.csv', header=None)
# # concat all together
# # run this once to get proper image.
# df = pd.concat([df2, df4, lab1, lab2])
# df.columns = ['label', 'x', 'y', 'image_name', 'image_width', 'image_height']
# df['image_name'] = df['image_name'].apply(lambda name: 'images/' + name)
# df.to_csv('annotation/all.csv')

In [None]:
annotations_df = pd.read_csv('annotation/all.csv', index_col=0)


# Main Code classes

In [None]:


class ImagePreprocessor(ABC):
    @abstractmethod
    def preprocess(self, image):
        pass

    def show_images(self, image):
        prepro_img = self.preprocess(image)
        fig, axes = plt.subplots(1, 2, figsize=(20,20))
        axes = axes.flatten()
        axes[0].imshow(image, cmap='gray', vmin = 0, vmax = 255)
        axes[1].imshow(prepro_img, cmap='gray', vmin = 0, vmax = 255)
        return prepro_img


class BackgroundPreprocessor(ImagePreprocessor):
    def __init__(self, background_points, width=1920, height=1080):
        self.background_points = background_points
        self.background_mask = np.zeros((height, width), dtype=np.uint8)
        cv2.fillPoly(self.background_mask, [self.background_points], color=255)
        self.background_mask = cv2.bitwise_not(self.background_mask)

    def preprocess(self, image):
        # Create a mask for the background points
        return cv2.bitwise_and(image, image, mask=self.background_mask)


class ImageAveragingPreprocessor(ImagePreprocessor):
    def __init__(self, background_images):
        self.background_images = [cv2.GaussianBlur(i, ksize=(7,7), sigmaX=0) for i in background_images]
        self.average_image = np.mean(self.background_images, axis=0).astype(np.uint8)

    def preprocess(self, image):
        return image - self.average_image # image - self.average_image #  cv2.absdiff(image, self.average_image)


class ThresholdingPreprocessor(ImagePreprocessor):
    def __init__(self, lower_threshold, upper_threshold):
        self.lower_threshold = lower_threshold
        self.upper_threshold = upper_threshold

    def preprocess(self, image):
        # Create an inRange mask based on threshold values
        inrange_mask = cv2.inRange(image, self.lower_threshold, self.upper_threshold)
        img = cv2.bitwise_and(image, image, mask=inrange_mask)

        # Apply Otsu's thresholding to further clean the image
        _, img = cv2.threshold(img, self.lower_threshold, self.upper_threshold, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
        return img

In [None]:
class PreprocessingPipeline(ImagePreprocessor):
    def __init__(self, processors):
        self.processors = processors

    def preprocess(self, image):
        for processor in self.processors:
            image = processor.preprocess(image)
        return image

class AndBitPreprocessingPipeline(ImagePreprocessor):
    def __init__(self, pipeline1, pipeline2):
        self.pipeline1 = pipeline1
        self.pipeline2 = pipeline2

    def preprocess(self, image):
        i1, i2 = image.copy(), image.copy()
        i1 = self.pipeline1.preprocess(i1)
        i2 = self.pipeline2.preprocess(i2)
        img = cv2.bitwise_and(i1, i2)
        return img


In [None]:

class GenericDetector(ABC):
    def __init__(self):
        pass

    @abstractmethod
    def detect(self, image):
        """
        Abstract method to detect objects in an image.
        This method should be implemented by subclasses to perform actual detection.

        Parameters:
        image (numpy.ndarray): A grayscale image in which to perform the detection.

        Returns:
        List[Tuple[int, int]]: A list of (x, y) coordinates for each detected object.
        """
        pass

    def show_detections(self, image):
        """
        Perform detection on the image and show the image with detected objects marked.

        Parameters:
        image (numpy.ndarray): The image on which to perform and show detections.
        """
        plt.figure(figsize=(16, 12))
        detections = self.detect(image)
        plt.imshow(image, cmap='gray')
        for x, y in detections:
            plt.scatter(x, y, c='green', s=30)
        plt.show()

class SimpleBlobDetector(GenericDetector):
    def __init__(self,
                 params = None):
        super().__init__()
        self.params = cv2.SimpleBlobDetector_Params() if params is None else params
        self.detector = cv2.SimpleBlobDetector_create(params)

    def detect(self, image):
        # Detect blobs.
        keypoints = self.detector.detect(image)

        # Convert keypoints to (x, y) coordinates
        return [(int(k.pt[0]), int(k.pt[1])) for k in keypoints]

class HOGSvmDetector:
    def __init__(self):
        # Initialize HOG descriptor with default people detector
        self.hog = cv2.HOGDescriptor()
        self.hog.setSVMDetector(cv2.HOGDescriptor_getDefaultPeopleDetector())

    def detect(self, image):
        # Detect people in the image
        boxes, _ = self.hog.detectMultiScale(image, winStride=(8, 8), padding=(8, 8), scale=1.05)
        center_points = [(int(x + w/2), int(y + h/2)) for (x, y, w, h) in boxes]

        return center_points

class HaarCascadeDetector(GenericDetector):
    def __init__(self, cascade_path):
        super().__init__()
        self.cascade = cv2.CascadeClassifier(cascade_path)

    def detect(self, image):
        # Detect objects (faces) in the image
        objects = self.cascade.detectMultiScale(image, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30))

        # Extract the top-left corner coordinates of each detected object
        return [(x, y) for (x, y, w, h) in objects]

class ContourDetector(GenericDetector):
    def __init__(self, threshold=200):
        super().__init__()
        self.threshold = threshold

    def detect(self, image):
        # Detect objects (faces) in the image
        contours, _ = cv2.findContours(image, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        objects = []
        for contour in contours:

            # Ignore small contours (adjust the threshold as needed)
            if cv2.contourArea(contour) < self.threshold:
                continue

            # Calculate the center of mass (centroid) of the contour
            M = cv2.moments(contour)
            if M["m00"] != 0:
                cx = int(M["m10"] / M["m00"])
                cy = int(M["m01"] / M["m00"])
                objects.append((cx, cy))

        return objects



In [None]:


class Evaluation(ABC):
    @abstractmethod
    def evaluate(self, image, ground_truth, predictions):
        pass

    def show_result(self, image, ground_truth, predictions):
        results = self.evaluate(image, ground_truth, predictions)
        matrix = confusion_matrix([1]*len(ground_truth) + [0]*len(predictions),
                                  [1 if result else 0 for result in results])
        sns.heatmap(matrix, annot=True, fmt='g')
        plt.show()

class NearestPointEuclideanEvaluation(Evaluation):
    def __init__(self, threshold):
        self.threshold = threshold

    def evaluate(self, image, ground_truth, predictions):
        tp, fp, tn, fn = 0, 0, 0, len(ground_truth)
        used_ground_truth = set()  # To keep track of matched ground truth points

        for pred in predictions:
            # Find the nearest ground truth point that hasn't been used
            nearest_dist, nearest_gt = None, None
            for gt in ground_truth:
                if gt not in used_ground_truth:
                    dist = np.linalg.norm(np.array(pred) - np.array(gt))
                    if nearest_dist is None or dist < nearest_dist:
                        nearest_dist = dist
                        nearest_gt = gt

            # Check if nearest ground truth is within the threshold
            if nearest_dist is not None and nearest_dist <= self.threshold:
                tp += 1
                fn -= 1
                used_ground_truth.add(nearest_gt)  # Mark this ground truth as used
            else:
                fp += 1

        return tp, fp, tn, fn

class PeopleCount:
    def __init__(self, preprocessing_pipeline, detector, evaluator):
        self.preprocessing_pipeline = preprocessing_pipeline
        self.detector = detector
        self.evaluator = evaluator

    def run(self, dataframe, show_prediction=False):
        detection_results = []
        evaluation_results = []

        for image_name, group in dataframe.groupby('image_name'):
            image = cv2.cvtColor(cv2.imread(image_name), cv2.COLOR_BGR2GRAY)
            preprocessed_image = self.preprocessing_pipeline.preprocess(image)

            detections = self.detector.detect(preprocessed_image)
            ground_truth = list(zip(group['x'], group['y']))
            evaluation = self.evaluator.evaluate(preprocessed_image, ground_truth, detections)

            detection_results.append({
                'image_name': image_name,
                'detections': detections
            })
            evaluation_results.append({
                'image_name': image_name,
                'evaluation': evaluation
            })

            if show_prediction:
                self.show_predictions(cv2.cvtColor(cv2.imread(image_name), cv2.COLOR_BGR2RGB), ground_truth, detections)

        return pd.DataFrame(detection_results), pd.DataFrame(evaluation_results)

    def show_predictions(self, image, ground_truth, detections):
        for x, y in ground_truth:
            cv2.circle(image, (x, y), radius=5, color=(0, 255, 0), thickness=-1)  # Green for ground truth

        for x, y in detections:
            cv2.circle(image, (x, y), radius=5, color=(0, 0, 255), thickness=-1)  # Red for detections
        plt.figure(figsize=(16,12))
        plt.imshow(image)
        plt.title(f"Detections (Red) {len(detections)} vs Ground Truth (Green) {len(ground_truth)}")

In [None]:
def calculate_precision(tp, fp):
    """ Calculate precision based on true positives (TP) and false positives (FP). """
    if tp + fp == 0:
        return 0  # To avoid division by zero
    return tp / (tp + fp)

def calculate_recall(tp, fn):
    """ Calculate recall based on true positives (TP) and false negatives (FN). """
    if tp + fn == 0:
        return 0  # To avoid division by zero
    return tp / (tp + fn)

def display_confusion_matrices(evaluation_results):
    total_confusion_matrix = np.zeros((2, 2), dtype=int)  # For the total confusion matrix
    all_tp, all_fp, all_tn, all_fn = 0, 0, 0, 0
    for index, row in evaluation_results.iterrows():
        tp, fp, tn, fn = row['evaluation']
        all_tp += tp
        all_fp += fp
        all_tn += tn
        all_tn += fn
        precision = calculate_precision(tp=tp, fp=fp)
        recall = calculate_recall(tp=tp, fn=fn)
        confusion_matrix = np.array([[tp, fp], [fn, tn]])

        # Display confusion matrix for each image
        plt.figure(figsize=(5, 4))
        sns.heatmap(confusion_matrix, annot=True, fmt='g', cmap='Blues', yticklabels=['Positive', 'Negative'], xticklabels=['Positive', 'Negative'])
        plt.title(f"Confusion Matrix for {row['image_name']}, Precision {precision:.5f}, Recall {recall:.5f}")
        plt.xlabel('Ground Truth')
        plt.ylabel('Predicted')
        plt.show()

        # Add to total confusion matrix
        total_confusion_matrix += confusion_matrix

    # Display total confusion matrix
    precision = calculate_precision(tp=all_tp, fp=all_fp)
    recall = calculate_recall(tp=all_tp, fn=all_tn)
    plt.figure(figsize=(5, 4))
    sns.heatmap(total_confusion_matrix, annot=True, fmt='g', cmap='Blues', yticklabels=['Positive', 'Negative'], xticklabels=['Positive', 'Negative'])
    plt.title(f"Total Confusion Matrix, Precision {precision:.5f}, Recall {recall:.5f}")
    plt.xlabel('Ground Truth')
    plt.ylabel('Predicted')
    plt.show()


# Experimentations

In [None]:
bg_points = np.array([
    (0, 589),
    (27, 529),
    (97, 530),
    (95, 442),
    (137, 446),
    (223, 440),
    (714, 417),
    (854, 402),
    (1179, 411),
    (1288, 412),
    (1384, 418),
    (1480, 418),
    (1566, 409),
    (1920, 398),
    (1920, 0),
    (0, 0)
])

bg_pro = BackgroundPreprocessor(bg_points)
background1 = cv2.cvtColor(cv2.imread("images/10.jpg"), cv2.COLOR_BGR2GRAY)
background2 = cv2.cvtColor(cv2.imread("images/9.jpg"), cv2.COLOR_BGR2GRAY)
background1 = bg_pro.preprocess(background1)
background2 = bg_pro.preprocess(background2)
average_pro = ImageAveragingPreprocessor([background1])
threshold_pro = ThresholdingPreprocessor(50, 220)

In [None]:
# example of each pipeline
preprocess_pipeline = PreprocessingPipeline([bg_pro, average_pro, threshold_pro])
preprocess_pipeline2 = PreprocessingPipeline([bg_pro, ImageAveragingPreprocessor([background2]), threshold_pro])
andbitpreprocess_pipeline = AndBitPreprocessingPipeline(preprocess_pipeline, preprocess_pipeline2)

In [None]:
# example and how to make blob detector
image_exp = cv2.cvtColor(cv2.imread("images/1.jpg"), cv2.COLOR_BGR2GRAY)
# image_exp = bg_pro.show_images(image_exp)
# image_exp = average_pro.show_images(image_exp)
# image_exp = threshold_pro.show_images(image_exp)
image_exp = andbitpreprocess_pipeline.show_images(image_exp)

params = cv2.SimpleBlobDetector_Params()
# Filter by Area.
params.filterByArea = True
params.minArea = 2
params.maxArea = 100

# Filter by Circularity
params.filterByCircularity = True
params.minCircularity = 0.75

# Filter by Convexity
params.filterByConvexity = True
params.minConvexity = 0.75

# Filter by Inertia
params.filterByInertia = True
params.minInertiaRatio = 0.15
blob_detector = SimpleBlobDetector(params)
blob_detector.show_detections(image_exp)

In [None]:
evaluator = NearestPointEuclideanEvaluation(100)
pc_solo = PeopleCount(preprocessing_pipeline=preprocess_pipeline,
                      detector=blob_detector,
                      evaluator=evaluator)
detection_df_solo, evaluation_result_solo = pc_solo.run(annotations_df, show_prediction=True)
display_confusion_matrices(evaluation_result_solo)

In [None]:
evaluator = NearestPointEuclideanEvaluation(100)
pc_dual = PeopleCount(preprocessing_pipeline=andbitpreprocess_pipeline,
                      detector=blob_detector,
                      evaluator=evaluator)
detection_df_dual, evaluation_result_dual = pc_dual.run(annotations_df, show_prediction=True)
display_confusion_matrices(evaluation_result_dual)

In [None]:
evaluator = NearestPointEuclideanEvaluation(100)
svm_detector = HOGSvmDetector()
pc_dualsvm = PeopleCount(preprocessing_pipeline=andbitpreprocess_pipeline,
                      detector=svm_detector,
                      evaluator=evaluator)
detection_df_dualsvm, evaluation_result_dualsvm = pc_dualsvm.run(annotations_df, show_prediction=True)
display_confusion_matrices(evaluation_result_dualsvm)

In [None]:
evaluator = NearestPointEuclideanEvaluation(100)
contour_detector = ContourDetector()
pc_dualcontour = PeopleCount(preprocessing_pipeline=andbitpreprocess_pipeline,
                         detector=contour_detector,
                         evaluator=evaluator)
detection_df_dualcontour, evaluation_result_dualcontour = pc_dualcontour.run(annotations_df, show_prediction=True)
display_confusion_matrices(evaluation_result_dualcontour)

# initial work

In [None]:
grouped = annotations_df.groupby("image_name")
# Iterate through each group
for image_name, group in grouped:
    # Read the image
    image = Image.open(image_name)  # Load the image using PIL

    # Create a matplotlib figure and axis
    fig, ax = plt.subplots(1)

    # Add points to the image based on "x" and "y" coordinates
    x = group["x"]
    y = group["y"]
    ax.scatter(x, y, color='red', s=10)  # You can customize the point appearance

    # Set axis limits if needed
    image_width = group.iloc[0]['image_width']
    image_height = group.iloc[0]['image_height']
    ax.set_xlim(0, image_width)
    ax.set_ylim(0, image_height)

    # Add labels or annotations if desired
    plt.title(f"Image: {image_name}")
    plt.xlabel("X-coordinate")
    plt.ylabel("Y-coordinate")

    # Show the image with points
    # Display the image with y-axis inverted
    ax.invert_yaxis()  # Invert the y-axis to display the image right-side up
    ax.imshow(image)
    plt.show()


In [None]:


# Load the two background images (images without people)
background1 = cv2.imread("images/10.jpg")
background2 = cv2.imread("images/9.jpg")

# Initialize the background subtractor with the two backgrounds
bg_subtractor = cv2.createBackgroundSubtractorMOG2(history=500, varThreshold=16, detectShadows=True)
bg_subtractor.apply(background1, learningRate=0)
bg_subtractor.apply(background2, learningRate=0)

# Group annotations by image_name
grouped_annotations = annotations_df.groupby("image_name")
# Group annotations by image_name

image_names = []
x_values = []
y_values = []

# Iterate through each group (each image)
for image_name, group in grouped_annotations:
    # Load the current image based on the image_name
    current_image = cv2.imread(image_name)

    # Apply background subtraction
    fg_mask = bg_subtractor.apply(current_image)

    # Post-processing to remove noise and enhance the mask
    fg_mask = cv2.erode(fg_mask, None, iterations=2)
    fg_mask = cv2.dilate(fg_mask, None, iterations=2)

    # Find contours in the foreground mask
    contours, _ = cv2.findContours(fg_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

    # Initialize a count for detected people
    people_count = 0

    for contour in contours:
        # Ignore small contours (adjust the threshold as needed)
        if cv2.contourArea(contour) < 200:
            continue

        # Calculate the center of mass (centroid) of the contour
        M = cv2.moments(contour)
        if M["m00"] != 0:
            cx = int(M["m10"] / M["m00"])
            cy = int(M["m01"] / M["m00"])
        else:
            cx, cy = 0, 0

        # Draw a dot (circle) at the centroid position
        cv2.circle(current_image, (cx, cy), 5, (0, 255, 0), -1)

        # Increment the people count
        people_count += 1

        # Append the detection count, x, and y values to the lists
        image_names.append(image_name)
        x_values.append(cx)
        y_values.append(cy)

    # Draw annotations (e.g., bounding boxes) on the image
    for _, annotation in group.iterrows():
        x, y = annotation['x'], annotation['y']
        cv2.circle(current_image, (x, y), 5, (0, 0, 255), -1)

    # Display the result with the people count and annotations
    cv2.putText(current_image, f"People Count: {people_count}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
    cv2.imshow("Image with People Count and Annotations", current_image)
    # Display the result with the people count and annotations in the Jupyter Notebook
    plt.imshow(cv2.cvtColor(current_image, cv2.COLOR_BGR2RGB))
    plt.title(f"Detection Count: {people_count}, Real Count: {group.shape[0]}")
    plt.axis('off')
    plt.show()

result_df = pd.DataFrame({'x': x_values, 'y': y_values, 'image_name': image_names})
#
# # Release OpenCV windows
# cv2.destroyAllWindows()

In [None]:
# compare annotation with ground truth to see resulting
import numpy as np

# Load your ground truth and detection results DataFrames
annotation_df = pd.read_csv("annotation/all.csv")  # Replace with the actual file paths

# Define the distance threshold for classification
distance_threshold = 5.0  # Adjust the threshold as needed

annotation_df["found"] = False

# Iterate through each row of the result_df DataFrame
annotation_df["found"] = False

# Iterate through each row of the result_df DataFrame
for index, result_row in result_df.iterrows():
    # Calculate the Euclidean distance for each annotation point using apply
    min_dist, idx = annotation_df.apply(
        lambda row: (np.sqrt((row["x"] - result_row["x"]) ** 2 + (row["y"] - result_row["y"]) ** 2), row.name),
        axis=1).sort_values().iloc[0]

    # Check if the minimum distance satisfies the threshold
    if min_dist <= distance_threshold:
        # Mark the nearest annotation as found using the index
        annotation_df.at[idx, "found"] = True

# Count the number of true positives and false positives
true_positives = annotation_df[annotation_df["found"]]["found"].sum()
false_positives = len(result_df) - true_positives

# Count the number of false negatives
false_negatives = len(annotation_df) - true_positives

# Print the results
print("True Positives:", true_positives)
print("False Positives:", false_positives)
print("False Negatives:", false_negatives)
