In [1]:
import dateutil.tz
import datetime as dt
import json
import awswrangler as wr
from feature_store import feature_store
from feature_store.feature_table import feature_table
from feature_store.value_type import ValueType
import feature_store.config as config
from io import StringIO
import urllib3
import logging
import sys
from json import dumps
import time
import pandas as pd

def get_ymd(datetime):
    year = datetime.year
    month = datetime.month
    day = datetime.day
            
    if month < 10:
        month = '0' + str(month)
    if day < 10:
        day = '0' + str(day)
    return year, month, day

def first_day_next_month(date):
    return (date.replace(day=1) + dt.timedelta(days=32)).replace(day=1)

def last_second_of_month(date: str) -> str:
    return str((pd.Timestamp(date) + pd.offsets.MonthEnd(0)).date()) + " 23:59:59"

def first_second_of_month(date: str) -> str:
    return str((pd.Timestamp(date) + pd.offsets.MonthBegin(0)).date()) + " 00:00:00"

streamer = StringIO()

def setup_logging():
    logger = logging.getLogger()
    for h in logger.handlers:
        logger.removeHandler(h)
     
    h = logging.StreamHandler(stream = streamer)
    h.setFormatter(logging.Formatter("%(asctime)s %(levelname)s: %(message)s",
                              "%Y-%m-%d %H:%M:%S"))
    logger.addHandler(h)
    logger.setLevel(logging.INFO)
    return logger

def query_log(query_id, table, logger):
    status = wr.athena.get_query_execution(query_id)['Status']['State']
    if wr.athena.get_query_execution(query_id)['Status']['State'] in ['FAILED', 'CANCELLED']:
        logger.critical(table + ': query is in ' + status + ' State. ' + 'QueryID: ' + query_id)
    else:
        logger.info(table + ': query is in ' + status + ' State. ' + 'QueryID: ' + query_id)
    return None

http = urllib3.PoolManager()
url = 'https://chat.googleapis.com/v1/spaces/AAAAOmonaaE/messages?key=AIzaSyDdI0hCZtE6vySjMm-WEfRq3CPzqKqqsHI&token=omEY3LT7gHOEZvXOuHDfjCt5YstqLkRvoKplCIdma10%3D'
fs = feature_store.feature_store()

zone = dateutil.tz.gettz('Asia/Calcutta')

logger = setup_logging()

now = dt.datetime.now(zone)
current_hour = now.replace(minute=0, second=0, microsecond=0)
current_hour_s = current_hour.strftime("%Y-%m-%d %H:%M:%S")
current_hour_epoch = int(current_hour.replace(tzinfo=zone).timestamp())+3600

def query_progress(query_id, run_async, table_name):
    if not run_async:
            status = wr.athena.get_query_execution(query_id)['Status']['State']
            while status not in ('SUCCEEDED'):
                if status in ['RUNNING', 'QUEUED']:
                    status = wr.athena.get_query_execution(query_id)['Status']['State']
                elif status == 'FAILED':
                    print('Query Failed')
                    break
                elif status == 'CANCELLED':
                    print('Query Cancelled')
                    break
    else:
        status = wr.athena.get_query_execution(query_id)['Status']['State']
        while status not in ('RUNNING'):
            if status == 'QUEUED':
                time.sleep(2)
                status = wr.athena.get_query_execution(query_id)['Status']['State']
            elif status == 'SUCCEEDED':
                print('Query Succeeded')
                break
            elif status == 'FAILED':
                print('Query Failed')
                break
            elif status == 'CANCELLED':
                print('Query Cancelled')
                break

    query_log(query_id, table_name, logger)
    return status

In [2]:
gps_features_district_aggregate = feature_table(source = {
    'feature_table': 'gps_features_district_aggregate'
},

entities = {
    'truck_number': ValueType.STRING,
    'entity': ValueType.STRING
},
                     
event_timestamp = {
    'event_timestamp': ValueType.TIMESTAMP
},

features = {
    'total_dwell_time' : ValueType.INT,
    'total_speed' : ValueType.INT,
    'total_is_ignition_off' : ValueType.INT,
    'total_records' : ValueType.INT
},
                  
partitions = {
    'date':  ValueType.STRING
}              
)

fs.create_table(gps_features_district_aggregate)

Table gps_features_district_aggregate already Exists


'Table gps_features_district_aggregate already Exists'

In [3]:
### state to state Short Term
source = {
    'feature_table': 'gps_features_district_aggregate'
}
ft = fs.load_ft(source)

db_name = ft.get_feature_db()
table_name = ft.get_feature_table()

query = '''
select max(date) from awsdatacatalog.{db_name}.{table_name}
'''.format(db_name = db_name, table_name = table_name)
max_date = wr.athena.read_sql_query(query, 
                                    database = config.feature_db, 
                                    workgroup = config.work_group,
                                    s3_output = config.s3_athena_output,
                                    ctas_approach = False)
