In [None]:
import geopandas as gp
import pandas as pd
import numpy as np
from datetime import datetime

from tobler.area_weighted import area_interpolate

from sklearn import metrics
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split 
from sklearn.ensemble import RandomForestRegressor

from elasticsearch import Elasticsearch, helpers
ES_DEV = Elasticsearch(['YOUR ES HOST'], http_auth=('ES LOGIN', 'ES PASS'), timeout=30)
%load_ext autotime

SF52 = { '01': 'AL', '02': 'AK', '04': 'AZ', '05': 'AR', 
                      '06': 'CA', '08': 'CO', '09': 'CT', '10': 'DE', 
                      '11': 'DC', '12': 'FL', '13': 'GA', '15': 'HI', 
                      '16': 'ID', '17': 'IL', '18': 'IN', '19': 'IA', 
                      '20': 'KS', '21': 'KY', '22': 'LA', '23': 'ME', 
                      '24': 'MD', '25': 'MA', '26': 'MI', '27': 'MN', 
                      '28': 'MS', '29': 'MO', '30': 'MT', '31': 'NE', 
                      '32': 'NV', '33': 'NH', '34': 'NJ', '35': 'NM', 
                      '36': 'NY', '37': 'NC', '38': 'ND', '39': 'OH', 
                      '40': 'OK', '41': 'OR', '42': 'PA', '44': 'RI', 
                      '45': 'SC', '46': 'SD', '47': 'TN', '48': 'TX', 
                      '49': 'UT', '50': 'VT', '51': 'VA', '53': 'WA', 
                      '54': 'WV', '55': 'WI', '56': 'WY', '72': 'PR'}

SF52R = {v: k for (k,v) in SF52.items()}

STATE_LENGTH, COUNTY_LENGTH, CT_LENGTH, CBG_LENGTH, CB_LENGTH = 2, 5, 11, 12, 15
# EPSG:4269 Geodetic coordinate system for North America - onshore and offshore
# EPSG:4326 - World Geodetic System 1984, used in GPS
CRS_TIGER, CRS_OOKLA = 4269, 4326

#### CREATE/update speed_test INDEX

In [9]:
def upload_ES_df(input_df, index_name, idCol = 'GEOID', es_instance = ES_DEV):
    # actiondf: details all actions, contains 5 cols: _id, doc, _index, _op_type, doc_as_upsert
    # use idCol as document _id
    actiondf = input_df[[idCol]].copy().rename(columns={idCol:'_id'})
    # a dict of all the data to be updated; does contain idCol which will be uploaded like a regular column
    actiondf['doc'] = input_df.to_dict('records')  
    del input_df
    # Some constants
    actiondf['_index'] = index_name
    actiondf['_op_type'] = 'update'
    actiondf['doc_as_upsert'] = True
    
    print(f"Start uploading {len(actiondf)} records to {index_name}")
    helpers.bulk(es_instance, actiondf.to_dict('records'))
    print(f"Completed uploading {len(actiondf)} records to {index_name}")

time: 654 µs (started: 2022-03-23 19:14:34 -04:00)


# NEW time-series speed_test index

### NOTE: docs count for speed_test index = total number of census block in SF52: 11,155,486

In [24]:
# CREATE A NEW INDEX: TIMESERIES
def create_speed_index(es_instance, index_name):
    create_request_body = {
       "settings": {
        "refresh_interval": "1s",
        "number_of_shards": 1,
        "number_of_replicas": 0
      },
        "mappings":{ 
            "properties":{
                "GEOID"                 : { "type":"text"},
                "statefips"             : { "type": "keyword" },
            }
        }
    } 

    es_instance.indices.create(index = index_name, body = create_request_body)
    print('created index ', index_name)

