### Load packages

In [1]:
import pandas as pd
import json
from datetime import datetime
import logging
from functools import reduce
import numpy as np


import sys
import os
sys.path.append("../system_files")
import aggregate_funcs.bpm_features as bpm_features

### Laod test dataset

In [2]:
sql_script="""
WITH
  bpm_stream AS(
  SELECT
    pim_id,
    heart_rate_datetime_loc AS datetime_local,
    bpm
  FROM
    `research-01-217611.odp_level2.fitbit_heart_rate`
  WHERE
    DATE(heart_rate_datetime_loc) > DATE_SUB('2021-05-20', INTERVAL 2 DAY)
    AND DATE(heart_rate_datetime_loc) <= '2021-05-20'
    AND bpm >= 40 ),
  /*
  --------------------- sleep events --------------------------------------------
  */
  sleep_stages AS(
  SELECT
    DISTINCT pim_id,
    sleep_datetime_loc AS sleep_stage_start_loc,
    CAST(TIMESTAMP_ADD( CAST(sleep_datetime_loc AS TIMESTAMP), INTERVAL (sleep_duration -1) SECOND) AS DATETIME) AS sleep_stage_end_loc,
    sleep_stage
  FROM
    odp_level2.fitbit_sleep_detail ),
  sleep_start_end AS (
  SELECT
    pim_id,
    sleep_date_loc,
    start_time_loc AS sleep_start_time,
    CAST(TIMESTAMP_ADD( CAST(start_time_loc AS TIMESTAMP), INTERVAL 10 MINUTE )AS DATETIME) AS sleep_start_time_10mins,
    end_time_loc AS sleep_end_time,
    CAST(TIMESTAMP_ADD( CAST(end_time_loc AS TIMESTAMP), INTERVAL 10 MINUTE )AS DATETIME) AS sleep_end_time_10mins,
  FROM
    `research-01-217611.odp_level2.fitbit_sleeps`
  WHERE
    /* 
	today's complete sleep data may be available tomorrow
	Example, a person sleeps from 9PM (today) to 5AM tomorrow (local time),
	his/her data will possibly not be complete yet today's data upload, 
	so we summarize his/her sleep cycles from yesterday's sleep details
	(Example:  ds  = '2021-04-27'
	   sleep_date_loc: '2021-04-26'
	   start_time_loc: '2021-04-26T23:00:00
	   end_time_loc:   '2021-04-27T04:00:00
	*/ sleep_date_loc = DATE_SUB('2021-05-20', INTERVAL 1 DAY)),
  combine_sleep_info AS (
  SELECT
    a.pim_id,
    a.sleep_date_loc,
    a.sleep_start_time,
    a.sleep_end_time,
    b.sleep_stage_start_loc,
    b.sleep_stage_end_loc,
    b.sleep_stage
  FROM
    sleep_start_end a
  LEFT JOIN
    sleep_stages b
  ON
    a.pim_id = b.pim_id
  WHERE
    (b.sleep_stage IS NOT NULL)
    AND (b.sleep_stage_start_loc BETWEEN a.sleep_start_time
      AND a.sleep_end_time)),

  /*
  --------------------- meal events --------------------------------------------
  */
  meal_events AS (
	SELECT
	  CAST(pim_id AS STRING) AS pim_id,
	  start_datetime_local AS meal_start_time,
	  CAST(TIMESTAMP_ADD( CAST(start_datetime_local AS TIMESTAMP), INTERVAL 10 MINUTE )AS DATETIME) AS meal_start_time_10mins,
	  peak_datetime_local  AS peak_start_time,
	  CAST(TIMESTAMP_ADD( CAST(peak_datetime_local AS TIMESTAMP), INTERVAL 10 MINUTE )AS DATETIME) AS peak_start_time_10mins,
	  return_to_baseline_datetime_local  AS baseline_start_time,
	  CAST(TIMESTAMP_ADD( CAST(return_to_baseline_datetime_local AS TIMESTAMP), INTERVAL 10 MINUTE )AS DATETIME) AS baseline_start_time_10mins
	FROM 
	  `research-01-217611.odp_level2_feature_store.meal_events` 
    WHERE 	 
	  DATE(start_datetime_local) = '2021-05-20' )

  (
  SELECT
    DISTINCT PARSE_DATE("%F",
      '2021-05-20') AS ds,
    a.pim_id as participant_id,
    a.datetime_local,
    a.bpm,
    b.sleep_stage_start_loc AS start_time,
    b.sleep_stage_end_loc AS end_time,
    DATETIME_DIFF(b.sleep_stage_end_loc,
      b.sleep_stage_start_loc,
      MINUTE) AS duration,
    LOWER(b.sleep_stage) AS event,
  FROM
    bpm_stream a
  LEFT JOIN
    combine_sleep_info b
  ON
    a.pim_id = b.pim_id
  WHERE
    a.datetime_local BETWEEN b.sleep_stage_start_loc
    AND b.sleep_stage_end_loc)
UNION ALL (
  SELECT
    DISTINCT PARSE_DATE("%F",
      '2021-05-20') AS ds,
    a.pim_id as participant_id,
    a.datetime_local,
    a.bpm,
    b.sleep_start_time AS start_time,
    b.sleep_start_time_10mins AS end_time,
    DATETIME_DIFF(b.sleep_start_time_10mins,
      b.sleep_start_time,
      MINUTE) AS duration,
    'bedtime_start' AS event
  FROM
    bpm_stream a
  LEFT JOIN
    sleep_start_end b
  ON
    a.pim_id = b.pim_id
  WHERE
    a.datetime_local BETWEEN b.sleep_start_time
    AND b.sleep_start_time_10mins)

UNION ALL (
  SELECT
    DISTINCT PARSE_DATE("%F",
      '2021-05-20') AS ds,
    a.pim_id as participant_id,
    a.datetime_local,
    a.bpm,
    b.sleep_end_time AS start_time,
    b.sleep_end_time_10mins AS end_time,
    DATETIME_DIFF(b.sleep_end_time_10mins,
      b.sleep_end_time,
      MINUTE) AS duration,
    'bedtime_end' AS event
  FROM
    bpm_stream a
  LEFT JOIN
    sleep_start_end b
  ON
    a.pim_id = b.pim_id
  WHERE
    a.datetime_local BETWEEN b.sleep_end_time
    AND b.sleep_end_time_10mins)

UNION ALL (
  SELECT
    DISTINCT PARSE_DATE("%F",
      '2021-05-20') AS ds,
    a.pim_id as participant_id,
    a.datetime_local,
    a.bpm,
    b.meal_start_time as start_time,
    b.meal_start_time_10mins as end_time,
DATETIME_DIFF(b.meal_start_time_10mins,
      b.meal_start_time,
      MINUTE) AS duration,
    'meal_start' AS event
  FROM
    bpm_stream a
  LEFT JOIN
    meal_events b
  ON
    a.pim_id = b.pim_id
  WHERE
    a.datetime_local BETWEEN b.meal_start_time
    AND b.meal_start_time_10mins )

 UNION ALL (
  SELECT
    DISTINCT PARSE_DATE("%F",
      '2021-05-20') AS ds,
    a.pim_id as participant_id,
    a.datetime_local,
    a.bpm,
    b.peak_start_time as start_time,
    b.peak_start_time_10mins as end_time,
	DATETIME_DIFF(b.peak_start_time_10mins,
      b.peak_start_time,
      MINUTE) AS duration,
    'peak_postpandrial_start' AS event
  FROM
    bpm_stream a
  LEFT JOIN
    meal_events b
  ON
    a.pim_id = b.pim_id
  WHERE
    a.datetime_local BETWEEN b.peak_start_time
    AND b.peak_start_time_10mins)

 UNION ALL (
  SELECT
    DISTINCT PARSE_DATE("%F",
      '2021-05-20') AS ds,
    a.pim_id as participant_id,
    a.datetime_local,
    a.bpm,
    b.baseline_start_time as start_time,
    b.baseline_start_time_10mins as end_time,
	DATETIME_DIFF(b.baseline_start_time_10mins,
      b.baseline_start_time,
      MINUTE) AS duration,
    'meal_return_baseline' AS event
  FROM
    bpm_stream a
  LEFT JOIN
    meal_events b
  ON
    a.pim_id = b.pim_id
  WHERE
    a.datetime_local BETWEEN b.peak_start_time
    AND b.peak_start_time_10mins)
"""

