In [59]:
import numpy as np
import pandas as pd
from demoparser2.demoparser2 import DemoParser

# 64Hz = 64 ticks per second
SEGMENT_SECONDS = 5
SEGMENT_LENGTH = 64 * SEGMENT_SECONDS

BUTTONS = ["FORWARD", "LEFT", "RIGHT", "BACK", "is_walking"]


# ======================================================================================= #
# PIPELINE HELPERS

def extract_ticks_df(demo_parser: DemoParser):
    """Given a demo parser object, collect all keyboard and mouse dynamic information per tick for every player.

    Ticks where irregular game states occur are filtered out (e.g. warmup and freeze period, player deaths).

    :param demo_parser: DemoParser object.
    :return: DataFrame containing all tick information.
    """
    key_features = ["FORWARD", "LEFT", "RIGHT", "BACK", "is_walking"]
    mouse_features = ["yaw", "pitch"]
    ticks_df = demo_parser.parse_ticks([
        *key_features,
        *mouse_features,
        # "is_freeze_period",
        "is_warmup_period",
        "is_alive",
        "total_rounds_played",
        # "is_terrorist_timeout",
        # "is_ct_timeout"
    ])
    # ticks_df = ticks_df[~ticks_df.is_freeze_period]
    ticks_df = ticks_df[~ticks_df.is_warmup_period]
    # ticks_df = ticks_df[~ticks_df.is_terrorist_timeout]
    # ticks_df = ticks_df[~ticks_df.is_ct_timeout]
    ticks_df = ticks_df[ticks_df.is_alive]
    ticks_df = ticks_df.drop(columns=["is_warmup_period", "is_alive"])
    # ticks_df = ticks_df.drop(columns=["is_freeze_period", "is_terrorist_timeout", "is_ct_timeout"])
    ticks_df[key_features] = ticks_df[key_features].astype(int)
    return ticks_df


def extract_segments(player_round_df: pd.DataFrame):
    segments = []
    min_tick = player_round_df.iloc[0].tick
    max_tick = player_round_df.iloc[-1].tick
    for start_tick in range(min_tick, max_tick, SEGMENT_LENGTH):
        end_tick = start_tick + SEGMENT_LENGTH
        segment_df = player_round_df[(player_round_df.tick >= start_tick) & (player_round_df.tick < end_tick)]
        segment = extract_segment(segment_df)
        segments.append(segment)
        # break
    return segments


def extract_segment(segment_df: pd.DataFrame):
    n_keys_pressed = extract_n_keys_pressed(segment_df)
    # print(n_keys_pressed)

    key_hold_time = extract_key_hold_time(segment_df)
    # print(key_hold_time)

    key_transition_time = extract_key_transition_time(segment_df)
    # print(key_transition_time)

    # print(segment_df)
    # print(get_run_lengths(segment_df["FORWARD"]))


# ======================================================================================= #
# FEATURE EXTRACTION HELPERS


def extract_n_keys_pressed(segment_df: pd.DataFrame):
    """Extract the frequency that N buttons were pressed during the segment, normalized to sum to 1.

    For e.g. if the user holds FORWARD only during the entire segment, the feature vector will be:
    [0 1 0 0 0 0]

    :param segment_df: the segment.
    :return: the feature vector.
    """
    n_buttons = segment_df[BUTTONS].values.sum(axis=1)
    n_buttons, counts = np.unique(n_buttons, return_counts=True)
    counts = counts / counts.sum()
    feature = np.zeros(len(BUTTONS) + 1)
    for n_button, count in zip(n_buttons, counts):
        feature[n_button] = count
    return feature


def get_run_lengths(column: pd.Series):
    """Returns the lengths of consecutive runs of 1s in a binary column.

    :param column: pd.Series containing binary column.
    :return: list containing lengths of each run of consecutive 1s.
    """
    is_new_run = np.cumsum(column != column.shift())  # identify change points
    return column.groupby(is_new_run).apply(lambda g: len(g) if g.iloc[0] == 1 else None).dropna().astype(int).tolist()


def extract_key_hold_time(segment_df: pd.DataFrame):
    """Extract the average time that a single button is held during the segment, in seconds.

    For e.g. if the user holds FORWARD for the entire segment, then the feature vector will be:
    [K 0 0 0 0] (for however long K is in seconds)

    If they hold down FORWARD four times during the segment for a short duration each time, then the
    feature vector will contain an average of each hold's duration.

    :param segment_df: the segment.
    :return: the feature vector.
    """
    runs = {
        button: get_run_lengths(segment_df[button]) for button in BUTTONS
    }
    mean = np.array([
        np.mean(runs[button]) / 64  # convert to seconds
        if len(runs[button]) > 0
        else 0
        for button in BUTTONS
    ])
    return mean


def get_gaps_between_runs(column: pd.Series):
    """Returns the lengths of gaps between runs of 1s in a binary column.

    :param column: pd.Series containing binary column.
    :return: list containing lengths of each gap between runs of consecutive 1s.
    """
    is_new_run = np.cumsum(column != column.shift())  # identify change points
    run_groups = column.groupby(is_new_run).apply(lambda g: (len(g), g.iloc[0]))  # (length, value)

    # extract lengths of 0-runs that are between two 1-runs
    zero_runs = [run_groups.iloc[i][0] for i in range(1, len(run_groups) - 1)
                 if run_groups.iloc[i - 1][1] == 1 and run_groups.iloc[i][1] == 0 and run_groups.iloc[i + 1][1] == 1]
    return zero_runs


def extract_key_transition_time(segment_df: pd.DataFrame):
    """Extract the average transition time between consecutive key preses during the segment.

    For e.g. if the user holds FORWARD twice, with 1 second between, then the feature vector will be:
    [1 0 0 0 0] (for however long K is in seconds)

    If they are ADAD-strafing, then the feature vector will contain an average of the transition time
    between each consecutive A presses and D presses, something like this:
    [0 X Y 0 0]

    :param segment_df: the segment.
    :return: the feature vector.
    """
    gaps = {
        button: get_gaps_between_runs(segment_df[button]) for button in BUTTONS
    }
    mean = np.array([
        np.mean(gaps[button]) / 64  # convert to seconds
        if len(gaps[button]) > 0
        else SEGMENT_SECONDS
        for button in BUTTONS
    ])
    return mean


# ======================================================================================= #
# MAIN PIPELINE

def main():
    path = "/Users/ktz/msai/msthesis/res/blast-premier-fall-final-2024-g2-vs-spirit-bo3-keEog6FzQxxIbzN28Nh3S0/g2-vs-spirit-m1-dust2.dem"
    demo_parser = DemoParser(path)
    ticks_df = extract_ticks_df(demo_parser)

    for (player_id, game_round), player_round_df in ticks_df.groupby(["steamid", "total_rounds_played"]):
        # print(player_id, game_round)
        segments = extract_segments(player_round_df)
        # break


main()

Exception ignored in: <bound method IPythonKernel._clean_thread_parent_frames of <ipykernel.ipkernel.IPythonKernel object at 0x107b879d0>>
Traceback (most recent call last):
  File "/Users/ktz/msai/msthesis/.venv/lib/python3.10/site-packages/ipykernel/ipkernel.py", line 775, in _clean_thread_parent_frames
    def _clean_thread_parent_frames(
KeyboardInterrupt: 


KeyboardInterrupt: 