In [1]:
%load_ext autoreload
%autoreload 2

import seaborn as sns
import metapack as mp
import pandas as pd
import geopandas as gpd
import numpy as np
import matplotlib.pyplot as plt
from IPython.display import display 
from tqdm.notebook import tqdm 

%matplotlib inline
sns.set_context('notebook')
mp.jupyter.init()

from demosearch import *
from pathlib import Path


central_cache = FileCache('/Volumes/SSD_Extern/radius')
rm = RasterManager(central_cache)

In [2]:
pkg = mp.jupyter.open_source_package()
pkg.set_sys_path()
import  pylib 

def get_cache(pkg):
    return FileCache(Path(pkg.path).parent.joinpath('cache'))

pkg_root = Path(pkg.path).parent

cache = get_cache(pkg)
cache
 

<FileCache: /Volumes/External/data/radius-collection/civicknowledge.com-percentile-demosearch/cache>

In [3]:
bpkg = mp.open_package('http://radius.civicknowledge.com.s3.amazonaws.com/businesslistdatabase.com-us_business-1.1.1.csv')
bpkg


In [4]:
%%time
# Link the businesses to CBSAs, then save the whole file
k_src = 'business_scores/combined_source'
if cache.exists(k_src):
    df = cache.get_df(k_src)
else:
    cbsa = bpkg.reference('cbsa').geoframe()
    bc = bpkg.resource('businesses_cat').geoframe()
    t = gpd.sjoin(bc, cbsa.to_crs(4326))
    df = t[~t.geoid.isnull()]
    cache.put_df(k_src, df)
    
dfs = df.sample(500_000)
   
print(len(dfs))
dfs.head() 

500000
CPU times: user 10.8 s, sys: 800 ms, total: 11.6 s
Wall time: 15.5 s


Unnamed: 0,naics,group,geometry,index_right,csafp,cbsafp,geoid,name,namelsad,lsad,memi,mtfcc,aland,awater,intptlat,intptlon
1809198,722511,ent,POINT (-88.08044 30.69545),575,380,33660,31000US33660,"Mobile, AL","Mobile, AL Metro Area",M1,1,G3110,5981777088,1095671201,30.9715095,-88.20787
511392,441110,auto,POINT (-76.52514 38.96641),13,548,12580,31000US12580,"Baltimore-Columbia-Towson, MD","Baltimore-Columbia-Towson, MD Metro Area",M1,1,G3110,6737880300,1304105561,39.304361,-76.5495009
510546,441110,auto,POINT (-76.84758 39.10006),908,548,47900,31000US47900,"Washington-Arlington-Alexandria, DC-VA-MD-WV","Washington-Arlington-Alexandria, DC-VA-MD-WV M...",M1,1,G3110,17010228389,1128156605,38.7982054,-77.4868816
748735,722511,ent,POINT (-78.37332 33.98161),603,396,34820,31000US34820,"Myrtle Beach-Conway-North Myrtle Beach, SC-NC","Myrtle Beach-Conway-North Myrtle Beach, SC-NC ...",M1,1,G3110,5136952008,831075921,33.9690951,-78.6127235
2218349,722511,ent,POINT (-72.94685 41.28156),614,408,35300,31000US35300,"New Haven-Milford, CT","New Haven-Milford, CT Metro Area",M1,1,G3110,1565143226,667604268,41.3497185,-72.9002033


In [5]:
drop_cols = ['agg_hh_income','female','male',
             'over25_college', # corr to agg_income
             'over25_high_school', # corr to total_pop, seniors
             'over25_high_school', # corr to total_pop, seniors
             'housing_rented_college', # Agg_income
             'households', # corr to  total_population
             'households_unmaried', # agg_income, housing_rented_college
             'geometry'
            ]

demo_cols = ['total_population', 'male', 'female', 'over25_college',
       'over25_high_school', 'seniors', 'households', 'households_unmaried',
       'housing_owned_college', 'housing_rented_college', 'agg_income',
       'agg_hh_income']


osm_cols = ['primary', 'secondary', 'tertiary', 'trunk', 'highway',
       'entertain', 'restaurant', 'casual', 'shop', 'bar', 'cafe', 'active',
       'travel', 'food']

cat_cols = set(df.columns)-set(demo_cols)-set(osm_cols)

# High-performing demographics columns
hp_cols = ['seniors',
 'households',
 'housing_owned_college',
 'households_unmaried',
 'housing_rented_college',
 'over25_college',
 'agg_hh_income',
 'male']




In [6]:
%%time

