## Apple Health Data to Elastic Processor

-----

## Dependencies and Libraries

In [3]:
from datetime import date, datetime, timedelta as td
import pytz
import numpy as np
import pandas as pd

import json

from elasticsearch import Elasticsearch
from elasticsearch import helpers
from espandas import Espandas

-----

In [4]:
# instantiate elastic search
es = Elasticsearch(http_compress=True)

------

## Functions for Processing Dates and Timezones

In [5]:
# functions to convert UTC to Shanghai time zone and extract date/time elements
convert_tz = lambda x: x.to_pydatetime().replace(tzinfo=pytz.utc).astimezone(pytz.timezone('Europe/Oslo'))
get_year = lambda x: convert_tz(x).year
get_month = lambda x: '{}-{:02}'.format(convert_tz(x).year, convert_tz(x).month) #inefficient
get_date = lambda x: '{}-{:02}-{:02}'.format(convert_tz(x).year, convert_tz(x).month, convert_tz(x).day) #inefficient
get_day = lambda x: convert_tz(x).day
get_hour = lambda x: convert_tz(x).hour
get_minute = lambda x: convert_tz(x).minute
get_day_of_week = lambda x: convert_tz(x).weekday()

---

## Import Steps into Elastic

In [6]:
steps = pd.read_csv("data/StepCount.csv")

In [7]:
len(steps)

41097

In [8]:
# parse out date and time elements as local time
steps['startDate'] = pd.to_datetime(steps['startDate'])
# parse to unix seconds since epoch
steps['timestamp'] = pd.to_datetime(steps['startDate']).astype(int) / 10**9

steps['dow'] = steps['startDate'].map(get_day_of_week)
steps['year'] = steps['startDate'].map(get_year)
steps['month'] = steps['startDate'].map(get_month)
steps['date'] = steps['startDate'].map(get_date)
steps['day'] = steps['startDate'].map(get_day)
steps['hour'] = steps['startDate'].map(get_hour)
steps['dow'] = steps['startDate'].map(get_day_of_week)

In [9]:
dayOfWeek={0:'Monday', 1:'Tuesday', 2:'Wednesday', 3:'Thursday', 4:'Friday', 5:'Saturday', 6:'Sunday'}
steps['weekday'] = steps['startDate'].dt.dayofweek.map(dayOfWeek)

In [10]:
steps['indexId'] = (steps.index + 100).astype(str)

In [11]:
# steps.info()
steps.head()

Unnamed: 0,sourceName,sourceVersion,device,type,unit,creationDate,startDate,endDate,value,timestamp,dow,year,month,date,day,hour,weekday,indexId
0,Seshathiri’s iPhone,13.1.3,"<<HKDevice: 0x283881450>, name:iPhone, manufac...",StepCount,count,2019-10-17 23:01:40 +0530,2019-10-17 01:18:08+05:30,2019-10-17 01:18:14 +0530,13,1571255000.0,3,2019,2019-10,2019-10-17,17,3,Thursday,100
1,Seshathiri’s iPhone,13.1.3,"<<HKDevice: 0x283881450>, name:iPhone, manufac...",StepCount,count,2019-10-18 23:38:29 +0530,2019-10-18 23:27:26+05:30,2019-10-18 23:27:29 +0530,13,1571421000.0,5,2019,2019-10,2019-10-19,19,1,Friday,101
2,Seshathiri’s iPhone,13.1.3,"<<HKDevice: 0x283881450>, name:iPhone, manufac...",StepCount,count,2019-10-18 23:57:27 +0530,2019-10-18 23:46:47+05:30,2019-10-18 23:56:47 +0530,272,1571423000.0,5,2019,2019-10,2019-10-19,19,1,Friday,102
3,Seshathiri’s iPhone,13.1.3,"<<HKDevice: 0x283881450>, name:iPhone, manufac...",StepCount,count,2019-10-19 00:07:50 +0530,2019-10-18 23:56:47+05:30,2019-10-19 00:00:55 +0530,290,1571423000.0,5,2019,2019-10,2019-10-19,19,1,Friday,103
4,Seshathiri’s iPhone,13.1.3,"<<HKDevice: 0x283881450>, name:iPhone, manufac...",StepCount,count,2019-10-19 00:22:01 +0530,2019-10-19 00:11:11+05:30,2019-10-19 00:17:22 +0530,72,1571424000.0,5,2019,2019-10,2019-10-19,19,2,Saturday,104


In [12]:
steps = steps.fillna(value='')

In [13]:
INDEX = 'steps'
TYPE = 'record'

# Delete if already exists
if es.indices.exists(INDEX):
    es.indices.delete(INDEX)

# Create index
es.indices.create(INDEX)    
    
