# Scheduled Execution: Trips at 30 Largest Airports in the US
* paas_cda.stops intersected with geographical POIs from dedicated.airports.airport_wkt

In [1]:
import pandas as pd
from keplergl import KeplerGl
import getpass
from pyhive import presto
import matplotlib.pyplot as plt
import plotly.express as px
import numpy as np
import os
import sys
import time
from datetime import datetime, timedelta

%reload_ext sql
%config SqlMagic.autocommit=False
%config SqlMagic.autolimit=0
%config SqlMagic.displaylimit=200
%config SqlMagic.autopandas=True

In [2]:
# SQL engine
connection = presto.connect(
    host="localhost",
    port=9090,
    catalog="cuebiq"
)

def read_sql(query: str) -> pd.DataFrame:
    return pd.read_sql(query, connection)

In [3]:
%sql presto://localhost:9090/cuebiq/

'Connected: @cuebiq/'

In [4]:
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 10)

In [5]:
# operations on dates
def get_dates_sequence(
    date_start, 
    date_end, 
    date_format
):
    return [
        (datetime.strptime(date_start, date_format) + timedelta(days=x)).strftime(date_format)
        for x in range(
            0, 
            (datetime.strptime(date_end, date_format) - datetime.strptime(date_start, date_format) + timedelta(days=1)).days
        )
    ]

In [6]:
date_format = "%Y-%m-%d"

In [7]:
first_date = "2021-04-01"

In [8]:
last_date_to_compute = (datetime.now() - timedelta(days=1)).strftime(date_format)

In [9]:
last_date_to_compute

'2021-04-08'

# Manage execution dates - come back to this later

In [10]:
last_computed_date = read_sql(
    f'''
    SELECT max(reference_date) 
    FROM dedicated.airports.inf_and_outf_trips
    ''').iloc[0][0]

if last_computed_date is None:
    first_date_to_compute = first_date
    message = f"""
        No computed dates yet
        Computation will start from {first_date_to_compute}
        Computation will end at {last_date_to_compute}
        """
else:
    first_date_to_compute = datetime.strftime(
        datetime.strptime(str(last_computed_date), date_format) + timedelta(days=1), 
        date_format
    )
    if first_date_to_compute > last_date_to_compute:
        print("No dates to execute")
        sys.exit(0)
    else:
        message = f"""
            Last computed date: {last_computed_date}
            Computation will start from {first_date_to_compute}
            Computation will end at {last_date_to_compute}
            """
print(message)


            Last computed date: 2021-04-04
            Computation will start from 2021-04-05
            Computation will end at 2021-04-08
            


In [11]:
dates_to_compute = get_dates_sequence(first_date_to_compute, last_date_to_compute, date_format)
dates_to_compute[:2]

['2021-04-05', '2021-04-06']

# Let's try to collect inflow and outflow data in one go

In [None]:
%%time
%%sql inf_outf <<

with trips as (select * from paas_mi.trip where reference_date = '2017-07-01'),

airport_wkt as (select * from dedicated.airports.airport_wkt),

tmp_1 as (select * from trips t inner join airport_wkt a on st_contains(st_geometryfromtext(a.geometry), st_point(t.next_lng, t.next_lat))),

tmp_2 as (select * from trips t inner join airport_wkt a on st_contains(st_geometryfromtext(a.geometry), st_point(t.lng, t.lat))),

inf as (select cuebiq_id,
        trip_start_zoned_datetime,
        next_zoned_datetime,
        country_code,
        next_country_code,
        lat,
        lng,
        next_lat,
        next_lng,
        distance_meters,
        time_distance_seconds,
        home_known_flag,
        work_known_flag,
        starts_in_home_area_flag,
        ends_in_home_area_flag,
        reference_date,
        city,
        airport_name,
        'INFLOW' as trip_type from tmp_1 where (not st_contains(st_geometryfromtext(tmp_1.geometry), st_point(tmp_1.lng, tmp_1.lat)))),

