In [40]:
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch
from torchvision import transforms
from face_alignment import align
from backbones import get_model
from sklearn.metrics.pairwise import cosine_similarity
from ultralytics import YOLO
from PIL import Image
import cv2
import torch.nn.functional as F
from skimage.metrics import structural_similarity as ssim
from torchmetrics.functional import structural_similarity_index_measure as ssim

In [109]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
transform = transforms.Compose([
    transforms.Resize((112, 112)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

In [13]:
def compute_ssim(pil1, pil2):
    img1 = np.array(pil1.resize((112, 112)).convert("L"))
    img2 = np.array(pil2.resize((112, 112)).convert("L"))
    score, _ = ssim(img1, img2, full=True)
    return score

def compare_histogram(pil1, pil2):
    img1 = cv2.cvtColor(np.array(pil1), cv2.COLOR_RGB2HSV)
    img2 = cv2.cvtColor(np.array(pil2), cv2.COLOR_RGB2HSV)
    hist1 = cv2.calcHist([img1], [0, 1], None, [50, 60], [0, 180, 0, 256])
    hist2 = cv2.calcHist([img2], [0, 1], None, [50, 60], [0, 180, 0, 256])
    cv2.normalize(hist1, hist1)
    cv2.normalize(hist2, hist2)
    return cv2.compareHist(hist1, hist2, cv2.HISTCMP_BHATTACHARYYA)




In [7]:
# Load a COCO-pretrained YOLOv8n model
yolo_model = YOLO("yolo11n.pt")
yolo_model = yolo_model.to(device)  # Move the model to GPU if available
## the other model
face_model_name="edgeface_xs_gamma_06" # or edgeface_xs_gamma_06
face_model=get_model(face_model_name)
checkpoint_path=f'checkpoints/{face_model_name}.pt'
face_model.load_state_dict(torch.load(checkpoint_path, map_location='cuda'))
face_model.to(device)  # Move the model to GPU if available
transform = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]),
            ])

# Check the device for yolo_model
yolo_device = next(yolo_model.model.parameters()).device
print(f"YOLO model is on device: {yolo_device}")
# Check the device for face_model
face_device = next(face_model.parameters()).device
print(f"Face model is on device: {face_device}")

YOLO model is on device: cuda:0
Face model is on device: cuda:0


In [None]:
def get_bboxes(image,  person_class = 0):
    results = yolo_model(image)
    idxes = torch.where(results[0].boxes.cls == person_class)
    bboxes = results[0].boxes.xyxy[idxes]
    return bboxes
    
def crop_image_from_bbox(image, bbox):
    """
    Crop a region from a PIL image using a bounding box.
    bbox: [x1, y1, x2, y2] in pixel coordinates
    """
    x1, y1, x2, y2 = map(int, bbox)
    return image.crop((x1, y1, x2, y2))

## get the embeddings for various faces in the image
def get_embeddings(image):
    bboxes = get_bboxes(image)
    # Example: crop the first detected person
    transformed_faces = []
    idexes = []
    for idx, bbox in enumerate(bboxes):
        cropped_person = crop_image_from_bbox(image, bbox)
        aligned_face = align.get_aligned_face_from_image(cropped_person)
        try:
            transformed_face = transform(aligned_face)
            # print(f"Transformed face size: {transformed_face.size()}")
            transformed_faces.append(transformed_face)
            idexes.append(idx)
        except Exception as e:
            # print("Error transforming face, skipping this one.")
            continue

    if transformed_faces:
        transformed = torch.stack(transformed_faces).to(device)
        idxes = torch.tensor(idexes).to(device)
        embeddings = face_model(transformed)
        # transformed = transformed.to(device) # Move to GPU if available
    else:
        embeddings = torch.empty(0, 512).to(device)  # Assuming 512 is embedding size
        idxes = torch.tensor([]).to(device)
    return embeddings, bboxes, idxes

def get_single_emb(image_path):
    aligned = align.get_aligned_face(image_path) # align face
    transformed_input = transform(aligned)
    transformed_input = transformed_input.to(device)
    emb = face_model(transformed_input.unsqueeze(0))
    return emb

