In [None]:
%load_ext cuebiqmagic.magics

In [None]:
%init_cuebiq_data 
import pandas as pd
snow_engine = get_ipython().user_ns['instance']
pd.set_option("display.max_columns", 25)

In [None]:
import geopandas as gpd
from datetime import datetime, date, timedelta
from tqdm import tqdm
import gc
from pathlib import Path

In [None]:
def numerical_date_range(start_date: int, end_date: int):
    """Return every calendar day from start_date to end_date inclusive, each as an int YYYYMMDD."""
    s = str(start_date)
    e = str(end_date)
    start_dt = date(int(s[:4]), int(s[4:6]), int(s[6:]))
    end_dt   = date(int(e[:4]), int(e[4:6]), int(e[6:]))
    if start_dt > end_dt:
        raise ValueError("start_date must not be after end_date")
    span = (end_dt - start_dt).days
    return [int((start_dt + timedelta(days=i)).strftime("%Y%m%d")) for i in range(span + 1)]

## Polygon objects

In [None]:
geo_table = "CUEBIQ_DATA.PAAS_CDA_PE_V3.GEOGRAPHY_REGISTRY"
zip_codes_ca_table = "DEDICATED.PACOB.ZIP_CODES_CA"

In [None]:
zip_codes_ca_q = f'''SELECT * 
                    FROM {geo_table}
                    WHERE country_code = 'US' AND
                    geography_type_code = 'zipcode' AND
                    (substr(geography_id,1,5)) IN ('US.90', 'US.91', 'US.92', 'US.93', 'US.94', 'US.95', 'US.96')
            '''
zip_codes_ca = snow_engine.read_sql(zip_codes_ca_q)
zip_codes_ca['geometry'] = gpd.GeoSeries.from_wkt(zip_codes_ca.geometry_wkt)

In [None]:
wkts_zips = gpd.GeoSeries(zip_codes_ca.loc[zip_codes_ca.geography_id.isin(['US.91001','US.90272'])].geometry).to_wkt()
print(wkts_zips.iloc[1])

In [None]:
baseline_pings_by_nights = 'DEDICATED.PACOB.BASELINE_PINGS_BY_NIGHTS'
jan_feb_pings_by_nights =  'DEDICATED.PACOB.JAN_FEB_PINGS_BY_NIGHTS'
feb_apr_pings_by_nights =  'DEDICATED.PACOB.FEB_APR_PINGS_BY_NIGHTS'

# Get pings for different periods

In [None]:
users_by_nights = 'DEDICATED.PACOB.LA_BASELINE_USERS_BY_NIGHTS'
zip_codes_ca_table = "DEDICATED.PACOB.ZIP_CODES_CA"
ping_table = 'CUEBIQ_DATA.PAAS_CDA_PE_V3.DEVICE_LOCATION_UPLEVELLED'

In [None]:
def pings_from_users_in_zips(ping_table: str,
                users_table: str,
                zip_table: str,
                processing_date: int,
                provider: str,
                date_range):
    start_date, end_date = date_range
    event_start = (datetime.strptime(str(start_date), "%Y%m%d").date())
    event_end = (datetime.strptime(str(end_date), "%Y%m%d").date() - timedelta(days=5))
    event_end = max(event_start, event_end)
    
    event_start, event_end = event_start.strftime("%Y%m%d"), event_end.strftime("%Y%m%d")
    return f"""
SELECT
    S.CUEBIQ_ID,
    H.EVENT_TIMESTAMP,
    H.EVENT_ZONED_DATETIME,
    H.LNG,
    H.LAT,
    H.ACCURACY_METERS,
    H.CLASSIFICATION_TYPE,
    COALESCE(Z.GEOGRAPHY_ID, 'OUTSIDE') AS ZIPCODE_ID
FROM {ping_table}               H
JOIN {users_table}              S  ON H.CUEBIQ_ID = S.CUEBIQ_ID
LEFT JOIN {zip_table}           Z  ON ST_WITHIN(
                                      ST_MAKEPOINT(H.LNG, H.LAT),
                                      TO_GEOGRAPHY(Z.GEOG))
WHERE H.PROCESSING_DATE = {processing_date}
  AND H.COUNTRY_CODE   = 'US'
  AND H.PROVIDER_ID    = '{provider}'
  AND TO_DATE(TO_TIMESTAMP_TZ(H.EVENT_ZONED_DATETIME))
      BETWEEN TO_DATE('{event_start}','YYYYMMDD') AND TO_DATE('{event_end}','YYYYMMDD')
"""

## Pings for baseline users

In [None]:
baseline_pings_by_nights = 'DEDICATED.PACOB.BASELINE_PINGS_BY_NIGHTS'

