# USB Protocol Analysis

Use the dropdown below to select a specific capture file (`source_file`) to analyze. Select 'All' to view aggregated data from all captures.


In [1]:
import polars as pl
import ipywidgets as widgets
from ipywidgets import VBox, Dropdown, Output, HTML
from IPython.display import display
import sys
from pathlib import Path
import warnings

warnings.filterwarnings('ignore', category=UserWarning, module='polars')


# --- Path Setup ---
# To make imports robust, we'll find the project root and add the scripts directory to the path.
# The project root is expected to contain the 'pyproject.toml' file.
try:
    # Start from the current working directory and go up until we find the root
    project_root = Path.cwd()
    while not (project_root / 'pyproject.toml').exists():
        if project_root == project_root.parent:
            raise FileNotFoundError("Reached filesystem root, 'pyproject.toml' not found.")
        project_root = project_root.parent
    
    analysis_scripts_path = project_root / 'analysis' / 'scripts'
    if str(analysis_scripts_path) not in sys.path:
        sys.path.insert(0, str(analysis_scripts_path))
    
    print(f"✅ Project root found at: {project_root}")
    print(f"✅ Added to sys.path: {analysis_scripts_path}")

except FileNotFoundError:
    print("❌ Could not determine project root. Imports may fail.")
    print(f"   Current working directory: {Path.cwd()}")


# --- Import and Data Load ---
try:
    import helpers
    print("✅ Successfully imported helpers.py")
except ImportError as e:
    print("❌ Could not import helpers.py. This might be due to a missing dependency in the script itself.")
    print(f"   Detailed error: {e}")
    helpers = None

# Load data
df = None
if helpers:
    try:
        # The master dataset is at the project root.
        df = helpers.load_master_dataset(project_root / 'usb_master_dataset.parquet')
    except FileNotFoundError as e:
        print(f"❌ {e}")


✅ Project root found at: /home/okhsunrog/code/km003c-protocol-research
✅ Added to sys.path: /home/okhsunrog/code/km003c-protocol-research/analysis/scripts
✅ Successfully imported helpers.py
✅ Loaded 12,008 USB packets from /home/okhsunrog/code/km003c-protocol-research/usb_master_dataset.parquet


In [2]:
if df is not None and not df.is_empty():
    # Get unique source files from the 'source_file' column
    source_files = ['All'] + sorted(df['source_file'].unique().to_list())

    # --- Create Widgets ---
    title = HTML("<h2>USB Transaction Analysis</h2>")
    
    source_dropdown = Dropdown(
        options=source_files,
        value='All',
        description='Source File:',
        style={'description_width': 'initial'},
        layout={'width': 'max-content'}
    )
    
    enum_checkbox = widgets.Checkbox(
        value=True,
        description='Hide Enumeration',
        disabled=False,
        indent=False
    )
    
    output_area = Output()

    # --- Define Handler ---
    def on_change(change):
        """Callback to update analysis when a control value changes."""
        with output_area:
            output_area.clear_output(wait=True)
            
            selected_file = source_dropdown.value
            hide_enum = enum_checkbox.value
            
            if selected_file == 'All':
                filtered_df = df
                print(f"📊 Showing transactions for ALL source files.")
            else:
                filtered_df = df.filter(pl.col('source_file') == selected_file)
                print(f"📊 Showing transactions for: {selected_file}")

            # Display stats for the selected data
            if not filtered_df.is_empty():
                # Process and display transactions
                transactions = helpers.get_transactions(filtered_df, filter_out_enumeration=hide_enum)
                helpers.print_transaction_log(transactions, limit=50) # Limit to 50 for notebook display
            else:
                print("No data available for this selection.")

    # --- Wire up and Display ---
    source_dropdown.observe(on_change, names='value')
    enum_checkbox.observe(on_change, names='value')

    # Display widgets
    controls = widgets.HBox([source_dropdown, enum_checkbox])
    display(VBox([title, controls, output_area]))

    # Trigger initial display
    on_change(None)

elif df is not None and df.is_empty():
    print("Dataset is loaded but empty. No analysis to display.")
else:
    print("Dataset could not be loaded. Cannot build UI.")


VBox(children=(HTML(value='<h2>USB Transaction Analysis</h2>'), HBox(children=(Dropdown(description='Source Fi…

In [2]:
# Process transactions, filtering out the standard USB enumeration
# To see ALL transactions including enumeration, set filter_out_enumeration=False
transactions = get_transactions(session_df, filter_out_enumeration=True)

# Display the transaction log (will show all by default)
print_transaction_log(transactions)


Found 194 logical transactions. Displaying 194:
--------------------------------------------------------------------------------------------------------------
#   Timestamp (s) Duration (ms) | Type            | Submit Details                 | Complete Details
--------------------------------------------------------------------------------------------------------------
1   5.519038             3.920 | HID-Class Command | H->D   7b - ...                | H->D   0b - ...
2   5.526973             2.978 | HID-Class Command | H->D   0b - ...                | H->D   0b - ...
3   5.529960             3.994 | Other Control   | D->H   0b - ...                | D->H  24b - ...
4   13.328214            3.782 | Vendor Command (0x32) | D->H   0b - ...                | D->H 170b - ...
5   13.457881            2.121 | HID-Class Command | D->H   0b - ...                | D->H   7b - ...
6   13.463666            1.337 | HID-Class Command | H->D   0b - ...                | H->D   0b - ...
7   13.465317 