In [1]:
import osmnx as ox
import pandas as pd
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
import nest_asyncio
import sys
import os

In [2]:
sys.path.append(os.path.abspath('.'))

In [3]:
df = pd.read_csv(os.path.join('..', 'data', 'acs_raw.csv'))


In [3]:

# ----------------------
# COMMON FUNCTIONS
# ----------------------

def get_intersection_density(place_name):
    """
    Downloads the walking network for the specified place,
    projects the graph to a metric CRS, and returns:
      - the number of intersections (i.e. nodes)
      - the area (in square kilometers) computed as the convex hull area of the nodes.
    """
    # Download the walking network for the specified place (in EPSG:4326 by default)
    G = ox.graph_from_place(place_name, network_type='walk')
    
    # Project the graph to a metric CRS (here we use Web Mercator, EPSG:3857)
    G_proj = ox.project_graph(G)
    
    # Convert the projected nodes to a GeoDataFrame
    nodes_proj, _ = ox.graph_to_gdfs(G_proj)
    
    # Compute the convex hull of all node geometries using union_all() (the recommended approach)
    convex_hull = nodes_proj.geometry.union_all().convex_hull
    
    # Calculate the area in square kilometers (convex_hull.area is in square meters)
    area_sqkm = convex_hull.area / 1e6
    return len(nodes_proj), area_sqkm

def compute_intersection_metrics(location):
    """
    Given a location string (e.g., "Boston, MA, USA"), returns a tuple:
      (number of intersections, area in square kilometers)
    If an error occurs, returns (None, None).
    """
    try:
        intersections, area = get_intersection_density(location)
        return intersections, area
    except Exception as e:
        print(f"Error processing {location}: {e}")
        return None, None


In [4]:

# ----------------------
# ASYNCHRONOUS + PARALLEL VERSION
# ----------------------
# (NEW: This version explicitly creates a ThreadPoolExecutor and uses loop.run_in_executor)

async def process_locations_async_parallel(locations, executor):
    """
    Given a list of location strings and an executor, concurrently compute intersection metrics
    using loop.run_in_executor.
    Returns a list of results (tuples).
    """
    loop = asyncio.get_event_loop()
    tasks = [loop.run_in_executor(executor, compute_intersection_metrics, loc)
             for loc in locations]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    return results

def add_intersection_metrics_async_parallel(df, town_col='town', state_col='state_name', max_workers=5):
    """
    Asynchronous + Parallel version:
    Given a DataFrame with columns for town and state name,
    constructs a 'location' string ("town, state, USA"),
    computes intersection metrics for each location asynchronously using an explicit ThreadPoolExecutor,
    and adds the following columns to the DataFrame:
      - 'num_intersections'
      - 'area_sqkm'
      - 'intersection_density'
    Returns the updated DataFrame.
    """
    df = df.copy()
    df["location"] = df.apply(lambda row: f"{row[town_col]}, {row[state_col]}, USA", axis=1)
    locations = df["location"].tolist()
    
    executor = ThreadPoolExecutor(max_workers=max_workers)
    try:
        try:
            results = asyncio.run(process_locations_async_parallel(locations, executor))
        except RuntimeError as e:
            nest_asyncio.apply()
            loop = asyncio.get_event_loop()
            results = loop.run_until_complete(process_locations_async_parallel(locations, executor))
    finally:
        executor.shutdown(wait=True)
    
    df["num_intersections"] = [result[0] for result in results]
    df["area_sqkm"] = [result[1] for result in results]
    df["intersection_density"] = df.apply(
        lambda row: row["num_intersections"] / row["area_sqkm"]
        if row["area_sqkm"] and row["area_sqkm"] > 0 else None,
        axis=1
    )
    return df



In [None]:
def main():
    """
    Main function to execute asynchronous + parallel processing for intersection metrics.

    Steps:
    1. Start asynchronous + parallel execution for intersection metrics.
    2. Measure execution time.
    3. Save the processed DataFrame to a CSV file.

    Returns:
    None
    """

    print("Starting asynchronous + parallel execution...")
    start_async_parallel = time.time()

    # Run the asynchronous + parallel function
    df_final = add_intersection_metrics_async_parallel(df, max_workers=10)

    time_async_parallel = time.time() - start_async_parallel
    print("Asynchronous + parallel execution completed.")
    print(f"Asynchronous + parallel execution time: {time_async_parallel:.2f} seconds\n")

    # Save the final DataFrame
    output_filepath = os.path.join('..', 'data', "acs_with_area_and_intersection.csv")
    print(f"Saving final DataFrame to {output_filepath}...")
    df_final.to_csv(output_filepath, index=False)
    print("File saved successfully.")


In [None]:
main()