In [44]:
from gzip import GzipFile
from io import BytesIO
import importlib
import os
from pathlib import Path
import shutil
import sys
from typing import Tuple, List

from arcgis.features import GeoAccessor, GeoSeriesAccessor

import boto3
from dotenv import load_dotenv, find_dotenv
import pandas as pd

In [2]:
# paths to common data locations - NOTE: to convert any path to a raw string, simply use str(path_instance)
dir_prj = Path('./').absolute().parent

dir_data = dir_prj/'data'

dir_raw = dir_data/'raw'
dir_ext = dir_data/'external'
dir_int = dir_data/'interim'
dir_out = dir_data/'processed'

gdb_raw = dir_raw/'raw.gdb'
gdb_int = dir_int/'interim.gdb'
gdb_out = dir_out/'processed.gdb'

# import the project package from the project package path
sys.path.append(str(dir_prj/'src'))
import uw_parks

# load the "autoreload" extension so that code can change, & always reload modules so that as you change code in src, it gets loaded
%load_ext autoreload
%autoreload 2

# load environment variables from .env
load_dotenv(find_dotenv())

True

# Get Patterns from Safegraph

Safegraph Patterns datasets are all available for download by month as complete datasets split into sequential files based on size. Yes, this sounds a little confusing at first, but it is a pretty common practice for big data. The data is organized into directories by year, then by month, and 

In [62]:
def get_year_month(pth: Path) -> Tuple[int, int]:
    """Get the year and month from the file path convention- useful for filtering."""
    
    pth = Path(pth) if isinstance(pth, str) else pth
    
    # get the year and month from the path depending on whether or not it is from the backports
    if 'backfill' in str(pth):
        yr_mth = pth.parts[6:8]
    else:
        yr_mth = pth.parts[2:4]
        
    # convert the year and month to integers for sorting
    yr_mth = tuple(int(val) for val in yr_mth)
    
    return yr_mth


def get_standardized_path(pth: Path) -> Path:
    """Get a standardized path for saving."""
    
    # get the year and the month from the file path
    year, month = get_year_month(pth)
    
    # get all the path parts
    pth_prts = Path(pth).parts
    
    # now, standardize the path
    pth_root = Path(pth_prts[0])
    typ = pth_prts[1].replace('_backfill', '')
    fl_nm = pth_prts[-1]
    
    # put everything back together in a standard schema
    out_pth = pth_root/typ/f'{year:04d}'/f'{month:02d}'/fl_nm
    
    return out_pth

def get_resource_type(pth: Path) -> str:
    """Get the resource type from the path - useful for finding the right resources."""
    typ = Path(pth).parts[1].replace('_backfill', '')
    return typ

def get_content_dataframe(s3, bucket='sg-c19-response', prefix='monthly-patterns'):
    """Provide ability to introspectively retrieve a dataframe of monthly patterns data."""

    # get the contents of the bucket with the specified prefix
    bkt_ls = s3.list_objects(Bucket=bucket, Prefix=prefix)
    cntnts = bkt_ls['Contents']

    # get a dataframe of all the available resources, starting with the S3 bucket path
    cntnt_df = pd.Series([itm['Key'] for itm in cntnts if not itm['Key'].endswith('_SUCCESS')], name='source_path').to_frame()

    # calculate the year and month for the resource
    cntnt_df[['year', 'month']] = cntnt_df.source_path.apply(lambda pth: pd.Series(get_year_month(pth)))

    # get the resource category
    cntnt_df['resource_type'] = cntnt_df.source_path.apply(lambda pth: get_resource_type(pth))

    # get a standardized path - useful for saving outputs
    cntnt_df['standardized_path'] = cntnt_df.source_path.apply(lambda pth: get_standardized_path(pth))

    return cntnt_df

def check_list(in_lst, dtype=str):
    """Helper function to ensure input is a list of correct data type."""
    
    assert isinstance(in_lst, (list, dtype))
            
    if isinstance(in_lst, list):
        for itm in in_lst: 
            assert isinstance(itm, dtype)
    else:
        in_lst = [in_lst]
            
    return in_lst


