In [53]:
import json
import math
import numpy as np
from itertools import chain
import cv2
from sklearn.linear_model import LinearRegression
import matplotlib.pyplot as plt

In [None]:
video_name = "ID-5"

In [254]:
annotations = json.load(open(f'detr2/{video_name}_annotationsdetr2.json', 'r'))

In [157]:
#Carica le tracce dalle annotazioni restituendo per ogni track_id la sequenza di previsioni e una lista di previsioni senza track_id
def get_tracks(annotations):
    tracks = {}
    not_tracked = []
    for i, frame_idx in enumerate(annotations):
        preds = annotations[frame_idx]
        for j, pred in enumerate(preds):
            track_id = pred.get('track_id')
            if track_id:
                add_to_tracks(tracks, track_id, pred, j, i)
            else:
                not_tracked.append((i, j))
    for track_id in tracks:
        track = tracks[track_id]
        confs = []
        xs = []
        ys = []
        for pred in track['sequence']:
            xs.append((pred['box']['x1'] + pred['box']['x2'])/2)
            ys.append((pred['box']['y1'] + pred['box']['y2'])/2)
            confs.append(pred['conf'])
        mean_conf = np.mean(confs)
        median_conf = np.median(confs)
        var_x_n = np.var(xs)/len(xs)
        var_y_n = np.var(ys)/len(ys)
        tracks[track_id]['mean_conf'] = round(mean_conf)
        tracks[track_id]['median_conf'] = round(median_conf)
        tracks[track_id]['var_x_n'] = var_x_n
        tracks[track_id]['var_y_n'] = var_y_n
    return tracks, not_tracked
#Aggiunge una previsione alla traccia mantenendo l'ordine sugli indici dei frame
def add_to_tracks(tracks, track_id, pred,pred_idx, frame_idx):
    new_entry = {
        'frame': frame_idx,
        'pred_idx': pred_idx,
        'box': pred['box'],
        'conf': round(pred['confidence']*100)
    }
    if not track_id in tracks.keys():
        tracks[track_id] = {}
        tracks[track_id]['sequence'] = [new_entry]
        return
    for i in range(len(tracks[track_id]['sequence'])):
        if tracks[track_id]['sequence'][i]['frame'] > frame_idx:
            tracks[track_id]['sequence'] = tracks[track_id]['sequence'][:i] + [new_entry] + tracks[track_id]['sequence'][i:]
            return
    tracks[track_id]['sequence'].append(new_entry)
    
def show_tracks(tracks):
    print(f"{len(tracks)} tracks:")
    for track_id in tracks:
        track = tracks[track_id]
        print("=============================================================")
        print(f"Track {track_id} mean confidence {track['mean_conf']} var_x_n {track['var_x_n']} var_y_n {track['var_y_n']}")
        print(f"Sequence of {len(track['sequence'])}")
        for pred in track['sequence']:
            print(f"{pred['frame']}: {pred['box']} with {pred['conf']}")
#Restitisce una lista di liste per ogni frame con i track_id che compaiono in quel frame
def get_tracks_per_frame(annotations):
    tracks_per_frame = [[] for _ in range(len(annotations))]
    for i, frame_idx in enumerate(annotations):
        preds = annotations[frame_idx]
        for j, pred in enumerate(preds):
            track_id = pred.get('track_id')
            if track_id:
                tracks_per_frame[i].append(track_id)
    return tracks_per_frame

def iou(boxA, boxB):
	xA = max(boxA[0], boxB[0])
	yA = max(boxA[1], boxB[1])
	xB = min(boxA[2], boxB[2])
	yB = min(boxA[3], boxB[3])
    
	interArea = max(0, xB - xA + 1) * max(0, yB - yA + 1)
    
	boxAArea = (boxA[2] - boxA[0] + 1) * (boxA[3] - boxA[1] + 1)
	boxBArea = (boxB[2] - boxB[0] + 1) * (boxB[3] - boxB[1] + 1)
    
	iou = interArea / float(boxAArea + boxBArea - interArea)
    
	return iou
#Restituisce il centro della bounding box
def get_center(x1, y1, x2, y2):
    center_x = (x1 + x2) / 2
    center_y = (y1 + y2) / 2
    return center_x, center_y


