In [1]:
import dask.dataframe as dd
import pandas as pd
import numpy as np
import dask

from dask.distributed import Client
from datetime import datetime

# Reduce the number of workers, because we need lots of RAM for loading the CBG shapefile
# client = Client(n_workers=1)

# CADES configuration
from dask_jobqueue import SLURMCluster
# Be careful with using too many process, or you'll run out of file descriptors
cluster = SLURMCluster(project='birthright', queue='high_mem_cd', cores=32, memory='300 GB', processes=2, walltime="4:00:00", job_extra=["-N 1"], interface="ib0")
cluster.scale(jobs=10)
client = Client(cluster)

In [2]:
def to_date(d):
    if ":" == d[-3:-2]:
        d = d[:-3]+d[-2:]
    return d

In [3]:
#For playing with one week at a time
# patterns = dd.read_csv("data/safegraph/weekly-patterns/2020-04-12-weekly-patterns.csv", dtype={'naics_code': 'float64'}).set_index('safegraph_place_id')
# Load the entire dataset
patterns = dd.read_csv("data/safegraph/weekly-patterns/2020-04*.csv", dtype={'naics_code': 'float64'}).set_index('safegraph_place_id')
patterns['date'] = patterns['date_range_start'].apply(lambda x: datetime.strptime(to_date(x), '%Y-%m-%dT%H:%M:%S%z').date(), meta=('date_range_start', 'object'))
patterns['naics_4'] = patterns['naics_code'].apply(lambda x: str(x)[0:4], meta=('naics_code', 'str'))


In [4]:
# df = test
# df = df[['date','visitor_home_cbgs', 'visits_by_day','naics_4']]
# df = df.visitor_home_cbgs\
#     .str.translate(str\
#     .maketrans({'{':'', '}':'','"':''}))\
#     .str.split(',')\
#     .apply(pd.Series, 1)\
#     .stack() \
#     .reset_index(level=1, drop=True) \
#     .to_frame('cbgs') \
#     .merge(df, left_index=True, right_index=True)\
#     .drop(['visitor_home_cbgs'], axis=1) \
#     .rename(columns={0:'cbgs',1:'party'}) \
#     .dropna()

# df = df[df.cbgs != ""]
# df['party'] = df.cbgs.str.split(':').apply(lambda x: x[-1]).astype(np.int32)
# df['cbgs'] = df.cbgs.str.split(':').apply(lambda x: x[0]).astype('str')

# df = df.visits_by_day \
#     .str.translate(str\
#     .maketrans({'[':'', ']':'','"':''}))\
#     .str.split(',')\
#     .apply(pd.Series, 1) \
#     .stack() \
#     .to_frame('day_visit') \
#     .astype(np.int32) \
#     .reset_index(level=1) \
#     .rename(columns={'level_1': 'day_offset'}) \
#     .merge(df, on='safegraph_place_id')

# df.date = (df.date.values.astype('M8[D]')+ df.day_offset.values * np.timedelta64(1, 'D')).astype('M8[D]')
# df['total_visits'] = df.party * df.day_visit
# df = df.drop(['day_offset', 'vis'], axis=1)
# df.head()


In [5]:
poi = dd.read_csv("data/safegraph/core/core_poi*.csv").set_index('safegraph_place_id')
poi = poi[['latitude', 'longitude']]

In [6]:
def compute_location_cbg(df):
    import geopandas as gpd
    from shapely.geometry import Point
    
    localdf = df[['latitude', 'longitude']].copy()
    
    cbgs = gpd.read_file("data/reference/census/block_groups.shp")
    cbgs.crs = 'epsg:4269'
    cbgs = cbgs.to_crs('epsg:4326')
    local_gdf = gpd.GeoDataFrame(localdf, crs="epsg:4326",\
                                       geometry=[Point(xy) for xy in \
                                                zip(localdf['longitude'], localdf['latitude'])])
    local_gdf = gpd.sjoin(local_gdf, cbgs, how='left', op='within')
    
    return local_gdf.GEOID.rename('location_cbg')
poi['location_cbg'] = poi.map_partitions(compute_location_cbg, meta=('location_cbg', 'str'))

In [7]:
df = patterns
# df = df[(df.region == 'GA') | (df.region == 'CA')]
# df = df.reset_index()

In [8]:
def strings_to_int(lists):
    ints = []
    for x in lists:
        try:
            ints.append(int(x))
        except:
            ints.append(0)
    return np.array(ints).astype(np.int32)

def get_cbgs(df):   
    df = df[['date','visitor_home_cbgs', 'visits_by_day','naics_4']]
    df = df.visitor_home_cbgs\
        .str.translate(str\
        .maketrans({'{':'', '}':'','"':''}))\
        .str.split(',')\
        .apply(pd.Series, 1)\
        .stack() \
        .reset_index(level=1, drop=True) \
        .to_frame('cbgs') \
        .merge(df, left_index=True, right_index=True)\
        .drop(['visitor_home_cbgs'], axis=1) \
        .rename(columns={0:'cbgs',1:'party'}) \
        .dropna()

    df = df[df.cbgs != ""]
    df['party'] = df.cbgs.str.split(':').apply(lambda x: x[-1]).astype(np.int32)
    df['cbgs'] = df.cbgs.str.split(':').apply(lambda x: x[0]).astype('str')

    df = df.visits_by_day \
        .str.translate(str\
        .maketrans({'[':'', ']':'','"':''}))\
        .str.split(',')\
        .apply(pd.Series, 1) \
        .stack() \
        .to_frame('day_visit') \
        .astype(np.int32) \
        .reset_index(level=1) \
        .rename(columns={'level_1': 'day_offset'}) \
        .merge(df, on='safegraph_place_id')

    df.date = (df.date.values.astype('M8[D]')+ df.day_offset.values * np.timedelta64(1, 'D')).astype('M8[D]')
    df['total_visits'] = df.party * df.day_visit
    df = df.drop(['day_offset', 'visits_by_day'], axis=1)

    return(df)