class SafegraphClient:
    """Client streamlining process of retrieving data from AWS S3 and preparing a proejct for analysis."""
    
    def __init__(self, bucket='sg-c19-response', prefix='monthly-patterns', access_key=None, secret_key=None):
        
        self.bucket = bucket
        self.prefix = prefix
        
        if access_key or secret_key:
            assert (access_key and secret_key), 'If explicitly providing an access_key and secret_key for accessing AWS S3, you must provide both.'

        # retrieve credentials from environment variables if not explicitly provided
        access_key = os.getenv('AWS_KEY') if not access_key else access_key
        secret_key = os.getenv('AWS_SECRET') if not secret_key else secret_key

        assert access_key, 'If "AWS_KEY" is not set in the environment variables, it must be explicitly provided in the "access_key" parameter.'
        assert secret_key, 'If "AWS_SECRET" is not set in the environment variables, it must be explicitly provided in the "secret_key" parameter.'

        # start a session connecting to AWS with credentials
        aws_session = boto3.Session(aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name='us-east-1')

        # using the authenticated session, create an S3 client accessing the safegraph data
        self.s3 = aws_session.client('s3', endpoint_url='https://s3.wasabisys.com')
        
        self._content_dataframe = None
        
    @property
    def content_dataframe(self):
        
        if self._content_dataframe is None:
            self._content_dataframe = get_content_dataframe(self.s3, self.bucket, self.prefix)
            
        return self._content_dataframe

    def get_dataframe_from_remote_path(self, source_s3_path:str):
        """Get a dataframe for a resource using the remove path, the, "Key," referncing the path within the bucket in AWS S3."""
        
        src_pth = str(source_s3_path) if isinstance(source_s3_path, Path) else source_s3_path
        
        assert self.content_dataframe.source_path.str.contains(src_pth).any()
        
        # get the headers describing the file
        resp = self.s3.get_object(Bucket=self.bucket, Key=src_pth)
        
        # handle compressed data differently - only applies to patterns data
        if resp['ContentType'] == 'application/gzip':
            
            # create a gzip object mapped to the stream
            gz = GzipFile(fileobj=resp.get('Body'))
            
            # stream into a pandas dataframe
            df = pd.read_csv(gz, dtype=str)
            
        # the rest are just in flat csv's, so use BytesIO for them
        else:
            byt = BytesIO(resp.get('Body').read())    
            df = pd.read_csv(byt)
            
        return df

    
    def get_patterns_dataframe(self, year:[int, List[int]], month:[int, List[int]]=None, safegraph_pois:[str, List[str]]=None, 
                               placekeys:[str, List[str]]=None) -> pd.DataFrame:
        """Get a patterns dataframe for a specific month and year with the option (recommended) to filter to a specific point of interest using the Safegraph POI ID."""
        
        year = check_list(year, int)
        
        if month is not None:
            month = check_list(month, int)
            
            # filter the content for just this month's patterns data
            ym_cntnt_df = self.content_dataframe[(self.content_dataframe.year.isin(year)) & 
                                                 (self.content_dataframe.month.isin(month)) & 
                                                 (self.content_dataframe.resource_type == 'patterns')]
            
        else:
            # filter the content for just this month's patterns data
            ym_cntnt_df = self.content_dataframe[(self.content_dataframe.year.isin(year)) &
                                                 (self.content_dataframe.resource_type == 'patterns')]

        # empty list to populate
        ym_df_lst = []

        # for every one of the remote files
        for pth in ym_cntnt_df.source_path:
            
            # get a dataframe for the remote file
            tmp_df = self.get_dataframe_from_remote_path(pth)
            
            # filter the dataframe based on Safegraph POI ID's
            if safegraph_pois is not None:
                safegraph_pois = check_list(safegraph_pois, str)
                tmp_df = tmp_df[(tmp_df.safegraph_place_id.isin(safegraph_pois))]
                
            # filter the dataframe based on Placekeys
            if placekeys is not None:
                placekeys = check_list(placekeys, str)
                tmp_df = tmp_df[(tmp_df.placekey.isin(placekeys))]
            
            # add the dataframe to the list
            ym_df_lst.append(tmp_df)

        # combine all the output dataframes
        ym_df = pd.concat(ym_df_lst)
        
        return ym_df

In [63]:
sg = SafegraphClient()

sg

<__main__.SafegraphClient at 0x2afcfcaf688>

In [64]:
sg_poi = 'sg:af471021a929414cbf69854e6f8f1b0c'  # white pass

In [66]:
wp_y2019_y2020_df = sg.get_patterns_dataframe([2018, 2019, 2020], safegraph_pois=sg_poi)

wp_y2019_y2020_df