In [None]:
snow_engine.execute_statement(f'CREATE OR REPLACE TABLE {baseline_pings_by_nights} (cuebiq_id STRING, event_timestamp INT, EVENT_ZONED_DATETIME STRING, LAT FLOAT, LNG FLOAT, ACCURACY_METERS FLOAT, CLASSIFICATION_TYPE STRING, ZIPCODE_ID STRING)')

In [None]:
# loop dimensions
providers = ['SIERRA', 'WHISKEY']

date_start = 20241126
date_end = 20250107 + 5
dates     = numerical_date_range(date_start, date_end)

base = Path('BASELINE_PINGS_BY_NIGHTS')
base.mkdir(exist_ok=True)

for d in tqdm(dates, desc='processing dates'):
    day_frames = []
    for provider in providers:
        sql = pings_from_users_in_zips(ping_table,
                                       users_by_nights,
                                       zip_codes_ca_table,
                                       d,
                                       provider,
                                       (date_start, date_end))
        chunk = snow_engine.read_sql(sql)
        chunk['provider'] = provider          # keep track of the provider in the output
        day_frames.append(chunk)

    day_df = pd.concat(day_frames, ignore_index=True)

    out_dir = base / f'processing_date={d}'
    out_dir.mkdir(parents=True, exist_ok=True)
    day_df.to_parquet(out_dir / 'data.parquet', index=False)

    del day_df, day_frames
    gc.collect()

## Pings for jan-feb users

In [None]:
jan_feb_pings_by_nights = 'DEDICATED.PACOB.JAN_FEB_PINGS_BY_NIGHTS'

In [None]:
snow_engine.execute_statement(f'CREATE OR REPLACE TABLE {jan_feb_pings_by_nights} (cuebiq_id STRING, event_timestamp INT, EVENT_ZONED_DATETIME STRING, LAT FLOAT, LNG FLOAT, ACCURACY_METERS FLOAT, CLASSIFICATION_TYPE STRING, ZIPCODE_ID STRING)')

In [None]:
# loop dimensions
providers = ['WHISKEY', 'SIERRA']

date_start = 20250112
date_end = 20250218 + 5 # 6 weeks + 5 days
dates     = numerical_date_range(date_start, date_end)

base = Path('JAN_FEB_PINGS_BY_NIGHTS')
base.mkdir(exist_ok=True)

for d in tqdm(dates, desc='processing dates'):
    day_frames = []
    for provider in providers:
        sql = pings_from_users_in_zips(ping_table,
                                       users_by_nights,
                                       zip_codes_ca_table,
                                       d,
                                       provider,
                                       (date_start, date_end))
        chunk = snow_engine.read_sql(sql)
        chunk['provider'] = provider          # keep track of the provider in the output
        day_frames.append(chunk)

    day_df = pd.concat(day_frames, ignore_index=True)

    out_dir = base / f'processing_date={d}'
    out_dir.mkdir(parents=True, exist_ok=True)
    day_df.to_parquet(out_dir / 'data.parquet', index=False)

    del day_df, day_frames
    gc.collect()

## Pings for feb-apr users

In [None]:
jan_feb_pings_by_nights = 'DEDICATED.PACOB.FEB_APR_PINGS_BY_NIGHTS'

In [None]:
snow_engine.execute_statement(f'CREATE OR REPLACE TABLE {jan_feb_pings_by_nights} (cuebiq_id STRING, event_timestamp INT, EVENT_ZONED_DATETIME STRING, LAT FLOAT, LNG FLOAT, ACCURACY_METERS FLOAT, CLASSIFICATION_TYPE STRING, ZIPCODE_ID STRING)')

In [None]:
# loop dimensions
providers = ['WHISKEY', 'SIERRA']

date_start = 20250219
date_end = 20250402 + 5 # 6 weeks + 5 days
dates     = numerical_date_range(date_start, date_end)

base = Path('FEB_APR_PINGS_BY_NIGHTS')
base.mkdir(exist_ok=True)

for d in tqdm(dates, desc='processing dates'):
    day_frames = []
    for provider in providers:
        sql = pings_from_users_in_zips(ping_table,
                                       users_by_nights,
                                       zip_codes_ca_table,
                                       d,
                                       provider,
                                       (date_start, date_end))
        chunk = snow_engine.read_sql(sql)
        chunk['provider'] = provider          # keep track of the provider in the output
        day_frames.append(chunk)

    day_df = pd.concat(day_frames, ignore_index=True)

    out_dir = base / f'processing_date={d}'
    out_dir.mkdir(parents=True, exist_ok=True)
    day_df.to_parquet(out_dir / 'data.parquet', index=False)

    del day_df, day_frames
    gc.collect()