# HRV & T2DM Capstone Project
## Data Extraction

January-March 2025 | Paul Kalnins, ND (kalninsp@ohsu.edu)


In [1]:
!python --version

Python 3.8.20


In [26]:
import os
import numpy as np
import pandas as pd
import math
import json
import orjson
import csv
from datetime import datetime
import scipy
import matplotlib
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm 
import wfdb
import neurokit2 as nk
from scipy import signal
import biosppy

from concurrent.futures import ThreadPoolExecutor
from collections import defaultdict
from IPython.display import display, Markdown


# Check current working directory
print(os.getcwd())

# Test if the libraries are available
print("Numpy:", np.__version__)
print("Pandas:", pd.__version__)
print("Matplotlib:", matplotlib.__version__)
print("Biosspy:", biosppy.__version__)
print("WFDB:", wfdb.__version__)
print("orjson:", orjson.__version__)
print("Neurokit:", nk.__version__)
print("Scipy:", scipy.__version__)

/Users/pkalnins/Documents/Capstone/yr2_data/dataset
Numpy: 1.24.4
Pandas: 2.0.3
Matplotlib: 3.7.5
Biosspy: 2.1.2
WFDB: 4.1.2
orjson: 3.10.15
Neurokit: 0.2.10
Scipy: 1.10.1


***

## Data Extraction

### Data Sources 

- Clinical data
    - measurement.csv
    - observation.csv
    - condition_occurrence.csv
    
- Wearable Activity Monitory
    - heart_rate
    - respiratory_rate
    - stress
    
- Cardiac ECG



***

### Extraction of Variables 

Datasets:

- Measurements: "clinical_data/measurement.csv"
- Conditions: "clinincal_data/condition_occurrence.csv"
- Observations: "clinical_data/observation.csv"

The following class `DataExtractor()` has a method `run()` that extracts the relevant data, calculates various metrics, and writes them to the file "extracted_data/measurements.csv"

Calculations/Metrics Added: 

- HOMA-IR
- HOMA-IR Categories (insulin resistance > 2.5)
- Hepatic Steatosis Index (HSI) and categories
- Metabolic syndrome (MetS_cat) (0 == no, 1 == yes)
- Liver Fat Score (LFS)
- LFS category (FLS_cat) (0 == no, 1 == yes)
- FIB-4 Score and categories