Unnamed: 0,safegraph_place_id,location_name,street_address,city,region,postal_code,safegraph_brand_ids,brands,date_range_start,date_range_end,...,median_dwell,bucketed_dwell_times,related_same_day_brand,related_same_month_brand,popularity_by_hour,popularity_by_day,device_type,placekey,parent_placekey,parent_safegraph_place_id
137646,sg:af471021a929414cbf69854e6f8f1b0c,White Pass Ski Area,48935 U.s. 12,Naches,WA,98937,,,2020-05-01T00:00:00-07:00,2020-06-01T00:00:00-07:00,...,21.0,"{""<5"":3,""5-20"":56,""21-60"":26,""61-240"":27,"">240...","{""Fred Meyer Jewelers"":33,""Chevron"":32}","{""Chevron"":37,""Walmart"":24,""76"":23,""Safeway"":2...","[5,4,3,2,4,4,4,12,21,28,27,26,23,22,26,22,9,11...","{""Monday"":11,""Tuesday"":20,""Wednesday"":15,""Thur...","{""android"":44,""ios"":32}",,,
199898,sg:af471021a929414cbf69854e6f8f1b0c,White Pass Ski Area,48935 U.s. 12,Naches,WA,98937,,,2020-06-01T00:00:00-07:00,2020-07-01T00:00:00-07:00,...,35.0,"{""<5"":1,""5-20"":17,""21-60"":7,""61-240"":18,"">240"":2}",{},"{""Chevron"":34,""Shell Oil"":34,""Safeway Fuel Sta...","[0,1,2,2,2,1,1,10,10,11,11,11,5,12,12,10,2,1,0...","{""Monday"":11,""Tuesday"":7,""Wednesday"":5,""Thursd...","{""android"":13,""ios"":7}",,,
198080,sg:af471021a929414cbf69854e6f8f1b0c,White Pass Ski Area,48935 U.s. 12,Naches,WA,98937,,,2020-07-01T00:00:00-07:00,2020-08-01T00:00:00-07:00,...,24.0,"{""<5"":3,""5-20"":33,""21-60"":22,""61-240"":12,"">240...",{},"{""Chevron"":43,""Walmart"":29,""76"":26,""Shell Oil""...","[2,2,2,2,4,3,1,6,9,8,13,15,25,23,22,15,9,3,3,6...","{""Monday"":10,""Tuesday"":10,""Wednesday"":11,""Thur...","{""android"":38,""ios"":21}",,,
137469,sg:af471021a929414cbf69854e6f8f1b0c,White Pass Ski Area,48935 U.s. 12,Naches,WA,98937,,,2020-08-01T00:00:00-07:00,2020-09-01T00:00:00-07:00,...,22.0,"{""<5"":3,""5-20"":37,""21-60"":24,""61-240"":13,"">240...",{},"{""Chevron"":42,""Shell Oil"":32,""Safeway"":28,""76""...","[2,2,2,2,3,2,4,20,17,20,16,21,22,21,17,16,9,4,...","{""Monday"":19,""Tuesday"":15,""Wednesday"":10,""Thur...","{""android"":39,""ios"":17}",,,
86499,sg:af471021a929414cbf69854e6f8f1b0c,White Pass Ski Area,48935 U.s. 12,Naches,WA,98937,,,2020-09-01T00:00:00-07:00,2020-10-01T00:00:00-07:00,...,52.5,"{""<5"":1,""5-20"":34,""21-60"":32,""61-240"":23,"">240...",{},"{""Costco Wholesale Corp."":31,""76"":29,""Shell Oi...","[13,14,11,12,12,13,21,31,28,28,31,27,28,32,29,...","{""Monday"":21,""Tuesday"":10,""Wednesday"":31,""Thur...","{""android"":31,""ios"":14}",zzy-222@5xd-7jh-f75,,
169339,sg:af471021a929414cbf69854e6f8f1b0c,White Pass Ski Area,48935 U.s. 12,Naches,WA,98937,,,2020-10-01T00:00:00-07:00,2020-11-01T00:00:00-07:00,...,72.0,"{""<5"":1,""5-20"":30,""21-60"":12,""61-240"":25,"">240...",{},"{""Chevron"":27,""76"":24,""Shell Oil"":20,""Starbuck...","[10,9,10,9,8,9,8,22,24,25,26,27,30,28,23,21,15...","{""Monday"":13,""Tuesday"":18,""Wednesday"":19,""Thur...","{""android"":17,""ios"":19}",zzy-222@5xd-7jh-f75,,
233622,sg:af471021a929414cbf69854e6f8f1b0c,White Pass Ski Area,48935 U.s. 12,Naches,WA,98937,,,2020-11-01T00:00:00-07:00,2020-12-01T00:00:00-08:00,...,35.0,"{""<5"":23,""5-10"":111,""11-20"":73,""21-60"":166,""61...",{},"{""Starbucks"":29,""Safeway"":26,""Chevron"":25,""Cos...","[11,11,10,11,7,7,15,50,114,145,154,174,165,143...","{""Monday"":46,""Tuesday"":9,""Wednesday"":18,""Thurs...","{""android"":107,""ios"":176}",zzw-222@5xd-7jh-ffz,,
94681,sg:af471021a929414cbf69854e6f8f1b0c,White Pass Ski Area,48935 U.s. 12,Naches,WA,98937,,,2018-01-01T00:00:00-08:00,2018-02-01T00:00:00-08:00,...,51.0,"{""<5"":26,""5-20"":218,""21-60"":284,""61-240"":286,""...","{""McDonald's"":14,""Fred Meyer"":8,""Fitness 19"":7...","{""Starbucks"":31,""Chevron"":28,""Costco Wholesale...","[80,79,77,76,73,72,82,133,246,264,238,278,297,...","{""Monday"":136,""Tuesday"":52,""Wednesday"":71,""Thu...","{""android"":120,""ios"":248}",,,
128545,sg:af471021a929414cbf69854e6f8f1b0c,White Pass Ski Area,48935 U.s. 12,Naches,WA,98937,,,2018-02-01T00:00:00-08:00,2018-03-01T00:00:00-08:00,...,63.0,"{""<5"":11,""5-20"":213,""21-60"":217,""61-240"":277,""...","{""Comfort Inn"":17,""KeyBank"":11,""Volkswagen"":8,...","{""Chevron"":28,""Safeway"":27,""Starbucks"":26,""Cos...","[115,117,118,116,117,117,115,147,236,278,289,2...","{""Monday"":116,""Tuesday"":75,""Wednesday"":62,""Thu...","{""android"":108,""ios"":261}",,,
119562,sg:af471021a929414cbf69854e6f8f1b0c,White Pass Ski Area,48935 U.s. 12,Naches,WA,98937,,,2018-03-01T00:00:00-08:00,2018-04-01T00:00:00-07:00,...,55.0,"{""<5"":5,""5-20"":200,""21-60"":243,""61-240"":251,"">...",{},"{""Starbucks"":29,""Costco Wholesale Corp."":29,""C...","[66,65,62,62,61,57,61,102,213,232,240,251,269,...","{""Monday"":28,""Tuesday"":48,""Wednesday"":68,""Thur...","{""android"":118,""ios"":207}",,,


