# 1. Setup

In [1]:
import sys
sys.path.append('../..')
from mobiquity.names import *
from mobiquity.spark import F, SparkSession

from urllib.error import HTTPError

# 2. Jobs
Using the LEHD LODES dataset: https://lehd.ces.census.gov/data

In [2]:
states = pd.read_csv(DATA / 'us_states.csv').disp()

49 rows x 3 cols; Memory: 0.0 MiB


Unnamed: 0,name,code,fips
,<object>,<object>,<int64>
0.0,Alabama,AL,1


## 2.1. Job counts

In [15]:
def get_tot_jobs(year, fips=states.code.str.lower(), imp_cols=D(
        C000='All', CE01='Low wage', CR01='White', CD01='<HS', CD02='HS'
        ), overwrite=False):
    outpath = DATA / f'access/opport/job_totals_{year}.parquet'
    if (df := U.checkfile(outpath, overwrite)) is not None: return df
    lodes = 'LODES' + ('7' if year <= 2020 else '8')
    root = f'https://lehd.ces.census.gov/data/lodes/{lodes}'
    res = []
    for st in tqdm(fips):
        url = f'{root}/{st}/wac/{st}_wac_S000_JT00_{year}.csv.gz'
        try:
            df = pd.read_csv(url)
        except HTTPError: # for AR & MS, latest data is for 2018
            df = pd.read_csv(url.replace(f'{year}', '2018'))
        except Exception as e:
            print(f'ERROR in state {st}: {e}')
            continue
        df = df.rename(columns={df.columns[0]: 'geoid'}).set_index('geoid')
        df.index = df.index.astype(str).str.zfill(15)
        df = df[list(imp_cols)].rename(columns=imp_cols)
        df['POC'] = df['All'] - df.pop('White')
        df['Low edu'] = df.pop('<HS') + df.pop('HS')
        for scale, nChar in D(County=5, Tract=11, BG=12).items():
            d = (df.set_index(df.index.str[:nChar])
                 .groupby('geoid').sum().reset_index()
                 .melt('geoid', var_name='kind', value_name='n_jobs'))
            res.append(d.assign(scale=scale))
    df = pd.concat(res).query('n_jobs > 0').reset_index(drop=1)
    cols = D(geoid=CAT, scale=CAT, kind=CAT, n_jobs=I32)
    df = df[list(cols)].astype(cols)
    df.to_parquet(U.mkfile(outpath))
    return df

tot_jobs = get_tot_jobs(2021, overwrite=0).disp() # 52s

1,276,237 rows x 4 cols; Memory: 41.4 MiB


Unnamed: 0,geoid,scale,kind,n_jobs
,<category>,<category>,<category>,<int32>
0.0,01001,County,All,12318


## 2.2. Job flows

In [4]:
def get_job_flows(year, fips=states.code.str.lower(),
                  imp_cols=D(S000='All', SE01='Low wage'),
                  overwrite=False):
    outpath = DATA / f'access/opport/job_flows_{year}.parquet'
    if (df := U.checkfile(outpath, overwrite)) is not None: return df
    lodes = 'LODES' + ('7' if year <= 2020 else '8')
    root = f'https://lehd.ces.census.gov/data/lodes/{lodes}'
    res = []
    for st in tqdm(fips):
        df = []
        for table in ['main', 'aux']:
            url = f'{root}/{st}/od/{st}_od_{table}_JT00_{year}.csv.gz'
            try:
                df.append(pd.read_csv(url))
            except HTTPError: # for AR & MS, latest data is for 2018
                df.append(pd.read_csv(url.replace(f'{year}', '2018')))
        df = pd.concat(df)
        df['src'] = df.pop('h_geocode').astype(str).str[:12]
        df['trg'] = df.pop('w_geocode').astype(str).str[:12]
        df = (df.groupby(['src', 'trg'])[list(imp_cols)].sum()
              .astype(I32).reset_index().rename(columns=imp_cols))
        df2 = df.copy()
        df2.insert(0, 'state', st.upper())
        df2.insert(1, 'scale', 'BG')
        for scale, nChar in D(County=5, Tract=11).items():
            d = df.assign(src=df.src.str[:nChar], trg=df.trg.str[:nChar])
            d = d.groupby(['src', 'trg']).sum().astype(I32).reset_index()
            d.insert(0, 'state', st.upper())
            d.insert(1, 'scale', scale)
            df2 = pd.concat([df2, d])
        res.append(df2)
    df = pd.concat(res).reset_index(drop=1)
    df = df.astype(D(src=CAT, trg=CAT, scale=CAT, state=CAT))
    df.to_parquet(U.mkfile(outpath))
    return df

# job_od = get_job_flows(2021, overwrite=0).disp() # 14m48s – 12s

111,852,173 rows x 6 cols; Memory: 1982.5 MiB


Unnamed: 0,state,scale,src,trg,All,Low wage
,<category>,<category>,<category>,<category>,<int32>,<int32>
0.0,AL,BG,100010405011,109503080430,1,1


# 3. POIs

## 3.1. SafeGraph POIs

In [5]:
sg_cats = (U.filt(U.load(DATA / 'poi_categories.csv'), source='SafeGraph')
           .rename(columns=D(category='kind', title='top_category'))).disp()

19 rows x 2 cols; Memory: 0.0 MiB


Unnamed: 0,kind,top_category
,<object>,<object>
0.0,Education,"Colleges, Universities, and Professional Schools"


In [6]:
sg_poi_path = '/home/umni2/a/umnilab/data/safegraph/pois/us/2020-11-06.parquet'