In [3]:
test=pd.read_gbq(sql_script,\
                             'research-01-217611', dialect='standard')

In [4]:
test.head()

Unnamed: 0,ds,participant_id,datetime_local,bpm,start_time,end_time,duration,event
0,2021-05-20,10735381,2021-05-20 19:02:00,108,2021-05-20 22:30:00,2021-05-20 22:40:00,10,meal_return_baseline
1,2021-05-20,158912,2021-05-20 17:08:00,108,2021-05-20 17:25:00,2021-05-20 17:35:00,10,meal_return_baseline
2,2021-05-20,11424164,2021-05-20 20:46:00,116,2021-05-20 23:30:00,2021-05-20 23:40:00,10,meal_return_baseline
3,2021-05-20,16748904,2021-05-20 19:10:00,112,2021-05-20 20:15:00,2021-05-20 20:25:00,10,meal_return_baseline
4,2021-05-20,32727,2021-05-20 18:40:00,109,2021-05-20 19:00:00,2021-05-20 19:10:00,10,meal_return_baseline


### Add utilities functions

In [5]:
def preprocess(df):
    '''
    Preprocessing function
    return "datetime_local" as the index of the dataframe
    '''
    # Add if needed along the way
    #df = sleeps.preprocess.clean_pipeline(df, method='linear',
    #                                      verbose=True,
    #                                      print_func=logging.info,
    #                                      min_bpm = 40, max_bpm = 200)
    df=df.drop_duplicates()
    return df.set_index(pd.DatetimeIndex(df['datetime_local'])).sort_index()

