<a href="https://colab.research.google.com/github/taymnichols/education_deserts/blob/main/education_deserts.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install geopandas
!pip install pyjanitor
!pip install tqdm census

Collecting pyjanitor
  Downloading pyjanitor-0.30.0-py3-none-any.whl.metadata (5.8 kB)
Collecting pandas_flavor (from pyjanitor)
  Downloading pandas_flavor-0.6.0-py3-none-any.whl.metadata (6.3 kB)
Downloading pyjanitor-0.30.0-py3-none-any.whl (207 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m207.9/207.9 kB[0m [31m6.0 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading pandas_flavor-0.6.0-py3-none-any.whl (7.2 kB)
Installing collected packages: pandas_flavor, pyjanitor
Successfully installed pandas_flavor-0.6.0 pyjanitor-0.30.0
Collecting census
  Downloading census-0.8.22-py3-none-any.whl.metadata (8.1 kB)
Downloading census-0.8.22-py3-none-any.whl (11 kB)
Installing collected packages: census
Successfully installed census-0.8.22


In [4]:
import pandas as pd
import geopandas as gpd
import janitor
from tqdm.notebook import tqdm
from census import Census
from google.colab import userdata
from google.colab import files

# Get Census API key from Google Colab's saved data
census_api_key = userdata.get('CENSUS_API_KEY')

if not census_api_key:
    census_api_key = input("Please enter your Census API key: ")
    userdata.set('CENSUS_API_KEY', census_api_key)  # Save it for future use

# Initialize Census client
census_client = Census(census_api_key)

def get_state_fips():
    """Creates a list of state codes that the Census Bureau uses to identify states."""
    return {
        'AL': '01', 'AK': '02', 'AZ': '04', 'AR': '05', 'CA': '06', 'CO': '08', 'CT': '09',
        'DE': '10', 'FL': '12', 'GA': '13', 'HI': '15', 'ID': '16', 'IL': '17', 'IN': '18',
        'IA': '19', 'KS': '20', 'KY': '21', 'LA': '22', 'ME': '23', 'MD': '24', 'MA': '25',
        'MI': '26', 'MN': '27', 'MS': '28', 'MO': '29', 'MT': '30', 'NE': '31', 'NV': '32',
        'NH': '33', 'NJ': '34', 'NM': '35', 'NY': '36', 'NC': '37', 'ND': '38', 'OH': '39',
        'OK': '40', 'OR': '41', 'PA': '42', 'RI': '44', 'SC': '45', 'SD': '46', 'TN': '47',
        'TX': '48', 'UT': '49', 'VT': '50', 'VA': '51', 'WA': '53', 'WV': '54', 'WI': '55',
        'WY': '56'
    }

def load_census_tracts():
    """Downloads a map of all census tracts in the US."""
    print("Loading census tract map data...")
    census_tracts_url = 'https://www2.census.gov/geo/tiger/GENZ2020/shp/cb_2020_us_tract_500k.zip'
    census_tracts = gpd.read_file(census_tracts_url)
    census_tracts = census_tracts.set_crs('EPSG:4269', allow_override=True)
    return census_tracts.to_crs('EPSG:3857')

def get_census_population(census_client, state_fips_dict):
    """Gets population data for every census tract in the US."""
    all_census_data = []

    for state_abbr, state_fips in tqdm(state_fips_dict.items(), desc="Getting population data"):
        try:
            state_census_data = census_client.acs5.state_county_tract(
                fields=('NAME', 'B01003_001E'),
                state_fips=state_fips,
                county_fips='*',
                tract='*',
                year=2020
            )
            all_census_data.extend(state_census_data)
        except Exception as e:
            print(f"Couldn't get data for {state_abbr}: {e}")
            continue

    census_df = pd.DataFrame(all_census_data)
    census_df = census_df.rename(columns={'B01003_001E': 'total_population'})
    census_df['GEOID'] = (census_df['state'].astype(str).str.zfill(2) +
                         census_df['county'].astype(str).str.zfill(3) +
                         census_df['tract'].astype(str).str.zfill(6))

    return census_df[['GEOID', 'total_population']]

def process_college_data(addresses_url):
    """Loads and organizes list of colleges."""
    print("Reading college location data...")
    college_addresses = pd.read_csv(addresses_url)
    college_addresses.columns = college_addresses.columns.str.lower()
    college_addresses = college_addresses.clean_names()

    important_columns = [
        'unitid', 'institution_name', 'fips_hd2023_', 'sector_hd2023_',
        'addr_hd2023_', 'city_hd2023_', 'stabbr_hd2023_', 'zip_hd2023_',
        'iclevel_hd2023_', 'control_hd2023_', 'latitude', 'longitude',
        'instcat_hd2023_'
    ]

    return college_addresses[
        college_addresses['sector_hd2023_'].isin([1, 2, 3, 4])
    ][important_columns]

def calculate_buffer_intersections(college, state_tracts):
    """Calculate intersection between college buffer and census tracts."""
    results = []
    intersecting_tracts = state_tracts[state_tracts.intersects(college['buffer'])]

    for _, tract in intersecting_tracts.iterrows():
        overlap = tract.geometry.intersection(college['buffer'])
        tract_total_area = tract.geometry.area
        overlap_area = overlap.area
        percent_in_range = (overlap_area / tract_total_area) * 100

        results.append({
            'unitid': college['unitid'],
            'institution_name': college['institution_name'],
            'state': college['stabbr_hd2023_'],
            'GEOID': tract['GEOID'],
            'sector_hd2023_': college['sector_hd2023_'],
            'total_tract_area': tract_total_area,
            'intersected_area': overlap_area,
            'tract_buffer_percentage': percent_in_range,
            'tract_total_population': tract['total_population'],
            'estimated_population_in_buffer': (percent_in_range / 100) * tract['total_population']
        })

    return results

def calculate_national_tract_buffer_percentage(addresses_url, census_api_key, buffer_distance_km=48.2803):
    """Calculate buffer percentages and identify education deserts."""
    # Initialize
    census_client = Census(census_api_key)
    state_fips_dict = get_state_fips()

    # Load and prepare data
    census_tract_boundaries = load_census_tracts()
    tract_population_data = get_census_population(census_client, state_fips_dict)
    census_tracts = census_tract_boundaries.merge(tract_population_data, on='GEOID', how='left')

    # Process college data
    colleges = process_college_data(addresses_url)

    colleges_gdf = gpd.GeoDataFrame(
        colleges,
        geometry=gpd.points_from_xy(colleges.longitude, colleges.latitude),
        crs='EPSG:4326'
    ).to_crs('EPSG:3857')

    colleges_gdf['buffer'] = colleges_gdf.geometry.buffer(buffer_distance_km * 1000)

    all_detailed_results = []
    all_summary_results = []

    for state_abbr, state_fips in tqdm(state_fips_dict.items(), desc="Analyzing states"):
        print(f"\nLooking at {state_abbr}...")

        state_colleges = colleges_gdf[colleges_gdf['stabbr_hd2023_'] == state_abbr]
        state_tracts = census_tracts[census_tracts['GEOID'].str.startswith(state_fips)]

        if len(state_colleges) == 0:
            print(f"No colleges found in {state_abbr}")
            continue

        state_detailed_results = []
        for _, college in tqdm(state_colleges.iterrows(), desc=f"Analyzing colleges in {state_abbr}"):
            results = calculate_buffer_intersections(college, state_tracts)
            #WHAT IS THIS?
            state_detailed_results.extend(results)

        if state_detailed_results:
            state_results_df = pd.DataFrame(state_detailed_results)
            state_results_df.to_csv(f'{state_abbr.lower()}_tract_buffer_detailed_results.csv', index=False)

            #WHAT IS THIS? what is aggregate? why is it doing mean max min?
            state_summary = state_results_df.groupby(['unitid', 'institution_name', 'state']).agg({
                'GEOID': 'count',
                'tract_buffer_percentage': ['mean', 'max', 'min']
            }).reset_index()

            state_summary.columns = [
                'unitid', 'institution_name', 'state',
                'total_tracts_intersected',
                'avg_tract_buffer_percentage',
                'max_tract_buffer_percentage',
                'min_tract_buffer_percentage'
            ]

            all_detailed_results.extend(state_detailed_results)
            all_summary_results.append(state_summary)

    final_detailed_results = pd.DataFrame(all_detailed_results)
    final_summary_results = pd.concat(all_summary_results, ignore_index=True)

    #WHAT IS THIS
    education_deserts = (
        final_detailed_results.groupby('GEOID')
        .agg({
            'state': 'first',
            'tract_total_population': 'first',
            'estimated_population_in_buffer': 'sum'
        })
        .reset_index()
    )

    education_deserts['percent_population_with_access'] = (
        education_deserts['estimated_population_in_buffer'] /
        education_deserts['tract_total_population'] * 100
    )

    #CAN WE TAKE THIS OUT - ask claude - end goal is to do X, do i still need this or should it happen later?
    education_deserts = education_deserts[
        education_deserts['percent_population_with_access'] < 25
    ]

    tract_level_summary = summarize_tract_access(final_detailed_results)

    final_detailed_results.to_csv('national_tract_buffer_detailed_results.csv', index=False)
    final_summary_results.to_csv('national_tract_buffer_summary_results.csv', index=False)
    tract_level_summary.to_csv('tract_level_college_access_summary.csv', index=False)

    return final_summary_results, final_detailed_results, census_tracts, education_deserts, tract_level_summary

def summarize_tract_access(final_detailed_results):
    """Create summary DataFrame showing college access statistics for each census tract."""
    def get_sector_types(sector_list):
        sector_map = {
            1: "Public 4-year",
            2: "Private nonprofit 4-year",
            3: "Private for-profit 4-year",
            4: "Public 2-year"
        }
        sectors = set(sector_list)
        return ', '.join(sector_map[s] for s in sectors if s in sector_map)

    tract_summary = (
        final_detailed_results.groupby('GEOID')
        .agg({
            'state': 'first',
            'tract_total_population': 'first',
            'tract_buffer_percentage': 'max',
            'estimated_population_in_buffer': 'max',
            'institution_name': lambda x: ', '.join(set(x)),
            'unitid': 'nunique',
            'sector_hd2023_': lambda x: get_sector_types(x)
        })
        .reset_index()
    )

    tract_summary = tract_summary.rename(columns={
        'tract_buffer_percentage': 'total_buffer_coverage',
        'unitid': 'number_of_nearby_colleges',
        'sector_hd2023_': 'college_types'
    })

    tract_summary['total_buffer_coverage'] = tract_summary['total_buffer_coverage'].clip(upper=100)
    tract_summary['estimated_population_in_buffer'] = tract_summary['estimated_population_in_buffer'].round(0).astype(int)
    tract_summary['tract_total_population'] = tract_summary['tract_total_population'].round(0).astype(int)
    tract_summary['estimated_population_in_buffer'] = tract_summary[['estimated_population_in_buffer', 'tract_total_population']].min(axis=1)

    tract_summary['population_outside_buffer'] = (
        tract_summary['tract_total_population'] -
        tract_summary['estimated_population_in_buffer']
    )

    tract_summary['percent_population_with_access'] = (
        (tract_summary['estimated_population_in_buffer'] /
         tract_summary['tract_total_population']) * 100
    ).round(2)

    tract_summary['percent_population_without_access'] = (
        (tract_summary['population_outside_buffer'] /
         tract_summary['tract_total_population']) * 100
    ).round(2)

    tract_summary['percent_population_with_access'] = tract_summary['percent_population_with_access'].clip(upper=100)
    tract_summary['percent_population_without_access'] = tract_summary['percent_population_without_access'].clip(upper=100)

    final_columns = [
        'GEOID',
        'state',
        'tract_total_population',
        'number_of_nearby_colleges',
        'college_types',
        'institution_name',
        'total_buffer_coverage',
        'estimated_population_in_buffer',
        'population_outside_buffer',
        'percent_population_with_access',
        'percent_population_without_access'
    ]

    return tract_summary[final_columns]

def main():
    """Run the analysis and display tract-level results."""
    addresses_url = "https://raw.githubusercontent.com/taymnichols/education_deserts/refs/heads/main/census_data/geocoded_addresses.csv"

    summary_results, detailed_results, census_tracts, education_deserts, tract_summary = calculate_national_tract_buffer_percentage(
        addresses_url,
        census_api_key,
        buffer_distance_km=48.2803
    )

    print("\nAnalysis complete! Summary of college access by census tract:")
    print(f"\nTotal census tracts analyzed: {len(tract_summary)}")
    print(f"\nAverage percent of population with college access: {tract_summary['percent_population_with_access'].mean():.1f}%")
    print(f"\nNumber of tracts with less than 25% access: {len(tract_summary[tract_summary['percent_population_with_access'] < 25])}")

    print("\nFirst few rows of tract-level summary:")
    print(tract_summary.head())
    tract_summary.to_csv('tract_level_college_access_summary.csv', index=False)
    census_tracts.to_csv('census_tracts.csv', index=False)

if __name__ == "__main__":
    main()

files.download('tract_level_college_access_summary.csv')

# Download the census tracts CSV
files.download('census_tracts.csv')

# Download the national detailed results CSV
files.download('national_tract_buffer_detailed_results.csv')

# Download the national summary results CSV
files.download('national_tract_buffer_summary_results.csv')

  and should_run_async(code)


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>