## SDAD Broadbandnow.com Scraper Code
### Last Edit: 2/8/22

Requires addresses from Corelogic/similar source at the block level with 2 important columns: geoid_blk and mail_address. Original approach uses one address per block group and first generates cleaned list of addresses and block groups from this. Then, proceeds to scrape all package information for those selected addresses. Reports results at block group, tract, and county level within dataframe, which can be exported.

In [3]:
# imports
# generic imports
import numpy as np
import re
import time
import pandas as pd
from bs4 import BeautifulSoup

# selenium imports
import selenium
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import ElementClickInterceptedException
from selenium.common.exceptions import NoSuchElementException
from selenium.common.exceptions import TimeoutException

In [4]:
# check if xpath exists, if not return false
def check_exists_by_xpath(driver, xpath):
    '''
    Description:
        Check existence of xpath on page
    
    Inputs:
        webdriver: your webdriver
        xpath: whatever element we are looking for
        
    Outputs:
        returns True if xpath exists, False if not
    '''
    # try to find element
    try:
        driver.find_element_by_xpath(xpath)
    
    # throw exception and return false if unable to find
    except NoSuchElementException:
        return False
    return True

In [5]:
# IMPORTANT: Corelogic provides BLOCK level FIPs code, but we use BLOCK GROUP level data here
def read_and_clean_addresses_for_bgs(data, need_subset = True, size_subset = 3):
    '''
    Description:
        Check existence of xpath on page
    
    Inputs:
        data: string, name of csv you want to use (includes .csv ending)
        need_subset: boolean, True if using subset of data (originally used 1st address within each bg from list of 3) - default = True
        size_subset: integer, if subsetting, selects every "nth" row (not necessary to mess with this param if using 1 address per bg) - default = 3
        
    Outputs:
        returns True if xpath exists, False if not
    '''
    # read in csv, drop index, and update block column
    address_sample_3_per_bg = pd.read_csv(data, index_col = 0)
    address_sample_3_per_bg = address_sample_3_per_bg.reset_index(drop = True)
    address_sample_3_per_bg['geoid_blk'] = address_sample_3_per_bg.geoid_blk.astype(str)
    
    # drop lat 4 digits of mail address to get short zipcode
    a = address_sample_3_per_bg.mail_address.values
    a = np.array([a[i][0:-4] if a[i][-9].isdigit() else a[i] for i in range(len(a))])
    
    # get block group geoid
    address_sample_3_per_bg['geoid_bg'] = address_sample_3_per_bg.geoid_blk.str.slice(start=0, stop=12)
    
    # if data needs subsetting (I had 3 addresses )
    if need_subset:
        addresses = a[::size_subset]
        block_geoids = address_sample_3_per_bg.geoid_bg[::size_subset]
        
    else:
        addresses = a
        block_geoid = address_sample_3_per_bg.geoid_bg
    
    return addresses, block_geoids.values

In [6]:
def search_address2(address, driver, driver_wait = 20):
    '''
    Description:
        Check existence of xpath on page
    
    Inputs:
        address: string, single home address we are scraping for
        driver: your webdriver
        driver_wait: integer, wait time for driver - default = 20
        
    Outputs:
        returns True if xpath exists, False if not
    '''
    # wait until search bar is clickable and enter address
    wait = WebDriverWait(driver, driver_wait)
    search = wait.until(EC.element_to_be_clickable((By.ID, 'plan-search')))
    search.clear()
    search.send_keys("{}".format(address))

    # sleep, then go to top suggested address
    time.sleep(sleep_time)
    go_top = check_exists_by_xpath(driver, '//*[@id="plans-search"]/div/div/div[1]/div/div/div/ul')

    # click top address
    if go_top:
        go_top_address = wait.until(EC.element_to_be_clickable((By.XPATH, '//*[@id="plans-search"]/div/div/div[1]/div/div/div/ul/li')))
        go_top_address.click()
        
    return go_top