def rename_cols(df,newcols):
    '''
    Rename the columns of a dataframe
    df: pandas dataframe
    newcols = dictionary containing old column names and
              new column names
    '''
    return df.rename(columns = newcols)

def aggregate_table(df,agg_col,group_col,metrics):
    '''
    Aggregate the values of the dataframe column based from
        the defined metrics
    df: pandas dataframe
    agg_col(str): column name that will be aggregated
    group_col (str/list): determine the columns for the groupby
    metrics: list of aggregation methods
    '''
 
    if isinstance(group_col, str):
        df =  df[[group_col,agg_col]].groupby(group_col).agg(metrics)
    else:
        df = df[group_col + [agg_col]].groupby(group_col).agg(metrics)
    #flatten multilevel index from aggregation
    if isinstance(df.keys(), pd.core.indexes.multi.MultiIndex):
        df.columns = df.columns.droplevel()
    return df

In [6]:
def join_tables(df_list):
    '''
    Join multiple tables by `participant_id`
    df_list = list of pandas dataframe with "participant_id" col
    '''
    df_temp = reduce(
        lambda  left,right: pd.merge(
            left,right,on=['participant_id'],
            how='outer'
        ), 
        df_list)
        
    return df_temp

In [52]:
def calc_event_metrics(period_events):
    '''
    Calculate the event bpm values 
    period_events : dataframe containing the sleep stages bpm values
    '''
    bpm_metrics = [ 
        bpm_features.mean_bpm,
        bpm_features.sd_bpm,
        bpm_features.min_bpm,
        bpm_features.max_bpm,
        bpm_features.count_
    ]
    
    stage_df = pd.DataFrame(
        period_events['participant_id'].unique(),
        columns = ["participant_id"]
    )
    
    stage_df_long = pd.DataFrame({'participant_id': ['-1'],
                                  'start_time': [None],
                                  'end_time': [None],
                                  'mean_bpm': [None],
                                  'sd_bpm': [None],
                                  'min_bpm': [None],
                                  'max_bpm':[None],                                  
                                  'event': 'fake'}).set_index(['participant_id','start_time','end_time'])
    
    stage_df_wide = pd.DataFrame(
        period_events['participant_id'].unique(),
        columns = ["participant_id"]
    )
    
    event_stages = [
        'wake', 'light', 'deep', 'rem',
        'awake','asleep','restless',
        'bedtime_start','bedtime_end',
        'meal_start','peak_postprandial_start',
        'meal_return_baseline'
    ]
    
    for key in event_stages:
        stage_bpm = aggregate_table(
            period_events[period_events.event == key],
            "bpm",
            ["participant_id","start_time","end_time"],
            bpm_metrics
        )
        stage_bpm["event"] = key      
        stage_df_long = stage_df_long.append(stage_bpm)
        
        stage_bpm = aggregate_table(
            period_events[period_events.event == key],
            "bpm",
            "participant_id",
            bpm_metrics
        )
        new_name = {
            'mean_bpm': 'mean_bpm_' + key,
            'sd_bpm': 'sd_bpm_' + key,
            'max_bpm':'max_bpm_' + key,
            'min_bpm': 'min_bpm_' + key,
            'count_' : 'count_' + key
            }
        stage_bpm = rename_cols(stage_bpm, new_name)
        stage_df_wide = join_tables([stage_df_wide,stage_bpm])
        
   

    stage_df_long = stage_df_long.replace({np.nan: None})
    stage_df_long = stage_df_long.reset_index()
    stage_df_long = stage_df_long.set_index("participant_id")
    #Json format doesnt accept Timestamp
    stage_df_long["start_time"] = stage_df_long["start_time"].astype(str)
    stage_df_long["end_time"] = stage_df_long["end_time"].astype(str)
    
    stage_df_wide = stage_df_wide.replace({np.nan: None})
    stage_df_wide = stage_df_wide.set_index("participant_id")

        
    return stage_df_long, stage_df_wide