max_date = max_date.iloc[0][0]

In [4]:
max_date

<NA>

In [5]:
##### Delete the given Partition of the current table
def del_s3_files(table, path):
    path = 's3://ds-feature-store/feature_tables/' + table + '/' + path
    print(path)
    wr.s3.delete_objects(path)

if str(max_date) != '<NA>':
    del_s3_files(table_name,'date=' + max_date)

In [6]:
if str(max_date) == '<NA>':
    start_time = dt.datetime(2021, 3, 1, 0, 0, 0, tzinfo = zone)
else:
    start_time = dt.datetime.fromisoformat(max_date).replace(tzinfo=zone)

end_time = current_hour + dt.timedelta(days = 1)
series = pd.date_range(start_time.replace(microsecond=0), end_time, freq='1D')
ts_final = pd.DataFrame(series, columns=['from_time'])
ts_final['to_time'] = ts_final['from_time'].shift(-1)
ts_final = ts_final[~ts_final.to_time.isnull()]

ts_final['to_time'] = ts_final['to_time'] - pd.Timedelta(seconds = 1)
ts_final['from_time_utc'] = ts_final.from_time.dt.tz_convert('UTC')
ts_final['to_time_utc'] = ts_final.to_time.dt.tz_convert('UTC')
ts_final['date'] = ts_final.from_time.dt.date
ts_final['date'] = ts_final.date.astype(str)

In [7]:
ts_final

Unnamed: 0,from_time,to_time,from_time_utc,to_time_utc,date
0,2021-03-01 00:00:00+05:30,2021-03-01 23:59:59+05:30,2021-02-28 18:30:00+00:00,2021-03-01 18:29:59+00:00,2021-03-01
1,2021-03-02 00:00:00+05:30,2021-03-02 23:59:59+05:30,2021-03-01 18:30:00+00:00,2021-03-02 18:29:59+00:00,2021-03-02
2,2021-03-03 00:00:00+05:30,2021-03-03 23:59:59+05:30,2021-03-02 18:30:00+00:00,2021-03-03 18:29:59+00:00,2021-03-03
3,2021-03-04 00:00:00+05:30,2021-03-04 23:59:59+05:30,2021-03-03 18:30:00+00:00,2021-03-04 18:29:59+00:00,2021-03-04
4,2021-03-05 00:00:00+05:30,2021-03-05 23:59:59+05:30,2021-03-04 18:30:00+00:00,2021-03-05 18:29:59+00:00,2021-03-05
...,...,...,...,...,...
312,2022-01-07 00:00:00+05:30,2022-01-07 23:59:59+05:30,2022-01-06 18:30:00+00:00,2022-01-07 18:29:59+00:00,2022-01-07
313,2022-01-08 00:00:00+05:30,2022-01-08 23:59:59+05:30,2022-01-07 18:30:00+00:00,2022-01-08 18:29:59+00:00,2022-01-08
314,2022-01-09 00:00:00+05:30,2022-01-09 23:59:59+05:30,2022-01-08 18:30:00+00:00,2022-01-09 18:29:59+00:00,2022-01-09
315,2022-01-10 00:00:00+05:30,2022-01-10 23:59:59+05:30,2022-01-09 18:30:00+00:00,2022-01-10 18:29:59+00:00,2022-01-10


