In [1]:
# Run this cell to install required packages
%pip install segment-geospatial geopandas folium ipywidgets google-auth-oauthlib google-auth-httplib2 google-api-python-client matplotlib descartes rasterio


Collecting segment-geospatial
  Downloading segment_geospatial-0.12.2-py2.py3-none-any.whl.metadata (11 kB)
Collecting descartes
  Downloading descartes-1.1.0-py3-none-any.whl.metadata (2.4 kB)
Collecting rasterio
  Downloading rasterio-1.4.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (9.1 kB)
Collecting fiona (from segment-geospatial)
  Downloading fiona-1.10.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (56 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m56.6/56.6 kB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
Collecting ipympl (from segment-geospatial)
  Downloading ipympl-0.9.4-py3-none-any.whl.metadata (8.7 kB)
Collecting leafmap (from segment-geospatial)
  Downloading leafmap-0.38.8-py2.py3-none-any.whl.metadata (16 kB)
Collecting localtileserver (from segment-geospatial)
  Downloading localtileserver-0.10.4-py3-none-any.whl.metadata (5.2 kB)
Collecting patool (from segment-geospatial)
  Downloading patool-3.0.2-

In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

# Creates the CSV files with ZIPCodes containg the patches co-ordinates

In [None]:
import os
import csv
import geopandas as gpd
import numpy as np
from shapely.geometry import box

# Constants
COUNTY_FILEPATH = r"C:\Users\GradStudent\OneDrive\Documents\Madhu RA\Coun\tl_2022_us_county.shp"
ZIPCODE_FILEPATH = r"C:\Users\GradStudent\OneDrive\Documents\Madhu RA\Zip\tl_2022_us_zcta520.shp"
SAVE_PATH = r'/content/drive/MyDrive/Madhu RA Work Folder/CSV/cook'
zoom = 22

# State FIPS codes
state_name_to_fips = {
    'Alabama': '01', 'Alaska': '02', 'Arizona': '04', 'Arkansas': '05', 'California': '06',
    'Colorado': '08', 'Connecticut': '09', 'Delaware': '10', 'District of Columbia': '11',
    'Florida': '12', 'Georgia': '13', 'Hawaii': '15', 'Idaho': '16', 'Illinois': '17',
    'Indiana': '18', 'Iowa': '19', 'Kansas': '20', 'Kentucky': '21', 'Louisiana': '22',
    'Maine': '23', 'Maryland': '24', 'Massachusetts': '25', 'Michigan': '26', 'Minnesota': '27',
    'Mississippi': '28', 'Missouri': '29', 'Montana': '30', 'Nebraska': '31', 'Nevada': '32',
    'New Hampshire': '33', 'New Jersey': '34', 'New Mexico': '35', 'New York': '36',
    'North Carolina': '37', 'North Dakota': '38', 'Ohio': '39', 'Oklahoma': '40', 'Oregon': '41',
    'Pennsylvania': '42', 'Rhode Island': '44', 'South Carolina': '45', 'South Dakota': '46',
    'Tennessee': '47', 'Texas': '48', 'Utah': '49', 'Vermont': '50', 'Virginia': '51',
    'Washington': '53', 'West Virginia': '54', 'Wisconsin': '55', 'Wyoming': '56',
    'Puerto Rico': '72'
}

def load_data():
    print("Reading county data from local file...")
    counties_gdf = gpd.read_file(COUNTY_FILEPATH)
    if counties_gdf.crs is None:
        counties_gdf.set_crs(epsg=4326, inplace=True)
    print("County data read and processed.")

    print("Reading ZIP code data from local file...")
    zipcodes_gdf = gpd.read_file(ZIPCODE_FILEPATH)
    if zipcodes_gdf.crs is None:
        zipcodes_gdf.set_crs(epsg=4326, inplace=True)
    print("ZIP code data read and processed.")

    return counties_gdf, zipcodes_gdf

def get_county_boundary(county_name, state_name, counties_gdf):
    state_fips_code = state_name_to_fips.get(state_name)
    if not state_fips_code:
        print(f"No FIPS code found for state {state_name}")
        return None

    try:
        county_gdf = counties_gdf[(counties_gdf['NAME'].str.contains(county_name, case=False)) &
                                  (counties_gdf['STATEFP'] == state_fips_code)]
        if county_gdf.empty:
            print(f"No boundary found for {county_name} County, {state_name}")
            return None
        return county_gdf
    except Exception as e:
        print(f"Error fetching boundary for {county_name} County, {state_name}: {str(e)}")
        return None

def get_zip_codes(county_geometry, county_gdf, zipcodes_gdf, threshold=0.3):
    if county_gdf.crs != zipcodes_gdf.crs:
        county_gdf = county_gdf.to_crs(zipcodes_gdf.crs)

    county_zipcodes = []
    for idx, zipcode in zipcodes_gdf.iterrows():
        intersection = zipcode.geometry.intersection(county_geometry)
        if intersection.is_empty or intersection.area == 0:
            continue
        proportion_within = intersection.area / zipcode.geometry.area
        if proportion_within >= threshold:
            county_zipcodes.append(zipcode)

    return gpd.GeoDataFrame(county_zipcodes, crs=zipcodes_gdf.crs)

def adjust_num_patches(zipcode_geometry, target_patch_size_meters):
    # Create a GeoSeries from the geometry
    geom_series = gpd.GeoSeries([zipcode_geometry], crs='EPSG:4326')

    # Estimate an appropriate UTM CRS
    utm_crs = geom_series.estimate_utm_crs()

    # Project the geometry to the UTM CRS
    geom_series_projected = geom_series.to_crs(utm_crs)
    projected_geom = geom_series_projected.iloc[0]

    minx, miny, maxx, maxy = projected_geom.bounds

    width = maxx - minx
    height = maxy - miny

    num_patches_x = max(1, int(np.ceil(width / target_patch_size_meters)))
    num_patches_y = max(1, int(np.ceil(height / target_patch_size_meters)))

    return num_patches_x, num_patches_y

def generate_patch_coordinates(zipcode_geometry, num_patches_x, num_patches_y, patch_size, zoom, county_name, state_name, zip_code):
    minx, miny, maxx, maxy = zipcode_geometry.bounds
    patch_width = (maxx - minx) / num_patches_x
    patch_height = (maxy - miny) / num_patches_y

    patches = []
    patch_count = 1
    for i in range(num_patches_x):
        for j in range(num_patches_y):
            left = minx + i * patch_width
            right = left + patch_width
            bottom = miny + j * patch_height
            top = bottom + patch_height

            # Create patch geometry
            patch_geometry = box(left, bottom, right, top)

            # Only add patches that intersect with the ZIP code boundary
            if zipcode_geometry.intersects(patch_geometry):
                patch_name = f"{county_name}_{state_name}_zipcode_{zip_code}_patch_{patch_count}.tif"
                patches.append([patch_name, left, bottom, right, top, zoom, patch_size, patch_size])
                patch_count += 1

    return patches

def save_patch_info(patches, county_name, state_name, zip_code, save_path):
    csv_filename = f"{county_name}_{state_name}_zipcode_{zip_code}_patches_info.csv"
    csv_path = os.path.join(save_path, csv_filename)
    with open(csv_path, 'w', newline='') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(['filename', 'left', 'bottom', 'right', 'top', 'zoom', 'pixel_width', 'pixel_height'])
        writer.writerows(patches)
    print(f"Patch information saved to {csv_path}")

def main():
    counties_gdf, zipcodes_gdf = load_data()

    # Fix invalid geometries in counties and ZIP codes
    counties_gdf['geometry'] = counties_gdf['geometry'].buffer(0)
    zipcodes_gdf['geometry'] = zipcodes_gdf['geometry'].buffer(0)

    # Set the target patch size in meters
    target_patch_size_meters = 100  # Adjust this value as needed

    while True:
        county_name = input("\nEnter county name (or 'quit' to exit): ").strip()
        if county_name.lower() == 'quit':
            break

        state_name = input("Enter state name (e.g., Illinois): ").strip()

        print(f"Fetching boundary for {county_name} County, {state_name}...")
        county_gdf = get_county_boundary(county_name, state_name, counties_gdf)
        if county_gdf is None:
            continue

        # Validate county geometry
        county_gdf['geometry'] = county_gdf['geometry'].buffer(0)

        print("Fetching ZIP code boundaries...")
        county_geometry = county_gdf.geometry.iloc[0]

        zipcodes = get_zip_codes(county_geometry, county_gdf, zipcodes_gdf)

        for idx, zipcode in zipcodes.iterrows():
            print(f"Processing ZIP code {zipcode['ZCTA5CE20']}...")

            # Validate ZIP code geometry
            zipcode_geometry = zipcode.geometry.buffer(0)

            # Adjust num_patches_x and num_patches_y based on ZIP code geometry
            num_patches_x, num_patches_y = adjust_num_patches(zipcode_geometry, target_patch_size_meters)

            patches = generate_patch_coordinates(
                zipcode_geometry,
                num_patches_x,
                num_patches_y,
                512,
                zoom,
                county_name,
                state_name,
                zipcode['ZCTA5CE20']
            )
            save_patch_info(patches, county_name, state_name, zipcode['ZCTA5CE20'], SAVE_PATH)

if __name__ == "__main__":
    main()