In [7]:
def get_pois(inpath=sg_poi_path, categories=sg_cats, overwrite=False):
    outpath = DATA / 'access/opport/pois_2020.parquet'
    if (df := U.checkfile(outpath, overwrite)) is not None: return df
    poi = (U.load(inpath, columns='placekey top_category region lon lat'.split())
           .merge(categories, 'left', on='top_category')
           .rename(columns=D(region='state')))
    poi = U.pdf2gdf(poi, 'lon', 'lat', CRS_DEG).to_crs(CRS_M)
    zones = U.load(DATA / 'zones/zones_2020.parquet', filters=[
        ('scale', '==', 'BG')], columns=['geoid', 'geometry'])
    poi = poi.sjoin(zones, predicate='within').reset_index(drop=1)
    poi = poi[['placekey', 'kind', 'geoid', 'state']]
    poi = poi.astype(D(kind=CAT, geoid=CAT, state=CAT))
    poi.to_parquet(U.mkfile(outpath))
    return poi

poi = get_pois(overwrite=0).disp() # 1m1s

5,510,037 rows x 4 cols; Memory: 454.2 MiB


Unnamed: 0,placekey,kind,geoid,state
,<object>,<category>,<category>,<category>
0.0,224-222@8t2-d74-syv,,480219507002,TX


### 3.1.1. Count POIs by kind & BG

In [8]:
def agg_pois_by_bg(poi, overwrite=False):
    outpath = DATA / 'access/opport/poi_totals_2020.parquet'
    if (df := U.checkfile(outpath, overwrite)) is not None: return df
    df_all = poi.groupby(['geoid']).size().reset_index().assign(kind='Total')
    by_kind = poi.groupby(['kind', 'geoid']).size().reset_index()
    df = (pd.concat([df_all, by_kind]).rename(columns={0: 'n_pois'})
          .query('n_pois > 0').reset_index(drop=1))
    res = [df.assign(scale='BG')]
    for scale, nchar in D(County=5, Tract=11).items():
        d = df.assign(geoid=df.geoid.str[:nchar])
        d = d.groupby(['geoid', 'kind'])['n_pois'].sum().reset_index()
        res.append(d.assign(scale=scale))
    df = (pd.concat(res).reset_index(drop=1)
          .astype(D(scale=CAT, kind=CAT, geoid=CAT, n_pois=I32))
          [['geoid', 'scale', 'kind', 'n_pois']])
    df.to_parquet(U.mkfile(outpath))
    return df

tot_pois = agg_pois_by_bg(poi, overwrite=0).disp() # 2s

976,813 rows x 4 cols; Memory: 38.2 MiB


Unnamed: 0,geoid,scale,kind,n_pois
,<category>,<category>,<category>,<int32>
0.0,010010202001,BG,Education,1


In [9]:
tot_pois.groupby('kind')['n_pois'].sum().to_frame().T

kind,Education,Groceries,Medical,Social Support,Total
n_pois,593664,455679,1875396,443391,16530111


## 3.2. Visits ODM

In [10]:
# spark = SparkSession()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/03 19:59:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [11]:
poi_visits_path = '/home/umni2/a/umnilab/data/safegraph/patterns/monthly/2021-04-01'

In [12]:
def get_poi_flows(spark, path=poi_visits_path, overwrite=False):
    outpath = DATA / 'access/opport/poi_visits_2021.parquet'
    if (df := U.checkfile(outpath, overwrite)) is not None: return df
    poi = spark.read_parquet(DATA / 'access/opport/pois_2020.parquet')
    vis = spark.read_parquet(path).select(
        'placekey', F.col('visitor_home_cbgs').alias('od'))
    od = vis.select('placekey', F.explode('od').alias('src', 'n'))
    od = od.join(poi.withColumnRenamed('geoid', 'trg'), on='placekey')
    total = (od.groupBy('src', 'trg').agg(F.sum('n').alias('n'))
             .withColumn('kind', F.lit('All')))
    byKind = od.groupBy('src', 'trg', 'kind').agg(F.sum('n').alias('n'))
    od = total.select('src', 'trg', 'kind', 'n').union(byKind)
    od = od.toPandas().rename(columns=D(n='visitors'))
    od = od.astype(D(state=CAT, src=CAT, trg=CAT, kind=CAT, visitors=I32))
    od.to_parquet(U.mkfile(outpath), compression='gzip')
    return od

# poiOD = get_poi_flows(spark, overwrite=0).disp() # 4m47s

74,658,050 rows x 5 cols; Memory: 1042.3 MiB


Unnamed: 0,state,src,trg,kind,visitors
,<category>,<category>,<category>,<category>,<int32>
0.0,WA,530630044004,530630102041,All,4


# 4. Combine

In [20]:
opport = pd.concat([
    tot_jobs.rename(columns=D(n_jobs='opport')).assign(purpose='Work'),
    tot_pois.rename(columns=D(n_pois='opport')).assign(purpose='Non-work')
]).reset_index(drop=1)[['geoid', 'scale', 'purpose', 'kind', 'opport']]
opport = opport.astype(D(geoid=CAT, scale=CAT, purpose=CAT, kind=CAT)).disp()

2,253,050 rows x 5 cols; Memory: 52.9 MiB


Unnamed: 0,geoid,scale,purpose,kind,opport
,<category>,<category>,<category>,<category>,<int32>
0.0,01001,County,Work,All,12318


In [21]:
opport.to_parquet(DATA / 'access/opport/opportunities.parquet')