In [156]:
### Helper functions

def set_increment(total_count, increment):
    """
    Calculates an increment count by dividing the total count by the increment percentage.

    :param total_count: The total count to calculate an increment for.
    :param increment: The increment percentage to calculate.
    :return: Returns an integer representing the increment count.
    """
    return round(total_count / (100 / increment))

def update_progress(progress_update, current_count, total_count, increment):
    """
    Prints a progress update for an iterating loop if the increment threshold has been passed.

    :param progress_update: The string describing the progress update.
    :param current_count: The number of features currently iterated through.
    :param total_count: The total number of features to iterate through.
    :param increment: The number of iterated features at which to print a progress update.
    """
    if current_count % increment == 0:
        print(f"{progress_update}: {current_count / total_count:.0%} complete. {current_count} records processed.")

def runtime(progress_update, start_time, end_time):
    """
    Prints the runtime for a given progress update.

    :param progress_update: The string describing the progress update.
    :param start_time: The start time of the progress update.
    :param end_time: The end time of the progress update.
    """
    print(f"{progress_update} runtime: {end_time - start_time}")

In [2]:
### Pull website with suffix data

from selenium import webdriver
from selenium.webdriver.common.by import By

# Suffix data website information
suffix_url = "https://pe.usps.com/text/pub28/28apc_002.htm"
table_id = "ep533076" # suffix element's id

# Pull website using Firefox WebDriver
print(f"Pulling website: {suffix_url}")
options = webdriver.FirefoxOptions()
options.add_argument("--headless")
driver = webdriver.Firefox(options=options)
driver.get(suffix_url)

# Find suffix elements
suffix_rows = driver.find_elements(By.XPATH, f"//table[@id='{table_id}']/tbody/tr")

Pulling website: https://pe.usps.com/text/pub28/28apc_002.htm


In [3]:
### Parse suffix elements from website

from datetime import datetime
import pandas as pd

# Store table headers
header_cols = suffix_rows[0].find_elements(By.TAG_NAME, "td") # find table cells
headers = []
for header in header_cols[:2][::-1]: # flip header order
    header = header.text.replace("\n", " ")
    headers.append(header)

# Identify primary street suffix names and corresponding abbreviations
suffixes = {} # suffix data container - {abbreviation: suffix}
full_suffix = "" # primary street suffix name

suffix_count = len(suffix_rows) - 1 # excluding table header
increment = set_increment(suffix_count, 10)
progress_update = "Parsing suffixes"
start_time = datetime.now()

for count, row in enumerate(suffix_rows[1:]): # excluding table header
    cols = row.find_elements(By.TAG_NAME, "td") # find table cells
    if len(cols) > 1: # if multi-col row, col[0] is suffix, and col[1] is abbreviation
        full_suffix = cols[0].text
        suffixes[cols[1].text] = full_suffix
    else: # if single-col row, col[0] is abbreviation, and suffix is implied
        suffixes[cols[0].text] = full_suffix
    
    update_progress(progress_update, count, suffix_count, increment)

update_progress(progress_update, suffix_count, suffix_count, 1)
end_time = datetime.now()
runtime(progress_update, start_time, end_time)

# Write processed data to file
suffixes_filepath = "data sources/suffixes.csv"
print(f"Writing suffixes to file: {suffixes_filepath}")
suffixes_pd = pd.DataFrame(list(suffixes.items()), columns=headers)
suffixes_pd.to_csv(suffixes_filepath, index=False)

# Close WebDriver
driver.quit()

Parsing suffixes: 0% complete. 0 records processed.
Parsing suffixes: 10% complete. 50 records processed.
Parsing suffixes: 20% complete. 100 records processed.
Parsing suffixes: 30% complete. 150 records processed.
Parsing suffixes: 40% complete. 200 records processed.
Parsing suffixes: 50% complete. 250 records processed.
Parsing suffixes: 60% complete. 300 records processed.
Parsing suffixes: 70% complete. 350 records processed.
Parsing suffixes: 80% complete. 400 records processed.
Parsing suffixes: 90% complete. 450 records processed.
Parsing suffixes: 100% complete. 500 records processed.
Parsing suffixes: 100% complete. 502 records processed.
Parsing suffixes runtime: 0:00:07.110649
Writing suffixes to file: data sources/suffixes.csv


