In [None]:
import pandas as pd
import numpy as np
from PIL import Image
import cv2
import argparse
import sys
import os
from sklearn.cluster import KMeans
import sklearn
import torch

import progressbar

In [None]:
from video_handler.VideoHandler import VideoHandler
from detection.Detection import Detection
from model.SiameseReId import SiameseReId

In [None]:
video_path = "test.mp4"
n_frames = 10

In [None]:
__video_handler: VideoHandler = VideoHandler(video_path)
__detection: Detection = Detection()
__siamese_net: SiameseReId = SiameseReId("./model/weights/model_final.pt")
__features: pd.DataFrame = pd.DataFrame(columns=["fv", "color"])
__vid_w, __vid_h, video_frame_count = __video_handler.get_video_w_h_fc()

__frame_count: int = video_frame_count

if n_frames != -1:
    if n_frames <= video_frame_count:
        __frame_count = n_frames
    else:
        sys.exit(f"Error: "
                 f"The number of frames ({n_frames}) has to be "
                 f"less or equal than the number of frames "
                 f"of the video (for {video_path}: {video_frame_count})"
                 f"")

# TODO: testare e nel caso mettere fix
__batch_size: int = np.array([np.gcd(i, __frame_count) for i in range(5, 11)]).max()
print("Batch size: ", __batch_size)
print("Frames count ", __frame_count)

if __batch_size < 5:
    sys.exit(f"Error generating batch size with frame number = {__frame_count}")

__frames_buffer: list[np.ndarray] = [np.zeros((__vid_h, __vid_w, 3), dtype="uint8")] * __batch_size
__detections_buffer: list[pd.DataFrame] = [None] * __batch_size

__counter = {}

widgets = [' [',
           progressbar.Timer(format='elapsed time: %(elapsed)s'),
           '] ',
           progressbar.Bar('*'), ' (',
           progressbar.ETA(), ") \n",
           ]
__progress_bar: progressbar.ProgressBar = progressbar.ProgressBar(max_value=__frame_count, widgets=widgets).start()

In [None]:
def __get_buffer_frames(buffer_head: int) -> bool:
    buffer_id = buffer_head
    while buffer_id < __batch_size:
        ret, frame = __video_handler.video_read_frame()
        if not ret:
            return ret
        __frames_buffer[buffer_id] = frame.copy()  # TODO copy si o no?
        buffer_id += 1
    return True

In [None]:
buffer_head = 0

In [None]:
__get_buffer_frames(buffer_head)

In [None]:
for frame in __frames_buffer:
    display(Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)))

In [None]:
# detections for frames in frames_buffer
__detections_buffer[buffer_head:__batch_size] = \
    __detection.get_segmentation_list_of_dataframes(
        __frames_buffer[buffer_head:__batch_size]
    )

In [None]:
__detections_buffer

In [None]:
def __get_kmeans_model(max_bb: int) -> sklearn.cluster._kmeans.KMeans:
    centers = np.array([])
    for det in __detections_buffer:
        if not np.any(centers):
            centers = det["center"]
        else:
            centers = pd.concat((centers, det["center"]), axis=0)

    # fit kmeans cluster with n_cluster equal to max detections per frame
    return KMeans(n_clusters=max_bb, random_state=0, n_init="auto").fit(centers.tolist())

In [None]:
def __find_box_id() -> None:
    # len of bounding boxes dataframe per frame
    max_bb_in_batch = max([len(frame_bb) for frame_bb in __detections_buffer])
    if max_bb_in_batch > 0:  # at least one frame with persons
        kmeans = __get_kmeans_model(max_bb_in_batch)

        for df in __detections_buffer:
            df["box_id"] = df["center"].apply(lambda row: kmeans.predict([np.array(row)]).tolist()[0])

In [None]:
__find_box_id()

In [None]:
__detections_buffer

In [None]:
def __get_track_id_of_detection_img(img: Image) -> int:
    similarity = __features['fv'].apply(lambda x: __siamese_net.similarity(
        img, torch.tensor(x)).detach().cpu().numpy()[0][0])
    if similarity.empty:
        return -1
    else:
        return similarity.idxmax() if similarity.loc[similarity.idxmax()] > 0.8 else -1

In [None]:
def __crop_segmentation(mask: list, box: list, img: np.ndarray) -> Image:
    # img = img.copy()
    w, h, _ = img.shape
    mask = (cv2.resize(np.array(mask), (h, w)) > 0).astype("uint8")
    img_segm = cv2.bitwise_and(img, img, mask=mask)
    img_segm = img_segm[box[1]:box[3], box[0]:box[2]]
    return Image.fromarray(cv2.cvtColor(img_segm, cv2.COLOR_BGR2RGB))

