In [8]:
import logging 
import json 
import boto3
import pandas as pd

In [9]:
def read_s3_trigger(trigger_event): 
    logging.info('Reading s3 event trigger')
    s3_event = json.loads(open(f'{trigger_event}', 'r').read())
    
    s3_key = s3_event.get("Records")[0].get('s3').get('object').get('key')
    s3_bucket = s3_event.get("Records")[0].get('s3').get('bucket').get('name')

    return s3_bucket, s3_key

In [13]:
import urllib.parse
bucket = 'ntonthat-apple-health-data'
key = urllib.parse.unquote_plus("syncs/2023-02-01T06%3A45%3A05.339463.json", encoding="utf-8")
personal = boto3.Session(profile_name='personal')
s3 = personal.client("s3")
json_data = s3.get_object(Bucket=bucket, Key=key)['Body'].read().decode("utf-8")

In [11]:
json_file = 'source/syncs/2023-01-26T06:11:46.119648.json'
json_data = open(json_file, 'r').read()
source_data = []

for line in json_data.splitlines():
    source_data.append(json.loads(line))

In [18]:
import io
import tempfile 

logging.info("Converting to dataframe")
df = pd.DataFrame.from_records(source_data)

# force conversion types
df["qty"] = df["qty"].astype(str)
df["date"] = pd.to_datetime(df["date"]).dt.date.astype(str)

logging.info("Converting to parquet")

# write to parquet
file_name = key.split("/")[-1].split(".")[0]
parquet_file_name = f"{file_name}.parquet"
# with io.StringIO() as parquet_buffer:
#     df.to_parquet(parquet_buffer, engine="fastparquet")

with tempfile.NamedTemporaryFile() as tmp:
    df.to_parquet(tmp.name, compression='gzip', engine='fastparquet')
    with open(tmp.name, 'rb') as fh:
        parquet_buffer = io.BytesIO(fh.read())

In [19]:
bucket = 'ntonthat-apple-health-data'

response = s3.put_object(
        Bucket=bucket,
        Key=f"parquets/{parquet_file_name}",
        Body=parquet_buffer.getvalue(),
    )


In [3]:
def create_parquets(event): 
    s3 = boto3.resource('s3')
    personal = boto3.Session(profile_name='personal')
    s3 = personal.resource('s3')
    bucket, key = read_s3_trigger(event)

    # convert contents to native python string
    json_data = load_json_from_s3(bucket, key)
    
    source_data = []
    
    for line in json_data.splitlines(): 
        source_data.append(json.loads(line))
        
    logging.info('Converting to dataframe')
    df = pd.DataFrame.from_records(source_data)
    
    # force conversion types
    df['qty'] = df['qty'].astype(str)
    df['date'] = pd.to_datetime(df['date']).dt.date
    
    logging.info('Converting to parquet')
    
    # write to parquet
    file_name = key.split('/')[-1].split('.')[0]
    parquet_file_name = f'outputs/parquets/{file_name}.parquet'
    df.to_parquet(f'{parquet_file_name}')
    
    s3.meta.client.upload_file(f'{parquet_file_name}', bucket, f'parquets/{parquet_file_name}')
    
    return 

In [167]:
create_parquets(event='source/s3-trigger-event.json')

Unnamed: 0,source,date,qty,name,units,date_updated
0,,2023-01-12,957.4229999999992,active_energy,kcal,2023-01-18 09:39:59.454617
1,,2023-01-13,507.22899999999953,active_energy,kcal,2023-01-18 09:39:59.454625
2,,2023-01-14,667.1689999999936,active_energy,kcal,2023-01-18 09:39:59.454626
3,,2023-01-15,667.2439999999998,active_energy,kcal,2023-01-18 09:39:59.454627
4,,2023-01-16,854.7099999999991,active_energy,kcal,2023-01-18 09:39:59.454628
...,...,...,...,...,...,...
456,,2023-01-14,4.277019999999999,zinc,mg,2023-01-18 09:39:59.479184
457,,2023-01-15,3.40875185,zinc,mg,2023-01-18 09:39:59.479185
458,,2023-01-16,4.005757775,zinc,mg,2023-01-18 09:39:59.479186
459,,2023-01-17,2.8978000000000006,zinc,mg,2023-01-18 09:39:59.479187