In [38]:
def generate_event_summary(event_tbl):
    '''
    Main method to summarize bpm features of each daily event
    Note: We only have sleep stages data for now; we'll incorporate meal events
    '''

    event_agg = calc_event_metrics(event_tbl)
    
    return event_agg

### Generate test results

In [39]:
event_tbl = preprocess(test)

In [40]:
event_tbl.dtypes

ds                datetime64[ns]
participant_id            object
datetime_local    datetime64[ns]
bpm                        int64
start_time        datetime64[ns]
end_time          datetime64[ns]
duration                   int64
event                     object
dtype: object

In [41]:
data_summary = generate_event_summary(event_tbl)

In [49]:
data_summary[0]

Unnamed: 0_level_0,start_time,end_time,mean_bpm,sd_bpm,min_bpm,max_bpm,event,count_
participant_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
-1,NaT,NaT,,,,,fake,
10742283,2021-05-19 01:58:30,2021-05-19 02:03:59,68,2.28035,64.0,71.0,wake,5
10742283,2021-05-19 02:53:00,2021-05-19 02:56:59,74.25,2.27761,72.0,78.0,wake,4
10742283,2021-05-19 04:40:00,2021-05-19 04:45:59,77.3333,1.88562,74.0,80.0,wake,6
10767560,2021-05-19 02:57:00,2021-05-19 03:22:59,66,4.0762,60.0,73.0,wake,26
...,...,...,...,...,...,...,...,...
99754,2021-05-20 17:30:00,2021-05-20 17:40:00,80.7273,1.13545,78.0,82.0,meal_return_baseline,11
99754,2021-05-20 22:45:00,2021-05-20 22:55:00,88.0909,3.39664,82.0,95.0,meal_return_baseline,11
99810,2021-05-20 11:15:00,2021-05-20 11:25:00,84.3636,1.55346,82.0,87.0,meal_return_baseline,11
99810,2021-05-20 16:15:00,2021-05-20 16:25:00,91.7273,3.35995,88.0,100.0,meal_return_baseline,11


In [47]:
data_long=data_summary[0][data_summary[0].event!='fake']

In [48]:
data_long

