In [1]:
%reload_ext sql
%config SqlMagic.autocommit=False
%config SqlMagic.autolimit=0
%config SqlMagic.autopandas=True
%config SqlMagic.displaylimit=200
%sql trino://localhost:9090/cuebiq/

import pandas as pd
import yaml
import numpy as np
import os
from pyhive import trino
import pydeck as pdk
from typing import List
import json
import copy
import itertools
# import geohash
import geopandas as geopd
from pyquadkey2 import quadkey
from pyquadkey2.quadkey import TileAnchor, QuadKey
from h3 import h3
import seaborn as sns
from datetime import datetime, timedelta
import math
import pickle

os.environ['MAPBOX_API_KEY'] = "INSERT YOUR MAPBOX TOKEN HERE"
pd.set_option('display.max_colwidth', 0)
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)
import warnings
warnings.filterwarnings('ignore')

In [2]:
# SQL engine
from trino.dbapi import connect 
from sqlalchemy import create_engine

class TrinoEngine():
    def __init__(self):
        conn = connect(
            host="localhost",
            port=9090,
            catalog="cuebiq"
        )
        self.cur = conn.cursor()
        self.engine = create_engine("trino://localhost:9090/cuebiq/")
    
    def execute_statement(self, query:str) -> list:
        """
        Create and drop statements.
        """
        self.cur.execute(query)
        return self.cur.fetchall()
    
    def read_sql(self, query:str) -> pd.DataFrame: 
        """
        Select and insert into operations.
        """
        return pd.read_sql(query, self.engine)

sql_engine = TrinoEngine()

In [3]:
# Census Block Groups typically have a population between 600 to 3000 people, this makes the data privacy safe.
geography_table = "cuebiq.paas_cda_pe_v3.geography_registry"
visit_table = "cuebiq.paas_cda_pe_v3.visit"

# Descriptive statistics of the data set used in the CEUS paper

In [5]:
%%time
## this code snippet is only for descriptive statistics and no need to run in the future
df_indi_home_loc_race = pd.read_pickle("../output/CA_Individual_race_composition_20190301.pkl")
print(df_indi_home_loc_race.shape[0]) # all cubiq individuals who live in California: 1600238

# date = ['20220301_20220308','20220309_20220316','20220317_20220323','20220324_20220331']
date = ['20190301_20190308','20190309_20190316','20190317_20190323','20190324_20190331']

sum_df_race = pd.DataFrame()
for d in date:
    print (d)
    df_visit_LA = pd.read_pickle("../output/LA_visit_with_poiCategory"+str(d)+".pkl")
    df_visit_LA.drop_duplicates(keep='last', inplace=True) 
    df_visit_LA_race = pd.merge(df_visit_LA, df_indi_home_loc_race,on='cuebiq_id')  
    sum_df_race = sum_df_race.append(df_visit_LA_race)
print('total visit records:',sum_df_race.shape[0])
print('total unique persons:',sum_df_race.cuebiq_id.unique().shape[0])
print('total unique POIs:',sum_df_race.place_id.unique().shape[0])    
allpid_inthepaper = sum_df_race.cuebiq_id.unique().tolist()

1086135
20190301_20190308
20190309_20190316
20190317_20190323
20190324_20190331
total visit records: 4580969
total unique persons: 283556
total unique POIs: 34657
CPU times: user 20.9 s, sys: 4.06 s, total: 25 s
Wall time: 28.4 s


In [19]:
%sql show tables from cuebiq.paas_cda_pe_v3

 * trino://localhost:9090/cuebiq/
Done.


Unnamed: 0,Table
0,brand
1,cbsa
2,custom_poi
3,custom_poi_history
4,device_location_uplevelled
5,device_metrics_uplevelled
6,device_recurring_area
7,device_user_labeling
8,geography_registry
9,poi


In [10]:
# sql_engine.read_sql(f"desc cuebiq.paas_cda_pe_v3.visit")

