In [1]:
import numpy as np
import pandas as pd
import os
import glob

import matplotlib.pyplot as plt
import matplotlib as mpl

mpl.rcParams['lines.linewidth'] = 0.91
plt.style.use('seaborn-v0_8-whitegrid')
%matplotlib qt

from bokeh.plotting import figure, show
from bokeh.io import output_notebook
from bokeh.models import ColumnDataSource, HoverTool, Slider, Select
from bokeh.layouts import gridplot
from bokeh.models import Range1d
from bokeh.io import export_png
from bokeh.models import DatetimeTickFormatter

In [2]:
import numpy as np
from scipy.signal import cheby1, sosfiltfilt

def apply_resample(
    *, time, goal_fs=None, time_rs=None, data=(), indices=(), aa_filter=True, fs=None
):
    """
    Apply a resample to a set of data.

    Parameters
    ----------
    time : numpy.ndarray
        Array of original timestamps.
    goal_fs : float, optional
        Desired sampling frequency in Hz.  One of `goal_fs` or `time_rs` must be
        provided.
    time_rs : numpy.ndarray, optional
        Resampled time series to sample to. One of `goal_fs` or `time_rs` must be
        provided.
    data : tuple, optional
        Tuple of arrays to normally downsample using np.interpolation. Must match the
        size of `time`. Can handle `None` inputs, and will return an array of zeros
        matching the downsampled size.
    indices : tuple, optional
        Tuple of arrays of indices to downsample.
    aa_filter : bool, optional
        Apply an anti-aliasing filter before downsampling. Default is True. This
        is the same filter as used by :py:function:`scipy.signal.decimate`.
        See [1]_ for details. Ignored if upsampling.
    fs : {None, float}, optional
        Original sampling frequency in Hz. If `goal_fs` is an integer factor
        of `fs`, every nth sample will be taken, otherwise `np.np.interp` will be
        used. Leave blank to always use `np.np.interp`.

    Returns
    -------
    time_rs : numpy.ndarray
        Resampled time.
    data_rs : tuple, optional
        Resampled data, if provided.
    indices_rs : tuple, optional
        Resampled indices, if provided.

    References
    ----------
    .. [1] https://en.wikipedia.org/wiki/Downsampling_(signal_processing)
    """

    def resample(x, factor, t, t_rs):
        if (int(factor) == factor) and (factor > 1):
            # in case that t_rs is provided and ends earlier than t
            n = np.nonzero(t <= t_rs[-1])[0][-1] + 1
            return (x[: n : int(factor)],)
        else:
            if x.ndim == 1:
                return (np.interp(t_rs, t, x),)
            elif x.ndim == 2:
                xrs = np.zeros((t_rs.size, x.shape[1]), dtype=np.float64)
                for j in range(x.shape[1]):
                    xrs[:, j] = np.interp(t_rs, t, x[:, j])
                return (xrs,)

    if fs is None:
        # compute sampling frequency by hand
        fs = 1 / np.mean(np.diff(time[:5000]))

    if time_rs is None and goal_fs is None:
        raise ValueError("One of `time_rs` or `goal_fs` is required.")

    # get resampled time if necessary
    if time_rs is None:
        if int(fs / goal_fs) == fs / goal_fs and goal_fs < fs:
            time_rs = time[:: int(fs / goal_fs)]
        else:
            time_rs = np.arange(time[0], time[-1], 1 / goal_fs)
    else:
        goal_fs = 1 / np.mean(np.diff(time_rs[:5000]))
        # prevent t_rs from extrapolating
        time_rs = time_rs[time_rs <= time[-1]]

    # AA filter, if necessary
    if (fs / goal_fs) >= 1.0:
        sos = cheby1(8, 0.05, 0.8 / (fs / goal_fs), output="sos")
    else:
        aa_filter = False

    # resample data
    data_rs = ()

    for dat in data:
        if dat is None:
            data_rs += (None,)
        elif dat.ndim in [1, 2]:
            data_to_rs = sosfiltfilt(sos, dat, axis=0) if aa_filter else dat
            data_rs += resample(data_to_rs, fs / goal_fs, time, time_rs)
        else:
            raise ValueError("Data dimension exceeds 2, or data not understood.")

    # resampling indices
    indices_rs = ()
    for idx in indices:
        if idx is None:
            indices_rs += (None,)
        elif idx.ndim == 1:
            indices_rs += (
                np.around(np.interp(time[idx], time_rs, np.arange(time_rs.size))).astype(np.int_),
            )
        elif idx.ndim == 2:
            indices_rs += (np.zeros(idx.shape, dtype=np.int_),)
            for i in range(idx.shape[1]):
                indices_rs[-1][:, i] = np.around(
                    np.interp(
                        time[idx[:, i]], time_rs, np.arange(time_rs.size)
                    )  # cast to in on insert
                )

    ret = (time_rs,)
    if data_rs != ():
        ret += (data_rs,)
    if indices_rs != ():
        ret += (indices_rs,)

    return ret

In [3]:
from warnings import warn

from avro.datafile import DataFileReader
from avro.io import DatumReader
from numpy import (
    round,
    arange,
    vstack,
    ascontiguousarray,
    isclose,
    full,
    argmin,
    abs,
    nan,
    float64,
)

