### Packages

In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from fastdtw import fastdtw
from scipy.spatial.distance import euclidean

### Configurations

In [2]:
TEST_DATA_ID = 'CHERRY04'
RESULT_PATH = f'./saved_exp_results/{TEST_DATA_ID}'

FPS = 25
VIDEO_CODEC = 'XVID'

### Data Operation

In [3]:
with open(f'tracks.txt', 'r') as f:
    data = f.readlines()

tracks = {}

for line in data:
    line = list(map(int, line.split()))
    if line[0] not in tracks.keys():
        tracks[line[0]] = []
    tracks[line[0]].append((line[1], tuple(line[2:])))

In [4]:
import matplotlib.pyplot as plt

class KalmanFilter:
    def __init__(self, process_variance, measurement_variance, estimation_error, initial_estimate):
        self.process_variance = process_variance
        self.measurement_variance = measurement_variance
        self.estimation_error = estimation_error
        self.estimate = initial_estimate

    def update(self, measurement):
        self.estimation_error += self.process_variance

        kalman_gain = self.estimation_error / (self.estimation_error + self.measurement_variance)
        self.estimate += kalman_gain * (measurement - self.estimate)
        self.estimation_error *= (1 - kalman_gain)
        
        return self.estimate

def smooth_track_with_kalman_filter(track, process_variance=1e-3, measurement_variance=1e-1):
    initial_position = track[0][1]
    kf_x = KalmanFilter(process_variance, measurement_variance, 1, initial_position[0])
    kf_y = KalmanFilter(process_variance, measurement_variance, 1, initial_position[1])
    
    smoothed_track = []
    for frame, position in track:
        smoothed_x = kf_x.update(position[0])
        smoothed_y = kf_y.update(position[1])
        smoothed_track.append((frame, (smoothed_x, smoothed_y)))
        
    return smoothed_track

def smooth_all_tracks(tracks, process_variance=1e-3, measurement_variance=1e-1):
    smoothed_tracks = {}
    for person_id, track in tracks.items():
        smoothed_tracks[person_id] = smooth_track_with_kalman_filter(track, process_variance, measurement_variance)
    return smoothed_tracks

smoothed_tracks = smooth_all_tracks(tracks, process_variance=1e-1, measurement_variance=1e-1)

In [5]:
def calculate_mean_velocity(tracks, fps):
    velocities = {}
    for person_id, track in tracks.items():
        total_diplacement = np.array(track[-1][1]) - np.array(track[0][1])
        total_time = len(track) / fps
        mean_velocity_x = total_diplacement[0] / total_time
        mean_velocity_y = total_diplacement[1] / total_time
        velocities[person_id] = (mean_velocity_x, mean_velocity_y)
    return velocities

mean_velocities = calculate_mean_velocity(smoothed_tracks, FPS)
processed_data = pd.DataFrame(list(mean_velocities.items()), columns=['Person ID', 'Mean Velocity'])x

In [6]:
frames = [i[0][0] for i in smoothed_tracks.values()]
poss = [(i[0][1][0],i[0][1][1]-100) for i in smoothed_tracks.values()]
processed_data['Starting Frame'] = frames
processed_data['Starting Frame'] += 1
processed_data['Initial Position'] = poss
processed_data.to_csv('processed_tracks.csv')

In [10]:
sum([i[-1][0] - i[0][0] for i in smoothed_tracks.values()])/len(smoothed_tracks.values())

136.06422018348624

### Simularity Calculation

In [7]:
def extract_track_segment(track, start_frame, num_frames):
    end_frame = start_frame + num_frames
    segment = [(frame, pos) for frame, pos in track if start_frame <= frame < end_frame]
    return segment

def compare_tracks_dtw(track1, track2):
    positions1 = [pos for _, pos in track1]
    positions2 = [pos for _, pos in track2]
    
    distance, _ = fastdtw(positions1, positions2, dist=euclidean)
    return distance

def compare_all_tracks(tracks1, tracks2, start_frame, num_frames):
    dtw_distances = {}
    for person_id in tracks1.keys():
        if person_id in tracks2:
            track1 = extract_track_segment(tracks1[person_id], start_frame, num_frames)
            track2 = extract_track_segment(tracks2[person_id], start_frame, num_frames)
            
            dtw_distance = compare_tracks_dtw(track1, track2)
            dtw_distances[person_id] = dtw_distance
    return dtw_distances

tracks1 = {
    0: [(512, (2, 3)), (513, (3, 4)), (515, (1, 2))],
    1: [(512, (1, 2)), (513, (2, 3)), (514, (3, 5))],
}

tracks2 = {
    0: [(512, (2, 3)), (513, (3, 4)), (515, (1, 2))],
    1: [(512, (1, 2)), (513, (2, 3)), (514, (3, 5))],
}

start_frame = 0
num_frames = 510

dtw_distances = compare_all_tracks(tracks1, tracks2, start_frame, num_frames)
print("DTW Distances:", dtw_distances)


DTW Distances: {0: 0, 1: 0}