In [9]:
%%time
# get the total number of location points of March 2022 data in LA County used in the CEUS paper
# this takes a super long time - haven't finished running once yet
pe_dl_table = "cuebiq.paas_cda_pe_v3.device_location_uplevelled"
sql_engine.read_sql(
    f"""
    select count(*)
    from {pe_dl_table}
    where 
        country_code = 'US'
        and processing_date between 20190301 and 20190331
        and provider_id = '190199'
        and cuebiq_id in (1069274429, 1591710227)
    """
)

CPU times: user 1.65 s, sys: 18 ms, total: 1.67 s
Wall time: 7min 47s


Unnamed: 0,_col0
0,10278


In [15]:
10278/2*283556

1457194284.0

# Option 1. Extract visit table - one day only

In [8]:
date = 20220310 # Thursday
date_plus = int((datetime.strptime(str(date), "%Y%m%d") + timedelta(days=3)).strftime("%Y%m%d"))# You see that we can limit to the first three processing dates starting from the local date of interest.

In [66]:
# sql_engine.read_sql(f"""
# SELECT from_iso8601_timestamp('2022-03-10T13:54:09-08:00') AT TIME ZONE 'UTC'
# """)

In [65]:
# sql_engine.read_sql(f"""
# SELECT from_iso8601_timestamp('2022-03-10T13:54:09-08:00') AT TIME ZONE '-08:00'
# """)

In [28]:
# test = sql_engine.read_sql(f"""
#         select
#             *, 
#             from_iso8601_timestamp(zoned_datetime) as start_time, 
#             date_add('second', cast(dwell_time_minutes * 60 as bigint), from_iso8601_timestamp(zoned_datetime)) as end_time
#         from {visit_table}
#         where 
#             country_code = 'US'
#             and provider_id = '190199'
#             and processing_date between {date} and {date_plus}
#             and event_date = {date}
#             and admin2_id = 'US.CA.037'
#             and dwell_time_minutes < 1440
#         limit 10
#         """
# )
# test.head()

In [None]:
# from_iso8601_timestamp(zoned_datetime) as start_time, 
# date_add('second', cast(dwell_time_minutes * 60 as bigint), from_iso8601_timestamp(zoned_datetime)) as end_time

In [1]:
%%time
# 264083
# CPU times: user 12.2 s, sys: 387 ms, total: 12.6 s
# Wall time: 1min 7s

df_visit_LA = sql_engine.read_sql(f"""
    with la_visit as (
        select
            *
        from {visit_table}
        where 
            country_code = 'US'
            and provider_id = '190199'
            and processing_date between {date} and {date_plus}
            and event_date = {date}
            and admin2_id = 'US.CA.037'
            and dwell_time_minutes < 1440
        ),
        
        la_cbg as (
        select
            geography_id, geometry_wkt
        from {geography_table}
        where
            country_code = 'US'
            and geography_type_code = 'admin4'
            and geography_id like 'US.CA.037%'     --- <<< filter geometries in LA
        )
        
        select
        *
        from la_visit s
        inner join la_cbg c
        on st_contains(st_geometryfromtext(c.geometry_wkt), st_point(s.lng, s.lat))
        """
)

print(df_visit_LA.shape[0])
df_visit_LA.drop(['zipcode_id','place_version','processing_date','country_code',
                 'geometry_wkt','admin1_id','device_type_code','os_name','provider_id'], inplace=True, axis=1)
df_visit_LA.head()

In [None]:
%%time
#create start_time, end_time, start_hour, and end_hour columns
df_visit_LA['start_time'] =  df_visit_LA['zoned_datetime'].astype(str).str[:19]
df_visit_LA['start_time'] = pd.to_datetime(df_visit_LA['start_time'], errors='coerce')
df_visit_LA['time_added'] = pd.to_timedelta(df_visit_LA['dwell_time_minutes'],'m')
df_visit_LA['end_time'] = df_visit_LA['start_time'] + df_visit_LA['time_added']
df_visit_LA['start_hour'] = df_visit_LA['start_time'].dt.hour
df_visit_LA['end_hour'] = df_visit_LA['end_time'].dt.hour
df_visit_LA.drop(['time_added'], inplace=True, axis=1)
df_visit_LA.head()

In [None]:
# df_visit_LA.to_pickle("./output/LA_visit_"+str(date)+".pkl")  
# df_visit_LA = pd.read_pickle("./output/LA_visit_"+str(date)+".pkl")