In [None]:
### Obtain road data

from ftplib import FTP
import os

# Set up FTP connection to data portal
ftp_url = "ftp2.census.gov"
roads_directory_ftp = "/geo/tiger/TIGER2024/ROADS"
user = "anonymous"
passwd = "anonymous"
print(f"Connecting to server: {ftp_url + roads_directory_ftp}")
ftp = FTP("ftp2.census.gov")
print(ftp.login(user=user, passwd=user))
ftp.cwd(roads_directory_ftp)

# Download files
roads_directory = "data sources/roads/"
files = ftp.nlst()

# Skip any files already downloaded
downloaded_files = os.listdir(roads_directory)
downloaded_files = set(downloaded_files)
files = [item for item in files if item not in downloaded_files]

file_count = len(files)
increment = set_increment(file_count, 10)
progress_update = "Downloading files"
print(f"{progress_update} to directory: {roads_directory}")
start_time = datetime.now()

# Iterate through and download each file in roads directory
for count, file_name in enumerate(files):
    with open(roads_directory + file_name, 'wb') as file:
        ftp.retrbinary(f"RETR {file_name}", file.write)
    
    update_progress(progress_update, count, file_count, increment)

update_progress(progress_update, count, count, 1)
end_time = datetime.now()
runtime(progress_update, start_time, end_time)

print("Closing server connection.")
print(ftp.quit())

Connecting to server: ftp2.census.gov/geo/tiger/TIGER2024/ROADS
230-Server: ftp2.census.gov
230-
230-Personal Identifiable Information (PII) shall not be placed on the FTP
230-server without prior special arrangement and in conjunction with ITSO.
230-
230-NOTE: The data available for anonymous FTP download on this FTP server are
230-also available over the Web:
230-http://www2.census.gov
230 Login successful.
Downloading files to directory: data sources/roads/
Downloading files: 0% complete. 0 records processed.
Downloading files: 10% complete. 75 records processed.
Downloading files: 20% complete. 150 records processed.
Downloading files: 30% complete. 225 records processed.
Downloading files: 40% complete. 300 records processed.
Downloading files: 50% complete. 375 records processed.
Downloading files: 60% complete. 450 records processed.
Downloading files: 70% complete. 525 records processed.
Downloading files: 80% complete. 600 records processed.
Downloading files: 90% complete. 67

In [15]:
### Unzip road files

import zipfile

zip_files = [f for f in os.listdir(roads_directory) if f.endswith(".zip")] # select only zips

total_zips = len(zip_files)
increment = set_increment(total_zips, 10)
progress_update = "Unzipping files"
print(f"{progress_update} to directory: {roads_directory}")

start_time = datetime.now()
for count, zip_file in enumerate(zip_files):
    zip_path = os.path.join(roads_directory, zip_file) # find filepath of zip
    file_folder = os.path.join(roads_directory, os.path.splitext(zip_file)[0]) # create file folder
    os.makedirs(file_folder, exist_ok=True) # create directory for extracted files from zip
    with zipfile.ZipFile(zip_path, 'r') as zip_ref: # extract zip
        zip_ref.extractall(file_folder)
    
    update_progress(progress_update, count, total_zips, increment)

update_progress(progress_update, count, count, 1)
end_time = datetime.now()
runtime(progress_update, start_time, end_time)

Unzipping files to directory: data sources/roads/
Unzipping files: 0% complete. 0 records processed.
Unzipping files: 10% complete. 323 records processed.
Unzipping files: 20% complete. 646 records processed.
Unzipping files: 30% complete. 969 records processed.
Unzipping files: 40% complete. 1292 records processed.
Unzipping files: 50% complete. 1615 records processed.
Unzipping files: 60% complete. 1938 records processed.
Unzipping files: 70% complete. 2261 records processed.
Unzipping files: 80% complete. 2584 records processed.
Unzipping files: 90% complete. 2907 records processed.
Unzipping files: 100% complete. 3230 records processed.
Unzipping files: 100% complete. 3232 records processed.
Unzipping files runtime: 0:04:12.133783