In [127]:
class DataExtractor:
    def __init__(self):
        self.study_year = 2024
        self.output_folder = "extracted_data"
        self.output_file = "extracted_data/measurements.csv"
        self.measurements = "clinical_data/measurement.csv"
        self.observations = "clinical_data/observation.csv"
        self.conditions = "clinical_data/condition_occurrence.csv"
        
        self.measurement_dict = {
            3012888: "DBP", 3004249: "SBP", 2005200152: "waist_circum(cm)", 
            3036277: "height(cm)", 3025315: "weight(kg)", 2005200153: "hip_circum(cm)", 
            44809433: "WHR", 4245997: "BMI", 3004501: "glucose", 3004410: "hba1c", 
            3016244: "insulin", 3010084: "c_peptide", 3010156: "hs_crp", 
            3027114: "Tchol", 3022192: "TG", 3007070: "HDL", 3028288: "LDL", 
            3006923: "ALT", 3013721: "AST", 3024128: "bilirubin", 3035995: "alk_phos", 
            3024561: "albumin", 3021886: "globulin", 4288601: "ag_ratio", 
            3020630: "tot_protein", 3016723: "creatinine", 4112223: "BUN_creatinine", 
            3013682: "BUN", 3017250: "creatinine_urine", 3001802: "albumin_urine", 
            3029187: "nt_probnp", 40769783: "troponin_t", 3019550: "sodium", 
            3023103: "potassium", 3014576: "chloride", 3006906: "calcium", 
            3015632: "CO2", 2005200183: "RCB", 2005200182: "WCB", 3007461: "platelets", 
            2005200184: "hemoglobin", 3009542: "hematocrit", 3024731: "MCV", 
            3035941: "MCH", 3003338: "MCHC", 3002385: "RDW"
        }
        
        self.condition_dict = {
            433736: "obesity",
            37018196: "pre_DM",
            2005200547: "elev_hba1c",
            201826: "T2DM",
            316866: "HTN"
        }

        self.observations_dict = {
            4078999: "brthyy",
            2005200549: "use_insulin"
        }
    
    def extract_measurements(self):
        """
        Process clinical data to filter, map, pivot, and save extracted measurements.
        """
        os.makedirs(self.output_folder, exist_ok=True)
        output_file = os.path.join(self.output_folder, "measurements.csv")
        
        # Extract measurement_concept_ids and measurement names from the dictionary
        measurement_concept_ids = list(self.measurement_dict.keys())
        measurements = list(self.measurement_dict.values())
        
        # Load the clinical data
        df = pd.read_csv(self.measurements)
        
        # Filter rows based on measurement_concept_ids and create a copy
        filtered_df = df[df["measurement_concept_id"].isin(measurement_concept_ids)].copy()
        
        # Map measurement_concept_id to measurement_name
        filtered_df["measurement_name"] = filtered_df["measurement_concept_id"].map(self.measurement_dict)
        
        # Pivot the data
        pivot_df = filtered_df.pivot_table(
            index="person_id", 
            columns="measurement_name", 
            values="value_as_number", 
            aggfunc="mean"
        ).reset_index()
        
        # Round all numeric values to 3 decimal places
        pivot_df = pivot_df.round(3)
        
        # Reorder columns to match the order in the "measurements" list
        columns_order = ["person_id"] + measurements
        pivot_df = pivot_df.reindex(columns=columns_order)
        
        # Save the result to a new CSV file
        pivot_df.to_csv(self.output_file, index=False)
        print(f"Extracted measurements: complete")
    
    
    def extract_conditions(self):
        """
        Process condition data to map conditions to person IDs and merge with measurements.
        """
        measurements = pd.read_csv(self.output_file)
        condition_occurrence = pd.read_csv(self.conditions)
        
        measurements['person_id'] = measurements['person_id'].astype(str)
        condition_occurrence['person_id'] = condition_occurrence['person_id'].astype(str)
        
        # Extract person_id and condition_concept_id from condition_occurrence
        condition_occurrence = condition_occurrence[["person_id", "condition_concept_id"]]
        
        # Create an empty dataframe for extracted_conditions with the required structure
        extracted_conditions = measurements[["person_id"]].copy()
        for condition in self.condition_dict.values():
            extracted_conditions[condition] = 0  # Initialize with 0
        
        # Update extracted_conditions with values from condition_occurrence
        for concept_id, condition in self.condition_dict.items():
            condition_persons = condition_occurrence.loc[
                condition_occurrence["condition_concept_id"] == concept_id, "person_id"
            ]
            extracted_conditions.loc[
                extracted_conditions["person_id"].isin(condition_persons), condition
            ] = 1
        
        # Merge the extracted_conditions with the measurements dataframe
        merged_data = pd.merge(measurements, extracted_conditions, on="person_id", how="left")
        

        # Save the merged data back to the measurements file
        merged_data.to_csv(self.output_file, index=False)
        print(f"Extracted conditions: complete")

        
    def extract_observations(self):
        """
        Process observation data to filter, pivot, and merge with measurements data.
        """
        measurements = pd.read_csv(self.output_file)
        observations_data = pd.read_csv(self.observations)
        
        # Filter the observation data to include only the relevant observation_concept_id's
        filtered_observations = observations_data[
            observations_data["observation_concept_id"].isin(self.observations_dict.keys())
        ]
        
        # Pivot the filtered data
        pivoted_observations = filtered_observations.pivot_table(
            index="person_id",
            columns="observation_concept_id",
            values="value_as_number",
            aggfunc="first"
        )
        
        # Rename columns using the observations dictionary
        pivoted_observations.rename(columns=self.observations_dict, inplace=True)
        
        # Merge with the measurements file
        extracted_observations = measurements[["person_id"]].merge(
            pivoted_observations, on="person_id", how="left"
        )
        
        # Calculate age if "brthyy" exists
        if "brthyy" in extracted_observations.columns:
            extracted_observations["age"] = (self.study_year - extracted_observations["brthyy"]).astype(int)

            
        # Convert "use_insulin" to integer
        if "use_insulin" in extracted_observations.columns:
            extracted_observations["use_insulin"] = extracted_observations["use_insulin"].fillna(0).astype(int)
        
        # Only keep the columns "use_insulin" and "age"
        extracted_observations = extracted_observations[["person_id", "use_insulin", "age"]]
        
        # Merge the extracted observations with the measurements data
        merged_data = measurements.merge(extracted_observations, on="person_id", how="left")
        
        # Save the resulting dataframe back to the measurements file
        merged_data.to_csv(self.output_file, index=False)
        print(f"Extracted observations: complete")

    def run(self):
        """
        Run all steps of the data extraction and merging process.
        """
        self.extract_measurements()
        self.extract_conditions()
        self.extract_observations()
        print("All metrics added to 'extracted_data/measurements.csv'")