def estimate_linear_trajectory(track, n_frames = 4, fit_thresh=100):
    if n_frames > len(track['sequence']):
        print("NOT ENOUGH FRAMES")
        return None, None, None, None
    points = []
    for ann_point in track['sequence'][:-n_frames]:
        x1, y1, x2, y2 = ann_point['box'].values()
        points.append(get_center(x1, y1, x2, y2))
    points = np.array(points) 
    differences = np.diff(points, axis=0)
    
    X = points[:, 0].reshape(-1, 1)
    y = points[:, 1]
    linear_model = LinearRegression().fit(X, y)
    m = linear_model.coef_[0]
    c = linear_model.intercept_

    displacements = np.abs(m*X.flatten() - y +c) /np.sqrt(m**2 + 1)
    if not np.all(displacements < fit_thresh):
        print("DISPLACEMENT GREATER THAN THRESH")
        return None, None, None, None
    
    distances = np.sqrt(np.sum(differences**2, axis=1))
    time_interval = 1/25
    speeds = distances / time_interval
    directions_radians = np.arctan2(differences[:,-1], differences[:, 0])
    return m, c, speeds[-1], directions_radians[-1]

def generate_points(start_point, speed, direction_radians, num_points, time_interval):
    points = [start_point]
    for _ in range(num_points - 1):
        # Calculate the displacement
        delta_x = speed * np.cos(direction_radians) * time_interval
        delta_y = speed * np.sin(direction_radians) * time_interval
        # Calculate the new point
        new_point = (points[-1][0] + delta_x, points[-1][1] + delta_y)
        points.append(new_point)
    return np.array(points)
    
def update_pred(annotations, frame_idx, pred_idx, new_track_id):
    annotations[f'{frame_idx:04d}.png'][pred_idx]['track_id'] = new_track_id
    
def insert_pred(annotations, frame_idx, pred):
    annotations[f'{frame_idx:04d}.png'].append(pred)

In [152]:
tracks, not_tracked = get_tracks(annotations)

In [153]:
show_tracks(tracks)

