In [3]:
%load_ext autoreload
%autoreload 2
from covid_constants_and_util import *
import pandas as pd
import os
import matplotlib.pyplot as plt
import statsmodels.api as sm
import json
import datetime
import copy
import geopandas as gpd
import dask
import helper_methods_for_aggregate_data_analysis as helper
import h5py
import re

JUST_TESTING = False

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [None]:
# make sure we don't append onto existing files. 
assert not os.path.exists(os.path.join(helper.ANNOTATED_H5_DATA_DIR, helper.CHUNK_FILENAME))
assert not os.path.exists(os.path.join(helper.H5_DATA_DIR, helper.CHUNK_FILENAME))


In [None]:
# read in individual dataframes for monthly and weekly data [raw SafeGraph data].

dask.config.set(pool=ThreadPool(20))

all_monthly_dfs = []
all_weekly_dfs = []   

for week_string in helper.ALL_WEEKLY_STRINGS:
    all_weekly_dfs.append(helper.load_patterns_data(week_string=week_string, just_testing=JUST_TESTING))
    
for month, year in [
             (1, 2019),(2, 2019),(3, 2019),(4, 2019),(5, 2019),(6, 2019),(7, 2019),(8, 2019),(9, 2019),(10, 2019),(11, 2019),(12, 2019),
             (1, 2020),(2, 2020)][::-1]:
    # Note ::-1: we load most recent files first so we will take their places info if it is available.
    all_monthly_dfs.append(helper.load_patterns_data(month=month, year=year, just_testing=JUST_TESTING))
    


In [None]:
# Merge monthly DFs into a single dataframe. Each row is one POI. 

base = all_monthly_dfs[0]
core = all_monthly_dfs[1].columns.intersection(base.columns).to_list()
for i, df in enumerate(all_monthly_dfs[1:]):
    print(i)
    # merge all new places into base so that core info is not nan for new sgids
    new_places = df.loc[df.index.difference(base.index)][core]
    base = pd.concat([base, new_places], join='outer', sort=False)
    # can now left merge in the df because all sgids will be in base
    cols_to_use = df.columns.difference(base.columns).to_list()
    base =  pd.merge(base, df[cols_to_use], left_index=True, right_index=True, how='left')


In [None]:
# Merge in weekly dataframes. Just merge on SafeGraph ID, left merge. 
# This means that our final POI set is those that have both monthly and weekly data. 
# at the end of this cell we will have a single dataframe. 

for i, weekly_df in enumerate(all_weekly_dfs):
    print("\n\n********Weekly dataframe %i/%i" % (i + 1, len(all_weekly_dfs)))
    assert len(base.columns.intersection(weekly_df.columns)) == 0
    
    ids_in_weekly_but_not_monthly = set(weekly_df.index) - set(base.index)
    print("Warning: %i/%i POIs in weekly but not monthly data; dropping these" % (len(ids_in_weekly_but_not_monthly), 
                                                                  len(df)))
    base = pd.merge(base, weekly_df, how='left', left_index=True, right_index=True, validate='one_to_one')
    print("Missing data for weekly columns")
    print(pd.isnull(base[weekly_df.columns]).mean())

In [None]:
# annotate with demographic info and save dataframe. Dataframe is saved in h5py format, separated into chunks. 

test = base.sample(frac=1) # shuffle so rows are in random order [in case we want to prototype on subset].
helper.annotate_with_demographic_info_and_write_out_in_chunks(test, just_testing=JUST_TESTING)

In [None]:
# Reload because we don't modify DF in place. 
test = helper.load_all_chunks()

In [None]:
# Stratify by MSA and write out outfiles.  
just_in_msas = test.loc[test['poi_lat_lon_Metropolitan/Micropolitan Statistical Area'] == 'Metropolitan Statistical Area']
assert pd.isnull(just_in_msas['poi_lat_lon_CBSA Title']).sum() == 0
print("%i/%i POIs are in MSAs (%i MSAs total)" % (len(just_in_msas), 
                                                  len(test), 
                                                  len(set(just_in_msas['poi_lat_lon_CBSA Title']))))
grouped_by_msa = just_in_msas.groupby('poi_lat_lon_CBSA Title')
total_written_out = 0
for msa_name, small_d in grouped_by_msa:
    small_d = small_d.copy().sample(frac=1) # make sure rows in random order. 
    small_d.index = range(len(small_d))
    name_without_spaces = re.sub('[^0-9a-zA-Z]+', '_', msa_name)
    filename = os.path.join(helper.STRATIFIED_BY_AREA_DIR, '%s.csv' % name_without_spaces)
    for k in ['aggregated_cbg_population_adjusted_visitor_home_cbgs', 'aggregated_visitor_home_cbgs']:
        small_d[k] = small_d[k].map(lambda x:json.dumps(dict(x))) # cast to json so properly saved in CSV. 
    small_d.to_csv(filename)
    print("Wrote out dataframe with %i POIs to %s" % (len(small_d), '%s.csv' % name_without_spaces))
    total_written_out += 1
print("Total written out: %i" % total_written_out)