In [None]:
"""
Mobility-Immobility Analysis with Speed Thresholds

This script analyzes time-series speed data to identify periods of mobility and immobility
based on a speed threshold (0.02m/s). It processes the data to merge consecutive windows of the same
speed state that are shorter than a specified duration (less than 0.5s), calculates statistics for each window,
and visualizes the results.
"""

import pandas as pd
import plotly.graph_objects as go
import numpy as np
import os
from typing import List, Tuple, Union

def load_data(file_path: str, time_col: str = "time", speed_col: str = "speed") -> pd.DataFrame:
    """
    Load time and speed data from a CSV file with error handling.
    
    Args:
        file_path (str): Path to the CSV file
        time_col (str): Name of the column containing time data
        speed_col (str): Name of the column containing speed data
        
    Returns:
        pd.DataFrame: DataFrame containing the loaded data
    """
    try:
        df = pd.read_csv(file_path)
        if time_col not in df.columns or speed_col not in df.columns:
            raise KeyError(f"Required columns '{time_col}' or '{speed_col}' not found in the file.")
        return df
    except FileNotFoundError:
        raise FileNotFoundError(f"The file {file_path} does not exist.")

def identify_speed_windows(time: pd.Series, speed: pd.Series, 
                          speed_threshold: float = 0.02) -> List[List[Tuple[float, float]]]:
    """
    Identify windows of continuous speed states (low or high) based on a threshold.
    
    Args:
        time (pd.Series): Series containing time data
        speed (pd.Series): Series containing speed data
        speed_threshold (float): Threshold to distinguish between low and high speed (m/s)
        
    Returns:
        List[List[Tuple[float, float]]]: List of windows, where each window is a list of (time, speed) tuples
    """
    # Create masks for low and high speed
    low_speed_mask = (speed >= 0) & (speed <= speed_threshold)
    high_speed_mask = speed > speed_threshold
    
    # Initialize variables
    all_windows = []
    current_window = []
    current_state = None  # None, 'low', or 'high'
    
    for i in range(len(time)):
        # Determine the current state
        state = 'low' if low_speed_mask.iloc[i] else 'high'
        
        # Handle transitions between states
        if current_state is None:  # Start of the first window
            current_state = state
            current_window.append((time.iloc[i], speed.iloc[i]))
        elif state == current_state:  # Continue the current window
            current_window.append((time.iloc[i], speed.iloc[i]))
        else:  # Transition to a new state
            all_windows.append(current_window)
            current_window = [(time.iloc[i], speed.iloc[i])]
            current_state = state
    
    # Add the last window if it exists
    if current_window:
        all_windows.append(current_window)
    
    return all_windows

def merge_short_windows(windows: List[List[Tuple[float, float]]], 
                       min_duration: float = 0.5) -> List[List[Tuple[float, float]]]:
    """
    Merge consecutive windows that are shorter than the specified duration.
    
    Args:
        windows (List[List[Tuple[float, float]]]): List of windows to merge
        min_duration (float): Minimum duration for a window (shorter windows will be merged)
        
    Returns:
        List[List[Tuple[float, float]]]: List of merged windows
    """
    if not windows:
        return []
    
    merged_windows = [windows[0]]
    
    for window in windows[1:]:
        last_window = merged_windows[-1]
        
        # Calculate duration of the current window
        window_duration = window[-1][0] - window[0][0]
        
        if window_duration <= min_duration:
            # Merge with the previous window
            last_window.extend(window)
        else:
            # Add as a new window
            merged_windows.append(window)
    
    return merged_windows

def merge_windows_with_gaps(windows: List[List[Tuple[float, float]]], 
                           min_gap: float = 0.5) -> List[List[Tuple[float, float]]]:
    """
    Merge windows that have small gaps between them.
    
    Args:
        windows (List[List[Tuple[float, float]]]): List of windows to merge
        min_gap (float): Maximum gap between windows to be merged
        
    Returns:
        List[List[Tuple[float, float]]]: List of merged windows
    """
    if not windows:
        return []
    
    merged_windows = [windows[0]]
    
    for window in windows[1:]:
        last_window = merged_windows[-1]
        
        # Calculate the gap between windows
        gap = window[0][0] - last_window[-1][0]
        
        if gap <= min_gap:
            # Merge the windows
            last_window.extend(window)
        else:
            # Add as a new window
            merged_windows.append(window)
    
    return merged_windows

def calculate_window_statistics(windows: List[List[Tuple[float, float]]], 
                               speed_threshold: float = 0.02) -> Tuple:
    """
    Calculate statistics (average speed, duration) for each window.
    
    Args:
        windows (List[List[Tuple[float, float]]]): List of windows
        speed_threshold (float): Threshold to distinguish between low and high speed (m/s)
        
    Returns:
        Tuple: Contains (high_speed_averages, low_speed_averages, low_speed_durations, high_speed_durations)
    """
    # Separate windows into low and high speed groups
    low_speed_windows = [w for w in windows if w[0][1] <= speed_threshold]
    high_speed_windows = [w for w in windows if w[0][1] > speed_threshold]
    
    # Calculate average speed for high-speed windows
    high_speed_averages = [
        (np.mean([s for t, s in window]), window) for window in high_speed_windows
    ]
    
    # Calculate average speed for low-speed windows
    low_speed_averages = [
        (np.mean([s for t, s in window]), window) for window in low_speed_windows
    ]
    
    # Calculate durations for low-speed windows
    low_speed_durations = [
        (window[0][0], window[-1][0], window[-1][0] - window[0][0]) for window in low_speed_windows
    ]
    
    # Calculate durations for high-speed windows
    high_speed_durations = [
        (window[0][0], window[-1][0], window[-1][0] - window[0][0]) for window in high_speed_windows
    ]
    
    return high_speed_averages, low_speed_averages, low_speed_durations, high_speed_durations

