In [1]:
%%html
<style>.container{width:99%}</style>

In [2]:
%load_ext autoreload
%autoreload 2

In [1]:
import os
import shutil
import re
import json
from pathlib import Path

import requests
import geopandas as gpd
import boto3
import pandas as pd

from serverless import settings
from serverless.pipeline.data import (
    census_businesspatterns, 
    census_population, 
    tiger
)
from serverless.pipeline import Pipeline
from serverless.utils import download_url

In [2]:
processed_path = Path(settings.data.processed_path)
pipeline = Pipeline(processed_path)

path: str = '.*/.*'

In [3]:
pipeline.dirs

{'/Users/timdobbins/projects/serverless-prez/data/processed/tiger/mapping': '/Users/timdobbins/projects/serverless-prez/data/processed/tiger/mapping/statefp=20/5f7c0d34c0474cabb397199ebb4d3a32-0.parquet',
 '/Users/timdobbins/projects/serverless-prez/data/processed/tiger/county': '/Users/timdobbins/projects/serverless-prez/data/processed/tiger/county/statefp=20/20.geojson',
 '/Users/timdobbins/projects/serverless-prez/data/processed/tiger/tract': '/Users/timdobbins/projects/serverless-prez/data/processed/tiger/tract/statefp=20/countyfp=025/20025.geojson',
 '/Users/timdobbins/projects/serverless-prez/data/processed/tiger/block': '/Users/timdobbins/projects/serverless-prez/data/processed/tiger/block/statefp20=38/countyfp20=027/tractce20=959200/38027959200.geojson',
 '/Users/timdobbins/projects/serverless-prez/data/processed/census/businesspatterns': '/Users/timdobbins/projects/serverless-prez/data/processed/census/businesspatterns/data.parquet',
 '/Users/timdobbins/projects/serverless-pre

In [4]:
pipeline.delete_lambda('serverlessprez_lambda')

{'ResponseMetadata': {'RequestId': '4d238976-0bd7-41ba-9aad-362e6cf269f1',
  'HTTPStatusCode': 204,
  'HTTPHeaders': {'date': 'Mon, 11 Jul 2022 02:54:47 GMT',
   'content-type': 'application/json',
   'connection': 'keep-alive',
   'x-amzn-requestid': '4d238976-0bd7-41ba-9aad-362e6cf269f1'},
  'RetryAttempts': 0}}

In [5]:
pipeline.create_lambda(
    '../serverless/lambda_app/app/lambda.py', 
    'serverlessprez_lambda',
    image_uri="639381120660.dkr.ecr.us-east-1.amazonaws.com/serverlessprez:0.2.4"
)

{'ResponseMetadata': {'RequestId': 'bb0ba2e8-1dd1-4aee-93c7-8c07eb8af060',
  'HTTPStatusCode': 201,
  'HTTPHeaders': {'date': 'Mon, 11 Jul 2022 02:54:55 GMT',
   'content-type': 'application/json',
   'content-length': '991',
   'connection': 'keep-alive',
   'x-amzn-requestid': 'bb0ba2e8-1dd1-4aee-93c7-8c07eb8af060'},
  'RetryAttempts': 0},
 'FunctionName': 'serverlessprez_lambda',
 'FunctionArn': 'arn:aws:lambda:us-east-1:639381120660:function:serverlessprez_lambda',
 'Role': 'arn:aws:iam::639381120660:role/serverlessprez',
 'CodeSize': 0,
 'Description': 'serverlessprez',
 'Timeout': 60,
 'MemorySize': 512,
 'LastModified': '2022-07-11T02:54:55.154+0000',
 'CodeSha256': '22aae9cc2b0794591fb0d32351a171bc32548ec7942d910b0b5d8d903bb7ab0e',
 'Version': '6',
 'TracingConfig': {'Mode': 'PassThrough'},
 'RevisionId': 'cdd65574-72b3-4c11-81ad-073906436ab2',
 'State': 'Pending',
 'StateReason': 'The function is being created.',
 'StateReasonCode': 'Creating',
 'PackageType': 'Image',
 'Archi

In [27]:
pipeline.create_api(
    name = 'serverlessprez_lambda-0.7'
)

{'ResponseMetadata': {'RequestId': '3ebc262d-0f65-4a17-8830-526dffb0e805', 'HTTPStatusCode': 201, 'HTTPHeaders': {'date': 'Mon, 11 Jul 2022 02:33:54 GMT', 'content-type': 'application/json', 'content-length': '71', 'connection': 'keep-alive', 'x-amzn-requestid': '3ebc262d-0f65-4a17-8830-526dffb0e805', 'x-amz-apigw-id': 'VFLS9JtNoAMEd5w='}, 'RetryAttempts': 0}, 'httpMethod': 'GET', 'authorizationType': 'NONE', 'apiKeyRequired': False}


{'ResponseMetadata': {'RequestId': 'f3f21c61-3827-4535-ad19-2b64a5c98679',
  'HTTPStatusCode': 201,
  'HTTPHeaders': {'date': 'Mon, 11 Jul 2022 02:33:55 GMT',
   'content-type': 'application/json',
   'content-length': '41',
   'connection': 'keep-alive',
   'x-amzn-requestid': 'f3f21c61-3827-4535-ad19-2b64a5c98679',
   'x-amz-apigw-id': 'VFLTAIfrIAMEb2A='},
  'RetryAttempts': 0},
 'id': 'xoc19g',
 'createdDate': datetime.datetime(2022, 7, 10, 22, 33, 55, tzinfo=tzlocal())}

---

In [55]:
import urllib.parse
import json
import pandas as pd
import geopandas as gpd
import folium
import requests

sql = urllib.parse.quote_plus(
    """
    SELECT 
        county.statefp,
        county.countyfp,
        pop.population_2020,
        pop.rural_urban_continuum_code_2013,
        pop.ctyname,
        ST_GeomFromLegacyBinary(county.geometry) as geometry
    FROM serverlessprez.census_population pop
    JOIN serverlessprez.tiger_county county 
        ON county.statefp = pop.st
        AND county.countyfp = pop.cty
    WHERE 1=1 --pop.population_2020 >= 200000
        AND pop.st = '47'
        LIMIT 40;
    """
)
url = f'https://350zw43791.execute-api.us-east-1.amazonaws.com/dev/serverlessprez?sql={sql}'
# rg = requests.get(url)

In [56]:
df = pd.read_json(url)

In [57]:
df['geometry'] = gpd.GeoSeries.from_wkt(df['geometry'])

gdf = gpd.GeoDataFrame(df, geometry="geometry")

In [58]:
for _, r in df.iterrows():
    # Without simplifying the representation of each borough,
    # the map might not be displayed
    sim_geo = gpd.GeoSeries(r['geometry']).simplify(tolerance=0.001)
    geo_j = sim_geo.to_json()
    geo_j = folium.GeoJson(data=geo_j,
                           style_function=lambda x: {'fillColor': 'orange'})
    folium.Popup(r['ctyname']).add_to(geo_j)
    geo_j.add_to(m)
m

In [4]:
def prep_zipcode_to_county_mapping(inpath, outpath, partition):
    df = pd.read_excel(inpath, engine='openpyxl', dtype={'zip': str, 'county': str})
    df['statefp'] = df['county'].apply(lambda x: x[:2])
    df['countyfp'] = df['county'].apply(lambda x: x[2:])
    df['state'] = df['usps_zip_pref_state'].str.lower()
    df['city'] = df['usps_zip_pref_city'].str.lower()
    df.to_parquet(outpath, index=False, partition_cols=partition)

def shp_to_esri_enclosed_json_input(gdf, geom_type='esriGeometryPolygon'):

    shp_format = json.loads(gdf.to_json())
    esri_enclosed_json_inputs = {}
    esri_enclosed_json_inputs['displayFieldName'] = ''
    esri_enclosed_json_inputs['fieldAliases'] = {k:k for (k,v) in shp_format['features'][0]['properties'].items()}
    esri_enclosed_json_inputs['geometryType'] = geom_type
    esri_enclosed_json_inputs['spatialReference'] = {'wkid': 4269}

    if geom_type == 'esriGeometryPolygon':
        geom_key = 'rings'
    elif geom_type == 'esriGeometryPolyline':
        geom_key = 'paths'

    for jsonfeat in shp_format['features']:

        if jsonfeat['geometry']:

            if 'features' not in esri_enclosed_json_inputs:
                esri_enclosed_json_inputs['features'] = []

            jsonfeat['geometry'] = {
                geom_key: jsonfeat['geometry']['coordinates']
            }

            efeat = {
                'attributes': jsonfeat['properties']
            }

            efeat['geometry'] = jsonfeat['geometry']
            esri_enclosed_json_inputs['features'].append(efeat)

    return esri_enclosed_json_inputs

def partition_geo_data(path, partitions, processed_path, input_df=None, geom_type='esriGeometryPolygon'):

    gdf = input_df if input_df is not None else gpd.read_file(path).to_crs(4269).convert_dtypes()
    
    if partitions:

        grouped = gdf.groupby(partitions, as_index=False)

        for name, group in grouped:
            if isinstance(name, str):
                name = [name]
            filters = list(zip(partitions, name))
            fileid = ''.join([i[1] for i in filters])
            loc = '/'.join([f'{f[0]}={f[1]}'.lower() for f in filters])

            esri_format = shp_to_esri_enclosed_json_input(group, geom_type=geom_type)

            to_path = Path(processed_path/loc)
            to_path.mkdir(parents=True, exist_ok=True)

            with open(to_path/f'{fileid}.geojson', 'w') as file:
                json.dump(esri_format, file)

In [11]:
def tiger():
    zip_extension = '.zip'
    shp_extension = '.shp'
    excel_extensions = ['.xls', '.xlsx']

    for folder, subfolder in settings.etl.tiger.items():

        raw_path = settings.root_path/settings.data.raw_path/subfolder.file_path
        interim_path = settings.root_path/settings.data.interim_path/subfolder.file_path
        processed_path = settings.root_path/settings.data.processed_path/subfolder.file_path

        raw_path.mkdir(parents=True, exist_ok=True)
        processed_path.mkdir(parents=True, exist_ok=True)
        interim_path.mkdir(parents=True, exist_ok=True)

        url = subfolder.url

        for file in subfolder.files:
            raw_file_path = raw_path/file
            processed_file_path = processed_path/file
            interim_file_path = interim_path/file
            raw_file_exists = os.path.isfile(raw_file_path)
            existing_interim_files = [re.sub('\..*', '', i.name) for i in interim_path.iterdir()]
            processed_file_exists = os.path.isfile(processed_file_path)

            file_name_wo_zip = file.replace(zip_extension, '')

            if not raw_file_exists:
                print(f'DOWNLOADING: from {url+file} to {raw_file_path}')
                download_url(url+file, raw_file_path)
                if (zip_extension in file) and (file_name_wo_zip not in existing_interim_files):
                    print(f'UNZIPPING: from {raw_file_path} to {interim_file_path.parent}')
                    shutil.unpack_archive(raw_file_path, interim_file_path.parent)

                if raw_file_path.suffix == zip_extension:
                    interim_file = interim_file_path.parent/(raw_file_path.stem+shp_extension)
                    interim_file_exists = os.path.isfile(interim_file)
                    # if not interim_file_exists:
                    print(f'PARTITIONING: from {interim_file} to {processed_path}')
                    partition_geo_data(interim_file, subfolder.partition_by, processed_path)

            elif raw_file_path.suffix in excel_extensions:
                interim_file = interim_file_path.parent/'data.parquet'
                interim_file_exists = os.path.isfile(interim_file)
                if not interim_file_exists:
                    print(F'PREPPING: from {raw_file_path} to {interim_file}')
                    # prep_zipcode_to_county_mapping(raw_file_path, processed_file_path.parent, subfolder.partition_by)
        print('FINISHED POPULATING RAW, INTERIM, AND PROCESSED DIRECTORIES')
        
    return None

def main():
    tiger()