In [1]:
import os
import pandas as pd
import numpy as np
import pickle
import warnings
from tqdm import tqdm
from joblib import Parallel, delayed
from datetime import timedelta

##### Processing PPG Data from Parquet Files

<ul style="font-size: 0.8em;">
This code snippet processes Photoplethysmography (PPG) data stored in Parquet files. It extracts relevant data columns from each file, consolidates the information, and saves the structured data as pickle files. The key steps include:

  <li><strong>Reading Parquet Files:</strong> The script reads specific columns from each Parquet file to optimize performance.</li>
  <li><strong>Data Consolidation:</strong> Data from multiple files is concatenated into a single DataFrame, ensuring consistency and adding user identifiers.</li>
  <li><strong>Data Cleaning:</strong> The DataFrame is cleaned by removing duplicates and sorting by local time.</li>
  <li><strong>Grouping and Saving:</strong> The cleaned data is grouped by day and start hour, then each group is saved as a separate pickle file.</li>

This process is executed for each user directory in parallel, leveraging multiple cores for efficient processing.  
</ul>

In [None]:
from typing import List, Union
from tqdm.notebook import tqdm
from tqdm.auto import tqdm

def extract_ppg_df(udir: str, input_dir: Union[str, os.PathLike], output_dir: Union[str, os.PathLike]) -> None:
    """
    Extracts and processes PPG data from Parquet files in the specified user directory, 
    and saves the processed data as pickle files.

    Args:
    - udir (str): User directory name, containing the PPG data files.
    - input_dir (Union[str, os.PathLike]): The input directory path where user directories are located.
    - output_dir (Union[str, os.PathLike]): The output directory path where processed data will be saved.

    The function reads specific columns from Parquet files, consolidates them into a DataFrame, 
    adds user information, cleans the data, groups it by day and start time, and saves each group to a pickle file.
    """
    # Construct the full path to the user's directory
    udir_path = os.path.join(input_dir, udir)
    
    # Extract user identifier from directory name
    user = udir.split('=')[1]
    
    all_data = []     
    files = [file for file in os.listdir(udir_path) if file.endswith('parquet')]
    
    # Process each Parquet file in the user's directory
    for file in tqdm(files, desc=f"Processing parquet files for {udir}", leave=True):
        filepath = os.path.join(udir_path, file)
        df = pd.read_parquet(filepath, columns=['localtime', 'ppg1', 'day', 'start_lt', 'end_lt', 'label'])
        all_data.append(df)
            
    ppg_df = pd.concat(all_data, ignore_index=True)
    ppg_df['user'] = user        
    
    # Clean the DataFrame by removing duplicates and sorting by localtime
    ppg_df = (ppg_df
              .drop_duplicates()
              .sort_values(by='localtime')
              .reset_index(drop=True))

    grp_dfs = ppg_df.groupby(['day', 'start_lt'])    
    for (day, start_lt), df in grp_dfs:
        label = df['label'].iloc[0]        
        with open(os.path.join(output_dir, f"{user}_{day}_{start_lt}_{label}.pickle"), 'wb') as f:
            pickle.dump(df, f)

# Set input and output directory paths
input_dir = os.path.join(os.getcwd(), 'act_eval_ppg')
output_dir = os.path.join(os.getcwd(), 'dfs_motion_ppg')
os.makedirs(output_dir, exist_ok=True)
    
user_dirs = [udir for udir in os.listdir(input_dir) if udir.startswith('user')]
# Process each user directory in parallel
list(
    tqdm(
        Parallel(return_as="generator", n_jobs=20)(
            delayed(extract_ppg_df)(udir, input_dir, output_dir) 
            for udir in tqdm(user_dirs, desc="Processing for uid: {udir}")
        ),
        total=len(user_dirs),
    )
)

In [2]:
## Create a dataframe of file names of stored pickle files.

# output_dir = os.path.join(os.getcwd(), 'dfs_motion_ppg')
output_dir = "/data/mithun/dfs_motion_ppg"
PPG_available = []
for filename in os.listdir(output_dir):
    PPG_available.append(filename.split('.')[0].split('_'))

PPG_available_df = (pd.DataFrame(PPG_available, columns=['user', 'day', 'st_time', 'label'])
                    .astype({'user': str, 
                             'day': 'datetime64[ns]',  # covert to datetime from object type
                             'st_time': 'datetime64[ns]', # covert to datetime from object type
                             'label': 'str'})
                    .sort_values(by=['user', 'day', 'st_time']))

### 

##### Split 120 participants for train, val and test sets with a ratio of 70:15:15; this same split of pariticipants is used during training also. The test set was kept hidden during the self-spervised training phase to prevent data leakage during downstream task of activity detection in the field. However, since we used logistic regression from scikit-learn for the activity detection downstream task, during model building train and val splits were merged while the test split was used for model evaluation.   