In [8]:
for index, row in ts_final.iterrows():
    run_async = True
    date = row['date']
    print(date)
    
    query = '''

    INSERT INTO awsdatacatalog.feature_store.gps_features_district_aggregate
    
    with district_boundaries as (
    select place_id as district_id, 
    name as district_name, 
    ST_GeomFromBinary(from_hex(to_utf8(replace(boundary_geog,'20E61000')))) as boundary_geog,
    ST_X(ST_Centroid(ST_GeomFromBinary(from_hex(to_utf8(replace(boundary_geog,'20E61000')))))) as longitude,
    ST_Y(ST_Centroid(ST_GeomFromBinary(from_hex(to_utf8(replace(boundary_geog,'20E61000')))))) as latitude
    from location_service.public.admin_area where deleted = false
    and local_tag = 'DISTRICT'
    and boundary_geog != ''
    ),

    gps_delta_data as 
    (
    select 
    truck_number,
    quad_key,
    event_timestamp,
    entry_device_timestamp,
    exit_device_timestamp,
    hourly_speed,
    hourly_is_ignition_off,
    hourly_records,
    'delta' as event
    from awsdatacatalog.feature_store.gps_features_hourly_aggregate
    where date = '{date}'
    and date_diff('minute', entry_device_timestamp, exit_device_timestamp)>=3
    ),

    historical_data as (
    select 
    truck_number,
    quad_key,
    event_timestamp,
    entry_device_timestamp,
    exit_device_timestamp,
    hourly_speed,
    hourly_is_ignition_off,
    hourly_records,
    'historical' as event
    from awsdatacatalog.feature_store.gps_features_hourly_aggregate
    where date<'{date}'
    and date_diff('minute', entry_device_timestamp, exit_device_timestamp)>=3
    ),

    gps_data_temp as (
    select * from gps_delta_data
    union all
    select * from historical_data
    ),
    
    quad_key_to_district_mapping as (
    select 
    q.quad_key,
    d.district_id
    from (select distinct quad_key from gps_data_temp) q
    cross join district_boundaries d
    where ST_Contains(d.boundary_geog, 
    ST_Point(cast(ST_X(ST_Centroid(bing_tile_polygon(bing_tile(q.quad_key)))) as double), 
    cast(ST_Y(ST_Centroid(bing_tile_polygon(bing_tile(q.quad_key)))) as double)))
    ),



    gps_data as (
    select
    gt.truck_number,
    q.district_id as entity,
    gt.event_timestamp,
    gt.event,
    min(gt.entry_device_timestamp) as entry_device_timestamp,
    max(gt.exit_device_timestamp) as exit_device_timestamp,
    sum(gt.hourly_speed) as hourly_speed,
    sum(gt.hourly_is_ignition_off) as hourly_is_ignition_off,
    sum(gt.hourly_records) as hourly_records
    from gps_data_temp gt
    left join quad_key_to_district_mapping q on q.quad_key = gt.quad_key
    group by 1,2,3,4
    ),


    gps_features_temp as (
    select 
    truck_number,
    entity,
    event_timestamp,
    event,
    SUM(date_diff('minute', entry_device_timestamp  , exit_device_timestamp)) OVER(PARTITION BY truck_number, entity ORDER BY event_timestamp asc ROWS UNBOUNDED PRECEDING) as total_dwell_time,
    SUM(hourly_speed) OVER(PARTITION BY truck_number, entity ORDER BY event_timestamp asc ROWS UNBOUNDED PRECEDING) AS total_speed,
    SUM(hourly_is_ignition_off) OVER(PARTITION BY truck_number, entity ORDER BY event_timestamp asc ROWS UNBOUNDED PRECEDING) AS total_is_ignition_off,
    SUM(hourly_records) OVER(PARTITION BY truck_number, entity ORDER BY event_timestamp asc ROWS UNBOUNDED PRECEDING) AS total_records
    from gps_data
    )

    select 
    truck_number,
    entity,
    event_timestamp,
    try_cast(total_dwell_time as int) as total_dwell_time,
    try_cast(total_speed as int) as total_speed,
    try_cast(total_is_ignition_off as int) as total_is_ignition_off,
    try_cast(total_records as int) as total_records,

    cast(to_unixtime(now()) as bigint) as created_timestamp,
    cast(cast(date_trunc('day', event_timestamp) as date) as varchar) as date

    from gps_features_temp
    where event = 'delta'
    '''.format(date = date)

    query_id = wr.athena.start_query_execution(query, 
                                       database = config.feature_db, 
                                       workgroup = config.work_group,
                                       s3_output = config.s3_athena_output
                                      )
    query_status = query_progress(query_id, run_async, table_name)
    



2021-03-01
2021-03-02
2021-03-03
2021-03-04
2021-03-05
2021-03-06
2021-03-07
2021-03-08
2021-03-09
2021-03-10
2021-03-11
2021-03-12
2021-03-13
2021-03-14
2021-03-15
2021-03-16
2021-03-17
2021-03-18
2021-03-19
2021-03-20
2021-03-21
2021-03-22
2021-03-23
2021-03-24
2021-03-25
2021-03-26
2021-03-27
2021-03-28
2021-03-29
2021-03-30
2021-03-31
2021-04-01
2021-04-02
2021-04-03
2021-04-04
2021-04-05
2021-04-06
2021-04-07
2021-04-08
2021-04-09
2021-04-10
2021-04-11
2021-04-12
2021-04-13
2021-04-14
2021-04-15
2021-04-16
2021-04-17
2021-04-18
2021-04-19
2021-04-20
2021-04-21
2021-04-22
2021-04-23
2021-04-24
2021-04-25
2021-04-26
2021-04-27
2021-04-28
2021-04-29
2021-04-30
2021-05-01
2021-05-02
2021-05-03
2021-05-04
2021-05-05
2021-05-06
2021-05-07
2021-05-08
2021-05-09
2021-05-10
2021-05-11
2021-05-12
2021-05-13
2021-05-14
2021-05-15
2021-05-16
2021-05-17
2021-05-18
2021-05-19
2021-05-20
2021-05-21
2021-05-22
2021-05-23
2021-05-24
2021-05-25
2021-05-26
2021-05-27
2021-05-28
2021-05-29
2021-05-30