# Break the saved, linked file into chunks and produce tasks for
# the mp run. 

k_src = 'business_scores/source/{idx}'
k_score = 'business_scores/scores/{idx}'

tasks = []
chunk_size = 100 #len(s)
rng = list(range(0, len(dfs), chunk_size))
for i in tqdm(rng):
    chunk = dfs.iloc[i:i+chunk_size]
    k = k_src.format(idx=i)
    cache.put_df(k, chunk)
    tasks.append((cache, central_cache, i, k))
    
cache.get_df('business_scores/source/96600.pkl').head()

  0%|          | 0/5000 [00:00<?, ?it/s]

CPU times: user 13.6 s, sys: 2.3 s, total: 15.9 s
Wall time: 16.4 s


Unnamed: 0,naics,group,geometry,index_right,csafp,cbsafp,geoid,name,namelsad,lsad,memi,mtfcc,aland,awater,intptlat,intptlon
1404967,448120,shop,POINT (-95.47161 29.66583),400,288.0,26420,31000US26420,"Houston-The Woodlands-Sugar Land, TX","Houston-The Woodlands-Sugar Land, TX Metro Area",M1,1,G3110,21415829912,3043591049,29.7495926,-95.3536422
785680,445230,food,POINT (-78.22348 35.94418),715,450.0,39580,31000US39580,"Raleigh-Cary, NC","Raleigh-Cary, NC Metro Area",M1,1,G3110,5486646049,74650629,35.7567464,-78.4604412
316534,722511,ent,POINT (-89.60120 40.64522),675,,37900,31000US37900,"Peoria, IL","Peoria, IL Metro Area",M1,1,G3110,8633293349,174445163,40.7088872,-89.6974658
1227723,442110,shop,POINT (-80.45070 41.25268),335,566.0,49660,31000US49660,"Youngstown-Warren-Boardman, OH-PA","Youngstown-Warren-Boardman, OH-PA Metro Area",M1,1,G3110,4408441873,109322747,41.23646,-80.5617836
946493,448140,shop,POINT (-76.00365 42.09414),43,,13780,31000US13780,"Binghamton, NY","Binghamton, NY Metro Area",M1,1,G3110,3171194833,36154556,42.1639276,-76.0229431


In [12]:
# Functions for Multi-processor run

layers = ['total_population','male','female','over25_college','over25_high_school','seniors',
          'households','households_unmaried','housing_owned_college','housing_rented_college',
          'agg_income','agg_hh_income',
          'primary', 'secondary','tertiary', 'trunk', 'highway', 
          'entertain', 'restaurant', 'casual', 'shop','bar','cafe','active','travel', 'food']

def _f(local_cache, central_cache, idx, inp):
    from demosearch.exceptions import OutofBoundsError
    from demosearch import FileCache, RasterManager
    
    rm = RasterManager(central_cache)
    
    k = k_score.format(idx=idx)

    if local_cache.exists(k):
        return k
    
    s = local_cache.get_df(inp)

    rows = []
    for idx, row in s.iterrows():

        try:
            p = rm.patches( (row.geometry.y, row.geometry.x), layers=layers)
            e = p.score()[0]
           
            e['cbsa'] = row.geoid
            e['naics'] = row.naics
            e['group'] = row.group
            e['geometry'] = row.geometry
           
            rows.append(e)
        
        except OutofBoundsError as err:
            pass
        except Exception as err:
            print(err)
            raise
            pass # Ingore errors
        
    r = gpd.GeoDataFrame(rows)
    
    local_cache.put_df(k, r)
    
    return k
 
# Check if the function will pickle
import cloudpickle
pl = cloudpickle.dumps([_f, tasks])

# Test run to check that the function works. 
#task_no = 11
#print(tasks[task_no])
#cache.delete(tasks[task_no][3].replace('/source/','/scores/'))
#%time k = _f(*tasks[task_no])
#s = cache.get_df(k)
#s

In [13]:
def run_mp(f, tasks, desc='', n_cpu=None):
    """Run a function in multiple processes and return the results as a list
    displays a progress bar"""
    from joblib import Parallel, delayed
    from multiprocessing import cpu_count

    if n_cpu is None:
        n_cpu = cpu_count() - 2

    return Parallel(n_jobs=n_cpu)(delayed(f)(*t) for t in tqdm(tasks, desc=desc))

import appnope
with appnope.nope_scope():

    r = run_mp(_f, tasks)


  0%|          | 0/5000 [00:00<?, ?it/s]



In [14]:
len(r)

5000