6 tracks:
Track 144 mean confidence 72 var_x_n 70.61925566646646 var_y_n 2.8192922737698574
Sequence of 16
2531: {'x1': 1752.0462625, 'y1': 75.64935750000001, 'x2': 1773.1011925, 'y2': 96.55176750000001} with 90
2532: {'x1': 1756.534055, 'y1': 75.254205, 'x2': 1777.5889849999999, 'y2': 96.156615} with 90
2533: {'x1': 1761.0218475, 'y1': 74.85905249999999, 'x2': 1782.0767775, 'y2': 95.7614625} with 90
2534: {'x1': 1765.50964, 'y1': 74.4639, 'x2': 1785.95874, 'y2': 94.6692} with 62
2535: {'x1': 1769.32129, 'y1': 73.09953, 'x2': 1790.54358, 'y2': 93.99798} with 86
2536: {'x1': 1772.91467, 'y1': 72.29421, 'x2': 1793.99658, 'y2': 92.89806} with 86
2537: {'x1': 1777.53357, 'y1': 71.90709, 'x2': 1798.40015, 'y2': 92.21381} with 81
2544: {'x1': 1803.86572, 'y1': 67.53242, 'x2': 1825.16357, 'y2': 87.6513} with 59
2545: {'x1': 1806.5758866666667, 'y1': 67.57886666666667, 'x2': 1827.8737366666667, 'y2': 87.69774666666667} with 90
2546: {'x1': 1809.2860533333333, 'y1': 67.62531333333334, 'x2': 183

In [73]:
def best_merge(current_track, tracks, frame_window, dist_thresh, iou_thresh):
    cur_last_pred = current_track['sequence'][-1]
    cur_first_pred = current_track['sequence'][0]
    best_iou=0
    min_dist = math.inf
    best_match = None
    
    for next_track in tracks:
        
        if tracks[next_track] == current_track:
            continue
        first_pred = tracks[next_track]['sequence'][0]
        last_pred = tracks[next_track]['sequence'][-1]
        if abs(cur_first_pred['frame'] - last_pred['frame']) < frame_window:
            cur_iou, cur_dist = get_preds_iou_and_dist(first_pred, last_pred)
            if cur_iou > iou_thresh and cur_iou > best_iou:
                best_iou = cur_iou
                best_match = next_track
                continue
            if cur_dist > min_dist and cur_dist < dist_thresh:
                min_dist = cur_dist
                best_match = next_track
                continue
        elif abs(cur_last_pred['frame'] - first_pred['frame']) < frame_window:
            cur_iou, cur_dist = get_preds_iou_and_dist(cur_last_pred, first_pred)
            if cur_iou > iou_thresh and cur_iou > best_iou:
                best_iou = cur_iou
                best_match = next_track
                continue
            if cur_dist > min_dist and cur_dist < dist_thresh:
                min_dist = cur_dist
                best_match = next_track
                continue
    return best_match

#Aggiorna i track id in base alla sostituzione passata per argomento (vecchio_id, nuovo_id)
def update_tracks(annotations, sub):
    for annotation_id in annotations:
        for i, pred in enumerate(annotations[annotation_id]):
            track_id = pred.get('track_id')
            if not track_id:
                continue
            #print(track_id, sub[0])
            if track_id == sub[0]:
                annotations[annotation_id][i]['track_id'] = sub[1]
                #print(f"{sub[0]} substituted with {sub[1]}")
                
def merge_tracks(annotations, frame_window=5, dist_thresh=150, iou_thresh=0.2):
    tracks, _ = get_tracks(annotations)
    old_tracks = tracks.copy()
    merged_with = []
    removed_tracks = []
    for i, track_id in enumerate(old_tracks):
        if track_id in removed_tracks:
            continue
        current_track = tracks[track_id]
        best_merge_id = best_merge(current_track, tracks, frame_window = frame_window, dist_thresh = dist_thresh, iou_thresh = iou_thresh)
        if best_merge_id:
            removed_tracks.append(track_id)
            update_tracks(annotations, (track_id, best_merge_id))
            tracks, _ = get_tracks(annotations)
    return len(removed_tracks)

#Assegna il track_id in base alla IoU o alla distanza
def assign_missing_ids(annotations, frame_window=10, dist_thresh=150, iou_thresh=0.2):
    ids_assigned = 0
    tracks, not_tracked = get_tracks(annotations)
    tracks_per_frame = get_tracks_per_frame(annotations)
    for frame_idx, pred_idx in not_tracked:
        pred = annotations[f'{frame_idx:04d}.png'][pred_idx]
        x1,y1,x2,y2 = pred['box'].values()
        last = list(set(list(chain.from_iterable(tracks_per_frame[ max(0, frame_idx-(frame_window//2)):frame_idx]))))
        succ =  list(set(list(chain.from_iterable(tracks_per_frame[ frame_idx:min(len(tracks_per_frame), frame_idx+(frame_window//2))]))))
        if len(last) + len(succ) == 0:
            continue
        best_track = get_best_match(pred, last+succ, tracks, frame_idx, frame_window, dist_thresh, iou_thresh)
        if best_track:
            ids_assigned += 1
            add_to_tracks(tracks, best_track, pred, pred_idx, frame_idx)
            pred['track_id'] = best_track
    return ids_assigned
    
#Dato un insieme di track_id candidati, determina quello con cui si ha una corrispondenza migliore sulla base della IoU o della distanza    
def get_best_match(pred, candidates, tracks, frame_idx, frame_window, dist_thresh, iou_thresh):
    min_dist = math.inf
    best_iou = 0
    best_track = None
    x1, y1, x2, y2 = pred['box'].values()
    for candidate in candidates:
        track = tracks[candidate]['sequence']
        for tracked_pred in track:
            if abs(tracked_pred['frame']-frame_idx) > frame_window:
                continue
            x3, y3, x4, y4 = tracked_pred['box'].values()
            cur_iou = iou([x1, y1, x2, y2], [x3, y3, x4, y4])
            if cur_iou > iou_thresh and cur_iou > best_iou:
                best_iou = cur_iou
                best_track = candidate
                continue
            cx1, cy1 = get_center(x1,y1,x2,y2)
            cx2, cy2 = get_center(x3,y3,x4,y4)
            dist = math.sqrt((cx1- cx2)**2 + (cy1-cy2)**2)
            if dist < dist_thresh and dist < min_dist and best_iou==0:
                min_dist = dist
                best_track = candidate
    return best_track
    
def get_preds_iou_and_dist(pred1, pred2):
    x1,y1,x2,y2 = pred1['box'].values()
    x3, y3, x4, y4 = pred2['box'].values()
    cx1, cy1 = get_center(x1,y1,x2,y2)
    cx2, cy2 = get_center(x3,y3,x4,y4)
    return iou([x1, y1, x2, y2], [x3, y3, x4, y4]), math.sqrt((cx1- cx2)**2 + (cy1-cy2)**2)

In [71]:
frame_window = 9
dist_thresh=150
iou_thresh = 0.1

In [255]:
while True:
    assigned_ids = assign_missing_ids(annotations)
    tracks_merged = merge_tracks(annotations)
    #print(f"Missing IDs assigned: {assigned_ids}\nMerged tracks: {tracks_merged}")
    if assigned_ids + tracks_merged == 0:
        break

In [137]:
json.dump(annotations, open('ID-5almostfinal1.json', 'w')) 

In [130]:
def check_inconsistencies(tracks):
    inconsistencies = {
        'duplicate_frames': [],
        'large_frame_gaps': [],
        'abrupt_changes': [],
    }
    
    for id, track in zip(tracks.keys(),tracks.values()):
        prev_frame = None
        prev_box = None
        confidences = []

        for detection in track['sequence']:
            frame = detection['frame']
            box = detection['box']
            conf = detection['conf']
            confidences.append(conf)
            
            if prev_frame == frame:
                inconsistencies['duplicate_frames'].append((id, frame))

            # Check for large frame gaps
            if prev_frame is not None and frame - prev_frame > 1:
                inconsistencies['large_frame_gaps'].append((id, prev_frame, frame))
        
            if prev_box is not None:
                x1, y1, x2, y2 = box.values()
                x3, y3, x4, y4 = prev_box.values()
                cx1, cy1 = get_center(x1, y1, x2, y2)
                cx2, cy2 = get_center(x3, y3, x4, y4)
                distance = math.sqrt((cx1-cx2)**2 + (cy1-cy2)**2)
                if distance > 100:  # Threshold for an abrupt change
                    inconsistencies['abrupt_changes'].append((id, prev_frame, frame))
            
            prev_frame = frame
            prev_box = box
    return inconsistencies

In [224]:
def remove_preds_of_track(annotations, track_id):
    tracks, _ = get_tracks(annotations)
    for track in tracks.values():
        for pred in track['sequence']:
            print(f"Removing pred {pred['pred_idx']} in frame {pred['frame']}")
            annotations[f'{pred['frame']:04d}.png'].pop(pred['pred_idx'])
            

In [142]:
def low_var_tracks(annotations, var_thresh=0.1, seq_thresh=3):
    tracks, _ = get_tracks(annotations)
    low_var_tracks =  []
    for track_id in tracks:
        if tracks[track_id]['var_x_n'] < var_thresh or tracks[track_id]['var_y_n'] < var_thresh and len(tracks[track_id]['sequence'])>seq_thresh:
            low_var_tracks.append(track_id)
    return low_var_tracks

In [256]:
tracks, _ = get_tracks(annotations)
inconsistencies = check_inconsistencies(tracks)
inconsistencies

{'duplicate_frames': [(134, 462), (180, 2109), (180, 2112), (201, 2494)],
 'large_frame_gaps': [(201, 2460, 2464), (201, 2476, 2478)],
 'abrupt_changes': [(180, 2109, 2109),
  (180, 2111, 2112),
  (180, 2112, 2112),
  (180, 2112, 2113)]}

In [1]:
def remove_duplicates(annotations, duplicates):
    removed = 0
    tracks, _ = get_tracks(annotations)
    for dup in duplicates:
        print(f"Removing {dup}")
        tracks, _ = get_tracks(annotations)
        track_id = dup[0]
        dup_frame = dup[1]
        preds_to_check = []
        last_pred = None
        for pred in tracks[track_id]['sequence']:
            if pred['frame'] == dup_frame:
                preds_to_check.append(pred)
            elif pred['frame'] == dup_frame-1 or pred['frame'] == dup_frame-2:
                last_pred = pred
            elif pred['frame'] > dup_frame:
                break
        if last_pred is None or len(preds_to_check) == 0:
            print("Can't remove because there are no previous prediction")
            continue
        best_iou = 0
        min_dist = math.inf
        best_pred = -1
        for pred in preds_to_check:
            cur_iou, cur_dist = get_preds_iou_and_dist(last_pred, pred)
            if cur_iou > best_iou:
                best_iou=cur_iou
                best_pred = pred['pred_idx']
                break
            elif cur_dist < min_dist:
                min_dist = cur_dist
                best_pred = pred['pred_idx']
        if best_pred == -1:
            print("No match!!")
        else:
            print(f"Best is {best_pred}")
            for pred in preds_to_check:
                if pred['pred_idx'] != best_pred:
                    update_pred(annotations, pred['frame'], pred['pred_idx'], 0)
            removed +=1
    return removed

def calculate_velocity(box1, box2, frame_gap):
    vx = (box2['x1'] - box1['x1']) / frame_gap
    vy = (box2['y1'] - box1['y1']) / frame_gap
    return vx, vy

def fill_gaps(annotations, gaps, max_gap=5):
    tracks, _ = get_tracks(annotations)
    filled=0
    for gap in gaps:
        print(f"Filling {gap}")
        track_id = gap[0]
        last_frame = gap[1]
        next_frame = gap[2]
        frame_gap = next_frame-last_frame
        track_mean_conf = tracks[track_id]['mean_conf']
        if frame_gap > max_gap:
            continue
        for i in range(len(tracks[track_id]['sequence']) -1):
            current = tracks[track_id]['sequence'][i]
            next = tracks[track_id]['sequence'][i+1]
            if current['frame'] == last_frame and next['frame'] == next_frame:
                break
        vx, vy = calculate_velocity(current['box'], next['box'], frame_gap)
            
        for j in range(1, frame_gap):
            #print(f"Filling frame {j}/{frame_gap}")
            interpolated_box = {
                'name': 'ball',
                'class': 0,
                'box': {
                    'x1': current['box']['x1'] +vx*j,
                    'y1': current['box']['y1'] +vy*j,
                    'x2': current['box']['x2'] +vx*j,
                    'y2': current['box']['y2'] +vy*j,
                },
                'confidence': tracks[track_id]['mean_conf']/100,
                'track_id': track_id
            }
            insert_pred(annotations, last_frame+j, interpolated_box)
            filled +=1
    return filled

In [257]:
remove_duplicates(annotations, inconsistencies['duplicate_frames'])

Removing (134, 462)
Best is 0
Removing (180, 2109)
Can't remove because there are no previous prediction
Removing (180, 2112)
Best is 1
Removing (201, 2494)
Best is 0


3

In [258]:
fill_gaps(annotations, inconsistencies['large_frame_gaps'])

Filling (201, 2460, 2464)
Filling (201, 2476, 2478)


4

In [259]:
final_annotations = {}
final_annotations_with_confs={}
for i, v in enumerate(annotations.values()):
    frame_key = f"{i:04d}.png"
    if len(v) == 0:
        final_annotations[frame_key] = {"x": -1, "y": -1}
        final_annotations_with_confs[frame_key] = {"x": -1, "y": -1, 'conf': 0}
    elif len(v) == 1:
        track_id = v[0].get('track_id')
        
        x1, y1, x2, y2 = v[0]['box'].values()
        x,y = get_center(x1, y1, x2, y2 )
        final_annotations[frame_key] = {"x": x, "y": y}
        final_annotations_with_confs[frame_key] = {"x": x, "y": y, 'conf': v[0]['confidence']}
    else:
        max_conf = 0
        best = None
        for v1 in v:
            conf = v1['confidence']
            track_id = v1.get('track_id')
            if conf > max_conf:
                x1, y1, x2, y2 = v1['box'].values()
                best = get_center(x1, y1, x2, y2)
                max_conf = conf
        final_annotations[frame_key] ={"x": best[0] if best else -1, "y": best[1] if best else -1}
        final_annotations_with_confs[frame_key] = {"x": best[0] if best else -1, "y": best[1] if best else -1, 'conf': max_conf if best else 0}
json.dump(final_annotations, open('./{video_name}_fannotations1.json', 'w'))
json.dump(final_annotations_with_confs, open('./{video_name}_video_ann.json', 'w'))