In [282]:
### Pull INCITS codes for U.S. counties

# Set up FTP connection to data portal
INCITS_directory_ftp = "/geo/docs/reference/codes2020/"
INCITS_file = "national_county2020.txt"
print(f"Connecting to server: {ftp_url + INCITS_directory_ftp + INCITS_file}")
ftp = FTP("ftp2.census.gov")
print(ftp.login(user=user, passwd=user))

# Download file
INCITS_directory = 'data sources/'
print(f"Downloading file to: {INCITS_directory + INCITS_file}")
with open(INCITS_directory + INCITS_file, 'wb') as file:
    ftp.retrbinary(f"RETR {INCITS_directory_ftp + INCITS_file}", file.write)
print("Downloaded file.")

print("Closing server connection.")
print(ftp.quit())

Connecting to server: ftp2.census.gov/geo/docs/reference/codes2020/national_county2020.txt
230-Server: ftp2.census.gov
230-
230-Personal Identifiable Information (PII) shall not be placed on the FTP
230-server without prior special arrangement and in conjunction with ITSO.
230-
230-NOTE: The data available for anonymous FTP download on this FTP server are
230-also available over the Web:
230-http://www2.census.gov
230 Login successful.
Downloading file to: data sources/national_county2020.txt
Downloaded file.
Closing server connection.
221 Goodbye.


In [302]:
### Set up street type statistics data structure
# Note: road names are title-cased in input data

# Get all primary street suffix names, title-cased
suffixes_titlecased = {key.title(): value.title() for key, value in suffixes.items()}
full_suffixes = set(suffixes_titlecased.values()) # filter unique names
full_suffixes = list(full_suffixes)
full_suffixes = sorted(full_suffixes)

# Add additional street types (prefixes) not present on suffixes website, title-cased
prefixes = {
    "Cr "             : "County Road",
    "Cr-"             : "County Road",
    "C R "            : "County Road",
    "Co Rd "          : "County Road",
    "County Road "    : "County Road",
    "Co Hwy "         : "County Highway",
    "County Highway " : "County Highway",
    "Twp Rd "         : "Township Road",
    "Township Road "  : "Township Road",
    "FM "             : "Farm to Market Road",
    "State Rte "      : "State Route",
    "State Route "    : "State Route",
    "State Rd "       : "State Road",
    "State Road "     : "State Road",
    "State Hwy "      : "State Highway",
    "State Highway "  : "State Highway",
    "Old Hwy "        : "Old Highway",
    "Old US Hwy "     : "Old U.S. Highway",
    "US Hwy "         : "U.S. Highway",
    "I-"              : "Interstate",
    "NFS Rd "         : "National Forest System Road",
    "FS Rd "          : "Forest Service Road",
    "BLM Rd "         : "Bureau of Land Management Road"
}
full_prefixes = set(prefixes.values())
full_prefixes = list(full_prefixes)
full_prefixes = sorted(full_prefixes)

street_types = full_suffixes + full_prefixes + ["Other", "Unnamed"] # combine street types & additional unsorted types
st_stats_columns = ["INCITS", "State", "County"] + street_types # add identifier columns

# Set up data structure for suffix statistics

st_stats = pd.DataFrame(columns=st_stats_columns)
# Load state abbreviations and county names from INCITS file
INCITS_df = pd.read_csv(INCITS_directory + INCITS_file, dtype=str, delimiter="|")
st_stats["INCITS"] = INCITS_df["STATEFP"] + INCITS_df["COUNTYFP"]
st_stats[["State", "County"]] = INCITS_df[["STATE", "COUNTYNAME"]]
# Title-case city-county features (independent cities), formatted as "XYZ city"
st_stats["County"] = st_stats["County"].str.replace(" city", " City", regex=False)

