In [1]:
import pandas as pd
import numpy as np
import requests
import re
import os
from os import getenv

In [2]:
outDIR = r"rawData\2023"
if not os.path.exists(outDIR):
    os.makedirs(outDIR)
censusAPI = getenv("CENSUS_API_KEY")

[Census acs5 api attributes table for 2023](https://api.census.gov/data/2023/acs/acs5/variables.html)

In [3]:
def Population_csv(year, api_key, out_dir):
        """
        Download ACS 5-year county-level population data for a given year
        and save it as a CSV.

        Parameters
        ----------
        year : int
                ACS 5-year end year (e.g., 2019, 2020, 2021, 2022)
        api_key : str
                Census API key
        out_dir : str
                Output directory for CSV files
        """

        url = f"https://api.census.gov/data/{year}/acs/acs5"
        params = {
                "get": "NAME,B01003_001E",
                "for": "county:*",
                "key": api_key
        }
        response = requests.get(url, params=params)
        response.raise_for_status()

        data = response.json()
        df = pd.DataFrame(data[1:], columns=data[0])

        df.rename(columns={
                "B01003_001E": "population"
        }, inplace=True)

        output_file = f"{out_dir}/county_population_{year}.csv"
        df.to_csv(output_file, index=False)

In [4]:
Population_csv(2023, censusAPI, outDIR)

In [5]:
def MedianIncome_csv(year, api_key, out_dir):
    """
    Download ACS 5-year county-level median household income for a given year
    and save it as a CSV. The median household income is inflation-adjusted to the given year.

    Parameters
    ----------
    year : int
        ACS 5-year estimate year (must be between 2009 and 2023)
    api_key : str
        Census API key
    out_dir : str
        Directory where the CSV will be written

    Output
    ------
    Writes a CSV named:
    county_median_household_income_<year>.csv
    """

    url = f"https://api.census.gov/data/{year}/acs/acs5"
    params = {
        "get": "NAME,B19013_001E",
        "for": "county:*",
        "in": "state:*",
        "key": api_key
    }

    response = requests.get(url, params=params)
    response.raise_for_status()

    data = response.json()
    df = pd.DataFrame(data[1:], columns=data[0])

    df.rename(columns={"B19013_001E": "MedianIncome"}, inplace=True)

    output_file = f"{out_dir}/county_median_household_income_{year}.csv"
    df.to_csv(output_file, index=False)


In [6]:
MedianIncome_csv(2023, censusAPI, outDIR)

In [7]:
def HouseAge_csv(year, api_key, out_dir):
    """
    Downloads ACS 5-year county-level home age data (Table B25034)
    for a given year and saves it as a CSV.

    Parameters
    ----------
    year : int
        ACS 5-year end year (e.g., 2019, 2020, 2021, 2022)
    api_key : str
        Census API key
    out_dir : str
        Output directory for CSV files
    """


    # ACS 5-year endpoint
    base_url = f"https://api.census.gov/data/{year}/acs/acs5"

    # B25034: Year structure built
    variables = [
        "NAME",
        "B25034_001E",  # Total housing units
        "B25034_002E",  # Built 2020 or later (recent years vary by ACS year)
        "B25034_003E",  # Built 2010 to 2019
        "B25034_004E",  # Built 2000 to 2009
        "B25034_005E",  # Built 1990 to 1999
        "B25034_006E",  # Built 1980 to 1989
        "B25034_007E",  # Built 1970 to 1979
        "B25034_008E",  # Built 1960 to 1969
        "B25034_009E",  # Built 1950 to 1959
        "B25034_010E",  # Built 1940 to 1949
        "B25035_001E"   # Median age of housing units
    ]

    params = {
        "get": ",".join(variables),
        "for": "county:*",
        "in": "state:*",
        "key": api_key
    }

    response = requests.get(base_url, params=params)
    response.raise_for_status()

    data = response.json()

    df = pd.DataFrame(data[1:], columns=data[0])

    # Convert numeric columns
    for col in variables[1:12]:
        df[col] = pd.to_numeric(df[col], errors="coerce")

    # rename columns for clarity
    df.rename(columns={
        "B25034_001E": "total_housing_units",
        "B25034_002E": "built_2020_or_later",
        "B25034_003E": "built_2010_to_2019",
        "B25034_004E": "built_2000_to_2009",
        "B25034_005E": "built_1990_to_1999",
        "B25034_006E": "built_1980_to_1989",
        "B25034_007E": "built_1970_to_1979",
        "B25034_008E": "built_1960_to_1969",
        "B25034_009E": "built_1950_to_1959",
        "B25034_010E": "built_1940_to_1949",
        "B25035_001E": "MEDIAN_YEAR_BUILT"
    }, inplace=True)

    output_file = f"{out_dir}/county_house_age_{year}.csv"
    df.to_csv(output_file, index=False)

In [8]:
HouseAge_csv(2023, censusAPI, outDIR)

In [9]:
def RONI_csv(year,out_dir,rawData):

    # read in the csv file
    df = pd.read_csv(rawData)

    # set the year as the index
    df = df.set_index('Year')

    # rename the columns DJF becomes december, january, and february
    df = df.rename(columns={'DJF': 'DECEMBER_JANUARY_FEBRUARY'})
    df = df.rename(columns={'JFM': 'JANUARY_FEBRUARY_MARCH'})
    df = df.rename(columns={'FMA': 'FEBRUARY_MARCH_APRIL'})
    df = df.rename(columns={'MAM': 'MARCH_APRIL_MAY'})
    df = df.rename(columns={'AMJ': 'APRIL_MAY_JUNE'})
    df = df.rename(columns={'MJJ': 'MAY_JUNE_JULY'})
    df = df.rename(columns={'JJA': 'JUNE_JULY_AUGUST'})
    df = df.rename(columns={'JAS': 'JULY_AUGUST_SEPTEMBER'})
    df = df.rename(columns={'ASO': 'AUGUST_SEPTEMBER_OCTOBER'})
    df = df.rename(columns={'SON': 'SEPTEMBER_OCTOBER_NOVEMBER'})
    df = df.rename(columns={'OND': 'OCTOBER_NOVEMBER_DECEMBER'})
    df = df.rename(columns={'NDJ': 'NOVEMBER_DECEMBER_JANUARY'})

    # split each column with the month names into three separate columns with the month name as the column name and the value as the value.
    # if month exists already add the value to the existing column.
    month_columns = ['DECEMBER_JANUARY_FEBRUARY', 'JANUARY_FEBRUARY_MARCH', 'FEBRUARY_MARCH_APRIL', 'MARCH_APRIL_MAY', 'APRIL_MAY_JUNE', 'MAY_JUNE_JULY', 'JUNE_JULY_AUGUST', 'JULY_AUGUST_SEPTEMBER', 'AUGUST_SEPTEMBER_OCTOBER', 'SEPTEMBER_OCTOBER_NOVEMBER', 'OCTOBER_NOVEMBER_DECEMBER', 'NOVEMBER_DECEMBER_JANUARY']
    for month_column in month_columns:
        months = month_column.split('_')
        for month in months:
            col_name = month
            if col_name in df.columns:
                df[col_name] += df[month_column]
            else:
                df[col_name] = df[month_column]
    
        # drop the original column
        df = df.drop(columns=[month_column])

    # divide all month columns by 3 to get the average value for each month
    df = df.div(3)

    # get the row corresponding to the given year
    df = df.loc[year]

    # write the dataframe to a csv file
    output_file = f"{out_dir}/RONI_{year}.csv"
    df.to_csv(output_file)
    

In [17]:
RONI_csv(2023, outDIR, r"RONI_data\rawData.csv")

In [11]:
def stormDamage_csv(year, out_dir):
        """
        Download NOAA Storm Events Database damage data for a given year
        and save it as a CSV.
        """

        # 1. Get the URL for the StormEvents details file for the given year
        base_url = "https://www.ncei.noaa.gov/pub/data/swdi/stormevents/csvfiles/"
        r = requests.get(base_url)
        r.raise_for_status()
        
        pattern = re.compile(
            fr'StormEvents_details-ftp_v1\.0_d{year}_c\d+\.csv\.gz'
        )
        match = pattern.search(r.text)
        if match:
            url = str(base_url + match.group(0))
        else:
            raise ValueError(f"No StormEvents details file found for year {year}")
        
        # 2. Download and read the CSV file into a DataFrame
        df = pd.read_csv(url, compression='gzip', low_memory=False)


        # 3. save the DataFrame as a CSV file in the specified output directory
        output_file = f"{out_dir}/StormData_{year}.csv"
        df.to_csv(output_file, index=False)

In [12]:
stormDamage_csv(2023, outDIR)

In [13]:
def tempAnomaly(year, out_dir):
    """
    Downloads monthly county temperature CSVs for a full year from NOAA 
    and merges them into a single master file.
    """
    base_url = "https://www.ncei.noaa.gov/access/monitoring/climate-at-a-glance/county/mapping/110/tavg/"
    all_monthly_data = []

    # make temporary directory for raw monthly files in output directory
    temp_dir = f"{out_dir}/temporary_monthly"
    os.makedirs(temp_dir, exist_ok=True)

    for month in range(1, 13):
        # Format month to YYYYMM (e.g., 202301)
        date_code = f"{year}{str(month).zfill(2)}"
        url = f"{base_url}{date_code}.csv"

        response = requests.get(url)

        if response.status_code == 200:
            # Save raw file
            file_path = f"{temp_dir}/temperature_anomalies_{date_code}.csv"
            with open(file_path, "wb") as f:
                f.write(response.content)

            # Read the CSV into a DataFrame and add a 'month' column
            monthly_df = pd.read_csv(file_path, skiprows=3)  # Skip metadata rows
            monthly_df['MONTH'] = month

            # change column that starts with location id to fips code
            monthly_df['PartialFIPS'] = monthly_df['ID']

            # change value column to temperature
            monthly_df.rename(columns={'Value': 'TEMPERATURE'}, inplace=True)

            all_monthly_data.append(monthly_df[['PartialFIPS','TEMPERATURE', 'Anomaly (1901-2000 base period)', 'MONTH']])
    
    # Merge all monthly DataFrames into a single master DataFrame
    master_df = pd.concat(all_monthly_data, ignore_index=True)
    master_df.to_csv(f"{out_dir}/temperature_anomalies_{year}.csv", index=False)

    # delete temporary directory and raw monthly files
    for month in range(1, 13):
        date_code = f"{year}{str(month).zfill(2)}"
        file_path = f"{temp_dir}/temperature_anomalies_{date_code}.csv"
        os.remove(file_path)
    os.rmdir(temp_dir)
            

In [14]:
tempAnomaly(2023, outDIR)

In [15]:
def coastalType(out_dir):
    """
    2010 shoreline and watershed county data from NOAA ArcGIS service. This is a static dataset that doesn't change by year, but we include the year parameter for consistency with other functions and to allow for future updates if needed.
    """

    def arcgis_query_all(layer_query_url: str, out_fields: str):
        """
        Helper for shoreline and watershed counties: Query an ArcGIS layer and return ALL records, handling pagination.
        """
        rows = []
        offset = 0
        page_size = 2000  # service MaxRecordCount is 2000

        while True:
            params = {
                "where": "1=1",
                "outFields": out_fields,
                "returnGeometry": "false",
                "f": "json",
                "resultOffset": offset,
                "resultRecordCount": page_size,
            }
            r = requests.get(layer_query_url, params=params, timeout=120)
            r.raise_for_status()
            data = r.json()

            # If ArcGIS returns an error, it will look like {"error": {...}}
            if "error" in data:
                raise RuntimeError(f"ArcGIS error from {layer_query_url}: {data['error']}")

            feats = data.get("features", [])
            if not feats:
                break

            for f in feats:
                rows.append(f.get("attributes", {}))

            offset += len(feats)

            # Some services also include exceededTransferLimit; weâ€™re paginating anyway.
            if len(feats) < page_size:
                break

        return pd.DataFrame(rows)

    def ShorelineCounties_csv():
        """
        Downloads shoreline counties from NOAA ArcGIS service
        and saves it as a CSV.
        """

        
        SHORELINE_LAYER = "https://maps1.coast.noaa.gov/arcgis/rest/services/Landcover/Coastal_County_Update_Review/MapServer/9/query"

        df = arcgis_query_all(
            SHORELINE_LAYER,
            out_fields="fips,cntyname,st_name"
        )

        df = df.rename(columns={
            "fips": "FIPS",
            "cntyname": "COUNTY",
            "st_name": "STATE_NAME",
        })

        df["FIPS"] = df["FIPS"].astype(str).str.zfill(5)
        df["COASTAL_TYPE_SHORELINE"] = "shoreline"

        # combine names county and state into one column with format "County, State"
        df["NAME"] = df["COUNTY"] + ", " + df["STATE_NAME"]

        # only relevant columns
        return df[["FIPS", "COASTAL_TYPE_SHORELINE", "NAME"]]

    def WatershedCounties_csv():
        """
        Downloads watershed counties from NOAA ArcGIS service
        and saves it as a CSV.
        """
        
        WATERSHED_LAYER = "https://maps1.coast.noaa.gov/arcgis/rest/services/Landcover/Coastal_County_Update_Review/MapServer/33/query"

        df = arcgis_query_all(
            WATERSHED_LAYER,
            out_fields="fips,cntyname,st_name"
        )
        df = df.rename(columns={
            "fips": "FIPS",
            "cntyname": "COUNTY",
            "st_name": "STATE_NAME",
        })

        df["FIPS"] = df["FIPS"].astype(str).str.zfill(5)
        df["COASTAL_TYPE_WATERSHED"] = "watershed"

        # combine names county and state into one column with format "County, State"
        df["NAME"] = df["COUNTY"] + ", " + df["STATE_NAME"]

        return df[["FIPS", "COASTAL_TYPE_WATERSHED", "NAME"]]

    shoreline_df = ShorelineCounties_csv()
    watershed_df = WatershedCounties_csv()

    # merge shoreline and watershed dataframes on FIPS code, keeping all rows (outer join)
    df = pd.merge(shoreline_df, watershed_df, on="FIPS", how="outer", suffixes=("_SHORELINE", "_WATERSHED"))
    df.to_csv(f"{out_dir}/coastal_counties_2010.csv", index=False)

In [16]:
coastalType(outDIR)