## Apple Health Data to Elastic Processor

-----

## Dependencies and Libraries

In [1]:
from datetime import date, datetime, timedelta as td
import pytz
import numpy as np
import pandas as pd

import json

from elasticsearch import Elasticsearch
from elasticsearch import helpers
from espandas import Espandas

-----

In [2]:
# instantiate elastic search
es = Elasticsearch(http_compress=True)

------

## Functions for Processing Dates and Timezones

In [3]:
# functions to convert UTC to Shanghai time zone and extract date/time elements
convert_tz = lambda x: x.to_pydatetime().replace(tzinfo=pytz.utc).astimezone(pytz.timezone('America/Los_Angeles'))
get_year = lambda x: convert_tz(x).year
get_month = lambda x: '{}-{:02}'.format(convert_tz(x).year, convert_tz(x).month) #inefficient
get_date = lambda x: '{}-{:02}-{:02}'.format(convert_tz(x).year, convert_tz(x).month, convert_tz(x).day) #inefficient
get_day = lambda x: convert_tz(x).day
get_hour = lambda x: convert_tz(x).hour
get_minute = lambda x: convert_tz(x).minute
get_day_of_week = lambda x: convert_tz(x).weekday()

---

## Import Steps into Elastic

In [4]:
steps = pd.read_csv("data/StepCount.csv")

In [5]:
len(steps)

193459

In [6]:
# parse out date and time elements as local time
steps['startDate'] = pd.to_datetime(steps['startDate'])
# parse to unix seconds since epoch
steps['timestamp'] = pd.to_datetime(steps['startDate']).astype(int) / 10**9

steps['dow'] = steps['startDate'].map(get_day_of_week)
steps['year'] = steps['startDate'].map(get_year)
steps['month'] = steps['startDate'].map(get_month)
steps['date'] = steps['startDate'].map(get_date)
steps['day'] = steps['startDate'].map(get_day)
steps['hour'] = steps['startDate'].map(get_hour)
steps['dow'] = steps['startDate'].map(get_day_of_week)

In [7]:
dayOfWeek={0:'Monday', 1:'Tuesday', 2:'Wednesday', 3:'Thursday', 4:'Friday', 5:'Saturday', 6:'Sunday'}
steps['weekday'] = steps['startDate'].dt.dayofweek.map(dayOfWeek)

In [8]:
steps['indexId'] = (steps.index + 100).astype(str)

In [9]:
# steps.info()
steps.head()

Unnamed: 0,sourceName,sourceVersion,device,type,unit,creationDate,startDate,endDate,value,timestamp,dow,year,month,date,day,hour,weekday,indexId
0,Mark’s Apple Watch,3.1.3,"<<HKDevice: 0x280cc6080>, name:Apple Watch, ma...",StepCount,count,2017-04-11 21:17:21 -0800,2017-04-11 20:59:26-08:00,2017-04-11 21:05:31 -0800,8,1491973000.0,1,2017,2017-04,2017-04-11,11,13,Tuesday,100
1,Mark’s Apple Watch,3.1.3,"<<HKDevice: 0x280cc6080>, name:Apple Watch, ma...",StepCount,count,2017-04-11 21:50:23 -0800,2017-04-11 21:43:54-08:00,2017-04-11 21:44:55 -0800,29,1491976000.0,1,2017,2017-04,2017-04-11,11,14,Tuesday,101
2,"“马克\的 iPhone""",10.3.1,"<<HKDevice: 0x280cc7d90>, name:iPhone, manufac...",StepCount,count,2017-04-11 21:52:49 -0800,2017-04-11 21:43:54-08:00,2017-04-11 21:44:48 -0800,25,1491976000.0,1,2017,2017-04,2017-04-11,11,14,Tuesday,102
3,"“马克\的 iPhone""",10.3.1,"<<HKDevice: 0x280cc7d90>, name:iPhone, manufac...",StepCount,count,2017-04-11 21:52:49 -0800,2017-04-11 21:51:44-08:00,2017-04-11 21:52:21 -0800,37,1491976000.0,1,2017,2017-04,2017-04-11,11,14,Tuesday,103
4,Mark’s Apple Watch,3.1.3,"<<HKDevice: 0x280cc6080>, name:Apple Watch, ma...",StepCount,count,2017-04-11 21:59:45 -0800,2017-04-11 21:50:01-08:00,2017-04-11 21:51:41 -0800,34,1491976000.0,1,2017,2017-04,2017-04-11,11,14,Tuesday,104