In [128]:
# Instantiate Object from the Class and run
# The following dictionaries are variables extracted from each dataset

extractor = DataExtractor()
extractor.run()


Extracted measurements: complete
Extracted conditions: complete
Extracted observations: complete
All metrics added to 'extracted_data/measurements.csv'


***

### Extract Garmin Watch Data


In [129]:
import pandas as pd
from datetime import datetime

class WearableDataProcessor:
    def __init__(self):
        self.base_dir = "wearable_activity_monitor"
        self.manifest_file = "wearable_activity_monitor/manifest.tsv"
        self.out_file = "extracted_data/measurements.csv"

    def load_json_data(self, filepath):
        if not os.path.isfile(filepath):
            return None
        with open(filepath, 'rb') as file:
            return orjson.loads(file.read())
    
    def safe_filepath(self, value):
        return "" if pd.isna(value) or value in [None, "None", "none", ""] else value.lstrip('/')


    def process_stress_json(self, data):
        stress_values = []
        stress_dates = set()  # To store unique dates

        for entry in data["body"]["stress"]:
            stress_value = entry["stress"]["value"]
            if stress_value > 0:
                stress_values.append(stress_value)
                date = entry["effective_time_frame"]["date_time"][:10]  # Extract date (yyyy-mm-dd)
                stress_dates.add(date)

        if len(stress_dates) > 2:  # Check if there are more than 2 unique days
            return round(sum(stress_values) / len(stress_values), 2) if stress_values else None
        else:
            return None  # Return NA if there are fewer than 2 days


    def process_heart_rate(self, data):
        hr_rates = []
        hr_dates = set()

        for entry in data.get("body", {}).get("heart_rate", []):
            hr_rate = entry["heart_rate"]["value"]
            if hr_rate > 0:
                hr_rates.append(hr_rate)
                date = entry["effective_time_frame"]["date_time"][:10]  # Extract date (yyyy-mm-dd)
                hr_dates.add(date)

        if len(hr_dates) > 2:  # Only compute average if there are more than 2 unique days
            return round(np.mean(hr_rates), 2) if hr_rates else None
        else:
            return None  # Return NA if fewer than 2 days of data


    def process_respiratory_rate(self, data):
        rr_values = []
        rr_dates = set()  # To store unique dates

        for entry in data.get("body", {}).get("breathing", []):
            rr_value = entry["respiratory_rate"]["value"]
            if rr_value > 0:  # Only consider valid respiratory rates
                rr_values.append(rr_value)
                date = entry["effective_time_frame"]["date_time"][:10]  # Extract date (yyyy-mm-dd)
                rr_dates.add(date)

        if len(rr_dates) > 2:  # Only compute average if there are more than 2 unique days
            return round(np.mean(rr_values), 2) if rr_values else None
        else:
            return None  # Return NA if fewer than 2 days of data


    def calculate_prq(self, avg_hr, avg_rr):
        if not avg_hr or not avg_rr:
            return None
        return round((avg_hr / avg_rr), 2)
    
    def get_sleep_metrics(self, data):
        # Initialize dictionary to track the duration of each sleep stage
        sleep_stages = {"light": 0, "deep": 0, "awake": 0, "rem": 0}
        sleep_dates = set()  # To store unique dates

        for entry in data.get("body", {}).get("sleep", []):
            sleep_stage = entry["sleep_stage_state"]

            # Check if the sleep_stage is one of the recognized stages
            if sleep_stage not in sleep_stages:
                continue  # Skip any unknown sleep stages

            start_time = entry["sleep_stage_time_frame"]["time_interval"]["start_date_time"]
            end_time = entry["sleep_stage_time_frame"]["time_interval"]["end_date_time"]

            # Calculate the duration of each sleep stage (in minutes)
            start_dt = datetime.fromisoformat(start_time[:-1])  # Convert from ISO format
            end_dt = datetime.fromisoformat(end_time[:-1])
            duration = (end_dt - start_dt).total_seconds() / 60  # Duration in minutes

            sleep_stages[sleep_stage] += duration
            date = start_time[:10]  # Extract date (yyyy-mm-dd)
            sleep_dates.add(date)

        if len(sleep_dates) > 2:  # Only compute averages if there are more than 2 unique days
            # Total sleep time is the sum of "light", "deep", and "rem" stages
            total_sleep_time = sleep_stages["light"] + sleep_stages["deep"] + sleep_stages["rem"]
            # Total time in bed is the sum of "light", "deep", "awake", and "rem" stages
            total_time_in_bed = sleep_stages["light"] + sleep_stages["deep"] + sleep_stages["awake"] + sleep_stages["rem"]

            # Calculate Sleep Efficiency Ratio (SER)
            ser = (total_sleep_time / total_time_in_bed) * 100 if total_time_in_bed > 0 else None
            return round(ser, 2)  # Return the Sleep Efficiency Ratio as a percentage
        else:
            return None  # Return NA if fewer than 2 days of data




    def process_participant(self, row): 
        person_id = row["participant_id"]
        stress_file = self.safe_filepath(row["stress_level_filepath"])
        hr_file = self.safe_filepath(row["heartrate_filepath"])
        rr_file = self.safe_filepath(row["respiratory_rate_filepath"])
        sleep_file = self.safe_filepath(row["sleep_filepath"])

        stress_data = self.load_json_data(stress_file) if stress_file else None
        hr_data = self.load_json_data(hr_file) if hr_file else None
        rr_data = self.load_json_data(rr_file) if rr_file else None
        sleep_data = self.load_json_data(sleep_file) if sleep_file else None

        avg_stress = self.process_stress_json(stress_data) if stress_data else None
        avg_hr = self.process_heart_rate(hr_data) if hr_data else None
        avg_rr = self.process_respiratory_rate(rr_data) if rr_data else None
        avg_prq = self.calculate_prq(avg_hr, avg_rr)

        ser = self.get_sleep_metrics(sleep_data) if sleep_data else None

        # Updated metrics to only include SER
        metrics = {
            "stress_gv5": avg_stress,
            "HR_gv5": avg_hr,
            "RR_gv5": avg_rr,
            "PRQ_gv5": avg_prq,
            "SER": ser,  
        }
        return person_id, metrics


        
    def process_wearable_data(self):
        # Load the existing measurements file
        measurements_file = "extracted_data/measurements.csv"
        measurements_df = pd.read_csv(measurements_file)

        # Load the manifest file
        manifest = pd.read_csv(self.manifest_file, sep='\t')

        # Convert manifest to list of dictionaries for parallel processing
        manifest_records = manifest.to_dict("records")

        # Process participants in parallel with a progress bar
        with ThreadPoolExecutor() as executor:
            results = list(tqdm(executor.map(self.process_participant, manifest_records), total=len(manifest_records), desc="Processing Wearable Data"))

        # Convert results list to DataFrame
        results_dict = {person_id: metrics for person_id, metrics in results}  # Convert list of tuples to dict
        new_data_df = pd.DataFrame.from_dict(results_dict, orient="index").reset_index()
        new_data_df.rename(columns={"index": "person_id"}, inplace=True)

        # Merge with the existing measurements file on 'person_id'
        updated_df = measurements_df.merge(new_data_df, on="person_id", how="left")

        # Save the updated file
        updated_df.to_csv(measurements_file, index=False)
        print(f"Output saved to {measurements_file}")