# Code to modify INCITS codes to replace CT counties with planning regions
# Issue: CT data doesn't show on Looker Studio because it has CT counties geos instead of planning regions geos
# https://www.federalregister.gov/documents/2022/06/06/2022-12063/change-to-county-equivalents-in-the-state-of-connecticut
CT_INCITS_codes = [
    ["09110", "CT", "Capitol Planning Region"],
    ["09120", "CT", "Greater Bridgeport Planning Region"],
    ["09130", "CT", "Lower Connecticut River Valley Planning Region"],
    ["09140", "CT", "Naugatuck Valley Planning Region"],
    ["09150", "CT", "Northeastern Connecticut Planning Region"],
    ["09160", "CT", "Northwest Hills Planning Region"],
    ["09170", "CT", "South Central Connecticut Planning Region"],
    ["09180", "CT", "Southeastern Connecticut Planning Region"],
    ["09190", "CT", "Western Connecticut Planning Region"]
]
CT_INCITS = pd.DataFrame(CT_INCITS_codes, columns=["INCITS", "State", "County"])
st_stats = st_stats[~st_stats["INCITS"].str.startswith("09")] # remove CT counties
st_stats = pd.concat([st_stats, CT_INCITS]) # add CT planning regions

st_stats[street_types] = 0.0 # set initial road length sums to 0
st_stats = st_stats.set_index("INCITS") # set index to INCITS codes
st_stats = st_stats.sort_index() # re-sort by index to account for modifications

In [303]:
### Process road data

import geopandas as gpd

# Helper function for street type calculations
def find_street_type(street_name, prefixes, suffixes):
    """
    Finds and returns the street type of a street name if the type exists in either the prefix or suffix inputs.

    :param street_name: The street name to find the street type of.
    :param prefixes: A dict of prefixes, where the key is the abbrevation, and the value is the full prefix.
    :param suffixes: A dict of suffixes, where the key is the abbreviation, and the avlue is the full prefix.
    :return: The street type of the street name.
    """
    # Check if street name exists
    if street_name is None:
        return "Unnamed"
    else:
        # Strip ordinal directions from start of string
        ordinals = ["N ", "S ", "E ", "W ", "NE ", "NW ", "SE ", "SW "]
        for ordinal in ordinals:
            if street_name.startswith(ordinal):
                street_name = street_name[len(ordinal):]
                break # Only one ordinal direction per name, so multiple strips are unnecessary

        # Check for prefixes
        for prefix in prefixes.keys():
            if street_name.startswith(prefix):
                return prefixes[prefix]

        # Check for suffixes
        name_tuples = street_name.split(" ") # street name components are separated by spaces
        for name_tuple in reversed(name_tuples): # reverse order, as suffixes are at the end
            for suffix in suffixes.keys():
                if name_tuple == suffix:
                    return suffixes[name_tuple]

        # No listed street types were found
        return "Other"

# Iterate through each road file's folder
subfolders = [folder for folder in os.scandir(roads_directory) if folder.is_dir()]

total_folders = len(subfolders)
increment = set_increment(total_folders, 5)
progress_update = "Processing roads"
start_time = datetime.now()
roads = 0 # tracking how many roads were processed

for count, subfolder in enumerate(subfolders):
    # Obtain road dataset file path
    road_path = os.path.join(subfolder.path, subfolder.name + ".shp")
    road_path = os.path.normpath(road_path)

    if os.path.exists(road_path):
        # Dataset setup                         # 0123456789012345678
        # Obtain INCITS code; each file is named "tl_1234_12345_roads"
        INCITS_code = subfolder.name[8:13]
        
        # Load road dataset
        gdf = gpd.read_file(road_path)
        # Calculate the length (in miles) of each road segment
        gdf = gdf.to_crs(9311) # re-project to PCS
        gdf["LENGTH"] = gdf.geometry.length * 0.0006213711 # convert meters to miles

        # Identify street type
        gdf["STREETTYPE"] = [find_street_type(name, prefixes, suffixes_titlecased) for name in gdf["FULLNAME"]]

        # Sum and store street type lengths
        summed_lengths = gdf.groupby("STREETTYPE")["LENGTH"].sum() # sum lengths
        st_stats.loc[INCITS_code, summed_lengths.index] = summed_lengths # replace values only if street type is present

        roads += len(gdf.index)

    update_progress(progress_update, count, total_folders, increment)

update_progress(progress_update, count, count, 1)
end_time = datetime.now()
runtime(progress_update, start_time, end_time)
print(f"Total roads processed: {roads}")