df = df.map_partitions(get_cbgs, meta={'day_visit': 'int32', 'cbgs': 'str', 'date': 'object', 'naics_4': 'int32', 'party': 'int32', 'total_visits': 'int32'})

In [9]:
df = df.join(poi, how='left')

In [10]:
def compute_cbg_distances(df):
    
    def loc_and_distance(x):
        return x.geometry.centroid.distance(Point(x.longitude, x.latitude)) if x.longitude and x.latitude and x.geometry else 0.0
    
    import geopandas as gpd
    from shapely.geometry import Point
    
    cbgs = gpd.read_file("data/reference/census/block_groups.shp")
    cbgs.crs = 'epsg:4269'
    cbgs = cbgs.to_crs('epsg:4326')
    df.cbgs = df.cbgs.fillna("")
    
    df = df.merge(cbgs[['GEOID', 'geometry']], left_on='cbgs', right_on='GEOID', how='left').drop(['GEOID'], axis=1)
    
    df['distance'] = df.apply(loc_and_distance, axis=1)
    
    df = df.drop(['geometry'], axis=1)
    
    print(df)
    return df

df = df.map_partitions(compute_cbg_distances, meta={'day_visit': 'int32', 'cbgs': 'str', 'date': 'object', 'naics_4': 'int32', 'party': 'int32', 'total_visits': 'int32', 'latitude': 'float32', 'longitude': 'float32', 'location_cbg': 'str', 'distance': 'float64'})

In [11]:
%time df.to_parquet("data/output/SG-April-distance-matrix.parquet")

tornado.application - ERROR - Uncaught exception GET /status/ws (127.0.0.1)
HTTPServerRequest(protocol='http', host='localhost:8787', method='GET', uri='/status/ws', version='HTTP/1.1', remote_ip='127.0.0.1')
Traceback (most recent call last):
  File "/Users/raac/Development/covid/mobility-analysis/venv/lib/python3.7/site-packages/tornado/websocket.py", line 956, in _accept_connection
    open_result = handler.open(*handler.open_args, **handler.open_kwargs)
  File "/Users/raac/Development/covid/mobility-analysis/venv/lib/python3.7/site-packages/bokeh/server/views/ws.py", line 135, in open
    raise ProtocolError("Token is expired.")
bokeh.protocol.exceptions.ProtocolError: Token is expired.


Unnamed: 0,day_visit,cbgs,date,naics_4,party,total_visits,latitude,longitude,location_cbg,distance
0,1,60830020083,2020-04-12,7225,4,4,34.950098,-120.43797,60830024031,0.058874
1,1,60830020054,2020-04-12,7225,4,4,34.950098,-120.43797,60830024031,0.089068
2,1,60830024022,2020-04-12,7225,4,4,34.950098,-120.43797,60830024031,0.034811
3,1,60830020083,2020-04-13,7225,4,4,34.950098,-120.43797,60830024031,0.058874
4,1,60830020054,2020-04-13,7225,4,4,34.950098,-120.43797,60830024031,0.089068


In [12]:
df['weighted_distance'] = df.total_visits * df.distance

In [13]:
# split out the countyfips for the origin and destination cbg
df['origin_fips'] = df.cbgs.str[0:5]
df['dest_fips'] = df.location_cbg.str[0:5]
df.groupby(['origin_fips', 'dest_fips', 'date']).weighted_distance.sum()
df.head()

Unnamed: 0,day_visit,cbgs,date,naics_4,party,total_visits,latitude,longitude,location_cbg,distance,weighted_distance,origin_fips,dest_fips
0,1,60830020083,2020-04-12,7225,4,4,34.950098,-120.43797,60830024031,0.058874,0.235496,6083,6083
1,1,60830020054,2020-04-12,7225,4,4,34.950098,-120.43797,60830024031,0.089068,0.356272,6083,6083
2,1,60830024022,2020-04-12,7225,4,4,34.950098,-120.43797,60830024031,0.034811,0.139245,6083,6083
3,1,60830020083,2020-04-13,7225,4,4,34.950098,-120.43797,60830024031,0.058874,0.235496,6083,6083
4,1,60830020054,2020-04-13,7225,4,4,34.950098,-120.43797,60830024031,0.089068,0.356272,6083,6083


In [14]:
df = dd.read_csv("data/output/April-distance-matrix.csv/*.part", dtype={'naics_4': 'float64', 'location_cbg': 'str', 'cbgs': 'str'})

In [15]:
df['weighted_distance'] = df.total_visits * df.distance
df['origin_fips'] = df.cbgs.str[0:5]
df['dest_fips'] = df.location_cbg.str[0:5]
df = df.groupby(['origin_fips']).weighted_distance.mean()
df.to_csv("data/output/April-county-distance-matrix-mean.csv")

['/Users/raac/Development/covid/mobility-analysis/data/output/April-county-distance-matrix-mean.csv/0.part']

In [16]:
test = df.head()