***

### HRV Metrics Extraction

Metrics of interest: 

- Time-domain: SDNN, RMSSD
- Frequency-domain: LF, HF, LF/HF ratio
- Non-linear: sd_1, sd_2, sd_1/sd_2 ratio

Practical Metrics to Gather (due to short ~11 sec ECG readings):

- Time-domain: SDNN, RMSSD
- Also: ln(RMSSD normalized to heart rate)

Steps: 

- Install needed libraries (wfdb, biosppy, pyhrv, numpy, matplotlib)
- Load WFDB files using wfdb
- Extract RR intervals using biosppy
- Calculate HRV metrics using pyhrv (time-domain, frequency-domain, non-linear)
- Visualize and save results 

Notes: 

- ECG samples are only 11 seconds long, with some artifact at the end; I compared RR-intervals from heartpy, neurokit2, and biosppy.  Only biosppy correctly removes the artifact.
- Since the ECG samples are only 11 seconds long, I will only use SDNN and RMSSD as metrics, as these have some meaning for such short time intervals.  


In [130]:
class HRVProcessor:
    def __init__(self):
        self.lead = 10
        self.trim_duration = 0.5
        self.manifest_file = "cardiac_ecg/manifest.tsv"
        self.measurements_file = "extracted_data/measurements.csv"
    
    @staticmethod
    def round_or_none(value): 
        return round(value, 2) if value is not None else None
    
    def extract_hrv_nk(self, ecg_file):
        # Load ECG record
        record = wfdb.rdrecord(ecg_file)
        ecg_signal = record.p_signal[:, self.lead]  # Select specified lead
        fs = record.fs  # Sampling frequency

        # Trim and clean the signal to remove artifacts
        samples_to_trim = int(self.trim_duration * fs)
        ecg_signal_trimmed = ecg_signal[samples_to_trim:-samples_to_trim]
        ecg_cleaned = nk.ecg_clean(ecg_signal_trimmed, sampling_rate=fs)

        # Detect & extract R-peaks using NeuroKit2 | convert to milliseconds
        rpeaks, _ = nk.ecg_peaks(ecg_cleaned, sampling_rate=fs)
        rpeaks_samples = np.where(rpeaks['ECG_R_Peaks'])[0]
        rpeaks_ms = [idx * 1000 / fs for idx in rpeaks_samples]

        # Compute NN intervals (in milliseconds)
        nni = np.diff(rpeaks_ms)

        if len(nni) > 1:
            # Calculate HRV metrics
            sdnn = np.std(nni, ddof=1)
            diff_nni = np.diff(nni)
            rmssd = np.sqrt(np.mean(diff_nni ** 2)) if len(diff_nni) > 0 else None
            
            # Calculate Heart Rate (HR)
            mean_nni = np.mean(nni)
            hr_ecg = 60000/mean_nni if mean_nni > 0 else None
            
            # Compute RMSSD normalized to HR, and ln(RMSSD_norm)
            rmssd_norm = rmssd / hr_ecg if rmssd and hr_ecg else None
            rmssd_norm_ln = np.log(rmssd_norm) if rmssd_norm and rmssd_norm > 0 else None

            # Compute SDNN normalized to HR, and ln(SDNN_norm)
            sdnn_norm = sdnn / hr_ecg if sdnn and hr_ecg else None
            sdnn_norm_ln = np.log(sdnn_norm) if sdnn_norm and sdnn_norm > 0 else None
            
        else:
            sdnn = rmssd = hr_ecg = rmssd_norm = rmssd_norm_ln = sdnn_norm = sdnn_norm_ln = None

        return {
            'sdnn': self.round_or_none(sdnn),
            'rmssd': self.round_or_none(rmssd), 
            'hr_ecg': self.round_or_none(hr_ecg), 
            'cSDNN': self.round_or_none(sdnn_norm_ln),  
            'cRMSSD': self.round_or_none(rmssd_norm_ln),       
        }
    
    def process_participant(self, row): 
        index, row = row
        participant_id = row.get('participant_id')
        hr_ecg_manifest = row.get('Rate', np.nan)
        base_filepath = os.path.splitext(row['wfdb_hea_filepath'])[0].lstrip('/')
        
        try: 
            hrv_metrics = self.extract_hrv_nk(base_filepath)
            if hrv_metrics: 
                hrv_metrics['person_id'] = participant_id
                return hrv_metrics
            
        except Exception as e:
            print(f"Error processing {base_filepath}: {e}")
            return None

    def process_hrv_metrics(self):
        manifest_df = pd.read_csv(self.manifest_file, sep='\t') # Load manifest file
        all_hrv_metrics = []

        # Create a progress bar and ThreadPoolExecutor
        with ThreadPoolExecutor() as executor:
            results = list(tqdm(executor.map(self.process_participant, manifest_df.iterrows()), total=len(manifest_df), desc="Processing HRV metrics"))

        # Filter out None results (in case of errors) and convert to DataFrame
        all_hrv_metrics = [result for result in results if result is not None]
        hrv_df = pd.DataFrame(all_hrv_metrics)

        if not hrv_df.empty:
            hrv_df.set_index('person_id', inplace=True)
            existing_df = pd.read_csv(self.measurements_file, index_col='person_id')
            updated_df = existing_df.merge(hrv_df, left_index=True, right_index=True, how="left")
            updated_df.to_csv(self.measurements_file, index=True)
            print(f"ECG metrics saved to {self.measurements_file}")