outf as (select cuebiq_id,
        trip_start_zoned_datetime,
        next_zoned_datetime,
        country_code,
        next_country_code,
        lat,
        lng,
        next_lat,
        next_lng,
        distance_meters,
        time_distance_seconds,
        home_known_flag,
        work_known_flag,
        starts_in_home_area_flag,
        ends_in_home_area_flag,
        reference_date,
        city,
        airport_name,
        'OUTFLOW' as trip_type from tmp_2 where (not st_contains(st_geometryfromtext(tmp_2.geometry), st_point(tmp_2.next_lng, tmp_2.next_lat))))

select * from inf
union
select * from outf

-- success!

# the above query in a string format

In [13]:
query_inf_outf_trips = "with trips as (select * from paas_mi.trip where reference_date = '%(i)s'), \
airport_wkt as (select * from dedicated.airports.airport_wkt), \
tmp_1 as (select * from trips t inner join airport_wkt a on st_contains(st_geometryfromtext(a.geometry), st_point(t.next_lng, t.next_lat))), \
tmp_2 as (select * from trips t inner join airport_wkt a on st_contains(st_geometryfromtext(a.geometry), st_point(t.lng, t.lat))), \
inf as (select cuebiq_id, \
        trip_start_zoned_datetime, \
        next_zoned_datetime, \
        country_code, \
        next_country_code, \
        lat, \
        lng, \
        next_lat, \
        next_lng, \
        distance_meters, \
        time_distance_seconds, \
        home_known_flag, \
        work_known_flag, \
        starts_in_home_area_flag, \
        ends_in_home_area_flag, \
        reference_date, \
        city, \
        airport_name, \
        'INFLOW' as trip_type from tmp_1 where (not st_contains(st_geometryfromtext(tmp_1.geometry), st_point(tmp_1.lng, tmp_1.lat)))), \
outf as (select cuebiq_id, \
        trip_start_zoned_datetime, \
        next_zoned_datetime, \
        country_code, \
        next_country_code, \
        lat, \
        lng, \
        next_lat, \
        next_lng, \
        distance_meters, \
        time_distance_seconds, \
        home_known_flag, \
        work_known_flag, \
        starts_in_home_area_flag, \
        ends_in_home_area_flag, \
        reference_date, \
        city, \
        airport_name, \
        'OUTFLOW' as trip_type from tmp_2 where (not st_contains(st_geometryfromtext(tmp_2.geometry), st_point(tmp_2.next_lng, tmp_2.next_lat)))) \
select * from inf \
union \
select * from outf"

In [14]:
for i in dates_to_compute:
    print(f"Executing for {i}...")
    query = query_inf_outf_trips % {'i':i}
    print(query)
    res = %sql create table if not exists dedicated.airports.inf_and_outf_trips as {query}
    print(f"created {res['rows'][0]} rows")
    succedeed=int(res['rows'][0])
    if succedeed==0:
        #first time you create if not insert
        res= %sql insert into dedicated.airports.inf_and_outf_trips {query}
        print(f"inserted {res['rows'][0]} rows")

Executing for 2021-04-05...
with trips as (select * from paas_mi.trip where reference_date = '2021-04-05'), airport_wkt as (select * from dedicated.airports.airport_wkt), tmp_1 as (select * from trips t inner join airport_wkt a on st_contains(st_geometryfromtext(a.geometry), st_point(t.next_lng, t.next_lat))), tmp_2 as (select * from trips t inner join airport_wkt a on st_contains(st_geometryfromtext(a.geometry), st_point(t.lng, t.lat))), inf as (select cuebiq_id,         trip_start_zoned_datetime,         next_zoned_datetime,         country_code,         next_country_code,         lat,         lng,         next_lat,         next_lng,         distance_meters,         time_distance_seconds,         home_known_flag,         work_known_flag,         starts_in_home_area_flag,         ends_in_home_area_flag,         reference_date,         city,         airport_name,         'INFLOW' as trip_type from tmp_1 where (not st_contains(st_geometryfromtext(tmp_1.geometry), st_point(tmp_1.lng, tmp