In [None]:
def __calculate_features_vectors(buffer_head: int) -> None:
    for i, det in enumerate(__detections_buffer):
        if not det.empty:
            det["track_id"] = det.apply(
                lambda row: __get_track_id_of_detection_img(
                    __crop_segmentation(row["mask"], row["xyxy"], __frames_buffer[i])
                )
                if row["track_id"] is not None and int(row["track_id"]) == -1
                else row["track_id"],
                axis=1
            )

            if i >= buffer_head:
                det["fv"] = det.apply(
                    lambda row: __siamese_net.fv_encoding(
                        __crop_segmentation(row["mask"], row["xyxy"], __frames_buffer[i])
                    ).cpu().numpy(),
                    axis=1
                )

In [None]:
__calculate_features_vectors(buffer_head)

In [None]:
__detections_buffer

In [None]:
def __update_counter(box_id: int, track_id: int):
    # print(box_id, track_id)
    __counter.setdefault(str(box_id), []).append(track_id)

In [None]:
def __count_box_ids() :
    for i, det in enumerate(__detections_buffer):
        if not det.empty:
            det.apply(lambda row: __update_counter(row["box_id"], row["track_id"]), axis=1)

    used = set()

    for key in __counter:
        counting_inst = {str(u): __counter[key].count(u) for u in np.unique(np.array(__counter[key]))}
        candidate_tid = max(counting_inst, key=counting_inst.get)
        tid = candidate_tid if counting_inst[candidate_tid] >= (__batch_size // 2) else None
        __counter[key] = tid
        # if None don't evaluate & ignore -1 (to add)
        if tid is not None and int(tid) != -1:
            if tid not in used:
                __counter[key] = tid
                used.add(__counter[key])
            else:
                __counter[key] = None

    for det in __detections_buffer:
        det['track_id'] = det['box_id'].apply(lambda row: __counter[str(row)])

    # __counter = {}

In [None]:
__count_box_ids()
__counter

In [None]:
__detections_buffer

In [None]:
def __add_features_vectors():
    __features2 = pd.DataFrame(columns=["fv", "color"])
    added = {}

    for det in __detections_buffer:
        for index, row in det.iterrows():  # doesn't loop on empty datasets
            if row["track_id"] == -1:  # not None
                if row["box_id"] not in added:
                    color = np.random.randint(0, 255, 3).tolist()
                    __features2 = pd.concat(
                        [__features2, pd.DataFrame({"fv": [row['fv']], 'color': [color]})],
                        ignore_index=True
                    )
                    new_tid = __features.index[-1]
                    added[row['box_id']] = new_tid
                    det._set_value(index, 'track_id', new_tid)
                    row["track_id"] = new_tid
                else:
                    det._set_value(index, 'track_id', added[row['box_id']])
                    row["track_id"] = added[row["box_id"]]

    pd.concat([__features, __features2], ignore_index=True)

In [None]:
__add_features_vectors()

In [None]:
__detections_buffer

In [None]:
__features

In [None]:
def __buffers_swap(head: int) -> None:
    __frames_buffer[:head] = __frames_buffer[head:]
    __detections_buffer[:head] = __detections_buffer[head:]

In [None]:
def __draw_video_frames() -> None:
    for i in range(__batch_size // 2):
        det = __detections_buffer[i]
        drew_frame = __frames_buffer[i]
        if not det.empty:
            for _, row in det.iterrows():
                color = (255, 255, 255)
                text = "Unkown"

                if row["track_id"] is not None and int(row["track_id"]) != -1:
                    color = __features.loc[int(row["track_id"])]["color"]
                    text = str(row["track_id"])

                drew_frame = __video_handler.frame_draw_info(drew_frame, row["xyxy"], color, text)

        __video_handler.video_write(drew_frame)

In [None]:
def __clean_detections_box_ids_track_ids():
    for det in __detections_buffer:
        det["box_id"] = [-1] * len(det["box_id"])
        det.loc[np.where(det["track_id"] == None)[0], "track_id"] = -1

In [None]:
buffer_head = 0
frame_nr = 0

while frame_nr < __frame_count:
    print(frame_nr)
    # buffer_id = buffer_head

    # initial frames buffer
    ret = __get_buffer_frames(buffer_head)

    # detections for frames in frames_buffer
    __detections_buffer[buffer_head:__batch_size] = \
        __detection.get_segmentation_list_of_dataframes(
            __frames_buffer[buffer_head:__batch_size]
        )

    __find_box_id()

    __calculate_features_vectors(buffer_head)

    __count_box_ids()

    __add_features_vectors()

    # draw detections
    __draw_video_frames()

    # clean detection processing
    __clean_detections_box_ids_track_ids()

    # circulate circular buffers
    __buffers_swap(__batch_size // 2)

    # increment indexes
    buffer_head = (__batch_size // 2)
    frame_nr += (__batch_size // 2)
    __progress_bar.update(frame_nr)

    if not ret:
        __video_handler.set_last_frame()
        break

__video_handler.release()