In [None]:
import ee
import os
import pandas as pd
import geopandas as gpd
from shapely.geometry import Point
ee.Authenticate()

In [None]:
import ee, os, pandas as pd, duckdb
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
import geopandas as gpd
import glob

# Authenticate & initialize Earth Engine
ee.Initialize(project='ee-mkmitchellducks')

# -------------------------
# CONFIG
# -------------------------
csv_folder = "/mnt/c/gbif"
parquet_folder = "/mnt/c/ebirdpolars"

asset_folder = "projects/ee-mkmitchellducks/assets/gbif"
aoifile = '/mnt/e/gis/BaseData/MAV_Boundary_4326_wkb.parquet'

scale = 100  # Adjust based on your imagery resolution
aoi_gdf = gpd.read_parquet(aoifile)
aoi_geom = ee.Geometry.Polygon(list(aoi_gdf.geometry.union_all().exterior.coords))
# Buffer by 5000 m for focal stats
aoi_buffered = aoi_geom.buffer(5000)  # 5 km buffer

species_list = [
    "Protonotaria citrea",
    "Limnothlypis swainsonii",
    "Setophaga americana",
    "Empidonax virescens",
    "Coccyzus americanus",
    "Vireo griseus",
    "Setophaga cerulea",
    "Hylocichla mustelina",
    "Parkesia motacilla",
    "Geothlypis formosa",
    "Archilochus colubris",
    "Elanoides forficatus",
    "Vireo flavifrons",
    "Buteo lineatus",
    "Setophaga dominica",
    "Setophaga citrina",
    "Dryocopus pileatus",
    "Meleagris gallopavo",
    "Sphyrapicus varius",
    #"Odocoileus virginianus",     # white tailed deer
    #"Ursus americanus",           # Black bear
    #"Anaxyrus americanus",        # American Toad
    #"Anaxyrus fowleri",           # Fowler's Toad
    #"Gastrophryne carolinensis",  # Eastern Narrow-mouthed Toad
    #"Hyla avivoca",               # Bird-voiced Treefrog
    #"Hyla chrysoscelis",          # Cope's Gray Treefrog
    #"Hyla cinerea",               # Green Treefrog
    #"Hyla squirella",             # Squirrel Treefrog    
    #"Hyla versicolor",            # Gray Treefrog
    #"Lithobates catesbeianus",    # American Bullfrog
    #"Lithobates clamitans",       # Bronze Frog
    #"Lithobates palustris",       # Pickerel Frog
    #"Lithobates sphenocephalus",  # Southern Leopard Frog
    #"Pseudacris crucifer",        # Spring Peeper
    #"Pseudacris fouquettei",      # Cajun Chorus Frog
    #"Kinosternon subrubrum",      # Eastern Mud Turtle
    #"Apalone spinifera",          # Spiny Softshell Turtle   
    #"Macrochelys temmincki"       # Alligator Snapping Turtle    
    ]
species_list = [item.strip().lower().replace(' ', '_') for item in species_list]
log_file = "process_log.txt"

# -------------------------
# Logging
# -------------------------
def log(message):
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    with open(log_file, "a") as f:
        f.write(f"[{timestamp}] {message}\n")
    print(message)

# -------------------------
# Load species data from CSV or DuckDB
# -------------------------
def load_species_data(species):
    csv_files = glob.glob(os.path.join(csv_folder, f"{species}*.csv"))
    df = None
    if len(csv_files)>0:
        print('Reading csv')
        try:
            # Read and combine all CSVs
            df_list = [pd.read_csv(f) for f in csv_files]
            df = pd.concat(df_list, ignore_index=True)
            #df['date'] = pd.to_datetime(df['eventdate'])
            df['date'] = pd.to_datetime(df[['year', 'month', 'day']])
            df = df[['basisofrecord', 'species', 'latitude', 'longitude','coordinateuncertaintyinmeters', 'date']]
            df = df.dropna(how='any')
            df = df[df['coordinateuncertaintyinmeters'] <=100]
            log(f"✔ Loaded CSV for {species}")
        except Exception as e:
            log(f"⚠️ Failed to read CSV for {species}: {e}")
            return None
    else:
        print('Reading parquet')
        try:
            query = f"""
                    SELECT 
                        lower(eb.scientific_name) as scientific_name,
                        eb.observation_date,
                        eb.protocol_name,
                        eb.effort_distance_km,
                        eb.longitude,
                        eb.latitude
                    FROM read_parquet('{parquet_folder}/scientific_name={species}/*.parquet', hive_partitioning = true) AS eb
                    JOIN aoi
                        ON ST_Intersects(ST_Point(eb.longitude, eb.latitude), aoi.geometry)
                    WHERE lower(eb.scientific_name) = '{species}'
                      AND CAST(substr(eb.observation_date, 1, 4) AS INTEGER) BETWEEN 2017 AND 2024;  
            """
            df = con.execute(query).fetchdf()
            print(len(df))
            df = df[(df['effort_distance_km']<.1) | (df['protocol_name'] == 'Stationary')]
            df = df .drop(columns=['effort_distance_km'])
            df['date'] = pd.to_datetime(df['observation_date'])
            # Extract year, month, day
            df['year'] = df['date'].dt.year
            df['month'] = df['date'].dt.month
            df['day'] = df['date'].dt.day   
            if df.empty:
                log(f"❌ No matching records in Parquet for {species}")
                return None
            log(f"✔ Loaded Parquet data for {species}")
        except Exception as e:
            log(f"⚠️ DuckDB query failed for {species}: {e}")
            return None

    # Filter by date range
    try:
        if df.empty:
            log(f"❌ No records in date range for {species}")
            return None
        return df
    except Exception as e:
        log(f"⚠️ Failed to filter dates for {species}: {e}")
        return None

