In [1]:
import numpy as np
import pandas as pd
import os
import zipfile
from urllib.request import urlretrieve
from urllib.error import HTTPError
import pathlib
import gzip
import shutil
import requests
import re
from urllib.parse import urljoin

In [2]:
class DataGetter:
    """ 
    Given a year of interest, finds the relevant data files and randomly selects 100 coastal and 100 inland counties.
    Outputs a csv of those sample counties for that year. 
    Each county has it's respective population, average property damage from a storm event, and if it is coastal.

    Attributes:
        year: int, year of interest
    """

    def __init__(self, year, random_seed=None, zone=False, ignore_zero_damage=False):
        self.year = year
        self.coastal_sample = None
        self.inland_sample = None
        self.total_sample = None
        self.random_seed = random_seed
        self.zone = zone
        self.ignore_zero_damage = ignore_zero_damage
        
        if random_seed is not None:
            np.random.seed(random_seed)
        

        self.PARENT_DIRECTORY = pathlib.Path().resolve()

        self.non_MAINLAND_STATES_FIPS = ['02','15'] + ['60','64','66','68','69','70','72','74','78']

        self.COASTAL_COUNTIES_2024 = self.PARENT_DIRECTORY/ "Required_CSV" / "Coastal_Counties_2024.csv"
        self.CENSUS_DIRECTORY_URL = self.get_population_totals_county_url(self.year)
        self.STORM_DIRECTORY_URL = self.get_stormevents_detail_url(self.year)

        self.zone_county_reference = self.PARENT_DIRECTORY / "Required_CSV" / "zone_county_reference.csv"

        self.data_path = self.PARENT_DIRECTORY / f"{self.year}"

        self.download_data()

        # check that data was downloaded successfully
        census_csv_path = self.data_path / "census.csv"
        storm_csv_path = self.data_path / "storm.csv"

        if not census_csv_path.exists():
            raise ValueError(f"Census CSV file not found in the project folder for year {self.year} after download.")
        
        if not storm_csv_path.exists():
            raise ValueError(f"Storm CSV file not found in the project folder for year {self.year} after download.")

        self.data_paths = self.find_data()

        self.clean_all_dfs()

        # get counties with damage and store in self.damage_by_county for future use
        self.damage_by_county = None
        self.counties_with_damage = self.counties_with_damage_getter()
        
        self.make_final_dataset()

        # delete census and storm csv to save space and if re-running the code it will re-download fresh copies.
        try:
            os.remove(census_csv_path)
            os.remove(storm_csv_path)
        except OSError:
            pass
    
    def get_population_totals_county_url(self,year: int) -> str:
        """
        Return direct URL to the 'population totals by county' file (CSV or ZIP)
        for the requested year from the Census Popest datasets.

        Args:
            year (int): year to find (e.g. 2024)

        Returns:
            str: direct URL to the matching file (CSV or ZIP)

        Raises:
            ValueError: if no matching file is found
            requests.HTTPError: for HTTP errors while fetching pages
        """

        BASE_URL = "https://www2.census.gov/programs-surveys/popest/datasets/"
        # 1) Get top-level ranges
        r = requests.get(BASE_URL)
        r.raise_for_status()
        ranges = re.findall(r'href="(\d{4}-\d{4})/', r.text)
        if not ranges:
            raise ValueError("No dataset ranges found on the Census site.")

        # 2) pick the range that contains the year
        target_range = None
        for rng in ranges:
            start, end = map(int, rng.split('-'))
            if start <= year <= end:
                target_range = rng
                break
        if not target_range:
            raise ValueError(f"No dataset range covers year {year}.")

        range_url = urljoin(BASE_URL, f"{target_range}/")

        # 3) find counties/ subfolder
        r = requests.get(range_url)
        r.raise_for_status()
        subfolders = re.findall(r'href="([^"]+/)"', r.text)
        counties_subfolder = next((s for s in subfolders if "counties" in s.lower()), None)
        if not counties_subfolder:
            raise ValueError(f"No 'counties' folder found in range {target_range}.")

        counties_url = urljoin(range_url, counties_subfolder)

        # 4) fetch counties page & collect:
        #   - any files directly listed on counties page
        #   - any first-level subfolders (e.g. totals/) and files inside them
        r = requests.get(counties_url)
        r.raise_for_status()
        counties_html = r.text

        # gather files on the counties page itself
        files = re.findall(r'href="([^"]+\.(?:csv|zip))"', counties_html, re.IGNORECASE)
        file_urls = [urljoin(counties_url, f) for f in files]

        # gather first-level subfolders and search them too
        subfolders_level1 = re.findall(r'href="([^"]+/)"', counties_html)
        # filter out parent/other non-useful folders
        subfolders_level1 = [s for s in subfolders_level1 if not s.startswith('?') and s != 'Parent Directory/']

        for sub in subfolders_level1:
            sub_url = urljoin(counties_url, sub)
            try:
                rr = requests.get(sub_url)
                rr.raise_for_status()
                sub_files = re.findall(r'href="([^"]+\.(?:csv|zip))"', rr.text, re.IGNORECASE)
                file_urls.extend(urljoin(sub_url, f) for f in sub_files)
            except requests.HTTPError:
                # ignore inaccessible subfolders and continue
                continue

        # 5) pick the best matching file for the year
        # prefer filenames that include 'co-est' and the year and 'alldata' or 'pop'
        pattern = re.compile(rf'(co[-_]?est|coest|county).*{year}.*\.(csv|zip)', re.IGNORECASE)
        # fallback: any file with the year in its name
        fallback_pattern = re.compile(rf'.*{year}.*\.(csv|zip)', re.IGNORECASE)

        for url in file_urls:
            if pattern.search(url):
                return url

        for url in file_urls:
            if fallback_pattern.search(url):
                return url

        raise ValueError(f"No county population totals file found for year {year} in {counties_url}.")

    def get_stormevents_detail_url(self,year: int) -> str:
        base_url = "https://www.ncei.noaa.gov/pub/data/swdi/stormevents/csvfiles/"
        r = requests.get(base_url)
        r.raise_for_status()
        
        pattern = re.compile(
            fr'StormEvents_details-ftp_v1\.0_d{year}_c\d+\.csv\.gz'
        )
        match = pattern.search(r.text)
        if match:
            return base_url + match.group(0)
        else:
            raise ValueError(f"No StormEvents details file found for year {year}")
        
    def download_data(self):
        """ Download census and storm data for the given year. Unzip files as needed. """

        census_url = self.CENSUS_DIRECTORY_URL
        storm_url = self.STORM_DIRECTORY_URL

        # if files already exist, delete them to re-download fresh copies
        census_csv_path = self.data_path / "census.csv"
        storm_csv_path = self.data_path / "storm.csv"
        if census_csv_path.exists():
            os.remove(census_csv_path)
        if storm_csv_path.exists():
            os.remove(storm_csv_path)

        # check if year folder exists, if not create it
        if not self.data_path.exists():
            self.data_path.mkdir(parents=True, exist_ok=True)

        # Download and unzip census data
        census_year_folder = f"co-est{self.year}-alldata"
        census_zip_path = self.data_path / f"{census_year_folder}.zip"
        urlretrieve(census_url, census_zip_path)
        try:
            with zipfile.ZipFile(census_zip_path, 'r') as zip_ref:
                zip_ref.extractall(self.data_path / "census")
        except zipfile.BadZipFile:
            # If the file is not a zip file, move it to the census folder directly
            os.rename(census_zip_path, self.data_path / "census.csv")
        else:
            os.remove(census_zip_path)

        #check for census csv in project folder. if not found, raise error.
        census_csv_path = self.data_path / "census.csv"
        if not census_csv_path.exists():
            raise ValueError(f"Census CSV file not found after attempted download in the project folder for year {self.year}.")

        # Download and unzip gz file of storm data
        storm_zip_path = self.data_path / f"storm_{self.year}.csv.gz"
        try:
            urlretrieve(storm_url, storm_zip_path)
        except HTTPError:
            raise ValueError(f"Storm data for year {self.year} has been modified since creation of code.\
                              Contact creator.")
        
        # Unzip gz file
        with gzip.open(storm_zip_path, 'rb') as f_in:
            with open(self.data_path / "storm.csv", 'wb') as f_out:
                shutil.copyfileobj(f_in, f_out)
        os.remove(storm_zip_path)

        # check for storm csv in project folder. if not found, raise error.
        storm_csv_path = self.data_path / "storm.csv"
        if not storm_csv_path.exists():
            raise ValueError(f"Storm CSV file not found after attempted download in the project folder for year {self.year}.")

    def find_data(self):
        """ 
        Search for sub folder titled with self.year in the current folder. 
        Return path to each csv file in that folder as (pop, coastal, storm).
        """

        census_path = self.data_path / "census.csv"
        storm_path = self.data_path / "storm.csv"

        return (census_path, self.COASTAL_COUNTIES_2024, storm_path)

    def convert_damage(self, damage_str):
        """ Convert damage string with K, M, B suffixes to numeric value."""
        if isinstance(damage_str, str):
            damage_str = damage_str.upper()
            if 'K' in damage_str:
                return float(damage_str.replace('K', '')) * 1000
            if 'M' in damage_str:
                return float(damage_str.replace('M', '')) * 1000000
            if 'B' in damage_str:
                return float(damage_str.replace('B', '')) * 1000000000
        return pd.to_numeric(damage_str, errors='coerce')

    def clean_storm_df(self):
        """ clean storm dataframe and overwrite csv. """
        storm_df = pd.read_csv(self.data_paths[2], encoding='latin1', dtype=str)

        # clean fips codes to be strings with leading zeros as needed.
        storm_df['STATE_FIPS'] = storm_df['STATE_FIPS'].astype(str).str.zfill(2)
        storm_df['CZ_FIPS'] = storm_df['CZ_FIPS'].astype(str).str.zfill(3)
        storm_df['FIPS'] = storm_df['STATE_FIPS'] + storm_df['CZ_FIPS']

        # write storm df to csv for debugging
        #storm_df.to_csv(self.data_paths[2].with_name(f"storm_preclean_debug_{self.year}_{self.random_seed}.csv"), index=False)

        # remove non-mainland states
        storm_df = storm_df[~storm_df['STATE_FIPS'].isin(self.non_MAINLAND_STATES_FIPS)]

        if not self.zone:
            #remove rows where CZ_TYPE is not 'C' (county)
            storm_df = storm_df[storm_df['CZ_TYPE'] == 'C']

        #remove all columns except STATE_FIPS, CZ_FIPS, FIPS, DAMAGE_PROPERTY
        storm_df = storm_df[['STATE_FIPS', 'CZ_FIPS', 'FIPS', 'DAMAGE_PROPERTY']]

        # remove rows with missing DAMAGE_PROPERTY
        storm_df = storm_df[storm_df['DAMAGE_PROPERTY'].notna()]

        # write storm df to csv pre conversion for debugging
        #storm_df.to_csv(self.data_paths[2].with_name(f"storm_preconversion_debug_{self.year}_{self.random_seed}.csv"), index=False)
        
        # convert DAMAGE_PROPERTY to numeric, handling K, M, B suffixes
        storm_df['DAMAGE_PROPERTY'] = storm_df['DAMAGE_PROPERTY'].apply(self.convert_damage)

        # write storm df to csv for debugging
        #storm_df.to_csv(self.data_paths[2].with_name(f"storm_debug_{self.year}_{self.random_seed}.csv"), index=False)

        # overwrite storm csv
        storm_df.to_csv(self.data_paths[2], index=False)

        #close the dataframe
        del storm_df

    def clean_census_df(self):
        """ clean census dataframe and overwrite csv. """
        census_df = pd.read_csv(self.data_paths[0], encoding='latin1', dtype=str)

        # clean fips codes to be strings with leading zeros as needed.
        census_df['STATE'] = census_df['STATE'].astype(str).str.zfill(2)
        census_df['COUNTY'] = census_df['COUNTY'].astype(str).str.zfill(3)

        # add full fips column
        census_df['FIPS'] = census_df['STATE'] + census_df['COUNTY']

        #remove non-mainland states
        census_df = census_df[~census_df['STATE'].isin(self.non_MAINLAND_STATES_FIPS)]

        # remove all columns except STATE, COUNTY, CTYNAME, STNAME, POPESTIMATE for years of interest
        pop_col = f'POPESTIMATE{self.year}'
        census_df = census_df[['STATE', 'COUNTY', 'CTYNAME', 'STNAME', pop_col, 'FIPS']]

        # overwrite census csv
        census_df.to_csv(self.data_paths[0], index=False)

        #close the dataframe
        del census_df

    def clean_coastal_counties_df(self):
        """ clean coastal counties dataframe and overwrite csv. """
        coastal_counties_df = pd.read_csv(self.data_paths[1], encoding='latin1', dtype=str)

        # clean fips codes to be strings with leading zeros as needed.
        coastal_counties_df['countyfips'] = coastal_counties_df['countyfips'].astype(str).str.zfill(5)
        coastal_counties_df['statefips'] = coastal_counties_df['statefips'].astype(str).str.zfill(2)

        # remove non-mainland states
        coastal_counties_df = coastal_counties_df[~coastal_counties_df['statefips'].isin(self.non_MAINLAND_STATES_FIPS)]

        # remove all columns except countyfips and statefips
        coastal_counties_df = coastal_counties_df[['countyfips', 'statefips']]

        # overwrite coastal counties csv
        coastal_counties_df.to_csv(self.data_paths[1], index=False)

        #close the dataframe
        del coastal_counties_df

    def clean_all_dfs(self):
        """ do all cleaning processes for all dataframes and overwrite csvs. """
        self.clean_storm_df()
        self.clean_census_df()
        self.clean_coastal_counties_df()

    def counties_with_damage_getter(self):
        """ Return list of county fips where the TOTAL property damage from storms in that year is greater than zero.

        Returns:
            counties_with_damage: list of county fips with non-zero property damage
            
        """
        storm_df = pd.read_csv(self.data_paths[2], encoding='latin1', dtype=str)

        # Group by FIPS making damage a list containing all damage values for that county fips
        damage_by_county = storm_df.groupby('FIPS')['DAMAGE_PROPERTY'].apply(list)

        # Sum the damage values for each county fips
        damage_by_county = damage_by_county.apply(lambda x: sum([float(d) for d in x if pd.notnull(d)]))


        #write damage by county to csv for debugging
        #damage_by_county.to_csv(self.data_path / f"damage_by_county_debug_{self.year}.csv", header=['Total_Damage_Property'])


        # while where at it, store damage_by_county in self for future use
        self.damage_by_county = damage_by_county

        # Filter counties with total damage > 0
        counties_with_damage = damage_by_county[damage_by_county != 0.0].index.tolist()


        return counties_with_damage

    def sample_getter(self):
        """ Randomly select 100 coastal and 100 inland MAINLAND counties. Clean county names so that cenusus county names match coastal county names,
        that is make sure they end in COUNTY and are uppercase. Subset population data for those sample counties for the year of interest.
        Write to csv."""

        # open relevant data files.
        storm_df = pd.read_csv(self.data_paths[2], encoding='latin1', dtype=str)
        census_df = pd.read_csv(self.data_paths[0], encoding='latin1', dtype=str)
        coastal_counties_df = pd.read_csv(self.data_paths[1], encoding='latin1', dtype=str)


        if self.ignore_zero_damage:
            # remove counties with zero damage from storm data
            storm_df = storm_df[storm_df['FIPS'].isin(self.counties_with_damage)]

        ########################################################################
        # randomly sample 100 coastal counties from list of coastal counties csv that are in mainland US and have non-zero damage if required.
        ########################################################################
        
        # if ignoring zero damage, filter coastal counties to only those in counties_with_damage
        if self.ignore_zero_damage:
            coastal_counties_df = coastal_counties_df[coastal_counties_df['countyfips'].isin(self.counties_with_damage)]

        # Randomly sample 100 coastal counties
        coastal_counties_df_sample = coastal_counties_df.sample(n=100, random_state=self.random_seed)

        # census: build full 5-digit FIPS code for matching
        census_df['FIPS'] = census_df['STATE'] + census_df['COUNTY']   # 5-digit string

        # Find the coastal counties that are also in census data by matching county fips.
        self.coastal_sample = pd.merge(coastal_counties_df_sample, census_df, left_on=['countyfips'], right_on=['FIPS'], how='inner')
        
        # if less than 100 coastal counties matched, resample until we have 100
        while len(self.coastal_sample) < 100:
            #change seed for resampling
            if self.random_seed is None:
                self.random_seed = np.random.randint(1,1000)
            self.random_seed += np.random.randint(1,1000)
            np.random.seed(self.random_seed)
            coastal_counties_df_sample = coastal_counties_df.sample(n=100, random_state=self.random_seed)
            self.coastal_sample = pd.merge(coastal_counties_df_sample, census_df, left_on=['countyfips'], right_on=['FIPS'], how='inner')
    
        # Subset the census data to only include the columns of POPESTIMATE of the year, the county name, the state name, and the respective fips.
        pop_col = f'POPESTIMATE{self.year}'
        self.coastal_sample = self.coastal_sample[['countyfips','CTYNAME', 'STNAME', pop_col]]
        
        self.coastal_sample = self.coastal_sample.rename(columns={pop_col: 'Population'})
        self.coastal_sample['Coastal'] = True

        ########################################################################
        # randomly select 100 inland counties from population data that are not in coastal counties and are mainland and have non-zero damage if required.
        ########################################################################

        # get list of coastal county fips to filter inland counties.
        coastal_county_fips = coastal_counties_df['countyfips'].tolist()

        # clean census data to not include all coastal counties. Also remove non-mainland states.
        inland_counties_df = census_df[~census_df['FIPS'].isin(coastal_county_fips) & ~census_df['STATE'].isin(self.non_MAINLAND_STATES_FIPS)]

        # if ignoring zero damage, filter inland counties to only those in counties_with_damage
        if self.ignore_zero_damage:
            inland_counties_df = inland_counties_df[inland_counties_df['FIPS'].isin(self.counties_with_damage)]
        
        # randomly sample 100 inland counties
        self.inland_sample = inland_counties_df.sample(n=100, random_state=self.random_seed)

        # subset population data to only those inland counties and for the year. 
        self.inland_sample = self.inland_sample[['FIPS','CTYNAME', 'STNAME', pop_col]]
        #move fips column to match coastal sample
        self.inland_sample = self.inland_sample.rename(columns={'FIPS':'countyfips'})
        self.inland_sample = self.inland_sample.rename(columns={pop_col: 'Population'})
        self.inland_sample['Coastal'] = False
        
        # concatenate coastal and inland samples
        self.total_sample = pd.concat([self.coastal_sample, self.inland_sample], ignore_index=True)
    
    def zone_county_cleaner(self):
        """ Add parent county fips to zone_county_reference dataframe for reference later."""
        zone_reference_df = pd.read_csv(self.zone_county_reference, dtype=str)

        # Ensure all FIPS columns are clean, 5-digit strings, handling potential '.0'
        zone_reference_df['countyfips'] = zone_reference_df['countyfips'].str.split('.').str[0].str.zfill(5)

        #check if zone fips column already exists and is properly formatted
        if 'zone_fips' in zone_reference_df.columns:
            zone_reference_df['zone_fips'] = zone_reference_df['zone_fips'].str.split('.').str[0].str.zfill(5)
            return zone_reference_df

        # first find state fips from county fips by taking first two digits
        zone_reference_df['state_fips'] = zone_reference_df['countyfips'].str[:2]

        #now take last three digits from zone id and concat to state fips to make full zone fips
        zone_reference_df['zone_fips'] = zone_reference_df['state_fips'] + zone_reference_df['Zone_ID'].str[-3:]
        zone_reference_df['zone_fips'] = zone_reference_df['zone_fips'].str.zfill(5)


        # overwrite zone_county_reference csv with new column
        zone_reference_df.to_csv(self.zone_county_reference, index=False)

        return zone_reference_df
    
    def map_county_to_zone(self):
        """ Create a dictionary where the keys are zone fips and the values are all county fips in that zone."""
        # first run zone_county_cleaner to ensure zone fips are present
        zone_reference_df = self.zone_county_cleaner()

        # Create a mapping from county_fips to a list of zone_fips in that county.
        county_to_zones = {}
        for index, row in zone_reference_df.iterrows():
            county_fips = row['countyfips']
            zone_fips = row['zone_fips']
            if zone_fips not in county_to_zones:
                county_to_zones[zone_fips] = []
            county_to_zones[zone_fips].append(county_fips)

        return county_to_zones

    def property_damage_getter(self):
        """ For each county in self.coastal_sample and self.inland_sample, find average property damage per storm event for that year. """

        damage_dict = {}
        storm_df = pd.read_csv(self.data_paths[2], encoding='latin1', dtype=str)


        if self.zone:
            # map county fips to zone fips
            zone_ref_df = self.zone_county_cleaner()
            county_to_zones_map = zone_ref_df.groupby('countyfips')['zone_fips'].apply(list).to_dict()
        # else proceed as normal
        else:
            county_to_zones_map = {}

        # if self.zero_damage is True, storm_df only has counties with non-zero damage already filtered in sample_getter

        for index, row in self.total_sample.iterrows():
            county_fips = row['countyfips']
            
            # Get all relevant FIPS codes: the county itself plus all its zones if needed.
            fips_to_check = [county_fips]
            if county_fips in county_to_zones_map:
                fips_to_check.extend(county_to_zones_map[county_fips])
            
            # find the fips to check in storm data.
            county_storms = storm_df[storm_df['FIPS'].isin(fips_to_check)]
            
            
            # find the county in self.damage_by_county and get their total damage
            if county_fips in self.damage_by_county.index:
                total_property_damage = self.damage_by_county[county_fips]
                total_storms = len(county_storms)
                average_property_damage = total_property_damage / total_storms if total_storms > 0 else 0.0
                damage_dict[county_fips] = average_property_damage
            else:
                damage_dict[county_fips] = 0.0

        # add average property damage to self.total_sample
        average_damages = []
        for index, row in self.total_sample.iterrows():
            county_fips = row['countyfips']
            average_damages.append(damage_dict[county_fips])
        self.total_sample["Property_Damage_per_Storm"] = average_damages

    def make_final_dataset(self):
        """ Make data. Write final dataset for that year to the subfolder for that year. columns: CLeaned County Name, Cleaned State Name, Population, Average Property Damage, Coastal (T/F) """
        self.sample_getter()
        self.property_damage_getter()

        # determine final path based on zone and ignore_zero_damage
        if self.zone and self.ignore_zero_damage:
            final_path = self.data_path / f"data_{self.year}_zone_{self.zone}_no_zero_damage"
        elif self.zone:
            final_path = self.data_path / f"data_{self.year}_with_zone"
        elif self.ignore_zero_damage:
            final_path = self.data_path / f"data_{self.year}_no_zero_damage"
        else:
            final_path = self.data_path / f"data_{self.year}"
        final_path = str(final_path) + f"_random_seed_{self.random_seed}.csv"

        #write final dataset to csv
        self.total_sample.to_csv(final_path, index=False)

In [None]:
data = DataGetter(year=2024, ignore_zero_damage=True)