In [42]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [142]:
transform = transforms.Compose([
            transforms.Resize((112, 112)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
        ])

In [143]:
pil_img = Image.open("new_test/r1.JPG")

In [144]:
t_out = transform(pil_img)
t_out = t_out.to(device)

In [145]:
t_out.shape

torch.Size([3, 112, 112])

In [135]:
t_out.min()

tensor(-1., device='cuda:0')

In [136]:
def image_(image_path):
    image = Image.open(image_path)
    image = image.resize((112, 112))
    tensor_image = transform(image)
    tensor_image = tensor_image.to(device)
    return tensor_image

def compute_histogram(img, bins=256, range=(-1, 1)):
    """
    Compute a normalized histogram for a grayscale image.
    Args:
        img: (H, W) torch tensor, values in [0, 1]
    """
    hist = torch.histc(img, bins=bins, min=range[0], max=range[1])
    hist = hist / hist.sum()  # Normalize
    return hist

def compare_histograms_cosine(hist1, hist2):
    """
    Compare two histograms using cosine similarity.
    Returns 1.0 if identical, 0.0 if orthogonal.
    """
    return F.cosine_similarity(hist1.unsqueeze(0), hist2.unsqueeze(0)).item()


In [138]:
random_tensor = torch.rand(4, 3, 224, 224, device=device)
print(random_tensor.shape)

torch.Size([4, 3, 224, 224])


In [150]:
h_one = compute_histogram(torch.rand( 3, 224, 224, device=device))

In [151]:
h_one.shape

torch.Size([256])

In [146]:
h = compute_histogram_batch(random_tensor)

In [153]:
F.cosine_similarity(h, h_one.unsqueeze(0))

tensor([0.9943, 0.9942, 0.9945, 0.9944], device='cuda:0')

In [147]:
h.device

device(type='cuda', index=0)

In [None]:
class PersonTracker:
    def __init__(self, face_model_name, yolo_detection_path, refrence_image_path, device='cuda'):
        self.face_model = get_model(face_model_name)
        self.detection_model = YOLO(yolo_detection_path)
        self.device = torch.device(device)
        self.face_model.to(self.device)
        self.detection_model.to(self.device)
        self.detection_model.eval()
        self.face_model.eval()
        self.transform = transforms.Compose([
            transforms.Resize((112, 112)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
        ])
        image = Image.open(refrence_image_path)
        self.align = align.get_aligned_face_from_image
        transformed_input = self.transform(self.align(image))
        self.refrence_embedding = self.face_model(transformed_input.unsqueeze(0).to(device))
        self.cap = None
        self.last_frame = None
        self.last_hist = None

    def get_bboxes(self, image,  person_class = 0):
        results = self.detection_model(image)
        idxes = torch.where(results[0].boxes.cls == person_class)
        bboxes = results[0].boxes.xyxy[idxes]
        return bboxes
    
    def crop_image_from_bbox(self, image, bbox):
        """
        Crop a region from a PIL image using a bounding box.
        bbox: [x1, y1, x2, y2] in pixel coordinates
        """
        x1, y1, x2, y2 = map(int, bbox)
        return image.crop((x1, y1, x2, y2))

    def ssim(self, image):
        if self.last_frame is None:
            self.last_frame = image
            return 1
        score = ssim(self.last_frame, image)
        return score
    
    ### get the image embeddings and 
    def get_embeddings(self, image):
        
        bboxes = self.get_bboxes(image)
        # Example: crop the first detected person
        transformed_faces = []
        idexes = []
        for idx, bbox in enumerate(bboxes):
            cropped_person = self.crop_image_from_bbox(image, bbox)
            aligned_face = self.align(cropped_person)
            try:
                transformed_face = self.transform(aligned_face)
                # print(f"Transformed face size: {transformed_face.size()}")
                transformed_faces.append(transformed_face)
                idexes.append(idx)
            except Exception as e:
                # print("Error transforming face, skipping this one.")
                continue

        if transformed_faces:
            transformed = torch.stack(transformed_faces)
            
            transformed = transformed.to(self.device)
            embeddings = self.face_model(transformed)# Move to GPU if available
        else:

            embeddings = torch.empty(0, 512).to(self.device)
        idxes = torch.tensor(idexes).to(self.device)
        return embeddings, bboxes, idxes, transformed_faces
    
    def compute_histogram_batch(self, imgs, bins=256, value_range=(-1, 1)):
        """
        Compute normalized histograms for a batch of RGB images using vectorized operations.

        Args:
            imgs (Tensor): (N, 3, H, W) image batch with values in [-1, 1]
            bins (int): number of histogram bins
            value_range (tuple): (min, max) range of pixel values

        Returns:
            Tensor: (N, 3, bins) histogram per image and channel
        """
        N, C, H, W = imgs.shape
        device = imgs.device
        min_val, max_val = value_range

        # Flatten spatial dims
        flat = imgs.view(N, -1)  # (N, 3, H*W)

        # Scale values to [0, bins-1]
        scaled = ((flat - min_val) / (max_val - min_val) * (bins - 1)).long()
        scaled = torch.clamp(scaled, 0, bins - 1)  # avoid out-of-bounds

        # Create histograms
        histograms = torch.zeros(N, bins, device=device)

        for b in range(bins):
            histograms[:, b] = (scaled == b).sum(dim=1)

        # Normalize
        histograms = histograms / histograms.sum(dim=1, keepdim=True)

        return histograms

    def start_tracking(self, camera_index =0):
        self.cap = cv2.VideoCapture(camera_index)
        if not self.cap.isOpened():
            print("Error: Could not open camera.")
            return
        while True:
            ret, frame = self.cap.read()
            if not ret:
                print("Failed to grab frame.")
                break

            # Convert frame to PIL for consistency
            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            pil_image = Image.fromarray(frame_rgb)

            # Inference
            try:
                emb, bboxes, idxes = get_embeddings(pil_image)

                if emb.numel() > 0:
                   
                    cos_sim = torch.nn.functional.cosine_similarity(emb, self.refrence_embedding)
                    best_match_idx = torch.argmax(cos_sim)
                    best_box = bboxes[idxes[best_match_idx.item()].item()].detach().cpu().numpy()

                    # Draw bounding box
                    x1, y1, x2, y2 = map(int, best_box)
                    cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 255), 2)
                    cv2.putText(frame, f"Sim: {cos_sim[best_match_idx].item():.2f}",
                                (x1, y1 + 10), cv2.FONT_HERSHEY_SIMPLEX,
                                0.6, (0, 255, 0), 2)
                    last_img = self.crop_image_from_bbox(pil_image, best_box)
                    self.last_frame = self.transform(last_img).to(self.device)
                else:
                    if self.last_frame is None:
                        print("Need a face in frame once to start tracking")
                        ## should restart the loop
                    else : 
                        print("moving to tracker-with-no-face")
                        transformed_last_frame = self.transform(self.last_frame)
                        transformed_last_frame = transformed_last_frame.to(self.device)
                        transformed_curr = []
                        for idx, bbox in enumerate(bboxes):
                            cropped_person = self.crop_image_from_bbox(pil_image, bbox)
                            transformed_event = self.transform(cropped_person)
                            transformed_curr.append(transformed_event)
                        transformed_curr = torch.stack(transformed_curr).to(self.device)
                        hist_score = self.get_distribution_score(transformed_curr)
                        best_match_idx = torch.argmax(hist_score)
                        best_box = bboxes[best_match_idx.item()].detach().cpu().numpy()
                        # Draw bounding box
                        x1, y1, x2, y2 = map(int, best_box)
                        cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
                        cv2.putText(frame, f"Distributed score: {hist_score[best_match_idx].item():.2f}",
                                    (x1, y1 + 10), cv2.FONT_HERSHEY_SIMPLEX,
                                    0.6, (0, 255, 0), 2)
                        self.last_frame = transformed_curr[best_match_idx]
                # Show the frame
                cv2.imshow("Tracking", frame)

            except Exception as e:
                print(f"Error during inference: {e}")

            # Clear unused memory on GPU
            torch.cuda.empty_cache()

            # Exit on 'q'
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

        # Cleanup
        self.cap.release()
        cv2.destroyAllWindows()
        
    def compute_histogram(img, bins=256, range=(0, 1)):
        """
        Compute a normalized histogram for a grayscale image.
        Args:
            img: (H, W) torch tensor, values in [0, 1]
        """
        hist = torch.histc(img, bins=bins, min=range[0], max=range[1])
        hist = hist / hist.sum()  # Normalize
        return hist
    
    def get_distribution_score(self, curr_image):
    
        if self.last_hist is None:
            self.last_hist = self.compute_histogram_batch(self.last_frame)
        hist = self.compute_histogram_batch(curr_image)
        """
        Compare two histograms using cosine similarity.
        Returns 1.0 if identical, 0.0 if orthogonal.
        """
        return F.cosine_similarity(self.last_hist, hist).item()
    

    