class ReadEmpaticaAvro():
    """
    Read Empatica data from an avro file.

    Parameters
    ----------
    trim_keys : {None, tuple}, optional
        Trim keys provided in the `predict` method. Default (None) will not do any trimming.
        Trimming of either start or end can be accomplished by providing None in the place
        of the key you do not want to trim. If provided, the tuple should be of the form
        (start_key, end_key). When provided, trim datetimes will be assumed to be in the
        same timezone as the data (ie naive if naive, or in the timezone provided).
    resample_to_accel : bool, optional
        Resample any additional data streams to match the accelerometer data stream.
        Default is True.
    """

    _file = "file"
    _time = "time"
    _acc = "acc"
    _gyro = "gyro"
    _mag = "magnet"
    _temp = "temperature"
    _days = "day_ends"

    def __init__(self, trim_keys=None, resample_to_bvp=True):
        
        self.trim_keys = trim_keys
        self.resample_to_bvp = resample_to_bvp

    def get_accel(self, raw_accel_dict, results_dict, key):
        """
        Get the raw acceleration data from the avro file record.

        Parameters
        ----------
        raw_accel_dict : dict
            The record from the avro file for a raw data stream.
        results_dict : dict
            Dictionary where the results will go.
        key : str
            Name for the results in `results_dict`.
        """
        # sampling frequency
        fs = raw_accel_dict["samplingFrequency"]

        # timestamp start
        ts_start = raw_accel_dict["timestampStart"] / 1e6  # convert to seconds

        # imu parameters for scaling to actual values
        phys_min = raw_accel_dict["imuParams"]["physicalMin"]
        phys_max = raw_accel_dict["imuParams"]["physicalMax"]
        dig_min = raw_accel_dict["imuParams"]["digitalMin"]
        dig_max = raw_accel_dict["imuParams"]["digitalMax"]

        # raw acceleration data
        accel = ascontiguousarray(
            vstack((raw_accel_dict["x"], raw_accel_dict["y"], raw_accel_dict["z"])).T
        )

        # scale the raw acceleration data to actual values
        accel = (accel - dig_min) / (dig_max - dig_min) * (
            phys_max - phys_min
        ) + phys_min

        # create the timestamp array using ts_start, fs, and the number of samples
        time = arange(ts_start, ts_start + accel.shape[0] / fs, 1 / fs)[
            : accel.shape[0]
        ]

        if time.size != accel.shape[0]:
            raise ValueError("Time does not have enough samples for accel array")

        # use special names here so we can just update dictionary later for returning
        results_dict[key] = {self._time: time, "fs": fs, "values": accel}
    
    def get_bvp(self, raw_bvp_dict, results_dict, key):
        """
        Get the raw blood volume pulse data from the avro file record.

        Parameters
        ----------
        raw_bvp_dict : dict
            The record from the avro file for a raw data stream.
        results_dict : dict
            Dictionary where the results will go.
        key : str
            Name for the results in `results_dict`.
        """
        # sampling frequency
        fs = round(raw_bvp_dict["samplingFrequency"], decimals=3)
        # timestamp start
        ts_start = raw_bvp_dict["timestampStart"] / 1e6 # convert to seconds

        # raw bvp data
        bvp = ascontiguousarray(raw_bvp_dict["values"])

        # timestamp array
        time = arange(ts_start, ts_start + bvp.size / fs, 1 / fs)[: bvp.shape[0]]

        if time.size != bvp.shape[0]:
            raise ValueError("Time does not have enough samples for bvp array")
        
        results_dict[key] = {self._time: time, "fs": fs, "bvp": bvp}

    def get_eda(self, raw_dict, results_dict, key):
        """
        Get the raw electrodermal activity data from the avro file record.

        Parameters
        ----------
        raw_dict : dict
            The record from the avro file for a raw data stream.
        results_dict : dict
            Dictionary where the results will go.
        key : str
            Name for the results in `results_dict`.
        """
        if not raw_dict["values"]:
            return

        # sampling frequency
        fs = round(raw_dict["samplingFrequency"], decimals=3)
        # timestamp start
        ts_start = raw_dict["timestampStart"] / 1e6

        # raw eda data
        eda = ascontiguousarray(raw_dict["values"])

        # timestamp array
        time = arange(ts_start, ts_start + eda.size / fs, 1 / fs)[: eda.shape[0]]

        if time.size != eda.shape[0]:
            raise ValueError("Time does not have enough samples for eda array")
        
        results_dict[key] = {"time_eda": time, "fs_eda": fs, "eda": eda}

    def get_temp(self, raw_dict, results_dict, key):
        """
        Get the raw temperature data from the avro file record.

        Parameters
        ----------
        raw_dict : dict
            The record from the avro file for a raw data stream.
        results_dict : dict
            Dictionary where the results will go.
        key : str
            Name for the results in `results_dict`.
        """
        if not raw_dict["values"]:
            return

        # sampling frequency
        fs = round(raw_dict["samplingFrequency"], decimals=3)
        # timestamp start
        ts_start = raw_dict["timestampStart"] / 1e6

        # raw temperature data
        temp = ascontiguousarray(raw_dict["values"])

        # timestamp array
        time = arange(ts_start, ts_start + temp.size / fs, 1 / fs)[: temp.shape[0]]

        if time.size != temp.shape[0]:
            raise ValueError("Time does not have enough samples for temp array")
        
        results_dict[key] = {"time_temp": time, "fs_temp": fs, "temp": temp}

    def get_values_1d(self, raw_dict, results_dict, key):
        """
        Get the raw 1-dimensional values data from the avro file record.
        i.e, PPG, EDA, and temperature

        Parameters
        ----------
        raw_dict : dict
            The record from the avro file for a raw 1-dimensional values data stream.
        results_dict : dict
            Dictionary where the results will go.
        key : str
            Name for the results in `results_dict`.
        """
        if not raw_dict["values"]:
            return

        # sampling frequency
        fs = round(raw_dict["samplingFrequency"], decimals=3)
        # timestamp start
        ts_start = raw_dict["timestampStart"] / 1e6  # convert to seconds

        # raw values data
        values = ascontiguousarray(raw_dict["values"])

        # timestamp array
        time = arange(ts_start, ts_start + values.size / fs, 1 / fs)[: values.shape[0]]

        if time.size != values.shape[0]:
            raise ValueError(f"Time does not have enough samples for {key} array")

        results_dict[key] = {self._time: time, "fs": fs, "values": values}

    @staticmethod
    def get_systolic_peaks(raw_dict, results_dict, key):
        """
        Get the systolic peaks data from the avro file record.

        Parameters
        ----------
        raw_dict : dict
            The record from the avro file for systolic peaks data.
        results_dict : dict
            Dictionary where the results will go.
        key : str
            Name for the results in `results_dict`.
        """
        if not raw_dict["peaksTimeNanos"]:
            return

        peaks = (
            ascontiguousarray(raw_dict["peaksTimeNanos"]) / 1e9
        )  # convert to seconds

        results_dict[key] = {"values": peaks}

    def get_steps(self, raw_dict, results_dict, key):
        """
        Get the raw steps data from the avro file record.

        Parameters
        ----------
        raw_dict : dict
            The record from the avro file for raw steps data.
        results_dict : dict
            Dictionary where the results will go.
        key : str
            Name for the results in `results_dict`.
        """
        if not raw_dict["values"]:
            return

        # sampling frequency
        fs = round(raw_dict["samplingFrequency"], decimals=3)

        # timestamp start
        ts_start = raw_dict["timestampStart"] / 1e6  # convert to seconds

        # raw steps data
        steps = ascontiguousarray(raw_dict["values"])

        # timestamp array
        time = arange(ts_start, ts_start + steps.size / fs, 1 / fs)[: steps.size]

        if time.size != steps.size:
            raise ValueError("Time does not have enough samples for steps array")

        results_dict[key] = {self._time: time, "fs": fs, "values": steps}

    def handle_resampling(self, streams):
        """
        Handle resampling of data streams. Data will be resampled to match the
        BVP (Blood Volume Pulse) data stream.
        """
        if "bvp" not in streams:
            raise ValueError("BVP data stream is missing, cannot resample.")
        
        # Remove BVP data stream
        bvp_dict = streams.pop("bvp")
        # Remove Temp data stream
        temp_dict = streams.pop("temperature")
        # Remove EDA data stream
        eda_dict = streams.pop("eda")

        # Remove keys that cannot be resampled
        rs_streams = {d: streams.pop(d) for d in ["systolic_peaks", "steps"] if d in streams}
        
        for name, stream in streams.items():
            if stream["values"] is None:
                continue
            
            # Check that the stream doesn't start significantly later than BVP
            # if (dt := (stream["time"][0] - bvp_dict["time"][0])) > 1:
            #     warn(
            #         f"Data stream {name} starts more than 1 second ({dt}s) after "
            #         f"the BVP stream. Data will be filled with the first (and "
            #         f"last) value as needed."
            #     )
            
            # Check if resampling is needed
            if isclose(stream["fs"], bvp_dict["fs"], atol=1e-3):
                new_shape = list(stream["values"].shape)
                new_shape[0] = bvp_dict["bvp"].shape[0]
                rs_streams[name] = full(new_shape, nan, dtype=float64)
                i1 = argmin(abs(bvp_dict["time"] - stream["time"][0]))
                i2 = i1 + stream["time"].size
                rs_streams[name][i1:i2] = stream["values"][: stream["values"].shape[0] - (i2 - bvp_dict["time"].size)]
                rs_streams[name][:i1] = stream["values"][0]
                rs_streams[name][i2:] = stream["values"][-1]
                continue
            
            # Resample the stream to match BVP
            _, (stream_rs,) = apply_resample(
                time=stream["time"],
                time_rs=bvp_dict["time"],
                data=(stream["values"],),
                aa_filter=True,
                fs=stream["fs"],
            )
            rs_streams[name] = stream_rs
        
        rs_streams.update(bvp_dict)
        rs_streams.update(temp_dict)
        rs_streams.update(eda_dict)
        return rs_streams

    def get_datastreams(self, raw_record):
        """
        Extract the various data streams from the raw avro file record.
        """
        fn_map = {
            "accelerometer": ("acc", self.get_accel),
            "eda": ("eda", self.get_eda),
            "temperature": ("temperature", self.get_temp),
            "bvp": ("bvp", self.get_bvp),
            "systolicPeaks": ("systolic_peaks", self.get_systolic_peaks),
            "steps": ("steps", self.get_steps),
        }

        raw_data_streams = {}
        for full_name, (stream_name, fn) in fn_map.items():
            fn(raw_record[full_name], raw_data_streams, stream_name)
        
        if self.resample_to_bvp:
            data_streams = self.handle_resampling(raw_data_streams)
        else:
            data_streams = raw_data_streams.pop("bvp")
            data_streams.update(raw_data_streams)
        
        return data_streams

    def read(self, *, file, tz_name=None, **kwargs):
        """
        Read the input .avro file.

        Parameters
        ----------
        file : {path-like, str}
            The path to the input file.
        tz_name : {None, optional}
            IANA time-zone name for the recording location. If not provided, timestamps
            will represent local time naively. This means they will not account for
            any time changes due to Daylight Saving Time.

        Returns
        -------
        results : dict
            Dictionary containing the data streams from the file. See Notes
            for different output options.

        Notes
        -----
        There are two output formats, based on if `resample_to_accel` is True or False.
        If True, all available data streams except for `systolic_peaks` and `steps`
        are resampled to match the accelerometer data stream, which results in their
        values being present in the top level of the `results` dictionary, ie
        `results['gyro']`, etc.

        If False, everything except accelerometer will be present in dictionaries
        containing the keys `time`, `fs`, and `values`, and the top level will be these
        dictionaries plus the accelerometer data (keys `time`, `fs`, and `accel`).

        `systolic_peaks` will always be a dictionary of the form `{'systolic_peaks': array}`.
        """

        reader = DataFileReader(open(file, "rb"), DatumReader())
        records = []
        for record in reader:
            records.append(record)
        reader.close()

        # get the timezone offset
        tz_offset = records[0]["timezone"]  # in seconds

        # as needed, deviceSn, deviceModel

        # get the data streams
        results = self.get_datastreams(records[0]["rawData"])

        # update the timestamps to be local. Do this as we don't have an actual
        # timezone from the data.
        if tz_name is None:
            results["time"] += tz_offset
            results["time_temp"] += tz_offset
            results["time_eda"] += tz_offset

            for k in results:
                if k == "time":
                    continue
                if (
                    isinstance(results[k], dict)
                    and "time" in results[k]
                    and results[k]["time"] is not None
                ):
                    results[k]["time"] += tz_offset
        # do nothing if we have the time-zone name, the timestamps are already
        # UTC

        # adjust systolic_peaks
        if "systolic_peaks" in results:
            results["systolic_peaks"]["values"] += tz_offset

        return results

