In [1]:
##########==========##########==========##########==========##########==========

## H - Header

#### H1 – libraries

In [2]:
## standard foundational libraries
import numpy  as np
import pandas as pd

## import specific function
from os              import mkdir, listdir
from os.path         import isfile, isdir
from datetime        import datetime, timedelta
from dbfread         import DBF
from geopy.distance  import geodesic
from ipyparallel     import Cluster
from sklearn.cluster import AgglomerativeClustering, DBSCAN, OPTICS

#### H2 – basic automation

In [3]:
## set up standard directories if needed
def make_standard_file_system():
    for i in ['A_Input', 'B_Intermediate', 'C_Output']:
        if not isdir(i): mkdir(i)

## log time elapsed
time_log = dict()
def log_time(the_id = 'End Log'):
    
    ## construct new time stamp
    now_time = str(datetime.now().hour).zfill(2)
    now_time = now_time +':'+ str(datetime.now().minute).zfill(2)
    now_time = now_time +':'+ str(datetime.now().second).zfill(2)

    ## add to time log
    if the_id == 'End Log':
        time_log['End'] = now_time
        print('Time log:')
        for i in time_log.keys():
            print(i.rjust(5) + ':', time_log[i])
    else:
        time_log[the_id] = now_time
        
## toggle cache versus build
def build_or_cache(function, address, permit):
    if permit and isfile(address):
        print('Build/Cache Decision: Cache')
        the_file = pd.read_csv(address, index_col = 0)
    else:
        print('Build/Cache Decision: Build')
        the_file = function()
    return the_file
    
## execute functions
make_standard_file_system()
log_time('H2')

#### H3 – settings

In [4]:
## server mode (switches off data sampling; full distance data too big for a PC)
server_mode = False

## GD settings
set_gd = dict()

## RD settings
set_rd = {'1_cache': True}

## MD settings
set_md = {'1_cache': True, '2_cache': True}

## EMR settings
set_emr = dict()

## PVD settings
set_pvd = dict()

## RV settings
set_rv = dict()

## GD - Gather Data

Primary data source:
+ 2020 TIGER shapefiles from the US Census (the .dbf files)
+ 2020 DP series population summary tables from US Census (as needed)

#### GD1 - read in primary data (census tract shapefile .dbfs)

In [5]:
## read in dbf files for census tracts
def read_tract_dbf(directory):
    
    ## file dbf files in target directory
    dbf_addr = listdir(directory)
    dbf_addr = [i for i in dbf_addr if i[-3::] == 'dbf']
    
    ## read relevant columns from each
    desired_columns = {'GEOID': str,
                       'STATEFP': str, 'COUNTYFP':str, 'TRACTCE':str,
                        'INTPTLAT': float, 'INTPTLON': float, 'ALAND': int}
    dbf_data = []
    for i in dbf_addr:
        i_dbf = pd.DataFrame(iter(DBF(directory + '/' + i)))
        i_dbf = i_dbf[desired_columns.keys()].astype(desired_columns)
        dbf_data.append(i_dbf)
        
    ## compile data into a single file and export
    dbf_data = pd.concat(dbf_data, axis = 0).sort_values('GEOID')
    dbf_data['count'] = 1
    dbf_data = dbf_data.reset_index(drop = True)
    dbf_data.to_csv('B_Intermediate/dbf_data.csv.gz')
    return dbf_data
    
## execute code
dbf_data = read_tract_dbf('A_Input/tracts_dbf')
if not server_mode: # 04 = AZ, 08 = CO, NM = 35, UT = 49
    dbf_data = dbf_data.loc[dbf_data.STATEFP.isin(['49', '08', '04', '35'])]
log_time('GD1')

#### GD2 - Read in secondary data (census DP table columns)

## RD - Refine Data

#### RD1 – calculate and cache geographic distances between tract centroids

In [6]:
## reshape tract coordinate data to input format
tract_xy = list(zip(dbf_data.INTPTLAT.values, dbf_data.INTPTLON.values))