# Option 2. Extract visit table - time range

In [4]:
#March 2022: 
#weekend days: 20220305,20220306,20220312,20220313,20220319,20220320,20220326,20220327
#weekend days: 20190302,20190303,20190309,20190310,20190316,20190317,20190323,20190324,20190330,20190331

#20190301,20190308
#20190309,20190316
#20190317,20190323
#20190324,20190331
start_date,end_date = 20190324,20190331
end_date_plus = int((datetime.strptime(str(end_date), "%Y%m%d") + timedelta(days=3)).strftime("%Y%m%d"))# You see that we can limit to the first three processing dates starting from the local date of interest.

In [36]:
# from_iso8601_timestamp(zoned_datetime) as start_time, 
# date_add('second', cast(dwell_time_minutes * 60 as bigint), from_iso8601_timestamp(zoned_datetime)) as end_time

In [5]:
%%time  
# about 4min
# Select visits in LA county only

print(start_date,end_date)
df_visit_LA_range = sql_engine.read_sql(f"""
    with la_visit as (
        select
            *
        from {visit_table}
        where 
            country_code = 'US'
            and provider_id = '190199'
            and processing_date between {start_date} and {end_date_plus}
            and event_date between {start_date} and {end_date}
            and admin2_id = 'US.CA.037'     --- <<< filter geometries in LA
        ),
        
        la_cbg as (
        select
            geography_id, geometry_wkt
        from {geography_table}
        where
            country_code = 'US'
            and geography_type_code = 'admin4'
            and geography_id like 'US.CA.037%'     --- <<< filter geometries in LA
        )
        
        select
        *
        from la_visit s
        inner join la_cbg c
        on st_contains(st_geometryfromtext(c.geometry_wkt), st_point(s.lng, s.lat))
        """
)

print(df_visit_LA_range.shape[0])
df_visit_LA_range.drop(['zipcode_id','place_version','processing_date','country_code',
                 'geometry_wkt','admin1_id','device_type_code','os_name','provider_id'], inplace=True, axis=1)
df_visit_LA_range.head()

20190324 20190331
1909313
CPU times: user 54.1 s, sys: 2.9 s, total: 57 s
Wall time: 2min 53s


Unnamed: 0,admin2_id,brand_id,cuebiq_id,dwell_time_minutes,event_date,geohash,geoset_id,lat,lng,place_id,zoned_datetime,geography_id
0,US.CA.037,629,1069274429,7.966667,20190327,9q5cfdwu2,11132,34.07211,-118.357421,31119715,2019-03-27T19:05:07-07:00,US.CA.037.214501.2
1,US.CA.037,420,1591710227,108.516667,20190327,9qh08gwy3,7774,33.858219,-118.082707,25605481,2019-03-27T12:58:31-07:00,US.CA.037.554900.5
2,US.CA.037,38,1803455491,6.816667,20190327,9q5fbpnez,7669,34.272501,-118.467296,25402132,2019-03-27T15:19:29-07:00,US.CA.037.109100.2
3,US.CA.037,38,933900377,3.05,20190328,9q5dt5641,7669,34.207664,-118.605598,25648766,2019-03-28T11:06:06-07:00,US.CA.037.134522.1
4,US.CA.037,122,848675930,114.133333,20190327,9q5bdb98q,7824,33.840715,-118.353367,26115943,2019-03-27T14:12:41-07:00,US.CA.037.650604.1


In [6]:
%%time 
#create start_time, end_time, start_hour, and end_hour columns
df_visit_LA_range['start_time'] =  df_visit_LA_range['zoned_datetime'].astype(str).str[:19]
df_visit_LA_range['start_time'] = pd.to_datetime(df_visit_LA_range['start_time'], errors='coerce')
df_visit_LA_range['time_added'] = pd.to_timedelta(df_visit_LA_range['dwell_time_minutes'],'m')
df_visit_LA_range['end_time'] = df_visit_LA_range['start_time'] + df_visit_LA_range['time_added']
df_visit_LA_range['start_hour'] = df_visit_LA_range['start_time'].dt.hour
df_visit_LA_range['end_hour'] = df_visit_LA_range['end_time'].dt.hour
df_visit_LA_range.drop(['time_added'], inplace=True, axis=1)
df_visit_LA_range.head()