In [4]:
empatica_reader = ReadEmpaticaAvro()

### Save Acc for GGIR
~ 2 min for reading and concatenating into a list

**With Pandas**

- ~ 2.30 min for transforming it into a pd.DataFrame
- ~ 1.30 min for saving to csv

In [5]:
import numpy as np
import pandas as pd
import os
import glob

#### Change the paths below to the location of the data on your machine ####
data_path = "/Users/augenpro/Documents/Empatica/data_sara/data/participant_data/"
save_data_path = "/Users/augenpro/Documents/Empatica/data_sara/data/GGIR_input/"
#### Change the subject ID and device ID below to the subject and device you want to process ####
sub_ID = "00007"
device_ID = "3YK3J151VJ"

days = sorted(os.listdir(data_path))
days = [day for day in days if day[0] != "."] # remove hidden files (needed for MacOS users)

acc = []
ppg = []
temp = []
time = []
time_temp = []

for i, day in enumerate(days):
    
    print(f"Processing day {i+1}/{len(days)}")

    folder_day = data_path + day + "/" + sub_ID + "-" + device_ID + "/raw_data/v6"

    avro_files = sorted(glob.glob(folder_day + "/*.avro"))

    for avro_file in avro_files:
        
        data = empatica_reader.read(file=avro_file)

        acc.extend(data["acc"])

        ppg.extend(data["bvp"])

        temp.extend(data["temp"])
        time_temp.extend(data["time_temp"])

        time.extend(data["time"])

