This is a notebook meant to extract GPMF (GoPro Metadata Format) data from GoPro footage. 
The relevant datastreams will extract: 
ACCL: accelerometer data (in 3 dimensions, in m/s^2), 
GYRO: gyroscope data (in 3 dimensions, in radians/sec), and 
MAGN: magnetometer (in microTesla). For adjusting drifting sensor data.

This is done by using a telemetry_parser from Github, written in rust but implemented as a python module
We build a wheel to 'pip install' this module for the preprocessing extractor.

In [24]:
import telemetry_parser
import pandas as pd
import numpy as np
from config import config
from pprint import pprint

In [None]:
tp = telemetry_parser.Parser(str(config.DATA_DIR / 'GH010045.MP4'))

tp_raf = telemetry_parser.Parser(str(config.DATA_DIR / 'raf_frames.MP4'))

print('Camera: ', tp.camera)
print('Model: ', tp.model)

# # return all telemetry as an array of dicts
# print('Telemetry', tp.telemetry())

# format the values with units etc
print('Telemetry formatted')

single_sample = tp.telemetry(human_readable = True)[19]     #index unit is amount of seconds

print(type(single_sample))
print(single_sample['Accelerometer']['Data'])
# pprint(single_sample)
# print(tuple(bytes.fromhex(key[2:]).decode('ASCII') for key in single_sample['Default'].keys() if key.startswith('0x')))

# # return only gyro and accel with timestamps, normalized to a single orientation and scaled to deg/s and m/s2
# print('Normalized IMU', tp.normalized_imu())

We want a function that goes loops per second through the parser. It will use regex to extract relevant accl. information, and store it in a pandas dataframe. We will hardcode timestamps, as this is not provided.

In [None]:
# Parser
import re

def parse_IMU(file_contents):               # str to list[dict[str, float]]
    data = []
    # Regex pattern om alle Vector3-waarden te matchen
    pattern = r"Vector3\s*\{\s*x:\s*(-?\d+\.?\d*),\s*y:\s*(-?\d+\.?\d*),\s*z:\s*(-?\d+\.?\d*)\s*\}"
    # Zoek alle overeenkomsten in de string
    matches = re.findall(pattern, file_contents)
    
    # Zet elke match om naar een dictionary met floats
    for match in matches:
        x, y, z = map(float, match)
        data.append({"x": x, "y": y, "z": z})
    return data

# single sample imu data

print(type(single_sample['Accelerometer']['Data']))   # bugfixing

parsed_data_accl = parse_IMU((single_sample['Accelerometer']['Data']))
parsed_data_gyro = parse_IMU((single_sample['Gyroscope']['Data']))

print(type(parsed_data_accl))           # bugfixing

# ---------------------------------------------------------------------------

# test if all the lists are the same length

# for i in range(len(tp.telemetry(human_readable = True)) - 1):
#     single_sample = tp.telemetry(human_readable = True)[i+1]
#     parsed_data_accl = parse_IMU((single_sample['Accelerometer']['Data']))
#     parsed_data_gyro = parse_IMU((single_sample['Gyroscope']['Data']))     
#     if len(parsed_data_accl) != len(parsed_data_gyro):
#         print(len(parsed_data_accl))
#         print('and')
#         print(len(parsed_data_gyro))


# bouwen van de dataframe

In [None]:
# version Pre-Raf

import numpy as np
import pandas as pd

def parser_to_df(list_of_dicts):
    # Convert list of dicts to DataFrame
    df_temp = pd.DataFrame(list_of_dicts)
    return df_temp

# begin met bouwen

kolommen = ['TIMESTAMP', 'ACCL_x', 'ACCL_y', 'ACCL_z', 'GYRO_x', 'GYRO_y', 'GYRO_z']
imu_data_df = pd.DataFrame(columns=kolommen)
display(imu_data_df)

length = len(tp_raf.telemetry(human_readable=True)) - 1  # should be number of seconds in the video
print(length)

# list to collect data -- faster than a for loop adding rows iteratively
all_rows = []