CPU times: user 1.53 s, sys: 4 ms, total: 1.53 s
Wall time: 1.54 s


Unnamed: 0,admin2_id,brand_id,cuebiq_id,dwell_time_minutes,event_date,geohash,geoset_id,lat,lng,place_id,zoned_datetime,geography_id,start_time,end_time,start_hour,end_hour
0,US.CA.037,629,1069274429,7.966667,20190327,9q5cfdwu2,11132,34.07211,-118.357421,31119715,2019-03-27T19:05:07-07:00,US.CA.037.214501.2,2019-03-27 19:05:07,2019-03-27 19:13:05.000000020,19,19
1,US.CA.037,420,1591710227,108.516667,20190327,9qh08gwy3,7774,33.858219,-118.082707,25605481,2019-03-27T12:58:31-07:00,US.CA.037.554900.5,2019-03-27 12:58:31,2019-03-27 14:47:02.000000020,12,14
2,US.CA.037,38,1803455491,6.816667,20190327,9q5fbpnez,7669,34.272501,-118.467296,25402132,2019-03-27T15:19:29-07:00,US.CA.037.109100.2,2019-03-27 15:19:29,2019-03-27 15:26:18.000000020,15,15
3,US.CA.037,38,933900377,3.05,20190328,9q5dt5641,7669,34.207664,-118.605598,25648766,2019-03-28T11:06:06-07:00,US.CA.037.134522.1,2019-03-28 11:06:06,2019-03-28 11:09:09.000000000,11,11
4,US.CA.037,122,848675930,114.133333,20190327,9q5bdb98q,7824,33.840715,-118.353367,26115943,2019-03-27T14:12:41-07:00,US.CA.037.650604.1,2019-03-27 14:12:41,2019-03-27 16:06:48.999999980,14,16


In [7]:
df_visit_LA_range.to_pickle("../output/LA_visit_"+str(start_date)+'_'+str(end_date)+".pkl")  
# df_visit_LA_range = pd.read_pickle("../output/LA_visit_"+str(start_date)+'_'+str(end_date)+".pkl")

# Don't run below

In [30]:
## Voided, not useful
# df_visit_LA_range['start_time'] = df_visit_LA_range['zoned_datetime'].dt.tz_convert('America/Los_Angeles')
# df_visit_LA_range['end_time'] = df_visit_LA_range['end_time'].dt.tz_convert('America/Los_Angeles')
# df_visit_LA_range['start_time']  = df_visit_LA_range['start_time'].dt.tz_localize(None)
# df_visit_LA_range['end_time']  = df_visit_LA_range['end_time'].dt.tz_localize(None)
# df_visit_LA_range['start_hour'] = df_visit_LA_range['start_time'].dt.hour
# df_visit_LA_range['end_hour'] = df_visit_LA_range['end_time'].dt.hour

In [9]:
%%time
# read LA CBG data
cbg_geom = sql_engine.read_sql(
    f"""
    select
        geography_id, geometry_wkt
    from {geography_table}
    where
        country_code = 'US'
        and geography_type_code = 'admin4'
        and geography_id like 'US.CA.083%'     --- <<< filter geometries in LA
    """
)

cbg_geom.rename(columns={'geography_id': 'block_group_id'}, inplace=True)
# eliminate Catalina island and another island in the south of Catalina island
# cbg_geom = cbg_geom[~cbg_geom['block_group_id'].isin(['US.CA.037.599100.2','US.CA.037.599000.2','US.CA.037.599000.1','US.CA.037.599000.4','US.CA.037.599000.3','US.CA.037.599100.1'])]

CPU times: user 86.5 ms, sys: 617 µs, total: 87.1 ms
Wall time: 6.29 s


In [10]:
from keplergl import KeplerGl
KeplerGl(data={'geometries': cbg_geom})

User Guide: https://docs.kepler.gl/docs/keplergl-jupyter


KeplerGl(data={'geometries':          block_group_id  \
0    US.CA.083.001604.1   
1    US.CA.083.002702.1   
…