In [10]:
steps = steps.fillna(value='')

In [11]:
INDEX = 'steps'
TYPE = 'record'

# Delete if already exists
if es.indices.exists(INDEX):
    es.indices.delete(INDEX)

# Create index
es.indices.create(INDEX)    
    
# Add mapping
with open('apple_health_elastic_mapping.json') as json_mapping:
    d = json.load(json_mapping)

# Create Customized Index Mappings     
es.indices.put_mapping(index=INDEX, doc_type=TYPE, body=d, include_type_name=True)    



{'acknowledged': True}

In [12]:
esp = Espandas()
esp.es_write(steps, INDEX, TYPE)



## Import HR into Elastic

In [13]:
resting = pd.read_csv("data/RestingHeartRate.csv")
len(resting)

2003

In [14]:
# parse out date and time elements as local time
resting['startDate'] = pd.to_datetime(resting['startDate'])

# parse to unix seconds since epoch
resting['timestamp'] = pd.to_datetime(resting['startDate']).astype(int) / 10**9

resting['dow'] = resting['startDate'].map(get_day_of_week)
resting['year'] = resting['startDate'].map(get_year)
resting['month'] = resting['startDate'].map(get_month)
resting['date'] = resting['startDate'].map(get_date)
resting['day'] = resting['startDate'].map(get_day)
resting['hour'] = resting['startDate'].map(get_hour)
resting['dow'] = resting['startDate'].map(get_day_of_week)

In [15]:
dayOfWeek={0:'Monday', 1:'Tuesday', 2:'Wednesday', 3:'Thursday', 4:'Friday', 5:'Saturday', 6:'Sunday'}
resting['weekday'] = resting['startDate'].dt.dayofweek.map(dayOfWeek)

In [16]:
resting['indexId'] = (resting.index + 100).astype(str)

In [17]:
resting = resting.fillna(value='')

In [18]:
INDEX = 'resting_hr'
TYPE = 'record'

# Delete if already exists
if es.indices.exists(INDEX):
    es.indices.delete(INDEX)

# Create index
es.indices.create(INDEX)    
    
# Add mapping
with open('apple_health_elastic_mapping.json') as json_mapping:
    d = json.load(json_mapping)

# Create Customized Index Mappings     
es.indices.put_mapping(index=INDEX, doc_type=TYPE, body=d, include_type_name=True)    

{'acknowledged': True}

In [19]:
esp = Espandas()
esp.es_write(resting, INDEX, TYPE)

In [20]:
hr = pd.read_csv("data/HeartRate.csv")
len(hr)

759321

In [21]:
# hr.tail(10)

In [22]:
# parse out date and time elements as local time
hr['startDate'] = pd.to_datetime(hr['startDate'])

# parse to unix seconds since epoch
hr['timestamp'] = pd.to_datetime(hr['startDate']).astype(int) / 10**9

hr['dow'] = hr['startDate'].map(get_day_of_week)
hr['year'] = hr['startDate'].map(get_year)
hr['month'] = hr['startDate'].map(get_month)
hr['date'] = hr['startDate'].map(get_date)
hr['day'] = hr['startDate'].map(get_day)
hr['hour'] = hr['startDate'].map(get_hour)
hr['dow'] = hr['startDate'].map(get_day_of_week)

In [23]:
dayOfWeek={0:'Monday', 1:'Tuesday', 2:'Wednesday', 3:'Thursday', 4:'Friday', 5:'Saturday', 6:'Sunday'}
hr['weekday'] = hr['startDate'].dt.dayofweek.map(dayOfWeek)

In [24]:
hr['indexId'] = (hr.index + 100).astype(str)

In [25]:
hr = hr.fillna(value='')

In [26]:
INDEX = 'hr'
TYPE = 'record'

# Delete if already exists
if es.indices.exists(INDEX):
    es.indices.delete(INDEX)

# Create index
es.indices.create(INDEX)    
    
# Add mapping
with open('apple_health_elastic_mapping.json') as json_mapping:
    d = json.load(json_mapping)

# Create Customized Index Mappings     
es.indices.put_mapping(index=INDEX, doc_type=TYPE, body=d, include_type_name=True)    

{'acknowledged': True}

In [27]:
esp = Espandas()
esp.es_write(hr, INDEX, TYPE)