In [68]:
wp_y2019_y2020_df.to_csv(dir_raw/'patterns_wp.csv')

# SG BG JSON

In [229]:
geojson_pth = dir_raw/'safegraph_open_census_data'/'geometry'/'cbg.geojson'

geojson_pth

WindowsPath('D:/projects/pdx-parks/data/raw/safegraph_open_census_data/geometry/cbg.geojson')

In [233]:
with open(geojson_pth, 'rb') as geojson_file:
    geojson = json.load(geojson_file)

In [241]:
bg_df = pd.DataFrame([f['properties'] for f in geojson['features']])

In [240]:
from arcgis.geometry import Geometry, SpatialReference
sr = SpatialReference(4326)
sr

{'wkid': 4326}

In [246]:
bg_df['SHAPE'] = [Geometry({"rings" : g['geometry']['coordinates'][0], "spatialReference": sr}) for g in geojson['features']]

In [247]:
bg_df.spatial.set_geometry('SHAPE')

In [248]:
bg_df.spatial.validate()

True

In [249]:
bg_df.head()

Unnamed: 0,StateFIPS,CountyFIPS,TractCode,BlockGroup,CensusBlockGroup,State,County,ClassCode,SHAPE
0,1,81,41600,1,10810416001,AL,Lee County,H1,"{""rings"": [[[-85.37281500011215, 32.6342380002..."
1,1,81,41600,2,10810416002,AL,Lee County,H1,"{""rings"": [[[-85.38346400016343, 32.6483780003..."
2,1,81,41700,4,10810417004,AL,Lee County,H1,"{""rings"": [[[-85.37139200014064, 32.6013869998..."
3,1,73,11107,4,10730111074,AL,Jefferson County,H1,"{""rings"": [[[-86.64796800012306, 33.5920459996..."
4,1,73,11108,4,10730111084,AL,Jefferson County,H1,"{""rings"": [[[-86.65205899992287, 33.5986850000..."


In [250]:
bg_df.spatial.to_featureclass(gdb_raw/'sg_cbg')

'D:\\projects\\pdx-parks\\data\\raw\\raw.gdb\\sg_cbg'

In [311]:
out_df = home_cbgs_df.join(bg_df.set_index('CensusBlockGroup')['SHAPE'])
out_df

Unnamed: 0,visitor_home_count,SHAPE
410510007022,22,"{'rings': [[[-122.59121099978778, 45.490623000..."
410510007021,12,"{'rings': [[[-122.58482500027351, 45.497428000..."
410510006024,10,"{'rings': [[[-122.57903200022663, 45.478235999..."
410510090001,9,"{'rings': [[[-122.53769799976283, 45.490802000..."
410510016022,7,"{'rings': [[[-122.57310700038772, 45.512254000..."
...,...,...
410510091022,4,"{'rings': [[[-122.52356500002469, 45.490155000..."
410050222071,4,"{'rings': [[[-122.53383599973161, 45.427781999..."
410510005024,4,"{'rings': [[[-122.58930099984121, 45.478932999..."
410510072021,4,"{'rings': [[[-122.79113000014934, 45.625901999..."


In [312]:
out_df.spatial.to_featureclass(gdb_int/'test05')

'D:\\projects\\pdx-parks\\data\\interim\\interim.gdb\\test05'