def visualize_speed_data(time: pd.Series, speed: pd.Series, 
                        low_speed_windows: List[List[Tuple[float, float]]],
                        high_speed_windows: List[List[Tuple[float, float]]],
                        speed_threshold: float = 0.02) -> go.Figure:
    """
    Create a visualization of speed vs. time with highlighted regions for different speed states.
    
    Args:
        time (pd.Series): Series containing time data
        speed (pd.Series): Series containing speed data
        low_speed_windows (List[List[Tuple[float, float]]]): List of low-speed windows
        high_speed_windows (List[List[Tuple[float, float]]]): List of high-speed windows
        speed_threshold (float): Threshold used to distinguish between low and high speed (m/s)
        
    Returns:
        go.Figure: Plotly figure object
    """
    # Create Plotly figure
    fig = go.Figure()
    
    # Add speed trace
    fig.add_trace(go.Scatter(x=time, y=speed, mode='lines', name='Speed (m/s)', line=dict(color='black')))
    
    # Add background color for low-speed regions
    for window in low_speed_windows:
        start, end = window[0][0], window[-1][0]
        fig.add_vrect(x0=start, x1=end, fillcolor='red', opacity=0.2, layer='below', line_width=0)
    
    # Add background color for high-speed regions
    for window in high_speed_windows:
        start, end = window[0][0], window[-1][0]
        fig.add_vrect(x0=start, x1=end, fillcolor='green', opacity=0.2, layer='below', line_width=0)
    
    # Add threshold line
    fig.add_hline(y=speed_threshold, line_dash="dash", line_color="blue", 
                  annotation_text=f"Speed Threshold ({speed_threshold} m/s)")
    
    # Update layout
    fig.update_layout(
        title="Speed vs. Time with Activity Regions",
        xaxis_title="Time (s)",
        yaxis_title="Speed (m/s)",
        template="plotly_white",
        legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1)
    )
    
    return fig

def save_results(output_file: str, high_speed_averages: List[Tuple], 
                low_speed_averages: List[Tuple], low_speed_durations: List[Tuple], 
                high_speed_durations: List[Tuple]) -> None:
    """
    Save the processed data to a CSV file.
    
    Args:
        output_file (str): Path to the output CSV file
        high_speed_averages (List[Tuple]): List of tuples containing (avg_speed, window) for high-speed windows
        low_speed_averages (List[Tuple]): List of tuples containing (avg_speed, window) for low-speed windows
        low_speed_durations (List[Tuple]): List of tuples containing (start_time, end_time, duration) for low-speed windows
        high_speed_durations (List[Tuple]): List of tuples containing (start_time, end_time, duration) for high-speed windows
    """
    output_data = []
    
    # Add high-speed windows data
    for (avg_speed, _), (start_time, end_time, duration) in zip(high_speed_averages, high_speed_durations):
        output_data.append([start_time, end_time, avg_speed, duration, "High Speed"])
    
    # Add low-speed windows data with average speed
    for (avg_speed, _), (start_time, end_time, duration) in zip(low_speed_averages, low_speed_durations):
        output_data.append([start_time, end_time, avg_speed, duration, "Low Speed"])
    
    # Create DataFrame and save to CSV
    result_df = pd.DataFrame(output_data, columns=["Start Time", "End Time", "Average Speed", "Duration", "Speed State"])
    result_df.to_csv(output_file, index=False)
    print(f"Processed data saved to: {output_file}")

def main():
    """Main function to execute the mobility-immobility analysis."""
    # Configuration parameters
    file_path = "path_to_your_file.csv"  # Change to your actual file path
    time_col = "time"
    speed_col = "speed"
    speed_threshold = 0.02  # m/s
    min_duration = 0.5  # seconds
    
    try:
        # Load data
        df = load_data(file_path, time_col, speed_col)
        time = df[time_col]
        speed = df[speed_col]
        
        # Identify speed windows
        all_windows = identify_speed_windows(time, speed, speed_threshold)
        
        # Merge short windows
        merged_windows = merge_short_windows(all_windows, min_duration)
        
        # Separate windows into low and high speed groups
        low_speed_windows = [w for w in merged_windows if w[0][1] <= speed_threshold]
        high_speed_windows = [w for w in merged_windows if w[0][1] > speed_threshold]
        
        # Merge windows with small gaps
        low_speed_windows = merge_windows_with_gaps(low_speed_windows, min_duration)
        high_speed_windows = merge_windows_with_gaps(high_speed_windows, min_duration)
        
        # Calculate window statistics (now includes low-speed averages)
        high_speed_averages, low_speed_averages, low_speed_durations, high_speed_durations = calculate_window_statistics(
            merged_windows, speed_threshold
        )
        
        # Visualize data
        fig = visualize_speed_data(time, speed, low_speed_windows, high_speed_windows, speed_threshold)
        fig.show()
        
        # Save results (now includes low-speed averages)
        output_file = os.path.join(os.path.dirname(file_path), "processed_file.csv")  # Change to your actual file name
        save_results(output_file, high_speed_averages, low_speed_averages, low_speed_durations, high_speed_durations)
        
    except Exception as e:
        print(f"An error occurred: {str(e)}")

if __name__ == "__main__":
    main()

Processed data saved to: C:/Users/Labo/Documents/Inscopix_Projects/160-179-182-DT-LT_circular-rectangular-mazes/Analysis/Circular Arena/DLC-Speed\160-Circular-Arena-DLC-XY-speed_vs_time-mobility_data-1.csv