In [3]:
np.random.seed(42)

udirs =  sorted(os.listdir(str(os.path.join(os.getcwd(), 'act_eval_ppg'))))
all_ptcs = [udir.split("=")[1] for udir in udirs if udir.startswith('user')]
all_ptcs_shf = np.random.permutation(all_ptcs).tolist()

n = len(all_ptcs_shf)
train_ratio = 0.7
val_ratio = 0.15

train_end = int(n * train_ratio)
val_end = train_end + int(n * val_ratio)

train_ptcs = all_ptcs_shf[:train_end]
val_ptcs = all_ptcs_shf[train_end:val_end]
test_ptcs = all_ptcs_shf[val_end:]

print(len(train_ptcs), len(val_ptcs), len(test_ptcs))

84 18 18


In [4]:
## Dictionary of users' PPG means, standard deviations, and clipping thresholds    
with open ("dict_user_ppg_mean_std_per.pickle", "rb") as f:
    dict_user_ppg_mean_std_per = pickle.load(f)

##### Exporting PPG Data to Numpy Files

<ul style="font-size: 0.8em;">
This code snippet processes and exports Photoplethysmography (PPG) data stored in pickle files to numpy format. It ensures that the data is correctly normalized and clipped according to user-specific statistics. Key steps include:

  <li><strong>File Reading and Sorting:</strong> The PPG data is read from pickle files and sorted by local time for consistency.</li>
  <li><strong>Interpolation:</strong> When data length is insufficient, interpolation fills the gaps using a 20ms frequency.</li>
  <li><strong>Clipping and Normalization:</strong> Data is clipped to user-specific thresholds and normalized using mean and standard deviation.</li>
  <li><strong>Saving to Numpy:</strong> The processed data is saved as a numpy file with a structured naming convention for easy identification.</li>

The process is applied to each file, optimizing data handling and storage for subsequent analysis and modeling.
</ul>

In [None]:
from typing import Dict, Tuple, Union
from tqdm.notebook import tqdm
from tqdm.auto import tqdm

def export_numpy_file(
    file_path: str,
    dfs_dirs: Union[str, os.PathLike],
    output_np_dir: Union[str, os.PathLike],
    dict_user_ppg_mean_std_per: Dict[str, Tuple[float, float, float]],
    clip: bool,
    norm: bool
) -> None:
    """
    Exports PPG data to a numpy file after processing and normalizing/clipping based on user-specific statistics.

    Args:
    - file_path (str): Path to the pickle file containing the PPG DataFrame.
    - dfs_dirs (Union[str, os.PathLike]): Directory containing the pickle files.
    - output_np_dir (Union[str, os.PathLike]): Directory to save the output numpy files.
    - dict_user_ppg_mean_std_per (Dict[str, Tuple[float, float, float]]): Dictionary of user-specific PPG mean, 
      standard deviation, and clipping threshold.
    - clip (bool): Whether to apply clipping to PPG data.
    - norm (bool): Whether to normalize PPG data using user-specific statistics.

    This function reads a pickle file, processes the PPG data to interpolate if necessary, applies clipping and normalization
    based on user-specific statistics, and saves the result as a numpy file.
    """
    # Load PPG data from a pickle file
    with open(os.path.join(dfs_dirs, file_path), 'rb') as f:
        ppg_df = pickle.load(f)

    # Sort the DataFrame by localtime to ensure chronological order
    ppg_df = ppg_df.sort_values(by=['localtime'])
    user = ppg_df['user'].iloc[0]        
    day = ppg_df['day'].iloc[0]
    st_win = ppg_df['start_lt'].iloc[0]

    # Retrieve user-specific statistics for normalization and clipping
    user_ppg_mean, user_ppg_std, clip_tr = dict_user_ppg_mean_std_per[user]
    
    # Check if the data length is sufficient (at least 95% of 2000 entries)
    if len(ppg_df) / 2000 >= 0.95: # 100 Hz; 20 seconds = 20 * 100 = 2000 datapoints
        if len(ppg_df) >= 2000: 
            # Downsample the PPG data if sufficient
            ppg_win_data = ppg_df['ppg1'].values[:2000][::2]
        else:
            # Perform interpolation if the data is insufficient
            st = ppg_df['start_lt'].iloc[0]
            et = ppg_df['end_lt'].iloc[-1]
            orig_ts = ppg_df['localtime'].values
            new_ts = pd.date_range(start=st, end=et, freq='20ms', inclusive='left')
            orig_ts = pd.to_datetime(orig_ts)

            # Find closest indices before and after each new timestamp for interpolation
            closest_before_idx = np.searchsorted(orig_ts, new_ts, side='right') - 1
            closest_before_idx = np.clip(closest_before_idx, 0, len(orig_ts) - 2)
            closest_after_idx = closest_before_idx + 1

            # Extract timestamps and values for interpolation
            ts1 = orig_ts[closest_before_idx]
            ts2 = orig_ts[closest_after_idx]

            # Convert timestamps to seconds for interpolation
            ts1_delta = (ts1 - pd.Timestamp(0)).total_seconds()
            ts2_delta = (ts2 - pd.Timestamp(0)).total_seconds()
            new_ts_delta = (new_ts - pd.Timestamp(0)).total_seconds()

            # Interpolate PPG data for the new timestamps
            ppg_win_data = np.array([
                np.interp(new_ts_delta[i], [ts1_delta[i], ts2_delta[i]], 
                          [ppg_df['ppg1'].iloc[closest_before_idx[i]], ppg_df['ppg1'].iloc[closest_after_idx[i]]])
                for i in range(len(new_ts_delta))
            ])

        if clip:
            ppg_win_data = np.where(ppg_win_data > clip_tr, clip_tr, ppg_win_data)
        if norm:
            ppg_win_data -= user_ppg_mean
            ppg_win_data /= user_ppg_std
        
        # Construct the file name and save path
        file_name = f"{user}_{day}_{st_win}.npy"
        save_path = os.path.join(output_np_dir, file_name)        
        np.save(save_path, ppg_win_data)

