In [None]:
%%capture --no-display
# we are querying the datalake via AWS athena using the awswrangler library
%pip install awswrangler

# auto load the updated external functions
%load_ext autoreload
%autoreload 2

In [33]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
# import sklearn
# import torch

from IPython.core.interactiveshell import InteractiveShell
import warnings
import os

import awswrangler as wr
import sagemaker

DISPLAY_INFO = True
warnings.filterwarnings('ignore')

if DISPLAY_INFO:
    InteractiveShell.ast_node_interactivity = "all" # display results in each line (for debugging)
else:
    InteractiveShell.ast_node_interactivity = "none" # no display
    # InteractiveShell.ast_node_interactivity = "last" # (default) display results in the last line    

In [34]:
start = datetime(2023, 7, 5, 0, 0, 0)
end = datetime(2023, 7, 5, 5, 0, 0)

DEBUG = True

In [35]:
SQL_FORMAT_STR = "%Y-%m-%d %H:00:00"
FILENAME_FORMAT_STR = "'%Y-%m-%d_%H0000'"

TIME_INTERVAL = 60 * 60 # hour
# TIME_INTERVAL = 60 * 60 * 24 # day

DATABASE = 'tgao_tmp'

# select enabled fields

In [36]:
enabled_fields = ['*']
enabled_fields_str = ', '.join(enabled_fields)

enabled_fields_str

'*'

# fetch data

In [37]:
time_list = [
    start + timedelta(hours=x)
    for x in range(0, int((end - start).total_seconds() / TIME_INTERVAL))
]

time_list

[datetime.datetime(2023, 7, 5, 0, 0),
 datetime.datetime(2023, 7, 5, 1, 0),
 datetime.datetime(2023, 7, 5, 2, 0),
 datetime.datetime(2023, 7, 5, 3, 0),
 datetime.datetime(2023, 7, 5, 4, 0)]

In [39]:
def fetch_data(start_hour, enabled_fields_str='*', enable_query_display=False):
    start_hour_str = start_hour.strftime(SQL_FORMAT_STR)

    query = f"""
    SELECT {enabled_fields_str}
    FROM 
        datalake.imhotep.rjptrainingv1
    WHERE 
        hour = '{start_hour_str}'
    """

    query = f'{query}\nlimit 5' if DEBUG else query

    if enable_query_display:
        print(query)

    return wr.athena.read_sql_query(query, database=DATABASE, ctas_approach=True)

In [40]:
fname_lst = []
df = None

# create a local database where queries can be run
# you can find it under Glue in AWS Console
wr.catalog.create_database(DATABASE, exist_ok=True)

if not os.path.exists('data/partition'):
    os.makedirs('data/partition')

for idx, start_hour in enumerate(time_list):
    output_fn = f"data/partition/{start_hour.strftime(FILENAME_FORMAT_STR)}.parquet"
    print(f"Processing {output_fn}")

    try:
        enable_query_display = idx == 0
        if os.path.exists(output_fn): 
            fname_lst.append(output_fn)
            continue
        
        df = fetch_data(start_hour, enabled_fields_str, enable_query_display)
        print(f"\tRows: {df.shape[0]:d}")

        df.to_parquet(output_fn)
        fname_lst.append(output_fn)
    except Exception:
        print(f"\tFailed {output_fn}")

    if DEBUG and idx >= 3: break
# end_for

if df is not None: df.head(1000).to_csv('data/limit1000.csv')
            

Processing data/partition/'2023-07-05_000000'.parquet

    SELECT *
    FROM 
        imhotep_qa.rjpTraining_temp2
    WHERE 
        hour = '2023-07-05 00:00:00'
        and is_feedId is not null
    limit 5
	Rows: 5
Processing data/partition/'2023-07-05_010000'.parquet
	Rows: 5
Processing data/partition/'2023-07-05_020000'.parquet
	Rows: 5
Processing data/partition/'2023-07-05_030000'.parquet
	Rows: 5


# merge all the data

In [41]:
fname_lst

["data/partition/'2023-07-05_000000'.parquet",
 "data/partition/'2023-07-05_010000'.parquet",
 "data/partition/'2023-07-05_020000'.parquet",
 "data/partition/'2023-07-05_030000'.parquet"]

In [42]:
df_lst = [pd.read_parquet(fname) for fname in fname_lst]
df_all = pd.concat(df_lst)
df_all.to_parquet(f"data/full-dataset_{start.strftime(FILENAME_FORMAT_STR)}_{end.strftime(FILENAME_FORMAT_STR)}.parquet")


In [43]:
df.head(1000).to_csv('data/limit1000.csv')
