# Parsing and Processing Lookup Responses

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import os
import glob
import gzip
import json
from datetime import datetime
from multiprocessing import Pool

import numpy as np
from tqdm import tqdm
import pandas as pd

from config import (
    inc_city_att, 
    inc_city_cl, 
    inc_city_verizon, 
    inc_city_el
)
from parsers import (
    cl_workflow, 
    att_workflow, 
    verizon_workflow, 
    el_workflow, 
    get_incorporated_places, 
    check_redlining, 
    get_holc_grade, 
    get_closest_fiber
)

In [3]:
# inputs
fn_acs = '../data/intermediary/census/aggregated_tables_plus_features.csv.gz'
pattern_att = '../data/intermediary/isp/att/*/*.geojson.gz' # pattern for all data collected from lookup tools
pattern_cl = '../data/intermediary/isp/centurylink/*/*.geojson.gz'
pattern_verizon = '../data/intermediary/isp/verizon/*/*.geojson.gz'
pattern_el = "../data/intermediary/isp/earthlink/*/*.geojson.gz"

# outputs
fn_att = "../data/output/speed_price_att.csv.gz"
fn_cl = '../data/output/speed_price_centurylink.csv.gz'
fn_verizon = '../data/output/speed_price_verizon.csv.gz'
fn_el = '../data/output/speed_price_earthlink.csv.gz'

# params
n_jobs = 20
recalculate = False

In [4]:
# This is from Census data we crunched in the previous notebook.
acs = pd.read_csv(fn_acs, dtype={'geoid': str, 'block_group': str})

# These are the columns we're going to bring to merge with lookup responses.
acs_cols = [
    'geoid', 'race_perc_non_white','income_lmi', 
    'ppl_per_sq_mile', 'n_providers', 'income_dollars_below_median',
    'internet_perc_broadband', 'median_household_income'
]

## Total data collected

In [5]:
def count_addresses(fn):
    """
    How many addresses did we successfully collect in each file?
    """
    count = 0
    with gzip.open(fn, 'rb') as f:
        for line in f.readlines():
            record = json.loads(line)
            if record['collection_status'] != 0:
                count += 1
    return count 

def count_successful_addresses(pattern, n_jobs=20):
    """
    For all files in `pattern`, sees how many addresses were successfully counted.
    Uses multiprocessing to speed things up.
    """
    files = glob.glob(pattern)
    count = 0
    with Pool(n_jobs) as pool:
        for _count in tqdm(pool.imap_unordered(count_addresses, files), 
                           total=len(files)):
            count += _count
    return count

In [6]:
att_count = count_successful_addresses(pattern_att, n_jobs=n_jobs)
verizon_count = count_successful_addresses(pattern_verizon, n_jobs=n_jobs)
cl_count = count_successful_addresses(pattern_cl, n_jobs=n_jobs)
el_count = count_successful_addresses(pattern_el, n_jobs=n_jobs)
all_records = att_count + verizon_count + cl_count + el_count 

print(f"""AT&T: {att_count}
Verizon: {verizon_count}
CenturyLink: {cl_count}
EarthLink: {el_count}
Total: {all_records}""")

100%|██████████| 12652/12652 [00:16<00:00, 765.89it/s] 
100%|██████████| 10026/10026 [00:04<00:00, 2282.08it/s]
100%|██████████| 5891/5891 [00:05<00:00, 1111.69it/s]
100%|██████████| 17263/17263 [00:20<00:00, 825.78it/s] 

AT&T: 458787
Verizon: 312357
CenturyLink: 245139
EarthLink: 590412
Total: 1606695





## Functions we're going to be using
We calcualte the distance to the closest household with Fiber and check HOLC grades using functions defined in `parsers.py`.

In [7]:
??get_closest_fiber

