In [3]:
import pandas as pd
import numpy as np
from pathlib import Path

def process_annotations(annotation_path):
    """
    Process sleep lab annotation CSV file into a structured DataFrame
    
    Parameters:
    annotation_path : str or Path
        
    Returns:
    pd.DataFrame
    Processed DataFrame with columns: Start, Duration [ms], Event, Type, Stage
    """

    pth_to_file = Path(annotation_path)
    df = pd.read_csv(pth_to_file.as_posix())
    df.drop(['Unnamed: 1', 'Unnamed: 2'], axis=1, inplace=True)
    df.dropna(inplace=True)
    pattern = r'Start: (.*?); Duration \[ms\]: (.*?); Event: (.*?); \((.*?)\)(?:; Stage: (.*))?'
    df[['Start', 'Duration [ms]', 'Event', 'Type', 'Stage']] = df['Event-Exportdatei'].str.extract(pattern)
    df.drop('Event-Exportdatei', axis=1, inplace=True)
    df.drop_duplicates(inplace=True)
    df = df.dropna(subset=['Duration [ms]'])
    return df

def convert_duration_to_indices(df, params):
    """
    Convert duration from milliseconds to number of indices based on radar sampling rate
    
    Parameters:
    df: DataFrame with 'Duration [ms]' column
    params: Dictionary containing parameters including 'fps' key
    
    Returns:
    DataFrame with new column 'Duration_indices'
    """
    fps_respiration = params['fps']
    df['Duration_indices'] = (df['Duration [ms]'].astype(float) / 1000 * fps_respiration).astype(int)
    return df



def calculate_end_times(df):
    """
    Calculate end times by adding duration to start times
    
    Parameters:
    df : pd.DataFrame
        DataFrame with 'Start' column (time strings) and 'Duration [ms]' column
        
    Returns:
    pd.DataFrame
        DataFrame with new 'End' column containing end times
    """
    start_times = pd.to_datetime(df['Start'], format='%H:%M:%S')
    duration_seconds = (df['Duration [ms]'].astype(float) / 1000).round().astype(int)
    end_times = start_times + pd.to_timedelta(duration_seconds, unit='s')
    df['End'] = end_times.dt.strftime('%H:%M:%S')
    return df

def extract_recording_date(annotation_path):
    """Extract recording date from annotations CSV file.
    Args:
        annotation_path (str|Path): Path to annotations file
    Returns:
        str: Recording date (DD.MM.YYYY)
    """
    df = pd.read_csv(annotation_path, nrows=50)
    date_row_idx = df[df.iloc[:, 0] == '[Rec. date:]'].index
    if date_row_idx.empty:
        raise ValueError("Recording date not found in file")
    
    date_str = df.iloc[date_row_idx[0] + 1, 0]
    recording_date = pd.to_datetime(date_str, format='%d.%m.%Y')
    return recording_date

In [4]:
study = 'Sleep Lab'
from aws_manager import S3Manager, DBManagerSystem
import yaml
import pandas as pd

def fetch_motion_results(db_manager, session_id):
    query = f"SELECT * FROM MotionResult WHERE SessionID = {session_id}"
    return pd.read_sql(query, db_manager.engine)

def fetch_study_sessions(db_manager, study):
    query = f"""
    SELECT Patient.PatientStudyName, Patient.EnrollmentTime,
    Session.* FROM Session 
    JOIN Patient ON Session.PatientID = Patient.ID 
    JOIN Study ON Patient.StudyID = Study.ID 
    WHERE Study.Name = '{study}'
    """
    return pd.read_sql(query, db_manager.engine)

def find_session_id(recording_date):
    matching_session = session_df[session_df['StartTime'].dt.date == recording_date.date()]
    return matching_session['ID'].item()


def fetch_radar_signals(db_manager, session_id):
    query = f"""
    select RadarSignal.*, RespirationHrResults.* 
    from RadarSignal 
    JOIN RespirationHrResults ON RespirationHrResults.SessionID = RadarSignal.SessionID
    AND RespirationHrResults.SessionSignalID = RadarSignal.SessionSignalID
    where RadarSignal.SessionID = {session_id}
    """
    return pd.read_sql(query, db_manager.engine)


def load_config():
    with open('parameters.yaml', 'r') as file:
        config = yaml.safe_load(file)
    return config

S3_CONFIG = load_config()['Cloud']['s3']
DB_CONFIG = load_config()['Cloud']['DB_system']

s3_manager = S3Manager(
    access_key_id=S3_CONFIG['access_key_id'],
    secret_access_key=S3_CONFIG['secret_access_key'],
    region=S3_CONFIG['region'],
    bucket_name=S3_CONFIG['bucket_name']
)

db_manager = DBManagerSystem(
    host=DB_CONFIG['host'],
    
    user=DB_CONFIG['user'],
    password=DB_CONFIG['password'],
    database=DB_CONFIG['database']
)