In [5]:
import os
from typing import Union

def create_data_directories(base_dir: Union[str, os.PathLike]) -> None:
    """
    Creates a base directory and subdirectories for training, validation, and testing data,
    including 'noise' and 'noisefree' categories within each subdirectory.
    
    Parameters:
    - base_dir (Union[str, os.PathLike]): The path to the base directory to be created. 
      Subdirectories 'train', 'val', and 'test' with 'noise' and 'noisefree' categories
      will be created within this directory.
      
    The function ensures that all necessary directories are created if they do not already exist.
    """
    # Create the base directory if it doesn't exist
    base_path = os.path.join(os.getcwd(), base_dir)
    os.makedirs(base_path, exist_ok=True)

    # Define the structure of subdirectories needed
    subdirs = ['train', 'val', 'test']
    categories = ['noise', 'noisefree']

    # Create each subdirectory and its categories
    for subdir in subdirs:
        for category in categories:
            os.makedirs(os.path.join(base_path, subdir, category), exist_ok=True)

In [None]:
# create labelwise directories for train, val and test sets
create_data_directories('motion_ppg_np')
dfs_dirs = os.path.join(os.getcwd(), 'dfs_motion_ppg') # location of pickle files with ppg data

##### Processing PPG Data for Machine Learning

<ul style="font-size: 0.8em;">
This function processes Photoplethysmography (PPG) data for different participant groups and labels, exporting them as numpy files and loading them for use in machine learning tasks. It handles various steps including filtering, file path generation, parallel processing, and data labeling. Key steps include:

  <li><strong>Data Filtering:</strong> Selects data for specific participants and labels, ensuring relevant subsets are processed.</li>
  <li><strong>File Path Construction:</strong> Dynamically generates file paths based on metadata, facilitating organized data handling.</li>
  <li><strong>Parallel Processing:</strong> Utilizes parallel computing to efficiently process multiple files simultaneously.</li>
  <li><strong>Data Export and Load:</strong> Saves processed data as numpy files and loads them into arrays with corresponding labels for classification tasks.</li>

This approach standardizes the processing pipeline, making it adaptable for different datasets and labels, ensuring efficient data handling for machine learning applications.
</ul>

In [None]:
from typing import List, Dict, Tuple, Union
from tqdm import tqdm

