In [1]:
import importlib
import os
from pathlib import Path
import sys

from arcgis.features import GeoAccessor, GeoSeriesAccessor, FeatureSet
from arcgis.gis import GIS
from dotenv import load_dotenv, find_dotenv
import pandas as pd

In [2]:
# paths to common data locations - NOTE: to convert any path to a raw string, simply use str(path_instance)
dir_prj = Path.cwd().parent.parent

# import the project package from the project package path - only necessary if you are not using a unique environemnt for this project
sys.path.append(str(dir_prj/'src'))
import modeling

# load the "autoreload" extension so that code can change, & always reload modules so that as you change code in src, it gets loaded
%load_ext autoreload
%autoreload 2

# load environment variables from .env
load_dotenv(find_dotenv())

True

In [None]:
# create a GIS object instance; if you did not enter any information here, it defaults to anonymous access to ArcGIS Online
gis_portal = GIS(
    url=os.getenv('ESRI_PORTAL_URL'), 
    username=os.getenv('ESRI_PORTAL_USERNAME'),
    password=None if len(os.getenv('ESRI_PORTAL_PASSWORD')) is 0 else os.getenv('ESRI_PORTAL_PASSWORD')
)

gis_agol = GIS(
    url=os.getenv('ESRI_GIS_URL'), 
    username=os.getenv('ESRI_GIS_USERNAME'),
    password=None if len(os.getenv('ESRI_GIS_PASSWORD')) is 0 else os.getenv('ESRI_GIS_PASSWORD')
)

In [3]:
search_string = 'ace hardware'
code_naics = '44413005'
exclude_headquarters = True

In [235]:
from typing import Union
from warnings import warn

from arcgis.geometry import SpatialReference
from arcgis.gis import GIS
from modeling import Country, get_countries

def validate_spatial_reference_gis(spatial_reference:Union[str, int, dict, SpatialReference]):
    if isinstance(spatial_reference, dict):
        assert 'wkid' in spatial_reference.keys(), 'If providing spatial reference as a dictionary, it must conform '\
                                                   'look like {"wkid": <WKID>}, such as {"wkid": 4326}.'
        sr = spatial_reference
    elif isinstance(spatial_reference, str):
        assert spatial_reference.isdecimal(), 'If providing a string to identify a spatial reference, it must be a ' \
                                              'WKID for your desired spatial reference.'
        sr = {'wkid': spatial_reference}
    elif isinstance(spatial_reference, int):
        sr = {'wkid': spatial_reference}
    elif isinstance(spatial_reference, SpatialReference):
        sr = SpatialReference
    else:
        raise Exception('The spatial reference must be either a string or integer specify the WKID, a dictionary '\
                        'specifying the WKID such as {"wkid": 4326}, or a SpatialReference object.')
    return sr

def get_businesses_gis(area_of_interest:pd.DataFrame, gis:GIS, search_string:str=None, code_naics:Union[str, list]=None,
                       code_sic:Union[str, list]=None, exclude_headquarters:bool=True, country:Country=None, 
                       output_spatial_reference:Union[str, int, dict, SpatialReference]={'wkid': 4326}) -> pd.DataFrame:

    # list of fields to lighten the output payload
    out_flds = ['LOCNUM', 'CONAME', 'NAICSDESC', 'NAICS', 'SIC', 'SOURCE', 'PUBPRV','FRNCOD', 'ISCODE', 'CITY', 'ZIP', 
                'STATE', 'HDBRCHDESC']
    
    # begin to build up the request parameter payload
    params = {
        'f': 'json',
        'returngeometry': True,
        'outsr': validate_spatial_reference_gis(spatial_reference),
        'fields': out_flds
    }
    
    # make sure a country is somehow explicitly specified
    if '_cntry' in area_of_interest.attrs:
        params['sourcecountry'] = area_of_interest.attrs['_cntry'].iso2
        
    # if a country object was explicitly passed in - easy peasy lemon squeezy
    elif country is not None and isinstance(country, str):
        params['sourcecountry'] = country.iso2
    
    # if there is not a country to work with, bingo out
    else:
        raise Exception('Either the input dataframe must have been created using the modeling ' \
                        'module to retrieve standard geographies, or a country ojbect must be ' \
                        'explicitly specified in the input parameters.')
    
    # make sure some sort of filter is being applied
    assert (search_string is not None) or (code_naics is not None), 'You must provide either a search string or ' \
                                                                    'NAICS code or list of codes to search for ' \
                                                                    'businesses.' 

    # populate the rest of the search parameters
    params['searchstring'] = search_string
    params['businesstypefilters'] = [
        {'Classification': 'NAICS','Codes':[code_naics]},
        {'Classification': 'SIC', 'Codes': [code_sic]}
    ]
    
    # if the input Spatially Enabled DataFrame was created using the modeling module to get standard geographies
    if 'parent_geo' in area_of_interest.attrs.keys():
        params['spatialfilter'] = {
            "Boundaries": {
                "StdLayer": {
                    "ID":area_of_interest.attrs['parent_geo']['resource'],
                    "GeographyIDs":area_of_interest.attrs['parent_geo']['id']
                }
            }
        }
        
    # if just a normal spatially enabled dataframe, we can still use the geometry
    else:
        params['spatialfilter'] = {
            "Boundaries": {
                "recordSet": df.spatial.to_featureset().to_dict()
            }
        }

    # retrieve the businesses from the REST endpoint
    url = f'{gis.properties.helperServices.geoenrichment.url}/SelectBusinesses'
    r_json = gis._con.post(url, params=params)
    
    # ensure a valid result is received
    if 'error' in r_json.keys():
        err = r_json['error']
        raise Exception(f'Error in searching using Business Analyst SelectBusinesses REST endpoint. Error Code '
                        f'{err["code"]}: {err["message"]}')
        
    else:
        
        # plow through all the messages to see if there are any errors
        err_msg_lst =[]
        for val in r_json['messages']:
            if 'description' in val.keys():
                if 'error' in val['description'].lower():
                    err_msg_lst.append(val)
        
        # if an error message is found
        if len(err_msg_lst):
            err = err_msg_lst[0]
            raise Exception(f'Server error encoutered in searching using Business Analyst SelectBusinesses REST endpoint. '
                            f'Error ID: {err["id"]}, Type: {err["type"]}, Description: {err["description"]}')
        
    # extract the feature list out of the json response
    feature_lst = r_json['results'][0]['value']['features']
    
    # make sure something was found
    if len(feature_lst) == 0:
        warn('Although the request was valid and no errors were encountered, no businesses were found.')
    
    # convert the features to a Spatially Enabled Pandas DataFrame
    res_df = FeatureSet(feature_lst).sdf
    
    # reorganize the schema a little
    cols = [c for c in out_flds if c in res_df.columns] + ['SHAPE']
    res_df = res_df[cols]
    
    # if not wanting to keep headquarters, normally the case for forecasting modeling, filter them out
    if exclude_headquarters:
        res_df = res_df[~res_df['HDBRCHDESC'].str.lower().str.match('headquarters')].reset_index(drop=True)
        
    # drop the headquarters or branch column since only used to filter if necessary
    res_df.drop(columns='HDBRCHDESC', inplace=True)
        
    return res_df