# Iterate over each telemetry sample (seconds)
for i in range(length):
    time_temp = i
    temp = tp.telemetry(human_readable=True)[i + 1]
    
    # Parse accelerometer and gyroscope data (list of dicts)
    temp_accl = parse_IMU(temp['Accelerometer']['Data'])
    temp_gyro = parse_IMU(temp['Gyroscope']['Data'])
    
    # Check the types of parsed data
    # print(type(temp_accl))
    # print(type(temp_gyro))

    # omzetten naar dataframe
    temp_parsed_accl = parser_to_df(temp_accl)
    temp_parsed_gyro = parser_to_df(temp_gyro)
    
    # print(type(temp_parsed_accl))
    # print(type(temp_parsed_gyro))

    if len(temp_parsed_accl) == len(temp_parsed_gyro):  # Expected to have 197 elements, verified for sample videos
        
        # handmatig timestamps maken
        timestamp_temp = np.arange(time_temp, time_temp + 1, 1 / len(temp_parsed_accl))

        # Collect rows
        for j in range(len(temp_parsed_accl)):
            accl_x, accl_y, accl_z = temp_parsed_accl.iloc[j]['x'], temp_parsed_accl.iloc[j]['y'], temp_parsed_accl.iloc[j]['z']
            gyro_x, gyro_y, gyro_z = temp_parsed_gyro.iloc[j]['x'], temp_parsed_gyro.iloc[j]['y'], temp_parsed_gyro.iloc[j]['z']
            
            # Create new row and append it to the list
            new_row = [timestamp_temp[j], accl_x, accl_y, accl_z, gyro_x, gyro_y, gyro_z]
            all_rows.append(new_row)

imu_data_df = pd.DataFrame(all_rows, columns=kolommen)
imu_data_df.to_csv(config.DATA_DIR / 'sample_imu.csv')
# display(imu_data_df.head(10))

display(imu_data_df.head(200))
display(imu_data_df.tail(200))

In [None]:
# version post-Raf

import telemetry_parser
import pandas as pd
import numpy as np
from config import config
from pprint import pprint

# video 
tp = telemetry_parser.Parser(str(config.DATA_DIR / 'GH010045.MP4'))

def build_df():
    # Define the columns for the dataframe
    kolommen = ['TIMESTAMP', 'ACCL_x', 'ACCL_y', 'ACCL_z', 'GYRO_x', 'GYRO_y', 'GYRO_z']
    imu_data_df = pd.DataFrame(columns=kolommen)

    # Retrieve the length of the data, assuming tp.normalized_imu() returns a list of IMU data samples
    imu_data = tp.normalized_imu()  # Data retrieved from normalized_imu function
    length = len(imu_data)  # Get the number of IMU data samples
    print(length)

    all_rows = []

    # Iterate over each telemetry sample (second)
    for i in range(length):
        timestamp_s = imu_data[i]['timestamp_ms'] / 1000  # Convert from ms to seconds
        
        # Retrieve accelerometer and gyroscope data for the current sample
        accl_data = imu_data[i]['accl']
        gyro_data = imu_data[i]['gyro']
        
        # Extract x, y, z components from the accelerometer and gyroscope data
        accl_x, accl_y, accl_z = accl_data
        gyro_x, gyro_y, gyro_z = gyro_data
        
        # Create a new row with the timestamp and sensor data
        new_row = [timestamp_s, accl_x, accl_y, accl_z, gyro_x, gyro_y, gyro_z]
        all_rows.append(new_row)

    imu_data_df = pd.DataFrame(all_rows, columns=kolommen)

    # Optionally save the DataFrame to a CSV file
    imu_data_df.to_csv(config.DATA_DIR / 'normalized_imu_data.csv', index=False)
    
    return imu_data_df

imu_data_df = build_df()

# Display the first and last few rows of the DataFrame
display(imu_data_df.head(200))
display(imu_data_df.tail(200))


# the master function that does everything

In [32]:
#final version