In [97]:
tracker  = PersonTracker("edgeface_xs_gamma_06", "yolo11n.pt", "new_test/r1.JPG")

In [141]:
image = Image.open("new_test/ppl_no_face.jpg")

In [None]:
t = trans

In [100]:
emb, boxes, idxes = tracker.get_embeddings(image = image)


0: 416x640 7 persons, 18.8ms
Speed: 2.6ms preprocess, 18.8ms inference, 2.5ms postprocess per image at shape (1, 3, 416, 640)
Face detection Failed due to error.
list index out of range
Face detection Failed due to error.
list index out of range
Face detection Failed due to error.
list index out of range
Face detection Failed due to error.
list index out of range
Face detection Failed due to error.
list index out of range
Face detection Failed due to error.
list index out of range
Face detection Failed due to error.
list index out of range


In [102]:
emb.numel()

0

In [48]:

img1_path = "new_test/r2.JPG"
img2_path = "new_test/r11.JPG"
img1 = load_and_prepare_image(img1_path)
img2 = load_and_prepare_image(img2_path)
hist1 = compute_histogram(img1)
hist2 = compute_histogram(img2)

## 
similarity = compare_histograms_cosine(hist1, hist2)
ssim_value = ssim(img1.unsqueeze(0), img2.unsqueeze(0))
print(f"Histogram Cosine Similarity: {similarity:.4f}")
print(f"SSIM: {ssim_value:.4f}")