# Export statistics
output_file = "data outputs/street_type_statistics.csv"
print(f"Exporting statistics to directory: {output_file}")
st_stats.to_csv(output_file)

Processing roads: 0% complete. 0 records processed.
Processing roads: 5% complete. 162 records processed.
Processing roads: 10% complete. 324 records processed.
Processing roads: 15% complete. 486 records processed.
Processing roads: 20% complete. 648 records processed.
Processing roads: 25% complete. 810 records processed.
Processing roads: 30% complete. 972 records processed.
Processing roads: 35% complete. 1134 records processed.
Processing roads: 40% complete. 1296 records processed.
Processing roads: 45% complete. 1458 records processed.
Processing roads: 50% complete. 1620 records processed.
Processing roads: 55% complete. 1782 records processed.
Processing roads: 60% complete. 1944 records processed.
Processing roads: 65% complete. 2106 records processed.
Processing roads: 70% complete. 2268 records processed.
Processing roads: 75% complete. 2430 records processed.
Processing roads: 80% complete. 2592 records processed.
Processing roads: 85% complete. 2754 records processed.
Pro

In [354]:
### Reformat data for dashboard use
# Note: Looker Studio has a column limit of 200; the initial street type statistics has 225

# Trim street type statistics to include only the most common street types
max_cols = 100 # maximum number of street types to include
exclusions = ["State", "County", "Other", "Unnamed"] # exclude columns from sumnation analysis
st_lengths = st_stats.drop(columns=exclusions)
st_sums = st_lengths.sum()
top_sts = st_sums.nlargest(max_cols).index # find the most common street types by length
st_stats_trimmed = st_stats[exclusions + list(top_sts)] # trim statistics
st_stats_trimmed = st_stats_trimmed.copy() # make a copy of the original DF, instead of a slice

# Add least common street types to "Other" length
bottom_sts = st_sums.index.difference(top_sts)
st_stats_trimmed.loc[:, "Other"] += st_stats[bottom_sts].sum(axis=1)

# Find and append the top street type for each county
top_sts_county = st_lengths.idxmax(axis=1)
st_stats_trimmed.loc[:, "Top Street Type"] = top_sts_county

# Exclude county data not from states
non_states = ["60", "66", "69", "72", "74", "78"]
st_stats_trimmed = st_stats_trimmed[~st_stats_trimmed.index.str.startswith(tuple(non_states))]

# Export statistics
output_file = "data outputs/street_statistics_trimmed.csv"
print(f"Exporting trimmed statistics to directory: {output_file}")
st_stats_trimmed.to_csv(output_file)

# Reshape statistics to sum up lengths for each street type across the country
st_stats_summed = st_stats_trimmed.drop(columns=["State", "County", "Top Street Type"])
st_stats_summed = st_stats_summed.drop(columns="Unnamed") # drop statistics for unnamed roads
st_stats_summed = st_stats_summed.melt(var_name="Street Type", value_name="Length")
st_stats_summed = st_stats_summed.groupby("Street Type").sum().reset_index()

# Select the top 15 street type lengths (excluding "Other") and combine the rest into a new "Other" row
st_no_other = st_stats_summed[st_stats_summed["Street Type"] != "Other"]
top_15_sts = st_no_other.nlargest(15, "Length")
other_rows = st_no_other[~st_no_other["Street Type"].isin(top_15_sts["Street Type"])]
st_stats_summed_other = st_stats_summed[st_stats_summed["Street Type"] == "Other"].values[0][1]
other_row = pd.DataFrame({
    "Street Type": ["Other"],
    "Length": [other_rows["Length"].sum() + st_stats_summed_other]
})
top_15_sts_plus_other = pd.concat([top_15_sts, other_row], ignore_index=True)

# Export sums file
output_file = "data outputs/street_statistics_summed.csv"
print(f"Exporting summed statistics to directory: {output_file}")
top_15_sts_plus_other.to_csv(output_file, index=True)

Exporting trimmed statistics to directory: data outputs/street_statistics_trimmed.csv
Exporting summed statistics to directory: data outputs/street_statistics_summed.csv