def process_ppg_data(
    PPG_available_df: pd.DataFrame,
    participants: List[str],
    label: str,
    output_subdir: str,
    dfs_dirs: Union[str, os.PathLike],
    dict_user_ppg_mean_std_per: Dict[str, Tuple[float, float, float]],
    clip: bool = True,
    norm: bool = True,
    n_jobs: int = 30
) -> Tuple[np.ndarray, np.ndarray]:
    """
    Processes PPG data for participants, exports to numpy files, and loads them into numpy arrays for classification.

    Args:
    - PPG_available_df (pd.DataFrame): DataFrame containing metadata about available PPG data.
    - participants (List[str]): List of participant identifiers.
    - label (str): Label for the type of data ('noise' or 'noisefree').
    - output_subdir (str): Subdirectory under 'motion_ppg_np' for saving processed data ('test', 'val', 'train').
    - dfs_dirs (Union[str, os.PathLike]): Directory containing the pickled DataFrame files.
    - dict_user_ppg_mean_std_per (Dict[str, Tuple[float, float, float]]): Dictionary with user-specific statistics.
    - clip (bool): Whether to apply clipping to PPG data.
    - norm (bool): Whether to normalize PPG data using user-specific statistics.
    - n_jobs (int): Number of parallel jobs for processing.

    Returns:
    - Tuple[np.ndarray, np.ndarray]: The processed PPG data and their corresponding labels.
    """
    # Filter DataFrame for the specified participants and label
    filtered_df = PPG_available_df.loc[(PPG_available_df['user'].isin(participants)) & 
                                       (PPG_available_df['label'] == label)]
    
    # Generate file paths for the pickle files
    file_paths = [
        os.path.join(f"{row['user']}_{row['day'].date()}_{row['st_time']}_{row['label']}.pickle")
        for idx, row in filtered_df.iterrows()
    ]
    
    # Define output directory for processed numpy files
    output_np_dir = os.path.join(os.getcwd(), 'motion_ppg_np', output_subdir, label)
    
    # Process each file and save as a numpy file
    list(
        tqdm(
            Parallel(return_as="generator", n_jobs=n_jobs)(
                delayed(export_numpy_file)(file_path, dfs_dirs, output_np_dir, dict_user_ppg_mean_std_per, clip, norm) 
                for file_path in tqdm(file_paths, desc="Processing pickle files")
            ),
            total=len(file_paths)
        )
    )
    
    # Load the processed numpy files into an array
    ppg_X = []
    for file in os.listdir(output_np_dir):
        ppg_X.append(np.load(os.path.join(output_np_dir, file)))
    
    # Generate labels for the data
    ppg_y = [1 if label == 'noise' else 0] * len(ppg_X)  # 1 for noise, 0 for noisefree
    
    return ppg_X, ppg_y

In [14]:
# Process train data for activity detection in the field
train_X_noise, train_y_noise = process_ppg_data(
    PPG_available_df, train_ptcs, 'noise', 'train', dfs_dirs, dict_user_ppg_mean_std_per
)

train_X_noisefree, train_y_noisefree = process_ppg_data(
    PPG_available_df, train_ptcs, 'noisefree', 'train', dfs_dirs, dict_user_ppg_mean_std_per
)

# Combine the results
train_X = np.array(train_X_noisefree + train_X_noise)
train_X = train_X.reshape(train_X.shape[0], 1, -1)
train_y = np.array(train_y_noisefree + train_y_noise)

In [15]:
train_X.shape, train_y.shape

((2597670, 1, 1000), (2597670,))

In [10]:
# Process val data for activity detection in the field
val_X_noise, val_y_noise = process_ppg_data(
    PPG_available_df, val_ptcs, 'noise', 'val', dfs_dirs, dict_user_ppg_mean_std_per
)

val_X_noisefree, val_y_noisefree = process_ppg_data(
    PPG_available_df, val_ptcs, 'noisefree', 'val', dfs_dirs, dict_user_ppg_mean_std_per
)

# Combine the results
val_X = np.array(val_X_noisefree + val_X_noise)
val_X = val_X.reshape(val_X.shape[0], 1, -1)
val_y = np.array(val_y_noisefree + val_y_noise)

In [11]:
val_X.shape, val_y.shape

((499345, 1, 1000), (499345,))

In [12]:
# Process test data for activity detection in the field
test_X_noise, test_y_noise = process_ppg_data(
    PPG_available_df, test_ptcs, 'noise', 'test', dfs_dirs, dict_user_ppg_mean_std_per
)

test_X_noisefree, test_y_noisefree = process_ppg_data(
    PPG_available_df, test_ptcs, 'noisefree', 'test', dfs_dirs, dict_user_ppg_mean_std_per
)

# Combine the results
test_X = np.array(test_X_noisefree + test_X_noise)
test_X = test_X.reshape(test_X.shape[0], 1, -1)
test_y = np.array(test_y_noisefree + test_y_noise)

In [13]:
test_X.shape, test_y.shape

((462414, 1, 1000), (462414,))

##### Save train, val and test numpy files to be used for model building for the downstream task of activity detection in the field 

In [16]:
model_data_dir = os.path.join(os.getcwd(), "downstream", "stnst", "ppg_clp_norm")
if not os.path.exists(model_data_dir):
    os.makedirs(model_data_dir)

file_name = "train_X_ppg.npy"
np.save(os.path.join(model_data_dir, file_name), train_X)
file_name = "val_X_ppg.npy"
np.save(os.path.join(model_data_dir, file_name), val_X)
file_name = "test_X_ppg.npy"
np.save(os.path.join(model_data_dir, file_name), test_X)

file_name = "train_y_stationary_nonstationary.npy"
np.save(os.path.join(model_data_dir, file_name), train_y)
file_name = "val_y_stationary_nonstationary.npy"
np.save(os.path.join(model_data_dir, file_name), val_y)
file_name = "test_y_stationary_nonstationary.npy"
np.save(os.path.join(model_data_dir, file_name), test_y)