In [23]:
gis_portal = GIS(
    url=os.getenv('ESRI_PORTAL_URL'), 
    username=os.getenv('ESRI_PORTAL_USERNAME'),
    password=os.getenv('ESRI_PORTAL_PASSWORD')
)

usa = modeling.Country('USA', gis_portal)

usa

<modeling.Country - USA (GIS at https://geoai-ent.bd.esri.com/portal/ logged in as jmccune)>

In [50]:
aoi_df = usa.cbsas.get('seattle', return_geometry=True)

aoi_df

Unnamed: 0,ID,NAME,SHAPE
0,42660,"Seattle-Tacoma-Bellevue, WA Metropolitan Stati...","{""rings"": [[[-121.68612849977673, 48.298988999..."


In [51]:
brnd_df = usa.business.get_by_name('ace hardware', aoi_df)

brnd_df.head()

Unnamed: 0,LOCNUM,CONAME,NAICSDESC,NAICS,SIC,SOURCE,PUBPRV,FRNCOD,ISCODE,CITY,ZIP,STATE,SHAPE,id,brand_name,brand_name_category
0,174746248,MAGNOLIA ACE HARDWARE,HARDWARE-RETAIL,44413005,525104,INFOGROUP,,1,,SEATTLE,98199,WA,"{""x"": -122.39802, ""y"": 47.6402850000001, ""spat...",174746248,MAGNOLIA ACE HARDWARE,MAGNOLIA ACE HARDWARE
1,174841932,ACE HARDWARE,HARDWARE-RETAIL,44413005,525104,INFOGROUP,,1,,LAKE STEVENS,98258,WA,"{""x"": -122.1072885, ""y"": 47.998143, ""spatialRe...",174841932,ACE HARDWARE,ACE HARDWARE
2,174852467,CARRS ACE HARDWARE,HARDWARE-RETAIL,44413005,525104,INFOGROUP,,1,,MARYSVILLE,98270,WA,"{""x"": -122.1761745, ""y"": 48.05064, ""spatialRef...",174852467,CARRS ACE HARDWARE,CARRS ACE HARDWARE
3,216082099,SOUTH END ACE HARDWARE,HARDWARE-RETAIL,44413005,525104,INFOGROUP,,1,,SPANAWAY,98387,WA,"{""x"": -122.4344385, ""y"": 47.087136, ""spatialRe...",216082099,SOUTH END ACE HARDWARE,SOUTH END ACE HARDWARE
4,251373601,CEDAR PLAZA ACE HARDWARE,HARDWARE-RETAIL,44413005,525104,INFOGROUP,,1,,MOUNTLAKE TER,98043,WA,"{""x"": -122.2900785, ""y"": 47.7909450000001, ""sp...",251373601,CEDAR PLAZA ACE HARDWARE,CEDAR PLAZA ACE HARDWARE


In [53]:
self = usa.business
brand_businesses = brnd_df
area_of_interest = aoi_df
name_column = 'CONAME'
code_column = 'NAICS'
id_column = 'LOCNUM'
local_threshold = 0

In [54]:
from modeling.businesses import get_top_codes

['44413005']