Processing day 1/8
Processing day 2/8
Processing day 3/8
Processing day 4/8
Processing day 5/8
Processing day 6/8
Processing day 7/8
Processing day 8/8


## Detect non-wear

Convert to `pd.DataFrame`` - why sort index? Sometimes, between successive 30 min files, there is a negative difference between timestamps (i.e., the next files starts before the end of the previous one). Sort index re-orders the time in increasing order - this has almost no effect on the signals (especially ACC and Temp.) For PPG is a bit different but still negligible (see image on ppt)

In [9]:
acc_df = pd.DataFrame(acc, columns=["x", "y", "z"], index = pd.to_datetime(time, unit="s")).sort_index()
ppg_df = pd.DataFrame(ppg, columns=["ppg"], index = pd.to_datetime(time, unit="s")).sort_index()
temp_df = pd.DataFrame(temp, columns=["temp"], index = pd.to_datetime(time_temp, unit="s")).sort_index()

In [10]:
# I need to divide it into portions when the device was in charge

t_charge_end = acc_df.index[acc_df.index.to_series().diff().dt.total_seconds() > 60*10]
t_charge_start = acc_df.index[np.where(acc_df.index.to_series().diff().dt.total_seconds() > 60*10)[0]-1]
t_charge = pd.DataFrame({"start": t_charge_start, "end": t_charge_end})

good_portions = pd.DataFrame(columns=["start", "end"])
good_portions["start"] = t_charge["end"].iloc[:-1].reset_index(drop=True)
good_portions["end"] = t_charge["start"].iloc[1:].reset_index(drop=True)
start_first_charge = t_charge["start"].iloc[0]
end_last_charge = t_charge["end"].iloc[-1]

# Segment the data into portions when the device was not in charge and perform nonwear detection
acc_df_portions = [acc_df[:start_first_charge]]
ppg_df_portions = [ppg_df[:start_first_charge]]
temp_df_portions = [temp_df[:start_first_charge]]

for i, row in good_portions.iterrows():

    if row["end"] - row["start"] < pd.Timedelta("10 min"): # if the portion is less than 10 minutes, skip it
        continue

    acc_df_portions.append(acc_df[row["start"]:row["end"]])
    ppg_df_portions.append(ppg_df[row["start"]:row["end"]])
    temp_df_portions.append(temp_df[row["start"]:row["end"]])

acc_df_portions.append(acc_df[end_last_charge:])
ppg_df_portions.append(ppg_df[end_last_charge:])
temp_df_portions.append(temp_df[end_last_charge:])

In [11]:
import sys
sys.path.append("..")

In [12]:
from nonwear.DETACH import nimbaldetach

In [None]:
# for each of them, perform NW detection
acc_df_cleaned = []
temp_df_cleaned = []
ppg_df_cleaned = []
for i, (acc, temp, ppg) in enumerate(zip(acc_df_portions, temp_df_portions, ppg_df_portions)):

    start_stop_nw, _ = nimbaldetach(acc['x'].values, acc['y'].values, acc['z'].values, temp["temp"].values, accel_freq=64, temperature_freq=1, quiet=True)

    # Remove non-wear periods
    for i, row in start_stop_nw.iterrows():
        datetime_start_nw = acc.index[row["Start Datapoint"]]
        datetime_end_nw = acc.index[row["End Datapoint"]]
        acc.loc[datetime_start_nw:datetime_end_nw] = np.nan
        temp.loc[datetime_start_nw:datetime_end_nw] = np.nan
        ppg.loc[datetime_start_nw:datetime_end_nw] = np.nan

    acc_portion = acc.dropna()
    temp_portion = temp.dropna()
    ppg_portion = ppg.dropna()

    # plt.figure(figsize=(19, 11))
    # plt.subplot(2, 1, 1)
    # plt.plot(acc_portion.resample("20s").mean(), label="acc")
    # plt.legend()
    # plt.subplot(2, 1, 2, sharex = plt.subplot(2, 1, 1))
    # plt.plot(temp_portion.resample("30 s").mean(), label="temp")
    # plt.legend()

    acc_df_cleaned.append(acc_portion)
    temp_df_cleaned.append(temp_portion)
    ppg_df_cleaned.append(ppg_portion)

acc_df_cleaned = pd.concat(acc_df_cleaned)
temp_df_cleaned = pd.concat(temp_df_cleaned)
ppg_df_cleaned = pd.concat(ppg_df_cleaned)

In [None]:
plt.figure(figsize=(19, 11))
plt.subplot(2, 1, 1)
plt.plot(acc_df_cleaned, label="acc")
plt.legend()
plt.subplot(2, 1, 2, sharex = plt.subplot(2, 1, 1))
plt.plot(temp_df_cleaned, label="temp")
# plt.legend()

<matplotlib.legend.Legend at 0x359095f10>

  el.exec() if hasattr(el, "exec") else el.exec_()


In [15]:
plt.figure(figsize=(19, 11))
plt.subplot(2, 1, 1)
plt.plot(acc_df_cleaned.values, label="acc")
plt.legend()
plt.subplot(2, 1, 2, sharex = plt.subplot(2, 1, 1))
plt.plot(temp_df_cleaned.values, label="temp")
# plt.legend()

[<matplotlib.lines.Line2D at 0x3875d5250>]

  el.exec() if hasattr(el, "exec") else el.exec_()


In [38]:
# Save to csv for GGIR
acc_df.to_csv(save_data_path + "/acc_new.csv") 

In [35]:
plt.figure(figsize=(19, 11))
plt.subplot(3, 1, 1)
plt.plot(acc_df.resample("30s").mean())
plt.subplot(3, 1, 2, sharex = plt.subplot(3, 1, 1))
plt.plot(temp_df.resample("30s").mean())
plt.subplot(3, 1, 3, sharex = plt.subplot(3, 1, 1))
plt.plot(ppg_df)

[<matplotlib.lines.Line2D at 0x31b6153d0>]

In [36]:
plt.figure(figsize=(19, 11))
plt.subplot(2, 1, 1)
plt.plot(acc_df.values)
plt.subplot(2, 1, 2, sharex = plt.subplot(2, 1, 1))
plt.plot(ppg_df.values)

[<matplotlib.lines.Line2D at 0x43ae282f0>]

In [16]:
# Save to parquet
acc_df_cleaned.to_parquet(save_data_path + "/acc.parquet")
temp_df_cleaned.to_parquet(save_data_path + "/temp.parquet")
ppg_df_cleaned.to_parquet(save_data_path + "/ppg.parquet")

## Load GGIR output

In [11]:
output_GGIR_path = "/Users/augenpro/Documents/Empatica/data_sara/data/GGIR_output/output_GGIR_input/results/QC/"

output_GGIR = pd.read_csv(output_GGIR_path + "part4_nightsummary_sleep_full.csv")

SPT = []

for i, day_row in output_GGIR.iterrows():
    # Stupid thing to get the correct datetime for segmenting signals into day and night (but no alternatives I guess)
    if output_GGIR["sleeponset_ts"].iloc[0][0] == '0':
        sleep_onset = pd.to_datetime(str(pd.to_datetime(day_row["calendar_date"]).date() + pd.Timedelta("1d")) + " " + day_row["sleeponset_ts"])
    else:
        sleep_onset = pd.to_datetime(pd.to_datetime(day_row["calendar_date"]).date() + " " + day_row["sleeponset_ts"])

    wake_onset = pd.to_datetime(str(pd.to_datetime(day_row["calendar_date"]).date() + pd.Timedelta("1d")) + " " + day_row["wakeup_ts"])

    SPT.append((sleep_onset, wake_onset))

start_end_sleep = np.array(SPT).reshape(-1, 2)

In [12]:
start_end_sleep[1,0] = pd.Timestamp('2024-05-22 00:15:30')
start_end_sleep[2,0] = pd.Timestamp('2024-05-23 00:15:30')
# remove the fourth night
start_end_sleep = np.delete(start_end_sleep, 3, axis=0)
start_end_sleep

array([[Timestamp('2024-05-21 01:02:45'),
        Timestamp('2024-05-21 08:07:30')],
       [Timestamp('2024-05-22 00:15:30'),
        Timestamp('2024-05-22 05:59:55')],
       [Timestamp('2024-05-23 00:15:30'),
        Timestamp('2024-05-23 06:10:10')],
       [Timestamp('2024-05-25 01:03:10'),
        Timestamp('2024-05-25 06:07:55')],
       [Timestamp('2024-05-26 23:48:50'),
        Timestamp('2024-05-26 09:49:05')],
       [Timestamp('2024-05-27 23:39:40'),
        Timestamp('2024-05-27 06:14:20')]], dtype=object)

In [13]:
start_end_sleep[3,0] = pd.Timestamp('2024-05-24 00:30:30')
start_end_sleep[3,1] = pd.Timestamp('2024-05-24 07:40:30')
start_end_sleep[4,0] = pd.Timestamp('2024-05-26 00:00:30')
start_end_sleep[4,1] = pd.Timestamp('2024-05-26 09:30:30')
start_end_sleep[5,0] = pd.Timestamp('2024-05-26 23:45:30')
start_end_sleep[5,1] = pd.Timestamp('2024-05-27 06:05:30')

start_end_sleep

array([[Timestamp('2024-05-21 01:02:45'),
        Timestamp('2024-05-21 08:07:30')],
       [Timestamp('2024-05-22 00:15:30'),
        Timestamp('2024-05-22 05:59:55')],
       [Timestamp('2024-05-23 00:15:30'),
        Timestamp('2024-05-23 06:10:10')],
       [Timestamp('2024-05-24 00:30:30'),
        Timestamp('2024-05-24 07:40:30')],
       [Timestamp('2024-05-26 00:00:30'),
        Timestamp('2024-05-26 09:30:30')],
       [Timestamp('2024-05-26 23:45:30'),
        Timestamp('2024-05-27 06:05:30')]], dtype=object)

## Extract night HRV

In [2]:
import sys
sys.path.append("../")

In [11]:
parquet_path = "/Users/augenpro/Documents/Empatica/data_sara/data/GGIR_input/"

ppg_df = pd.read_parquet(parquet_path + "ppg.parquet")
temp_df = pd.read_parquet(parquet_path + "temp.parquet")
acc_df = pd.read_parquet(parquet_path + "acc.parquet")
nights = np.load(parquet_path + "SPT_window_GGIR.npy", allow_pickle=True)

In [4]:
nights = pd.DataFrame(nights, columns=["start", "end"])
nights

days = pd.DataFrame(columns=["start", "end"])
days["start"] = nights["end"].iloc[:-1].reset_index(drop=True)
days["end"] = nights["start"].iloc[1:].reset_index(drop=True)
start_first_day = ppg_df.index[0]
end_last_day =  ppg_df.index[-1]

days

Unnamed: 0,start,end
0,2024-05-21 08:07:30,2024-05-22 00:15:30
1,2024-05-22 05:59:55,2024-05-23 00:15:30
2,2024-05-23 06:10:10,2024-05-24 00:30:30
3,2024-05-24 07:40:30,2024-05-26 00:00:30
4,2024-05-26 09:30:30,2024-05-26 23:45:30


In [49]:
# Segment night data
plt.figure(figsize=(19, 11))
plt.plot(acc_df.resample("1s").mean())
for start_sleep, end_sleep in start_end_sleep:
    plt.axvspan(start_sleep, end_sleep, color="red", alpha=0.2)

In [5]:
def compute_acc_SMV(acc_df):
    return np.sqrt(acc_df["x"]**2 + acc_df["y"]**2 + acc_df["z"]**2)

In [None]:
from sleep.detect_acc_bursts import *

In [None]:
from heart_rate.ppg_utils import MSPTDfast
from heart_rate.ppg_utils import signal_fixpeaks
from heart_rate.heart_rate_fragmentation import compute_HRF

**RMSSD** is calculated on 5-minute windows as extensively done in the literature (e.g., https://pmc.ncbi.nlm.nih.gov/articles/PMC10566244/). Usually, consecutive windows are used, but since my computation is quite fast I am now opting for an overlap of 4 minutes (step of 1 min) between windows to increase the granularity. Moreover, in this way, I do not lose the last part of my quite portion (I loose at max 1 min). This choice can be discussed (PS I tried with 1s step and results seem interesing....)

**SDNN** I don't remember what Silvani said it's better to do (I think 5 min windows also for this)

For **heart rate fragmentation**, the longer the window the better (https://physionet.org/content/heart-rate-fragmentation-code/1.0.0/)



In [10]:
nights

Unnamed: 0,start,end
0,2024-05-21 01:02:45,2024-05-21 08:07:30
1,2024-05-22 00:15:30,2024-05-22 05:59:55
2,2024-05-23 00:15:30,2024-05-23 06:10:10
3,2024-05-24 00:30:30,2024-05-24 07:40:30
4,2024-05-26 00:00:30,2024-05-26 09:30:30
5,2024-05-26 23:45:30,2024-05-27 06:05:30


In [16]:
threshold_bursts = 35
window_length = pd.Timedelta("5 min")  # window length
window_step = pd.Timedelta("1 min")  # window step

HRV = []  # Reset HRV storage

ibi_quiet_all = []

for i, (start_sleep, end_sleep) in enumerate(nights):  # for each night

    acc_night = compute_acc_SMV(acc_df.loc[start_sleep:end_sleep])
    ppg_night = ppg_df.loc[start_sleep:end_sleep]

    # Detect wrist accelerometer bursts
    bursts = detect_bursts(acc_night, sampling_rate=64, alfa=threshold_bursts/1000)

    # Extract quiet periods (no movement of the wrist)
    quiet_periods = pd.DataFrame()
    quiet_periods["start"] = bursts["end"].iloc[:-1].reset_index(drop=True)
    quiet_periods["end"] = bursts["start"].iloc[1:].reset_index(drop=True)

    for _, quiet_period in quiet_periods.iterrows():  # for each quiet period

        duration_quiet_period = quiet_period["end"] - quiet_period["start"]

        if duration_quiet_period < window_length:  # If the whole period is shorter than 5 min, skip it
            continue
            
        acc_quiet = acc_night.loc[quiet_period["start"]:quiet_period["end"]]
        ppg_quiet = ppg_night.loc[quiet_period["start"]:quiet_period["end"]]
         # Extract systolic peaks from the quiet PPG signal
        _, peaks = MSPTDfast(ppg_quiet["ppg"].values, sampling_rate=64)
        t_peaks = ppg_quiet.index.to_series().values[peaks]
        ibi = np.diff(t_peaks).astype('timedelta64[ns]').astype('float64') / 1e9  # seconds
        ibi = np.insert(ibi, 0, np.mean(ibi[1:10]), axis=0)  # Set first value as mean of next 10
        ibi = pd.Series(ibi, index=t_peaks)

        # Kubios artifact correction
        artifacts, env_diff_corrected = signal_fixpeaks(ibi.values, 64, iterative=False)
        artifacts_all = np.concatenate((artifacts["ectopic"], artifacts["missed"], artifacts["extra"], artifacts["longshort"]))
        ibi[ibi.index[artifacts_all.astype(int)]] = np.nan
        ibi_clean = ibi.interpolate(method="linear")

        # Generate overlapping windows of 5 minutes with 30-second overlap
        current_start = quiet_period["start"]
        
        while current_start + window_length <= quiet_period["end"]:

            current_end = current_start + window_length

            ibi_window = ibi_clean.loc[current_start:current_end]

            # HRV Features
            ppi = ibi_window.values * 1000  # Convert to ms
            diff_ppi = np.diff(ppi)

            rmssd = np.sqrt(np.mean(diff_ppi**2))  # RMSSD
            sdnn = np.std(ppi, ddof=1)  # SDNN
            PIP = compute_HRF(ppi)  # Custom HRF computation

            HRV.append({
                "day": i+1,
                "time": current_start + window_length / 2,
                "rmssd": rmssd, 
                "sdnn": sdnn, 
                "PIP": PIP
            })

            current_start += window_step  # Move to next overlapping window

        ibi_quiet_all.append(ibi_clean)

HRV_df = pd.DataFrame(HRV)
ibi_quiet_df = pd.concat(ibi_quiet_all)

In [None]:
HRV_df = pd.DataFrame(HRV)

# plt.figure(figsize=(19, 11))
# plt.subplot(2,1,1)
# plt.scatter(HRV_df["time"], HRV_df["rmssd"], label="RMSSD")
# plt.subplot(2,1,2, sharex=plt.subplot(2,1,1))
# plt.scatter(ibi_quiet_df.index, ibi_quiet_df)

<matplotlib.collections.PathCollection at 0x142eca9c0>

2025-02-12 18:32:46.520 python[40896:1407279] +[IMKClient subclass]: chose IMKClient_Modern


In [17]:
output_notebook()

p1 = figure(x_axis_type="datetime", title="IBI", width=1100, height=300)
p1.line(ibi_quiet_df.index, ibi_quiet_df.values, line_width=2)
p1.title.text_font_size = "16pt"
p1.yaxis.axis_label = "IBI (s)"
p1.xaxis.axis_label_text_font_size = "16pt"
p1.yaxis.axis_label_text_font_size = "16pt"
p1.xaxis.major_label_text_font_size = "16pt"
p1.yaxis.major_label_text_font_size = "16pt"
p1.xaxis.formatter=DatetimeTickFormatter(
        hours="%H:%M",
        days="%d %B",
        months="%d %B",
        years="%d %B",
    )
p1.xaxis.major_label_orientation = np.pi/4

HRV_df.index = pd.to_datetime(HRV_df["time"])
# HRV_df.drop(columns=["time"], inplace=True)
HRV_df_plot = HRV_df.resample("1min").mean()

p2 = figure(x_axis_type="datetime", title="rmssd", width=1100, height=300, x_range=p1.x_range)
p2.line(HRV_df_plot["time"], HRV_df_plot["rmssd"], legend_label="rmssd", line_width=2, color="blue")
p2.scatter(HRV_df_plot["time"], HRV_df_plot["rmssd"], legend_label="rmssd", color="blue", size = 11)
# Horizontal line at the median, dashed
p2.line([HRV_df_plot["time"].iloc[0], HRV_df_plot["time"].iloc[-1]], [np.median(HRV_df_plot["rmssd"]), np.median(HRV_df_plot["rmssd"])], 
        line_width=2, color="black", line_dash="dashed", legend_label="Median")
p2.title.text_font_size = "16pt"
p2.legend.location = "top_left"
p2.legend.click_policy="hide"
p2.yaxis.axis_label = "rmssd (ms)"
p2.xaxis.axis_label_text_font_size = "16pt"
p2.yaxis.axis_label_text_font_size = "16pt"
p2.xaxis.major_label_text_font_size = "16pt"
p2.yaxis.major_label_text_font_size = "16pt"
# p2.y_range = Range1d(20, 110)
p2.xaxis.formatter=DatetimeTickFormatter(
        hours="%H:%M",
        days="%d %B",
        months="%d %B",
        years="%d %B",
    )
p2.xaxis.major_label_orientation = np.pi/4


# Show the plot
show(gridplot([[p1], [p2]]))

In [115]:
HRV_df

Unnamed: 0,day,window_start,window_end,rmssd,sdnn,PIP,middle
0,1,2024-05-21 01:13:01.088835001,2024-05-21 01:18:01.088835001,51.686506,64.647549,0.684825,2024-05-21 01:15:31.088835001
1,1,2024-05-21 01:13:31.088835001,2024-05-21 01:18:31.088835001,56.308176,62.170712,0.683794,2024-05-21 01:16:01.088835001
2,1,2024-05-21 01:14:01.088835001,2024-05-21 01:19:01.088835001,65.587052,74.812008,0.681275,2024-05-21 01:16:31.088835001
3,1,2024-05-21 01:14:31.088835001,2024-05-21 01:19:31.088835001,68.600474,79.210685,0.677419,2024-05-21 01:17:01.088835001
4,1,2024-05-21 01:15:01.088835001,2024-05-21 01:20:01.088835001,71.855343,77.171793,0.680328,2024-05-21 01:17:31.088835001
...,...,...,...,...,...,...,...
2368,6,2024-05-27 05:43:28.464230061,2024-05-27 05:48:28.464230061,30.272144,46.502956,0.663082,2024-05-27 05:45:58.464230061
2369,6,2024-05-27 05:43:58.464230061,2024-05-27 05:48:58.464230061,29.064951,48.197264,0.666667,2024-05-27 05:46:28.464230061
2370,6,2024-05-27 05:44:28.464230061,2024-05-27 05:49:28.464230061,27.299414,53.358816,0.699301,2024-05-27 05:46:58.464230061
2371,6,2024-05-27 05:49:44.401791096,2024-05-27 05:54:44.401791096,42.737534,52.782592,0.621528,2024-05-27 05:52:14.401791096


In [None]:
plt.figure(figsize=(19, 11))

days = HRV_df["day"].unique()

plt.figure(figsize=(19, 11))
plt.subplot(2, 1, 1)
for day in days:
    HRV_day = HRV_df[HRV_df["day"] == day]
    plt.plot(HRV_day["middle"], HRV_day["rmssd"], '-o', label=f"day {day}")

plt.legend()
plt.subplot(2, 1, 2, sharex = plt.subplot(2, 1, 1))
plt.plot(ibi_quiet_df)

[<matplotlib.lines.Line2D at 0x451162b70>]

: 

In [14]:
pd.to_datetime(str(acc_df.index[0].date()) + " " + output_GGIR["sleeponset_ts"].iloc[0])

Timestamp('2024-05-20 01:02:45')

In [None]:
output_GGIR_path = "/Users/augenpro/Documents/Empatica/data_sara/data/GGIR_output/"

output_GGIR = pd.read_csv(output_GGIR_path + "part4_nightsummary_sleep_full.csv")

tst = output_GGIR["SleepDurationInSpt"].values[0]

waso = output_GGIR["WASO"].values[0]

SE = output_GGIR["SE"]

SE = tst / (tst + waso)

# Sleep and wake onsets
if output_GGIR["sleeponset_ts"].iloc[0][0] == '0':
    sleep_onset = pd.to_datetime(str(acc_norm.index[0].date() + pd.Timedelta("1d")) + " " + output_GGIR["sleeponset_ts"].iloc[0])
else: 
    sleep_onset = pd.to_datetime(str(acc_norm.index[0].date())  + " " + output_GGIR["sleeponset_ts"].iloc[0])

wake_onset = pd.to_datetime(str(acc_norm.index[0].date() + pd.Timedelta("1d")) + " " + output_GGIR["wakeup_ts"].iloc[0])

In [16]:
acc.head()

Unnamed: 0,x,y,z
2024-05-20 13:02:55.531980990,-0.068359,0.408691,0.931152
2024-05-20 13:02:55.547605991,-0.068359,0.40332,0.930176
2024-05-20 13:02:55.563230991,-0.071289,0.397949,0.919434
2024-05-20 13:02:55.578855991,-0.069824,0.400879,0.925781
2024-05-20 13:02:55.594480991,-0.064453,0.409668,0.92334


In [17]:
acc.index.is_monotonic_increasing

True