# https://www.elastic.co/guide/en/elasticsearch/reference/current/tune-for-search-speed.html#map-ids-as-keyword
# LATER: map speedCat* as keyword, would result in x2 faster in query performance
def update_speed_index(es_instance, index_name, QUARTER='2021Q3'):
    # null_value i.e. set 0 as default value for the field -> allow indexing and searching https://www.elastic.co/guide/en/elasticsearch/reference/current/null-value.html
    update_request_body = {
        "properties": {
                f'{QUARTER}speedRankReadyRaw'             : { "type": "keyword" },
                  # 13 cols
                f'{QUARTER}numTestOokla'                  : { "type": "integer"},
                f'{QUARTER}numDeviceOokla'                : { "type": "integer"},
                f'{QUARTER}meanUploadMbpsOokla'           : { "type": "float" },
                f'{QUARTER}meanDownloadMbpsOokla'         : { "type": "float" },
                f'{QUARTER}medUploadMbpsOokla'            : { "type": "float" },
                f'{QUARTER}medDownloadMbpsOokla'          : { "type": "float" },
                f'{QUARTER}minUploadMbpsOokla'            : { "type": "float" },
                f'{QUARTER}minDownloadMbpsOokla'          : { "type": "float" },
                f'{QUARTER}maxUploadMbpsOokla'            : { "type": "float" },
                f'{QUARTER}maxDownloadMbpsOokla'          : { "type": "float" },
                f'{QUARTER}speedCatOokla'                 : { "type": "keyword" },
                f'{QUARTER}speedSourceOokla'              : { "type": "keyword" },
                f'{QUARTER}latencyOokla'                  : { "type": "integer"},
                  # 16 cols
                f'{QUARTER}numTestDownloadMlab'           : { "type": "integer"},
                f'{QUARTER}numDeviceDownloadMlab'         : { "type": "integer"},
                f'{QUARTER}numTestUploadMlab'             : { "type": "integer"},
                f'{QUARTER}numDeviceUploadMlab'           : { "type": "integer"},
                f'{QUARTER}meanUploadMbpsMlab'            : { "type": "float" },
                f'{QUARTER}meanDownloadMbpsMlab'          : { "type": "float" },
                f'{QUARTER}medUploadMbpsMlab'             : { "type": "float" },
                f'{QUARTER}medDownloadMbpsMlab'           : { "type": "float" },
                f'{QUARTER}minUploadMbpsMlab'             : { "type": "float" },
                f'{QUARTER}minDownloadMbpsMlab'           : { "type": "float" },
                f'{QUARTER}maxUploadMbpsMlab'             : { "type": "float" },
                f'{QUARTER}maxDownloadMbpsMlab'           : { "type": "float" },
                f'{QUARTER}speedCatMlab'                  : { "type": "keyword" },
                f'{QUARTER}speedSourceMlab'               : { "type": "keyword" },
                f'{QUARTER}latencyMlab'                   : { "type": "integer"},
                f'{QUARTER}lossrateMlab'                  : { "type": "float"},
                  # 7 cols
                f'{QUARTER}numISPwireless'                : { "type": "integer" },
                f'{QUARTER}numISPother'                   : { "type": "integer" },
                f'{QUARTER}numISPfiber'                   : { "type": "integer" },
                f'{QUARTER}MaxConsumerUp98'               : { "type": "float" },
                f'{QUARTER}MaxConsumerDown98'             : { "type": "float" },
                f'{QUARTER}speedCatNtia'                  : { "type": "keyword" },
                f'{QUARTER}speedSourceNtia'                : { "type": "keyword" },
        }
    }
    es_instance.indices.put_mapping(index = index_name, body = update_request_body)
    print('updated index ', index_name)


time: 2.11 ms (started: 2022-03-23 19:27:21 -04:00)


In [25]:
index_name = 'speed_test'
# # RUN ONCE
create_speed_index(ES_DEV, index_name)

# # RUN ONCE: add new cols for current (2021Q3) quarter
update_speed_index(ES_DEV, index_name, '2021Q1')
update_speed_index(ES_DEV, index_name, '2021Q2')
update_speed_index(ES_DEV, index_name, '2021Q3')
update_speed_index(ES_DEV, index_name, '2021Q4')

  es_instance.indices.create(index = index_name, body = create_request_body)


created index  speed_test
updated index  speed_test
updated index  speed_test
updated index  speed_test
updated index  speed_test
time: 619 ms (started: 2022-03-23 19:27:33 -04:00)


### ESdown.py
- using $ bash cmds.bash; comment out other commands as needed 
- download demographic and NTIA values from bossdata* (used for speedCatNtia and Mlab prediction)
- NOTE: at a later time, [historical] speed values will be downloaded from speed_test index
- save bossdata* dfs to Elasticsearch folder
- check the log file cmds.log

### ookla.ipynb

- generate ookla tiles at state level
- parallel run: ookla_gen_tiles.py: generate ookla speeds at CBG and CB levels
    - read ookla_state_tiles, and remove invalid latency
    - flooring and capping ookla values
    - joined valid ookla tests to CBG/CB boundaries
    - aggregate ookla tests (calculate weighted speeds, and sum up numtests and devices)   
    - ['speedSourceOokla']


### ookla_ntia.py
- NTIA cols: 100% imputed; compute speedCatNtia
- Ookla cols: 100% imputed; compute speedCatOokla
- ['speedSourceOokla'] = 'medianImputedAtCB'
- ['speedSourceNtia'] = 'notAvailableMaxConsumer98' or 'maxConsumer98'
- CB level data (ready for Mlab training) are saved to: f"ookla_ntia/{QUARTER}_CB_{sf}.csv"

In [2]:
# ookla_ntia folder: CB level: 4 QUARTERS * SF52 = 208 files
! ls ookla_ntia/ | wc -l