# Add mapping
with open('apple_health_elastic_mapping.json') as json_mapping:
    d = json.load(json_mapping)

# Create Customized Index Mappings     
es.indices.put_mapping(index=INDEX, doc_type=TYPE, body=d, include_type_name=True)    

ConnectionError: ConnectionError(<urllib3.connection.HTTPConnection object at 0x0000022E8D23B5C8>: Failed to establish a new connection: [WinError 10061] No connection could be made because the target machine actively refused it) caused by: NewConnectionError(<urllib3.connection.HTTPConnection object at 0x0000022E8D23B5C8>: Failed to establish a new connection: [WinError 10061] No connection could be made because the target machine actively refused it)

In [None]:
esp = Espandas()
esp.es_write(steps, INDEX, TYPE)

## Import HR into Elastic

In [14]:
resting = pd.read_csv("data/RestingHeartRate.csv")
len(resting)

436

In [15]:
# parse out date and time elements as local time
resting['startDate'] = pd.to_datetime(resting['startDate'])

# parse to unix seconds since epoch
resting['timestamp'] = pd.to_datetime(resting['startDate']).astype(int) / 10**9

resting['dow'] = resting['startDate'].map(get_day_of_week)
resting['year'] = resting['startDate'].map(get_year)
resting['month'] = resting['startDate'].map(get_month)
resting['date'] = resting['startDate'].map(get_date)
resting['day'] = resting['startDate'].map(get_day)
resting['hour'] = resting['startDate'].map(get_hour)
resting['dow'] = resting['startDate'].map(get_day_of_week)

In [16]:
dayOfWeek={0:'Monday', 1:'Tuesday', 2:'Wednesday', 3:'Thursday', 4:'Friday', 5:'Saturday', 6:'Sunday'}
resting['weekday'] = resting['startDate'].dt.dayofweek.map(dayOfWeek)

In [17]:
resting['indexId'] = (resting.index + 100).astype(str)

In [18]:
resting = resting.fillna(value='')

In [19]:
INDEX = 'resting_hr'
TYPE = 'record'

# Delete if already exists
if es.indices.exists(INDEX):
    es.indices.delete(INDEX)

# Create index
es.indices.create(INDEX)    
    
# Add mapping
with open('apple_health_elastic_mapping.json') as json_mapping:
    d = json.load(json_mapping)

# Create Customized Index Mappings     
es.indices.put_mapping(index=INDEX, doc_type=TYPE, body=d, include_type_name=True)    

ConnectionError: ConnectionError(<urllib3.connection.HTTPConnection object at 0x0000022E8C00A088>: Failed to establish a new connection: [WinError 10061] No connection could be made because the target machine actively refused it) caused by: NewConnectionError(<urllib3.connection.HTTPConnection object at 0x0000022E8C00A088>: Failed to establish a new connection: [WinError 10061] No connection could be made because the target machine actively refused it)

In [None]:
esp = Espandas()
esp.es_write(resting, INDEX, TYPE)

In [20]:
hr = pd.read_csv("data/HeartRate.csv")
len(hr)

759321

In [21]:
# hr.tail(10)

In [22]:
# parse out date and time elements as local time
hr['startDate'] = pd.to_datetime(hr['startDate'])

# parse to unix seconds since epoch
hr['timestamp'] = pd.to_datetime(hr['startDate']).astype(int) / 10**9

hr['dow'] = hr['startDate'].map(get_day_of_week)
hr['year'] = hr['startDate'].map(get_year)
hr['month'] = hr['startDate'].map(get_month)
hr['date'] = hr['startDate'].map(get_date)
hr['day'] = hr['startDate'].map(get_day)
hr['hour'] = hr['startDate'].map(get_hour)
hr['dow'] = hr['startDate'].map(get_day_of_week)

In [23]:
dayOfWeek={0:'Monday', 1:'Tuesday', 2:'Wednesday', 3:'Thursday', 4:'Friday', 5:'Saturday', 6:'Sunday'}
hr['weekday'] = hr['startDate'].dt.dayofweek.map(dayOfWeek)

In [24]:
hr['indexId'] = (hr.index + 100).astype(str)

In [25]:
hr = hr.fillna(value='')

In [26]:
INDEX = 'hr'
TYPE = 'record'

# Delete if already exists
if es.indices.exists(INDEX):
    es.indices.delete(INDEX)

# Create index
es.indices.create(INDEX)    
    
# Add mapping
with open('apple_health_elastic_mapping.json') as json_mapping:
    d = json.load(json_mapping)

# Create Customized Index Mappings     
es.indices.put_mapping(index=INDEX, doc_type=TYPE, body=d, include_type_name=True)    

{'acknowledged': True}

In [27]:
esp = Espandas()
esp.es_write(hr, INDEX, TYPE)