In [38]:
import h5py
import numpy as np
import time
from collections import deque
from tqdm import tqdm

In [39]:
def create_weight_list(length, decay_factor=1.5):
    """
    Create a weight list of specified length where:
    - The first entry has the highest weight.
    - Remaining weights decrease based on the decay factor.
    - The total sum equals 1.

    Args:
        length (int): Length of the weight list.
        decay_factor (float): Controls the rate of decrease. Higher values make weights drop faster.

    Returns:
        list: A list of weights summing to 1.
    """
    if length == 0:
        return []
    
    # Generate decreasing weights
    weights = [1 / (decay_factor ** i) for i in range(length)]
    
    # Normalize to ensure the sum is exactly 1
    total_weight = sum(weights)
    normalized_weights = [w / total_weight for w in weights]
    return normalized_weights

def _smooth(level, history, time_stamp = None, smooth_seconds=1):
    # Get the current timestamp
    if time_stamp is not None:
        try: 
            current_time = float(time_stamp)
        except:
            current_time = time.time()
    else:
        current_time = time.time()

    # Add the new prediction with its timestamp
    history.append((current_time, level))

    # Remove entries older than 1 second
    while history and history[0][0] < current_time - smooth_seconds:
        history.popleft()

    # Compute the weighted average of the remaining predictions
    if history:
        predictions = [pred for _, pred in history]
        weights = create_weight_list(len(predictions))
        filtered_p_w = [(pi, wi) for pi, wi in zip(predictions, weights) if pi is not None]
        predictions, weights = zip(*filtered_p_w)
        avg_prediction = np.convolve(predictions, weights[::-1], mode='valid')
    else:
        avg_prediction = 0  # Default if no predictions are in the window
    
    return history, avg_prediction

In [80]:
yolo_smoothing_queue = deque()
convnext_smoothing_queue = deque()

typoo = 'rotated'
smooth_seconds = 1
clip_val = 2.5
# load data from file
file_path = f'../../data/predictions/new_run/recording_{typoo}_all_frames_processed.hdf5'

yolo_smooth_dict = {}
convnext_smooth_dict = {}
yolo_clipped_dict = {}
convnext_clipped_dict = {}

with h5py.File(file_path, 'r') as hf:
    timestamps = list(hf.keys())

    if "frame" == timestamps[0].split("_")[0]:
        timestamps = sorted(timestamps, key=lambda x: int(x.split('_')[1]))
    else:
        timestamps = sorted(timestamps, key=lambda x: float(x))

    prev_timestamp = None

    for timestamp in tqdm(timestamps):
        yolo_prediction = hf[timestamp]['yolo']['percent'][...]
        convnext_prediction = hf[timestamp]['convnext']['percent'][...]
        yolo_smoothing_queue, smoothed_yolo = _smooth(yolo_prediction, yolo_smoothing_queue, timestamp, smooth_seconds)
        convnext_smoothing_queue, smoothed_convnext = _smooth(convnext_prediction, convnext_smoothing_queue, timestamp, smooth_seconds)

        yolo_smooth_dict[timestamp] = smoothed_yolo
        convnext_smooth_dict[timestamp] = smoothed_convnext

        # run a clipping of yolo and convnext that states no more than 10% difference between each frame
        if prev_timestamp is not None:
            yolo_clipped = np.clip(yolo_prediction, a_min=yolo_clipped_dict[prev_timestamp] - clip_val, a_max=yolo_clipped_dict[prev_timestamp] + clip_val)
            convnext_clipped = np.clip(convnext_prediction, a_min=convnext_clipped_dict[prev_timestamp] - clip_val, a_max=convnext_clipped_dict[prev_timestamp] + clip_val)
        else:
            yolo_clipped = yolo_prediction
            convnext_clipped = convnext_prediction
        
        yolo_clipped_dict[timestamp] = yolo_clipped
        convnext_clipped_dict[timestamp] = convnext_clipped
        
        prev_timestamp = timestamp

        

100%|██████████| 25139/25139 [00:11<00:00, 2195.52it/s]


In [81]:
# Now we open the smaller files and write the smoothed predictions
data_path = f'../../data/predictions/new_run/recording_{typoo}_all_frames_processed_combined.hdf5'
with h5py.File(data_path, 'r+') as hf:
    timestamps = list(hf.keys())
    if "frame" == timestamps[0].split("_")[0]:
        timestamps = sorted(timestamps, key=lambda x: int(x.split('_')[1]))
    else:
        timestamps = sorted(timestamps, key=lambda x: float(x))
        
    for timestamp in tqdm(timestamps):
        # define the group for the timestamp
        group = hf[timestamp]
        if f'yolo_smooth_{smooth_seconds}' in group:
            del group[f'yolo_smooth_{smooth_seconds}']
        # create the group for the smoothed predictions
        yolo_smooth_group = group.create_group(f'yolo_smooth_{smooth_seconds}')
        # write the smoothed predictions
        yolo_smooth_group.create_dataset('percent', data=yolo_smooth_dict[timestamp])
        
        if f'convnext_smooth_{smooth_seconds}' in group:
            del group[f'convnext_smooth_{smooth_seconds}']
        # create the group for the smoothed predictions
        convnext_smooth_group = group.create_group(f'convnext_smooth_{smooth_seconds}')
        # write the smoothed predictions
        convnext_smooth_group.create_dataset('percent', data=convnext_smooth_dict[timestamp])

        if f'yolo_clipped_{clip_val}' in group:
            del group[f'yolo_clipped_{clip_val}']
        # create the group for the clipped predictions
        yolo_clipped_group = group.create_group(f'yolo_clipped_{clip_val}')
        # write the clipped predictions
        yolo_clipped_group.create_dataset('percent', data=yolo_clipped_dict[timestamp])

        if f'convnext_clipped_{clip_val}' in group:
            del group[f'convnext_clipped_{clip_val}']
        # create the group for the clipped predictions
        convnext_clipped_group = group.create_group(f'convnext_clipped_{clip_val}')
        # write the clipped predictions
        convnext_clipped_group.create_dataset('percent', data=convnext_clipped_dict[timestamp])

100%|██████████| 733/733 [00:00<00:00, 1337.29it/s]