# -------------------------
# Convert DataFrame to EE FeatureCollection
# -------------------------
def df_to_ee_fc(df, datefield, lon_col='longitude', lat_col='latitude', properties=None):
    df = df.dropna(subset=[lon_col, lat_col])
    print('converting to fc')
    if properties is None:
        properties = [c for c in df.columns if c not in [lon_col, lat_col]]
    features = []
    for _, row in df.iterrows():
        geom = ee.Geometry.Point([row[lon_col], row[lat_col]])
        props = {k: row[k] for k in properties}
        props['obs_date'] = row[datefield].strftime('%Y-%m-%d')
        features.append(ee.Feature(geom, props))
    return ee.FeatureCollection(features)
    
# -------------------------
# Split FeatureCollection into subsets
# -------------------------
def split_fc(fc, n_subsets=10):
    print('splitting fc')
    n_points = fc.size().getInfo()
    points_list = fc.toList(n_points)
    subsets = []
    step = n_points // n_subsets + 1
    for i in range(0, n_points, step):
        subset = ee.FeatureCollection(points_list.slice(i, i + step))
        subsets.append(subset)
    print('split')
    return subsets

# -------------------------
# Dynamic World mode image
# -------------------------
def get_dw_mode_image(obs_date):
    obs_date = ee.Date(obs_date)
    start_date = obs_date.advance(-3, 'month')
    dw_collection = ee.ImageCollection("GOOGLE/DYNAMICWORLD/V1").select("label")
    dw_filtered = dw_collection.filterDate(start_date, obs_date)
    return dw_filtered.reduce(ee.Reducer.mode())

# -------------------------
# Percent cover for DW classes
# -------------------------
def compute_dw_percent_cover(dw_img, radius_m):
    class_ids = list(range(9))
    kernel = ee.Kernel.circle(radius=radius_m, units='meters', normalize=True)
    cover_images = []
    for class_id in class_ids:
        mask = dw_img.eq(class_id)
        pct = mask.reduceNeighborhood(ee.Reducer.mean(), kernel).multiply(100).rename(f'dw_class_{class_id}_pct_{radius_m}m')
        cover_images.append(pct)
    return ee.Image.cat(cover_images)

# -------------------------
# Forest edge and core
# -------------------------
def compute_forest_metrics(dw_img, radius_m):
    forest_mask = dw_img.eq(1)
    non_forest_mask = dw_img.neq(1)

    forest_edges = ee.Algorithms.CannyEdgeDetector(image=forest_mask, threshold=0.5, sigma=1)
    edge_density = forest_edges.reduceNeighborhood(
        ee.Reducer.sum(), ee.Kernel.circle(radius=radius_m, units='meters')
    ).rename('forest_edge_length')

    non_forest_buffer = non_forest_mask.focal_max(radius=100, units='meters')
    forest_core = forest_mask.And(non_forest_buffer.Not()).rename('forest_core')

    return ee.Image.cat([edge_density, forest_core])

# -------------------------
# Export Task Function
# -------------------------
def export_subset(sub_fc, species, subset_index):
    print('Setting up exports')
    def process_feature(f):
        obs_date = f.get('obs_date')
        dw_img = get_dw_mode_image(obs_date)

        cover_100m = compute_dw_percent_cover(dw_img, radius_m=100)
        cover_10km = compute_dw_percent_cover(dw_img, radius_m=10000)
        forest_metrics_100m = compute_forest_metrics(dw_img, 100)
        forest_metrics_10km = compute_forest_metrics(dw_img, 10000)

        full_img = ee.Image.cat([cover_100m, cover_10km, forest_metrics_100m, forest_metrics_10km])
        sampled = full_img.sampleRegions(
            collection=ee.FeatureCollection([f]),
            scale=100,
            geometries=True,
            tileScale=4
        )
        return sampled

    try:
        sampled_fc = sub_fc.map(process_feature).flatten()
        export_desc = f"{species}_subset{subset_index}"
        asset_id = f"{asset_folder}/{export_desc}"

        task = ee.batch.Export.table.toDrive(
            collection=sampled_fc,
            description=export_desc,
            fileNamePrefix=asset_id
        )
        task.start()
        log(f"✔ Export started for {species} subset {subset_index}")
    except Exception as e:
        log(f"❌ Export failed for {species} subset {subset_index}: {e}")


In [None]:
# -------------------------
# MAIN LOOP
# -------------------------
con = duckdb.connect()
con.sql('INSTALL spatial; LOAD spatial')
con.execute(f"""
CREATE OR REPLACE TABLE aoi AS
SELECT geometry, ST_Transform(geometry, 'EPSG:5070', 'EPSG:4326', always_xy := true) AS t_geom
FROM read_parquet('{aoifile}');
""")

for species in species_list:
    try:
        log(f"\n--- Processing: {species} ---")
        df = load_species_data(species)
        if df is None or df.empty:
            log(f"❌ No data found for {species}")
            continue

        training_fc = df_to_ee_fc(df, 'date')
        subsets = split_fc(training_fc, n_subsets=10)

        with ThreadPoolExecutor(max_workers=5) as executor:
            for i, sub_fc in enumerate(subsets):
                executor.submit(export_subset, sub_fc, species, i)

    except Exception as e:
        log(f"❌ Failed to process {species}: {e}")