In [7]:
def scrape_prices(driver, addresses):
    '''
    Description:
        Scrape internet packages from Broadbandnow.com - takes each address and scrapes all packages for top match
    
    Inputs:
        driver: your webdriver
        addresses: array of strings, home addresses we are scraping for (first output of read_and_clean_addresses_for_bgs)
        
    Outputs:
        all_prices: jagged list (list of varying sized lists), package prices
        all_names: jagged list (same size as all_prices), package names
        all_type_list: jagged list (same size as all_prices), package names
        all_speeds: jagged list (same size as all_prices), package names
        idxs: array, array of indices where information was successfully scraped (aligns with addresses)
    '''
    # create empty lists for prices, names, speeds, and types - will become jagged lists (lists of varying sized lists)
    all_prices = []
    all_names = []
    all_speeds = []
    all_type_list = []
    idxs = []

    # initialize variables and get start time
    start = time.time()
    i = 0
    problem_counter = 0

    # loop over block group addressed
    while i < len(addresses):
        # try below and exception IF takes too long (increments a counter before skipping address eventually)
        try:
            # reload page to clear results (noticed that we run into issues if we do not clear)
            driver.get("https://broadbandnow.com/compare/plans")
            go_top = search_address2(addresses[i], driver)

            # select top address
            if go_top:
                time.sleep(1)
                unable_to_confirm = check_exists_by_xpath(driver, "/html/body/div[2]/div/div/div[1]/section/section/div/div/div[1]/div/section")

                # if able to confirm and go to top address
                if not unable_to_confirm:
                    #
                    time.sleep(1)
                    load_more = check_exists_by_xpath(driver, '//*[@id="cityPlansListing"]/section/div/div[2]/div/div/section')

                    #if load more is an option, then load all packages
                    if load_more:
                        # load all plans
                        load_all_plans = wait.until(EC.element_to_be_clickable((By.XPATH, '//*[@id="cityPlansListing"]/section/div/div[2]/div/div/section')))
                        load_all_plans.click()

                    # bs - scrape page
                    html = driver.page_source
                    soup = BeautifulSoup(html)

                    # extract list of prices
                    price_temp_list = soup.find_all(attrs = {"class": "c-provider-card__plan-value"})
                    price = np.array([float(price_temp_list[i].getText().split("$")[-1]) for i in range(len(price_temp_list))])

                    # extract list of name of provider
                    name_temp_list = soup.find_all(attrs = {"class": "c-provider-card__provider-name"})
                    name = np.array([name_temp_list[i].getText().split(". ")[1] for i in range(len(name_temp_list))])

                    # extract list of speeds
                    speed_temp_list = soup.find_all(attrs = {"class": "c-provider-card__speeds-value"})
                    speed = np.array([float(speed_temp_list[i].getText().split(" ")[0]) for i in range(len(speed_temp_list))])

                    # extract string - "Upload" or "Download"
                    down_up_temp_list = soup.find_all(attrs = {"class": "c-provider-card__speeds-label"})
                    down_up = np.array([down_up_temp_list[i].getText() for i in range(len(speed_temp_list))])

                    # extract type of internet service
                    type_temp_list = soup.find_all(attrs = {"class": "c-provider-card__label"})
                    type_list = np.array([type_temp_list[i].getText().strip() for i in range(len(type_temp_list))])

                    # create empty 2D array for speeds 
                    speed_array = np.zeros([np.sum(down_up == "Download"), 2]) * np.nan
                    
                    # set counter to 0, will denote the row in speed_array we are filling in
                    count = 0

                    # loop over packages listed
                    for k in range(len(down_up)):

                        # if download speed
                        if down_up[k] == "Download":
                            if k != 0:
                                count += 1

                            # add download speed
                            speed_array[count, 0] = speed[k]

                        # if upload, add upload speed
                        else:
                            speed_array[count, 1] = speed[k]

                    # select edit option to change address
                    edit = wait.until(EC.element_to_be_clickable((By.XPATH, '//*[@id="plans-search"]/div/div/div/h1/span')))
                    edit.click()

                    # append to lists
                    idxs.append(i)
                    all_prices.append(price)
                    all_names.append(name)
                    all_type_list.append(type_list)
                    all_speeds.append(speed_array)

                    # set problem counter 
                    problem_counter = 0
            
            # increment address counter within while loop
            i += 1

        # if try fails, throw exception and increment counter (retry until problem_counter hits 5)
        # throws error if we try to edit search plans but this is not an option because nothing was searched after hitting home page
        except TimeoutException as ex:
            problem_counter += 1
            
            # if 2 problems with address, increment address counter, skip address, and reset problem counter
            if problem_counter == 2:
                i += 1
                print("skip")
                problem_counter = 0

        # get time taken to run as well as % completetion
        mid = time.time()
        if i == 1 * int(len(addresses)/10): print("10% @ {}".format(mid - start))
        if i == 2 * int(len(addresses)/10): print("20% @ {}".format(mid - start))
        if i == 3 * int(len(addresses)/10): print("30% @ {}".format(mid - start))
        if i == 4 * int(len(addresses)/10): print("40% @ {}".format(mid - start))
        if i == 5 * int(len(addresses)/10): print("50% @ {}".format(mid - start))
        if i == 6 * int(len(addresses)/10): print("60% @ {}".format(mid - start))
        if i == 7 * int(len(addresses)/10): print("70% @ {}".format(mid - start))
        if i == 8 * int(len(addresses)/10): print("80% @ {}".format(mid - start))
        if i == 9 * int(len(addresses)/10): print("90% @ {}".format(mid - start))

    # close driver
    driver.quit()        
    
    # convert indices to array and get time
    idxs = np.array(idxs)
    end = time.time()
    print(end - start)
    return all_prices, all_names, all_type_list, all_speeds, idxs


