# 1. Setup

In [1]:
from spark import *
from utils import *

# 2. SafeGraph POIs

In [2]:
SG_ROOT = Path('/home/umni2/a/umnilab/data/safegraph')

In [3]:
with open(DATA / 'opport/poi_categories.yml', 'r') as f:
    cats = yaml.safe_load(f)['SafeGraph']
cats = Pdf(sum([[D(kind=k, top_category=x) for x in v] 
                for k, v in cats.items()], [])).disp()

19 rows x 2 cols; Memory: 0.0 MiB


Unnamed: 0,kind,top_category
,<object>,<object>
0.0,Education,"Colleges, Universities, and Professional Schools"


## 2.1. POI data

In [4]:
def get_pois(categories=cats, overwrite=False):
    outpath = DATA / 'opport/pois_2020.parquet'
    if (df := file_check(outpath, overwrite)) is not None: return df
    inpath = SG_ROOT / 'pois/us/2020-11-06.parquet'
    poi = (pd.read_parquet(inpath, columns=[
        'placekey', 'top_category', 'region', 'lon', 'lat'])
           .merge(categories, 'left', on='top_category')
           .drop(columns='top_category')
           .rename(columns=D(region='state')))
    poi = pdf2gdf(poi, 'lon', 'lat', CRS_DEG).to_crs(CRS_M)
    zones = gpd.read_parquet(DATA / 'zones/zones_2020.parquet', filters=[
        ('scale', '==', 'BG')], columns=['geoid', 'geometry'])
    poi = poi.sjoin(zones, predicate='within').reset_index(drop=1)
    poi = poi[['placekey', 'kind', 'geoid', 'state']]
    poi = poi.astype(D(kind=CAT, state=CAT))
    poi.to_parquet(mkfile(outpath))
    return poi

poi = get_pois(overwrite=0).disp() # 50s

5,510,037 rows x 4 cols; Memory: 772.5 MiB


Unnamed: 0,placekey,kind,geoid,state
,<object>,<category>,<object>,<category>
0.0,224-222@8t2-d74-syv,,480219507002,TX


### 2.1.1. Count POIs by kind & BG

In [5]:
totPois = (pd.concat([
    poi.groupby(['kind', 'state', 'geoid']).size().reset_index(),
    poi.groupby(['state', 'geoid']).size().reset_index().assign(kind='Total')])
           .rename(columns={0: 'nPois'}).query('nPois > 0').reset_index(drop=1)
           .astype(D(state=CAT, kind=CAT, nPois=I32))).disp()
totPois.to_parquet(DATA / 'opport/poi_totals_2020.parquet') # 12s

631,861 rows x 4 cols; Memory: 45.2 MiB


Unnamed: 0,kind,state,geoid,nPois
,<category>,<category>,<object>,<int32>
0.0,Education,AL,010010202001,1


In [6]:
totPois.groupby('kind')['nPois'].sum().to_frame().T

kind,Education,Groceries,Medical,Social Support,Total
nPois,197888,151893,625132,147797,5510037


## 2.2. Visits OD matrix

In [7]:
SP = Spark()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/17 09:06:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/03/17 09:06:06 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).


### 2.2.1. POI-level

In [10]:
def get_poi_flows(overwrite=False):
    outpath = DATA / 'opport/poi_visits_2021.parquet'
    if (df := file_check(outpath, overwrite)) is not None: return df
    poi = SP.read_parquet(DATA / 'opport/pois_2020.parquet')
    vis = (SP.read_parquet(SG_ROOT / 'patterns/monthly/2021-04-01')
           .select('placekey', F.col('visitor_home_cbgs').alias('od')))
    od = vis.select('placekey', F.explode('od').alias('src', 'n'))
    od = od.join(poi.withColumnRenamed('geoid', 'trg'), on='placekey')
    total = (od.groupBy('state', 'src', 'trg').agg(F.sum('n').alias('n'))
             .withColumn('kind', F.lit('All')))
    byKind = (od.groupBy('state', 'src', 'trg', 'kind')
              .agg(F.sum('n').alias('n')))
    od = total.select('state', 'src', 'trg', 'kind', 'n').union(byKind)
    od = od.toPandas().rename(columns=D(n='visitors'))
    od = od.astype(D(state=CAT, src=CAT, trg=CAT, kind=CAT, visitors=I32))
    od.to_parquet(mkfile(outpath), compression='gzip')
    return od

poiOD = get_poi_flows(overwrite=0).disp() # 4m47s

74,658,050 rows x 5 cols; Memory: 1042.3 MiB


Unnamed: 0,state,src,trg,kind,visitors
,<category>,<category>,<category>,<category>,<int32>
0.0,WA,530630044004,530630102041,All,4


In [11]:
poiOD.groupby('kind')['visitors'].sum()

kind
All               408933330
Education          12419148
Groceries          25945766
Medical            15701996
Social Support      3515130
Name: visitors, dtype: int32