***

### Function to Run all Data Extraction Sequentially

In [131]:
def process_data():
    
    # Step 1: Extract measurements, conditions, and observations
    extractor = DataExtractor()
    extractor.run()
    
    # Step 2: Process wearable data
    processor = WearableDataProcessor()
    processor.process_wearable_data()
    
    # Step 3: Process HRV metrics
    hrv_processor = HRVProcessor()
    hrv_processor.process_hrv_metrics()

    print("Data processing complete.")


In [132]:
# Run this function to process all data
process_data()


Extracted measurements: complete
Extracted conditions: complete
Extracted observations: complete
All metrics added to 'extracted_data/measurements.csv'


Processing Wearable Data: 100%|███████████████| 905/905 [50:37<00:00,  3.36s/it]


Output saved to extracted_data/measurements.csv


Processing HRV metrics: 100%|███████████████| 1059/1059 [06:09<00:00,  2.87it/s]


ECG metrics saved to extracted_data/measurements.csv
Data processing complete.


***

### Function to Extract Final Variables, Filter, and Save to Master File for Analysis

In [42]:
class CreateFinalData(): 
    
    def __init__(self):
        self.input_file = "extracted_data/measurements.csv"
        self.output_file = "extracted_data/final_data.csv"
    
        self.columns_to_keep = [
            "age", "BMI", "WHR", "waist_circum(cm)",
            "SBP", "DBP", 
            "glucose", "hba1c", "insulin", "hs_crp", 
            "Tchol", "TG", "HDL", "LDL", "bilirubin", "alk_phos", 
            "nt_probnp", "troponin_t",  
            "BUN", "creatinine", 
            "obesity", "pre_DM", "elev_hba1c", "T2DM", "use_insulin", "HTN", 
            "stress_gv5", "HR_gv5", "RR_gv5", "PRQ_gv5", "SER",  
            "sdnn", "rmssd", "cRMSSD", "cSDNN", "hr_ecg",
            "MetS_cat", "SAFE", "SAFE_cat"
        ]
    
    def load_and_preprocess(self): 
        df = pd.read_csv(self.input_file)
        
        cols_to_numeric = [
            "DBP", "SBP", "WHR", "BMI", 
            "glucose", "hba1c", "insulin", "hs_crp",  
            "Tchol", "TG", "HDL", "LDL", 
            "BUN", "creatinine", "age",
            "stress_gv5", "HR_gv5", "RR_gv5", "PRQ_gv5", "SER",  
            "sdnn", "rmssd", "cRMSSD", "cSDNN", "hr_ecg"
        ]
        for col in cols_to_numeric:
            df[col] = pd.to_numeric(df[col], errors='coerce')  
        
        return df
    
    def calculate_metabolic_syndrome(self, df):
        df['MetS_cat'] = (
            (df['waist_circum(cm)'] >= 95).astype(int) +
            (df['TG'] >= 150).astype(int) +
            (df['HDL'] < 45).astype(int) +
            (df['SBP'] >= 130).astype(int) +
            (df['glucose'] >= 100).astype(int)
        ) >= 3
        df['MetS_cat'] = df['MetS_cat'].astype(int)
        
        return df
    
    def calculate_fibrosis_metrics(self, df):
        log_vars = ['AST', 'ALT', 'globulin', 'platelets']
        
        for var in log_vars:
            df[f'Ln_{var}'] = np.log(df[var].replace(0, 0.0001))
            
        # Create the 'DM_lab' column based on HbA1c
        #df['DM_lab'] = np.where(df['hba1c'] >= 6.5, 1, 0) 
        
        # SAFE Score
        df['BMI_capped'] = df['BMI'].clip(upper=40)
        df['SAFE'] = (
            (2.97 * df['age']) +
            (5.99 * df['BMI_capped']) +
            (62.85 * df['T2DM']) +
            (154.85 * df['Ln_AST']) -
            (58.23 * df['Ln_ALT']) +
            (195.48 * df['Ln_globulin']) -
            (141.61 * df['Ln_platelets']) - 75
        )
        df['SAFE_cat'] = np.nan
        df.loc[df['SAFE'] < 0.0, 'SAFE_cat'] = 0
        df.loc[df['SAFE'] >= 0.00, 'SAFE_cat'] = 1
        
        return df
    
    def clean_and_filter_data(self, df):
        # Replace 0 values with nan
        df['SER'] = df['SER'].replace(0, np.nan)
        df['hba1c'] = df['hba1c'].replace(0, np.nan)
        
        # Replace values > 10 with nan
        #df['hs_crp'] = df['hs_crp'].apply(lambda x: np.nan if x > 10 else x)
        
        # Apply log transformation to hs_crp
        #df['ln_hs_crp'] = np.where(df['hs_crp'] > 0, np.log(df['hs_crp']), np.nan)

        # Round selected metrics
        df = df.round({'SAFE': 2, 'TyG_index': 2})
        
        return df
    
    def calculate_metrics(self):
        df = self.load_and_preprocess()
        df = self.calculate_metabolic_syndrome(df)
        df = self.calculate_fibrosis_metrics(df)
        df = self.clean_and_filter_data(df)
        
        return df
    
    def filter_save_data(self):
        df = self.calculate_metrics()
        
        num_obs_before = len(df)
        print(f"Total observations before filtering: {num_obs_before}")
        
        #df_filtered = df[df['stress_gv5'].notna()] # Remove rows without sensor data
        df_filtered = df[["person_id"] + self.columns_to_keep] # Keep only selected columns
        df_filtered.set_index("person_id", inplace=True)
        df_filtered.to_csv(self.output_file)
        
        num_obs_after = len(df_filtered)
        print(f"Filtered dataset saved to {self.output_file}")
        print(f"Total observations after filtering: {num_obs_after}")
        print(f"Participants lacking sensor data: {num_obs_before - num_obs_after}")