In [5]:
def extract_signal_id(object):
    id_string = str(object)
    # Split the string by lines
    lines = id_string.split('\n')
    # From the first line, get the value after "SessionSignalID"
    if lines and "SessionSignalID" in lines[0]:
        # Split by whitespace and get the last part
        parts = lines[0].split()
        if len(parts) > 1:
            return parts[-1].strip()
    return "Unknown"  

def divide_annotations_by_signals(annotations_df, signals_df, recording_date, fps=200, labeler_id='SML'):
    """Divide annotations into different radar signals based on start and end times.
    Args:
        annotations_df (pd.DataFrame): DataFrame with annotations
        signals_df (pd.DataFrame): DataFrame with signal start and end times
        recording_date (datetime): Date when the recording started
        fps (int): Frames per second (default is 200)
        labeler_id (str): ID of the labeler
    Returns:
        pd.DataFrame: New DataFrame with first and last signal IDs
    """

    annotations = annotations_df.copy()
    signals = signals_df.copy()
    annotations['Duration [ms]'] = pd.to_numeric(annotations['Duration [ms]'], errors='coerce')
    time_strings = annotations['Start'].astype(str)
    start_times = []
    for time_str in time_strings:
        hours = int(time_str.split(':')[0])
        if hours >= 12 and hours <= 23: # Evening times 
            date_to_use = recording_date
        else:
            date_to_use = recording_date + pd.Timedelta(days=1)    # Morning times
        dt = pd.to_datetime(f"{date_to_use.strftime('%Y-%m-%d')} {time_str}")
        start_times.append(dt)
    
    annotations['Start'] = start_times
    
    # Handle NaN durations - set End time equal to Start time for crosshair effect
    annotations['End'] = annotations.apply(
        lambda row: row['Start'] if pd.isna(row['Duration [ms]']) 
                    else row['Start'] + pd.Timedelta(seconds=row['Duration [ms]']/1000),axis=1)
    
    
    signals['StartTime'] = pd.to_datetime(signals['StartTime'])
    signals['EndTime'] = pd.to_datetime(signals['EndTime'])
    
  
    results = []
    for idx, annotation in annotations.iterrows():
        matching_signals = []
        for sig_idx, signal in signals.iterrows():
            # Check for overlap
            if annotation['End'] <= signal['StartTime'] or annotation['Start'] >= signal['EndTime']:
                continue  # No overlap
            signal_id = extract_signal_id(signal.SessionSignalID)
            overlap_start = max(annotation['Start'], signal['StartTime'])
            overlap_end = min(annotation['End'], signal['EndTime'])
            matching_signals.append((signal_id, overlap_start, overlap_end, signal['StartTime']))
        
        # If we found matching signals
        if matching_signals:
            matching_signals.sort(key=lambda x: x[1])
            first_signal = matching_signals[0]
            last_signal = matching_signals[-1]
            # Calculate indices for first signal
            if annotation['Start'] == annotation['End']:  # Crosshair
                start_index = int((first_signal[1] - first_signal[3]).total_seconds() * fps)
                first_end_index = start_index
            else:
                start_index = int((first_signal[1] - first_signal[3]).total_seconds() * fps)
                first_end_index = int((first_signal[2] - first_signal[3]).total_seconds() * fps)
            # Calculate indices for last signal
            if annotation['Start'] == annotation['End']:  # Crosshair
                last_start_index = int((last_signal[1] - last_signal[3]).total_seconds() * fps)
                end_index = last_start_index
            else:
                last_start_index = int((last_signal[1] - last_signal[3]).total_seconds() * fps)
                end_index = int((last_signal[2] - last_signal[3]).total_seconds() * fps)
            
            # Add to results
            results.append({
                'FirstSignalID': first_signal[0],
                'LastSignalID': last_signal[0],
                'EventTypeID': annotation['Event'],
                'StartIndex': start_index,
                'EndIndex': end_index,
                'TimeStamp': annotation['Start'],
                'LabelerID': labeler_id })
    
    # Create the DataFrame
    divided_df = pd.DataFrame(results)
    print(f"Total annotations processed: {len(divided_df)}")
    
    return divided_df

In [6]:
params = {'fps': 200}  
annotations_df = process_annotations('/Users/avitalv/Downloads/0010.csv')
annotations_df = convert_duration_to_indices(annotations_df, params)
annotations_df = calculate_end_times(annotations_df)
recording_date = extract_recording_date('/Users/avitalv/Downloads/0010.csv')
session_df = fetch_study_sessions(db_manager, study)
session_id = find_session_id(recording_date)
signals_df = fetch_radar_signals(db_manager, session_id)

In [None]:
divided_df = divide_annotations_by_signals(annotations_df, signals_df, recording_date)

In [None]:
divided_df[divided_df['FirstSignalID'] == divided_df['LastSignalID'] ]