In [None]:
import time
import pandas as pd
import awswrangler as wr
from datetime import datetime
import boto3

In [None]:
today = datetime.now().strftime('%Y-%m-%d')
s3_bucket = "amazon-forecast-blood-analysis"
athena_table = "blood_blood_analysis"
athena_database = "blood_analisys"
s3 = boto3.resource('s3')

# Realizar Query no Athena com o wr e buscar os dados dos d - 30 enviados pelo IoT Core
# Realizar resample dos dados que sao coletados de 5 min em 5 min para hora em hora
# Subir esses dados para o s3 criando o catalog com o WR

# TODO: alterar query para D - 30
df_read_athena = wr.athena.read_sql_query(f"SELECT * FROM {athena_table}", database=athena_database)
blood_five_minutes = df_read_athena.sort_values(by='timestamp',ascending=True)
blood_five_minutes = blood_five_minutes.dropna()

unique_ids = blood_five_minutes['user_id'].unique() # Get Unique ids to iterate

for ids in unique_ids:
    print(f"[INFO] realizando para o id: {ids}")
    
    spplitted_data = blood_five_minutes.loc[blood_five_minutes['user_id'] == str(ids)]
    spplitted_data = spplitted_data.drop_duplicates('timestamp')

    # Fazer resample convertendo de 5 em 5 min para dados de 1 em 1 hora.
    data_series = spplitted_data.set_index('timestamp')
    data_series.index = pd.to_datetime(data_series.index)
    data_series = data_series.resample('1H').bfill()
    
    blood_analysis_hourly = data_series.reset_index()
    blood_analysis_hourly['timestamp'] = blood_analysis_hourly['timestamp'].dt.strftime('%Y-%m-%d %H:%m:%S')
    blood_analysis_hourly['timestamp_quicksight'] = blood_analysis_hourly['timestamp']
    blood_analysis_hourly['glicose'] = blood_analysis_hourly['glicose'].astype(float)
    
    # Removendo colunas desnecessarias
    blood_analysis_hourly = blood_analysis_hourly.drop(['insulina'], axis=1)
    blood_analysis_hourly = blood_analysis_hourly.drop(['carbo'], axis=1)
    
    # TODO: alterar o path onde é salvo baseado no ID do usuário.
    s3_path_glucose = f"s3://{s3_bucket}/stage/blood_analysis/glucose/"
    s3_path_df = {s3_path_glucose : {'df_name' : blood_analysis_hourly, 'table_name' : 'blood_analysis_stage_hourly'}}

    print(f"[INFO] Path to persist {s3_path_glucose}")
    
    # Dividindo em pastas diferentes baseado ids dos usuarios, novos CSV's vao ter sempre D - 30
    for s3_path, values in s3_path_df.items():
        response = wr.s3.to_csv(
            df=values['df_name'],
            path=s3_path,
            index=False,
            dataset=True,
            mode="append",
            database="blood_analisys",
            table=values['table_name'],
            partition_cols=["partition_0"]
            )
        old_name = response['paths'][0].replace("s3://", "")
        new_name = f"stage/blood_analysis/glucose/partition_0={ids}/{ids}_user.csv"
        
        print(old_name, new_name)
        s3.Object(s3_bucket, new_name).copy_from(CopySource=old_name)
        
        old_name = old_name.replace("amazon-forecast-blood-analysis/", "")
        s3.Object(s3_bucket,old_name).delete()