Unnamed: 0_level_0,start_time,end_time,mean_bpm,sd_bpm,min_bpm,max_bpm,event,count_
participant_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
10742283,2021-05-19 01:58:30,2021-05-19 02:03:59,68,2.28035,64.0,71.0,wake,5
10742283,2021-05-19 02:53:00,2021-05-19 02:56:59,74.25,2.27761,72.0,78.0,wake,4
10742283,2021-05-19 04:40:00,2021-05-19 04:45:59,77.3333,1.88562,74.0,80.0,wake,6
10767560,2021-05-19 02:57:00,2021-05-19 03:22:59,66,4.0762,60.0,73.0,wake,26
10767560,2021-05-19 03:30:00,2021-05-19 03:41:29,63.25,1.29904,61.0,66.0,wake,12
...,...,...,...,...,...,...,...,...
99754,2021-05-20 17:30:00,2021-05-20 17:40:00,80.7273,1.13545,78.0,82.0,meal_return_baseline,11
99754,2021-05-20 22:45:00,2021-05-20 22:55:00,88.0909,3.39664,82.0,95.0,meal_return_baseline,11
99810,2021-05-20 11:15:00,2021-05-20 11:25:00,84.3636,1.55346,82.0,87.0,meal_return_baseline,11
99810,2021-05-20 16:15:00,2021-05-20 16:25:00,91.7273,3.35995,88.0,100.0,meal_return_baseline,11


In [50]:
data_summary[1].head()

Unnamed: 0_level_0,mean_bpm_wake,sd_bpm_wake,min_bpm_wake,max_bpm_wake,count_wake,mean_bpm_light,sd_bpm_light,min_bpm_light,max_bpm_light,count_light,...,mean_bpm_peak_postprandial_start,sd_bpm_peak_postprandial_start,min_bpm_peak_postprandial_start,max_bpm_peak_postprandial_start,count_peak_postprandial_start,mean_bpm_meal_return_baseline,sd_bpm_meal_return_baseline,min_bpm_meal_return_baseline,max_bpm_meal_return_baseline,count_meal_return_baseline
participant_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
133979,,,,,,,,,,,...,,,,,,77.9545,3.0223,70.0,83.0,22.0
36579,82.4091,3.76176,77.0,91.0,44.0,79.6489,1.6466,76.0,87.0,393.0,...,,,,,,89.9091,3.46291,84.0,96.0,22.0
43638,,,,,,,,,,,...,,,,,,84.5909,6.4643,78.0,103.0,22.0
121339,,,,,,,,,,,...,,,,,,,,,,
186206,,,,,,,,,,,...,,,,,,,,,,


In [16]:
batch_time = datetime.utcnow().isoformat('T')
for ppt, data in data_summary[1].iterrows():
    ppt_dict = data.to_dict()
    ppt_dict['pim_id'] = ppt
    #ppt_dict['ds'] = date
    ppt_dict['batch_time'] = batch_time   

In [17]:
ppt_dict

{'mean_bpm_wake': None,
 'sd_bpm_wake': None,
 'min_bpm_wake': None,
 'max_bpm_wake': None,
 'count_wake': None,
 'mean_bpm_light': None,
 'sd_bpm_light': None,
 'min_bpm_light': None,
 'max_bpm_light': None,
 'count_light': None,
 'mean_bpm_deep': None,
 'sd_bpm_deep': None,
 'min_bpm_deep': None,
 'max_bpm_deep': None,
 'count_deep': None,
 'mean_bpm_rem': None,
 'sd_bpm_rem': None,
 'min_bpm_rem': None,
 'max_bpm_rem': None,
 'count_rem': None,
 'mean_bpm_awake': None,
 'sd_bpm_awake': None,
 'min_bpm_awake': None,
 'max_bpm_awake': None,
 'count_awake': None,
 'mean_bpm_asleep': None,
 'sd_bpm_asleep': None,
 'min_bpm_asleep': None,
 'max_bpm_asleep': None,
 'count_asleep': None,
 'mean_bpm_restless': None,
 'sd_bpm_restless': None,
 'min_bpm_restless': None,
 'max_bpm_restless': None,
 'count_restless': None,
 'mean_bpm_bedtime_start': None,
 'sd_bpm_bedtime_start': None,
 'min_bpm_bedtime_start': None,
 'max_bpm_bedtime_start': None,
 'count_bedtime_start': None,
 'mean_bpm_bedti