In [25]:
def process_data():
    
    # Step 1: Extract measurements, conditions, and observations
    extractor = DataExtractor()
    extractor.run()
    
    # Step 2: Process wearable data
    processor = WearableDataProcessor()
    processor.process_wearable_data()
    
    # Step 3: Process HRV metrics
    hrv_processor = HRVProcessor()
    hrv_processor.process_hrv_metrics()
    
    # Step 4: Create metrics and filter data
    filtering = CreateFinalData()
    filtering.filter_save_data()
    
    print("Data processing complete.")

In [26]:
process_data()

Extracted measurements: complete
Extracted conditions: complete
Extracted observations: complete
All metrics added to 'extracted_data/measurements.csv'


Processing Wearable Data: 100%|███████████████| 905/905 [21:57<00:00,  1.46s/it]


Output saved to extracted_data/measurements.csv


Processing HRV metrics: 100%|███████████████| 1059/1059 [03:03<00:00,  5.78it/s]

ECG metrics saved to extracted_data/measurements.csv
Filtered dataset saved to extracted_data/filtered_data.csv
Total observations after filtering: 469
Data processing complete.





In [53]:
# Run this function to extract final variables, filter, and save them to file

filtering = CreateFinalData()
filtering.filter_save_data()

Total observations before filtering: 1077
Filtered dataset saved to extracted_data/final_data.csv
Total observations after filtering: 1077
Participants lacking sensor data: 0