In [8]:
# flatten list of lists
def flatten(t):
    '''
    Description:
        Flattens our lists of lists so that we can make clean dataframe; helper function
    
    Inputs:
        t: list of lists (jagged list)
        
    Outputs:
         np.array([item for sublist in t for item in sublist]): array, flattened array from list of lists
    '''
    return np.array([item for sublist in t for item in sublist])

In [9]:
def make_df(all_prices, all_names, all_type_list, all_speeds, idxs):
    '''
    Description:
        Make dataframe using prices, names, internet types, speeds, and FIPs code at bg, tr, and ct levels
    
    Inputs:
        all_prices: jagged list (list of varying sized lists), package prices
        all_names: jagged list (same size as all_prices), package names
        all_type_list: jagged list (same size as all_prices), package names
        all_speeds: jagged list (same size as all_prices), package names
        idxs: array, array of indices where information was successfully scraped (aligns with addresses)
        
    Outputs:
        all_prices: jagged list (list of varying sized lists), package prices
        all_names: jagged list (same size as all_prices), package names
        all_type_list: jagged list (same size as all_prices), package names
        all_speeds: jagged list (same size as all_prices), package names
        idxs: array, array of indices where 
    '''
    # get number of packages, valid address, and construct result (the addresses column in the dataframe)
    num_packages = np.array([len(all_prices[i]) for i in range(len(all_prices))])
    valid_addresses = addresses[idxs]
    
    # repeat a valid address "num_packages" times
    result = np.array([valid_addresses[i] for i in range(len(num_packages)) for j in range(num_packages[i])])
    
    # flatten download and upload arrays
    download = flatten([all_speeds[i][:, 0] for i in range(len(all_speeds))])
    upload = flatten([all_speeds[i][:, 1] for i in range(len(all_speeds))])

    # get block groups, tracts, and counties from addresses data and add to dataframe
    short_blockgroup_geoid = block_geoids[idxs]
    short_tract_geoid = np.array([x[:11] for x in short_blockgroup_geoid])
    short_county_geoid = np.array([x[:5] for x in short_blockgroup_geoid])

    # repeat block group, tract, county name "num_packages" times for the number of packages within that area (these will be columns in df)
    short_blockgroup_geoid2 = np.array([short_blockgroup_geoid[i] for i in range(len(num_packages)) for j in range(num_packages[i])])
    short_tract_geoid2 = np.array([short_tract_geoid[i] for i in range(len(num_packages)) for j in range(num_packages[i])])
    short_county_geoid2 = np.array([short_county_geoid[i] for i in range(len(num_packages)) for j in range(num_packages[i])])

    # final dataframe
    df = pd.DataFrame({"address": result, "price": flatten(all_prices), "name": flatten(all_names),
                       "type": flatten(all_type_list), "download": download, "upload": upload,
                       "block_group": short_blockgroup_geoid2, "tract": short_tract_geoid2,
                       "county": short_county_geoid2})
    return df


### Running the functions above

In [10]:
# start driver
driver = webdriver.Chrome(ChromeDriverManager().install())
driver.get("https://broadbandnow.com/compare/plans")
driver.maximize_window()

# set driver params
driver_wait = 20
sleep_time = 2
wait = WebDriverWait(driver, driver_wait)

[WDM] - Downloading: 100%|█████████████████████████████████████████████████████████████████████████| 7.82M/7.82M [00:00<00:00, 25.6MB/s]
  driver = webdriver.Chrome(ChromeDriverManager().install())


In [11]:
# run read_and_clean_addresses_for_bgs and get list of addresses, block_geoids
# NEED: list of addresses with GEOID for block as "geoid_blk", mail address as "mail_address"
# I used Corelogic for housing information and scraped 3 for each block group, although I only use 1
addresses, block_geoids = read_and_clean_addresses_for_bgs(data = "three_address_in_block_group.csv")

FileNotFoundError: [Errno 2] No such file or directory: 'three_address_in_block_group.csv'

In [None]:
# given addresses in the correct format and driver: gets prices, names, types, speeds, (and indicies where successful)
all_prices, all_names, all_type_list, all_speeds, idxs = scrape_prices(driver, addresses[:100])

In [None]:
# produces dataframe
df = make_df(all_prices, all_names, all_type_list, all_speeds, idxs)

In [None]:
# print dataframe
df.head()