## define function to do distance compuations in parallel
def measure_distance_in_parallel(xy = tract_xy):
    the_iter = list(range(0, len(xy)))
    
    ## define engine function that will run on each parallel process
    def measure_distance_parallel_slice(n, xy_col = xy):
        from geopy.distance import geodesic
        xy_col = xy_col.copy()
        xy_row = xy_col[n]
        xy_dist = []
        for i in xy_col[0:n]: xy_dist.append(0)
        for i in xy_col[n::]:
            xy_dist.append(int(round(geodesic(xy_row, i).miles)))
        return xy_dist

    ## run engine in parallel for each slice of the data
    with Cluster(n = 4) as clust:
        view = clust.load_balanced_view()
        asyncresult = view.map_async(measure_distance_parallel_slice, the_iter)
        asyncresult.wait_interactive()
        result = asyncresult.get()
        
    ## package results and export
    result = np.array(result)
    result = result + result.T
    result = pd.DataFrame(result)
    result.to_csv('B_Intermediate/tract_distance.csv.gz')
    return result

tract_distance = build_or_cache(
    function = measure_distance_in_parallel,
    address = 'B_Intermediate/tract_distance.csv.gz',
    permit = set_rd['1_cache']
    )

log_time('RD1')

Build/Cache Decision: Cache


#### RD2

## MD - Model Data

#### MD0 - build common components

In [7]:
## score model
def score_model(clusters, d):
    score = clusters * np.ones(d.shape)
    mask = (score != -1) & (score.T != -1)
    score = (score == score.T) * mask * np.array(d)
    the_denom = np.sum(d.sum().values)
    return np.round(np.sum(score) / the_denom, 3)

## run model across different parameters
model_name = 'EMPTY'
model_list = dict()
def run_model(dist = tract_distance):
    
    ##  retrieve objects from external environment
    model_dict = model_list
    addr_name = model_name
    
    ## declare container objects for model results
    model_clusters = dict()
    model_stats    = dict()
    
    ## fit model and populate container objects with model performance data
    for i in model_dict.keys():
        model_clusters[i] = model_dict[i].fit_predict(dist)
        model_stats[i] = dict()
        model_stats[i]['name'] = i
        model_stats[i]['score'] = score_model(model_clusters[i], d = dist)
        model_stats[i]['clusters'] = len(set(model_clusters[i]))
        model_stats[i]['outliers'] = sum(model_clusters[i] == -1)
        
    ## package results and export
    model_clusters = pd.DataFrame(model_clusters)
    model_clusters.to_csv('B_Intermediate/Model_' + addr_name + '_Clust.csv.gz')
    model_stats = pd.DataFrame(model_stats).T
    model_stats = model_stats.astype({'clusters':int, 'outliers':int})
    model_stats.to_csv('B_Intermediate/Model_' + addr_name + '_Stats.csv')
    return model_stats

#### MD1 and MD2 - run models

In [8]:
## -- cluster population by agglomeration

## assemble models
model_agglomeration = dict()
for i in range(250 - 150, 250 + 150, 10):
    model_iter = AgglomerativeClustering(
        n_clusters = None, compute_full_tree = True,
        affinity = 'precomputed', linkage = 'average',
        distance_threshold = i)
    model_agglomeration[str(int(i))] = model_iter
    
## execute models
model_name = 'Agglomeration'
model_list = model_agglomeration
agglomeration_score = build_or_cache(
    function = run_model,
    address = 'B_Intermediate/Model_' + model_name + '_Stats.csv',
    permit = set_md['1_cache']
    )

log_time('MD1')

## -- cluster population by DBSCAN

## assemble models
model_dbscan = dict()
for i in range(25 - 15, 25 + 15, 1):
    model_iter = DBSCAN(eps = i, metric = 'precomputed', n_jobs = 8)
    model_dbscan[str(int(i))] = model_iter
    
## execute models
model_name = 'DBSCAN'
model_list = model_dbscan
agglomeration_score = build_or_cache(
    function = run_model,
    address = 'B_Intermediate/Model_' + model_name + '_Stats.csv',
    permit = set_md['2_cache']
    )

log_time('MD2')

Build/Cache Decision: Build
Build/Cache Decision: Build


## EMR - Enrich Model Results

#### EMR1

#### EMR2

## PVD - Prepare Visualization Data

#### PVD1

#### PVD2

## RV - Render Visualization

#### RV1

#### RV2

## F - Footer

In [9]:
log_time()

Time log:
   H2: 14:14:21
  GD1: 14:14:24
  RD1: 14:14:26
  MD1: 14:15:00
  MD2: 14:15:37
  End: 14:15:37


In [10]:
##########==========##########==========##########==========##########==========