In [61]:
df = pd.read_csv("extracted_data/final_data.csv")
print("Total number of participants:", len(df))

columns_to_check = ['WHR', 'waist_circum(cm)', 'stress_gv5', 'PRQ_gv5', 'SER', 'sdnn', 'rmssd', 'SAFE']
filtered_df = df.dropna(subset=columns_to_check)

# Filter HRV values
before_sdnn_filter = len(filtered_df)
hrv_threshold = 100
filtered_df = filtered_df[(filtered_df['sdnn'] <= hrv_threshold) & (filtered_df['rmssd'] <= hrv_threshold)]

removed_due_to_sdnn = before_sdnn_filter - len(filtered_df)
print(f"\nRows removed due to sndd > 150 ms: {removed_due_to_sdnn}")

filtered_df.to_csv("extracted_data/filtered_data.csv", index=False)

summary = pd.DataFrame({
    "Original_total": df.count(), 
    "Missing": df.isnull().sum(), 
    "Filtered_total": filtered_df.count(), 
    "Filtered_missing": filtered_df.isnull().sum()
})

print("\nObservations per column:")
print(summary)

summary.to_csv("filtered_summary.csv")

Total number of participants: 1077

Rows removed due to sndd > 150 ms: 71

Observations per column:
                  Original_total  Missing  Filtered_total  Filtered_missing
person_id                   1077        0             740                 0
age                         1077        0             740                 0
BMI                         1075        2             740                 0
WHR                         1072        5             740                 0
waist_circum(cm)            1074        3             740                 0
SBP                         1077        0             740                 0
DBP                         1077        0             740                 0
glucose                     1032       45             740                 0
hba1c                       1022       55             734                 6
insulin                     1030       47             738                 2
hs_crp                      1032       45             740       