Histogram Cosine Similarity: 0.6534
SSIM: 0.1386


ValueError: Expected `preds` and `target` to have BxCxHxW or BxCxDxHxW shape. Got preds: torch.Size([3, 112, 112]) and target: torch.Size([3, 112, 112]).

In [9]:
Tracker = PersonTracker("edgeface_xs_gamma_06", "yolo11n.pt", "new_test/rajnish_crop.JPG")

torch.Size([1, 512])

In [9]:
def find_person(image_path, person_image_path):
    image = Image.open(image_path)
    source_embeddings = get_single_emb(person_image_path)
    emb, bboxes, idxes = get_embeddings(image)
    
    if emb.numel() == 0:
        print("No faces detected in the image.")
        return None
    
    cos_sim = torch.nn.functional.cosine_similarity(emb, source_embeddings)
    best_match_idx = torch.argmax(cos_sim)
    # print("Cosine similarity:", cos_sim)
    # print("Best match index:", best_match_idx.item())
    
    best_box = bboxes[idxes[best_match_idx.item()].item()]
    final_image = crop_image_from_bbox(image, best_box)
    
    # plt.imshow(final_image)
    # plt.axis('off')
    # plt.show()
    
    return final_image

In [None]:
def run_video_tracking(source_image_path="new_test/steve.JPG", camera_index=0):
    source_embeddings = get_single_emb(source_image_path).to(device)

    cap = cv2.VideoCapture(camera_index)
    if not cap.isOpened():
        print("Error: Could not open camera.")
        return

    print("Press 'q' to quit.")
    
    while True:
        ret, frame = cap.read()
        if not ret:
            print("Failed to grab frame.")
            break

        # Convert frame to PIL for consistency
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        pil_image = Image.fromarray(frame_rgb)

        # Inference
        try:
            emb, bboxes, idxes = get_embeddings(pil_image)

            if emb.numel() > 0:
                emb = emb.to(device)
                cos_sim = torch.nn.functional.cosine_similarity(emb, source_embeddings)
                best_match_idx = torch.argmax(cos_sim)
                best_box = bboxes[idxes[best_match_idx.item()].item()].detach().cpu().numpy()
                
                # Draw bounding box
                x1, y1, x2, y2 = map(int, best_box)
                cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 255), 2)
                cv2.putText(frame, f"Sim: {cos_sim[best_match_idx].item():.2f}",
                            (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX,
                            0.6, (0, 255, 0), 2)

            # Show the frame
            cv2.imshow("Tracking", frame)

        except Exception as e:
            print(f"Error during inference: {e}")

        # Clear unused memory on GPU
        torch.cuda.empty_cache()

        # Exit on 'q'
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    # Cleanup
    cap.release()
    cv2.destroyAllWindows()

In [14]:
def try_reidentify_by_visual(pil_image, last_patch, lambda_ssim=0.6, threshold=0.55):
    bboxes = get_bboxes(pil_image)
    best_score = 0
    best_box = None

    for bbox in bboxes:
        candidate_patch = crop_image_from_bbox(pil_image, bbox)
        try:
            ssim_score = compute_ssim(candidate_patch, last_patch)
            hist_score = compare_histogram(candidate_patch, last_patch)
            combined_score = lambda_ssim * ssim_score + (1-lambda_ssim) * (1 - hist_score)

            if combined_score > best_score and combined_score > threshold:
                best_score = combined_score
                best_box = bbox
        except:
            continue

    return best_box


In [17]:
!pip uninstall -y opencv-python
!pip install opencv-contrib-python


Found existing installation: opencv-python 4.11.0.86
Uninstalling opencv-python-4.11.0.86:
  Successfully uninstalled opencv-python-4.11.0.86


You can safely remove it manually.


Collecting opencv-contrib-python
  Downloading opencv_contrib_python-4.11.0.86-cp37-abi3-win_amd64.whl.metadata (20 kB)
Downloading opencv_contrib_python-4.11.0.86-cp37-abi3-win_amd64.whl (46.2 MB)
   ---------------------------------------- 0.0/46.2 MB ? eta -:--:--
   ----- ---------------------------------- 6.8/46.2 MB 38.1 MB/s eta 0:00:02
   ------------- -------------------------- 15.5/46.2 MB 38.9 MB/s eta 0:00:01
   --------------------- ------------------ 24.6/46.2 MB 41.0 MB/s eta 0:00:01
   ---------------------------- ----------- 32.8/46.2 MB 40.8 MB/s eta 0:00:01
   ----------------------------------- ---- 41.2/46.2 MB 40.9 MB/s eta 0:00:01
   ---------------------------------------  46.1/46.2 MB 40.8 MB/s eta 0:00:01
   ---------------------------------------- 46.2/46.2 MB 37.2 MB/s eta 0:00:00
Installing collected packages: opencv-contrib-python
Successfully installed opencv-contrib-python-4.11.0.86


In [21]:
run_video_tracking_with_visual_reid("new_test/rajnish_crop.JPG")


Press 'q' to quit.

0: 480x640 1 person, 13.4ms
Speed: 1.9ms preprocess, 13.4ms inference, 2.5ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 person, 17.5ms
Speed: 1.6ms preprocess, 17.5ms inference, 2.9ms postprocess per image at shape (1, 3, 480, 640)
Error: name 'compute_ssim_score' is not defined

0: 480x640 1 person, 18.1ms
Speed: 1.5ms preprocess, 18.1ms inference, 2.4ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 person, 17.6ms
Speed: 1.1ms preprocess, 17.6ms inference, 3.5ms postprocess per image at shape (1, 3, 480, 640)
Error: name 'compute_ssim_score' is not defined

0: 480x640 1 person, 21.6ms
Speed: 1.1ms preprocess, 21.6ms inference, 3.5ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 person, 15.8ms
Speed: 1.5ms preprocess, 15.8ms inference, 3.5ms postprocess per image at shape (1, 3, 480, 640)
Error: name 'compute_ssim_score' is not defined

0: 480x640 1 person, 14.4ms
Speed: 1.2ms preprocess, 14.4ms inference, 2.6ms pos