208


# Last piece: MLAB
- mlab_rolling.ipynb : handle initial datapoint: 2021Q1, as well as subsequent quarters: 2021Q2, 2021Q3, 2021Q4, etc.


#### COMPARE: new speedCatO/M (with latency factor) with the old
- per Yuan request: compare new 2021Q4 speedCat with old bossdata* speedCats 
    - download old bossdata* speedCats to f'Elasticsearch/old_speedCat_{index_name}.csv'


In [102]:
for sf in SF52:
# for sf in ['02', '10']:
    index_name = f"bossdata{sf}"
    df_old = pd.read_csv(f'Elasticsearch/old_speedCat_{index_name}.csv')
    df_new = pd.read_csv(f'speed_ready_upload/2021Q4_{sf}.csv')[set(df_old)]
    
    for col_to_join in { 
         'speedCatMlab', # most sf: changed about 5%
         'speedCatNtia', # remain unchanged: confirmed!
         'speedCatOokla', # most sf: changed about 5%
         'speedRankReadyRaw',
         'speedSourceMlab',
         'speedSourceNtia', # remain unchanged: confirmed!
         'speedSourceOokla', 
                       }:
        
        # # should remain unchanged: confirmed!
        # if col_to_join in {'speedCatNtia', 'speedSourceNtia'}:
        #     joined = df_new.merge(df_old[['GEOID', col_to_join]] , on='GEOID', how='inner', suffixes=('_new', '_old'))
        #     print(sf, SF52[sf], col_to_join, df_old.shape, df_new.shape, joined.shape)
        #     joined[f"{col_to_join}_equal"] = np.where(joined[f"{col_to_join}_old"] == joined[f"{col_to_join}_new"], 1, 0)
        #     joined[['GEOID', f"{col_to_join}_old", f"{col_to_join}_new", f"{col_to_join}_equal"]]
        #     uniques = set(joined[f"{col_to_join}_equal"])
        #     if uniques != {1}:
        #         print("ALERT: ", sf, SF52[sf], col_to_join, "changed")            
        
        if col_to_join in {'speedCatMlab', 'speedCatOokla'}:
            joined = df_new.merge(df_old[['GEOID', col_to_join]] , on='GEOID', how='inner', suffixes=('_new', '_old'))
            # print(sf, SF52[sf], col_to_join, df_old.shape, df_new.shape, joined.shape)
            # find number of rows that go from Served to Un[der]served (1,0)
            joined[f"{col_to_join}_served_to_unserved"] = np.where(
                (joined[f"{col_to_join}_old"] == 2) & (joined[f"{col_to_join}_new"] != 2), 1, 0)
            
            joined[['GEOID', f"{col_to_join}_old", f"{col_to_join}_new", f"{col_to_join}_served_to_unserved"]]
            
            num_changed = joined[f"{col_to_join}_served_to_unserved"].sum()
            
            if num_changed != 0:
                change_per = int(num_changed/len(joined) *100)
                print("ALERT: ", sf, SF52[sf], joined.shape, col_to_join, "# changed=", num_changed, "%=", change_per)                          


ALERT:  01 AL (252266, 10) speedCatOokla # changed= 19 %= 0
ALERT:  01 AL (252266, 10) speedCatMlab # changed= 11677 %= 4
ALERT:  02 AK (45292, 10) speedCatOokla # changed= 4 %= 0
ALERT:  02 AK (45292, 10) speedCatMlab # changed= 595 %= 1
ALERT:  04 AZ (241666, 10) speedCatOokla # changed= 12 %= 0
ALERT:  04 AZ (241666, 10) speedCatMlab # changed= 3093 %= 1
ALERT:  05 AR (186211, 10) speedCatOokla # changed= 223 %= 0
ALERT:  05 AR (186211, 10) speedCatMlab # changed= 10035 %= 5
ALERT:  06 CA (710145, 10) speedCatOokla # changed= 72 %= 0
ALERT:  06 CA (710145, 10) speedCatMlab # changed= 14499 %= 2
ALERT:  08 CO (201062, 10) speedCatOokla # changed= 36 %= 0
ALERT:  08 CO (201062, 10) speedCatMlab # changed= 4745 %= 2
ALERT:  09 CT (67578, 10) speedCatMlab # changed= 4454 %= 6
ALERT:  10 DE (24115, 10) speedCatMlab # changed= 985 %= 4
ALERT:  11 DC (6507, 10) speedCatMlab # changed= 439 %= 6
ALERT:  12 FL (484481, 10) speedCatOokla # changed= 49 %= 0
ALERT:  12 FL (484481, 10) speedCatMl

### Lastly, upload latest speeds to bossdata* (2021Q4)
- run upload_bossdata.py inside cmds.bash