[0;31mSignature:[0m [0mget_closest_fiber[0m[0;34m([0m[0mdf[0m[0;34m:[0m [0mpandas[0m[0;34m.[0m[0mcore[0m[0;34m.[0m[0mframe[0m[0;34m.[0m[0mDataFrame[0m[0;34m)[0m [0;34m->[0m [0mpandas[0m[0;34m.[0m[0mcore[0m[0;34m.[0m[0mframe[0m[0;34m.[0m[0mDataFrame[0m[0;34m[0m[0;34m[0m[0m
[0;31mSource:[0m   
[0;32mdef[0m [0mget_closest_fiber[0m[0;34m([0m[0mdf[0m[0;34m:[0m [0mpd[0m[0;34m.[0m[0mDataFrame[0m[0;34m)[0m [0;34m->[0m [0mpd[0m[0;34m.[0m[0mDataFrame[0m[0;34m:[0m[0;34m[0m
[0;34m[0m    [0;34m"""[0m
[0;34m    Convert coordinates to radians and fit a sklearn ball tree [0m
[0;34m    to find closest household with 200 Mbps speeds.[0m
[0;34m    """[0m[0;34m[0m
[0;34m[0m    [0m_df[0m [0;34m=[0m [0mdf[0m[0;34m[[0m[0mdf[0m[0;34m.[0m[0mspeed_down[0m [0;34m>=[0m [0;36m200[0m[0;34m][0m[0;34m[0m
[0;34m[0m    [0;34m[0m
[0;34m[0m    [0;31m# create a ball tree just on fiber households[0m[0

We `check_redlining` grades by looking if an addresses' coordinates (converted to a Shapely `Point`) are within the `Polygon`s of redlining maps by Mapping Inequality. This actual check is done by `get_holc_grade`.

In [8]:
??get_holc_grade

[0;31mSignature:[0m [0mget_holc_grade[0m[0;34m([0m[0mrow[0m[0;34m:[0m [0mdict[0m[0;34m,[0m [0mpolygons[0m[0;34m:[0m [0mlist[0m[0;34m)[0m [0;34m->[0m [0mstr[0m[0;34m[0m[0;34m[0m[0m
[0;31mSource:[0m   
[0;32mdef[0m [0mget_holc_grade[0m[0;34m([0m[0mrow[0m[0;34m:[0m [0mdict[0m[0;34m,[0m [0;34m[0m
[0;34m[0m                   [0mpolygons[0m[0;34m:[0m [0mlist[0m[0;34m)[0m [0;34m->[0m [0mstr[0m[0;34m:[0m[0;34m[0m
[0;34m[0m    [0;34m"""[0m
[0;34m    Converts any lat and lon in a dictionary into a shapely point,[0m
[0;34m    then iterate through a list of dictionaries containing [0m
[0;34m    shapely polygons shapes for each HOLC-graded area.[0m
[0;34m    """[0m[0;34m[0m
[0;34m[0m    [0mpoint[0m [0;34m=[0m [0mPoint[0m[0;34m([0m[0mfloat[0m[0;34m([0m[0mrow[0m[0;34m[[0m[0;34m'lon'[0m[0;34m][0m[0;34m)[0m[0;34m,[0m [0mfloat[0m[0;34m([0m[0mrow[0m[0;34m[[0m[0;34m'lat'[0m[0;34m][0m[0;3

## ATT

In [9]:
states = []

In [10]:
if not os.path.exists(fn_att) or recalculate:
    # find the data we collected for each block group.
    data_att = []
    files = glob.glob(pattern_att)
    with Pool(n_jobs) as pool:
        # create parallel jobs that parse each block group of data using `att_workflow`.
        for record in tqdm(pool.imap_unordered(att_workflow, files), 
                           total=len(files)):
            data_att.extend(record)
    att = pd.DataFrame(data_att)
    del data_att
    
    # only keep addresses in the incorporated city
    att = att[att.incorporated_place.isin(inc_city_att)]
    att['block_group'] = att['block_group'].apply(lambda x: f"{int(x):012d}")
    
    # check HOLC-grades for each address, and the distance to download speeds at or above 200 Mbps
    att = check_redlining(att)
    att = get_closest_fiber(att)
    
    # merge census data, and save the file
    att_acs = att.merge(acs[acs_cols], how='left',
                        left_on='block_group', right_on='geoid')
    att_acs.to_csv(fn_att, index=False, compression='gzip')
else:
    att_acs = pd.read_csv(fn_att)

In [11]:
# start and end collection datetime
[datetime.fromtimestamp(att_acs.collection_datetime.min()), 
 datetime.fromtimestamp(att_acs.collection_datetime.max())]

[datetime.datetime(2022, 4, 18, 2, 18, 2),
 datetime.datetime(2022, 4, 27, 20, 47, 51)]

In [12]:
states.extend(att_acs['state'].unique())

## Centurylink

In [13]:
if not os.path.exists(fn_cl) or recalculate:
    data_cl = []
    files = glob.glob(pattern_cl)
    with Pool(n_jobs) as pool:
        for record in tqdm(pool.imap_unordered(cl_workflow, files), 
                           total=len(files)):
            data_cl.extend(record)
    cl = pd.DataFrame(data_cl)
    del data_cl
    
    cl = cl[cl['incorporated_place'].isin(inc_city_cl)]
    cl = cl[cl.speed_down != 940]
    
    cl = check_redlining(cl)
    cl = get_closest_fiber(cl)
    
    cl_acs = cl.merge(acs[acs_cols], how='left', 
                      left_on='block_group', right_on='geoid')
    cl_acs.to_csv(fn_cl, index=False, compression='gzip')
else:
    cl_acs = pd.read_csv(fn_cl)

In [14]:
# start and end collection datetime
[datetime.fromtimestamp(cl_acs.collection_datetime.min()), 
 datetime.fromtimestamp(cl_acs.collection_datetime.max())]

[datetime.datetime(2022, 4, 15, 18, 4, 41),
 datetime.datetime(2022, 4, 17, 17, 34, 25)]

In [15]:
states.extend(cl_acs['state'].unique())

## Verizon

In [16]:
if not os.path.exists(fn_verizon) or recalculate:
    data_verizon = []
    files = glob.glob(pattern_verizon)
    with Pool(n_jobs) as pool:
        for record in tqdm(pool.imap_unordered(verizon_workflow, files), 
                           total=len(files)):
            data_verizon.extend(record)
    verizon = pd.DataFrame(data_verizon)
    del data_verizon
    
    verizon = verizon[verizon.incorporated_place.isin(inc_city_verizon)]
    
    verizon['lon'] = verizon['lon'].astype(float)
    verizon['lat'] = verizon['lat'].astype(float)    
    verizon = check_redlining(verizon)
    verizon = get_closest_fiber(verizon)
    
    verizon_acs = verizon.merge(acs[acs_cols], how='left',
                                left_on='block_group', right_on='geoid')
    verizon_acs.to_csv(fn_verizon, index=False, compression='gzip')
else:
    verizon_acs = pd.read_csv(fn_verizon)

  exec(code_obj, self.user_global_ns, self.user_ns)


In [17]:
# start and end collection datetime
[datetime.fromtimestamp(verizon_acs.collection_datetime.min()), 
 datetime.fromtimestamp(verizon_acs.collection_datetime.max())]

[datetime.datetime(2022, 4, 19, 17, 4, 55),
 datetime.datetime(2022, 4, 27, 23, 47, 27)]

In [18]:
states.extend(verizon_acs['state'].unique())

## Earthlink

In [19]:
if not os.path.exists(fn_el) or recalculate:
    data_el = []
    files = glob.glob(pattern_el)

    with Pool(n_jobs) as pool:
        for record in tqdm(pool.imap_unordered(el_workflow, files), 
                           total=len(files)):
            data_el.extend(record)
    el = pd.DataFrame(data_el)
    del data_el
    
    el['block_group'] = el['block_group'].apply(lambda x: f"{int(x):012d}")
    el = check_redlining(el)
    el = get_closest_fiber(el)
    
    el_acs = el.merge(acs[acs_cols], how='left', 
                      left_on='block_group', right_on='geoid')
    el_acs = el_acs[el_acs.incorporated_place.isin(inc_city_el)]
    el_acs.to_csv(fn_el, index=False, compression='gzip')
else:
    el_acs = pd.read_csv(fn_el)

In [20]:
# start and end collection datetime
[datetime.fromtimestamp(el_acs.collection_datetime.min()), 
 datetime.fromtimestamp(el_acs.collection_datetime.max())]

[datetime.datetime(2022, 4, 20, 1, 45, 42),
 datetime.datetime(2022, 5, 25, 17, 1, 44)]

In [21]:
# how many states did we collect data from?
states.extend(el_acs['state'].unique())
len(set(states))

46

In [22]:
# Which ISPs is EarthLink leasing from?
el_acs[el_acs.speed_down != 0].contract_provider.value_counts(dropna=False)

AT&T           278236
CenturyLink    112461
Frontier         8376
Name: contract_provider, dtype: int64

In [23]:
el_acs.incorporated_place.value_counts(normalize=True).head(20)

Chicago city                   0.104066
Houston city                   0.095431
Los Angeles city               0.081363
Phoenix city                   0.058381
Detroit city                   0.046281
Jacksonville city              0.041479
Charlotte city                 0.036511
Portland city                  0.034805
Seattle city                   0.033257
Oklahoma City city             0.032477
Memphis city                   0.031268
Kansas City city               0.030919
Las Vegas city                 0.030510
Denver city                    0.030053
New Orleans city               0.028419
Milwaukee city                 0.027601
Omaha city                     0.027394
Indianapolis city (balance)    0.026712
Albuquerque city               0.024980
Cleveland city                 0.024481
Name: incorporated_place, dtype: float64

In [24]:
el_acs.technology.value_counts()

Fiber Based    162562
Fiber          134496
Copper         102015
Name: technology, dtype: int64