import telemetry_parser
import pandas as pd
import numpy as np
from config import config
from pprint import pprint
from pathlib import Path

def extract_imu_data(file: Path):
    tp = telemetry_parser.Parser(str(file))
    # Define the columns for the dataframe
    kolommen = ['TIMESTAMP', 'ACCL_x', 'ACCL_y', 'ACCL_z', 'GYRO_x', 'GYRO_y', 'GYRO_z']
    imu_data_df = pd.DataFrame(columns=kolommen)

    # Retrieve the length of the data, assuming tp.normalized_imu() returns a list of IMU data samples
    imu_data = tp.normalized_imu()  # Data retrieved from normalized_imu function
    length = len(imu_data)  # Get the number of IMU data samples
    # print(length)

    all_rows = []

    # Iterate over each telemetry sample (second)
    for i in range(length):
        timestamp_s = imu_data[i]['timestamp_ms'] / 1000  # Convert from ms to seconds
        
        # Retrieve accelerometer and gyroscope data for the current sample
        accl_data = imu_data[i]['accl']
        gyro_data = imu_data[i]['gyro']
        
        # Extract x, y, z components from the accelerometer and gyroscope data
        accl_x, accl_y, accl_z = accl_data
        gyro_x, gyro_y, gyro_z = gyro_data
        
        # Create a new row with the timestamp and sensor data
        new_row = [timestamp_s, accl_x, accl_y, accl_z, gyro_x, gyro_y, gyro_z]
        all_rows.append(new_row)

    imu_data_df = pd.DataFrame(all_rows, columns=kolommen)

    # Optionally save the DataFrame to a CSV file
    imu_data_df.to_csv(config.DATA_DIR / 'normalized_imu_data.csv', index=False)
    
    return imu_data_df

In [None]:
display(extract_imu_data(Path('data/raf_frames.MP4')))

# function to work with user input (start frame, end frame and label)

In [None]:
# given the assumption that frame number is an integer between 0 and (lengh of video in seconds)x(30fps)
# in the sample video of 531 seconds, the frame number should be between 0 and 15930

def retrieve_frame_number(frame: int, fps: int):     # outputs a frame number with number of seconds into the video
    if frame < 0:
        print('invalid frame number')
        return None
    elif frame < fps:
        seconde_teller = 0
        return frame, seconde_teller
    else:
        seconde_teller = 0
        frame_relative = frame % fps
        while(frame > fps):              # fps = 30, 60, ...
            frame = frame =- fps
            seconde_teller =+ 1
        return frame_relative, seconde_teller
    
def find_timestamp(frame_relative: int, seconde_teller: int, fps: int):
    temp_linspace = np.linspace(0, 1, 197)             # |parsed_data_accl| = 197 als het goed is
    temp_frame_fraction = frame_relative / fps
    
    closest_timestamp = seconde_teller + temp_linspace[np.abs(temp_linspace - temp_frame_fraction).argmin()]     # finds closest timestamp
    return closest_timestamp
    
def create_df():
    rijen = ['TIMESTAMP', 'LABEL']
    dataframe = pd.DataFrame(columns=rijen)
    dataframe['TIMESTAMP'] = imu_data_df['TIMESTAMP']
    dataframe['LABEL'] = None
    return dataframe

def label_timestamps(frame_start: int, frame_end: int, label: str, fps: int):     # sample video 531 seconds; frame number between 0 and 15930

    start_frame, start_sec = retrieve_frame_number(frame_start, fps)
    end_frame, end_sec = retrieve_frame_number(frame_end, fps)

    timestamp_start = find_timestamp(start_frame, start_sec, fps)
    timestamp_end = find_timestamp(end_frame, end_sec, fps)
    
    dataframe = create_df()

    for i, ts in enumerate(dataframe['TIMESTAMP']):         # label alle timestamps ertussen
        if timestamp_start <= ts < timestamp_end:
            dataframe.at[i, 'LABEL'] = label
    return(dataframe)

display(label_timestamps(2,3, 'test', 30).head(50))

