In [1]:
import arcpy, pandas as pd, os, multiprocessing as mp, psutil, time
from arcpy import env
from arcpy.sa import *
from otherfunctions import folders_exist

In [None]:
# Paths to input datasets
root_folder = r"Z:\PhD_Datasets&Analysis\Info_Inputs"
tam_out_dir = r"Z:\PhD_Datasets&Analysis\Outputs\T&M_WBM"
tc_ds = root_folder + "\\TerraClimate"
out_geotiff = tc_ds + "\\GeoTIFF"
bands_gee = ["pr", "pet", "ro"] # band names in GEE - for comparison with GEE TerraClimate dataset
tc_vars = ["ppt", "pet", "q"] # variable names according to TerraClimate
serial_id = 'grdcno_int'

# Set arcpy environment variables
env.overwriteOutput = True
arcpy.CheckOutExtension("spatial")
# env.cellSize = "MINOF" # Avoided to prevent huge files
env.cellSize = out_geotiff + "\\ppt_2023_1.tif" # Use TerraClimate resolution as reference for cell size
env.workspace = r"Z:\PhD_Datasets&Analysis\_ProcessingCache"
env.outputCoordinateSystem = arcpy.SpatialReference("WGS 1984") # WGS 1984 (4326)

In [None]:
# Get the current environment's spatial reference
spatial_ref = env.outputCoordinateSystem

# Check if a spatial reference is set
if spatial_ref:
    print(f"Spatial Reference Name: {spatial_ref.name}")
    print(f"Spatial Reference WKID: {spatial_ref.factoryCode}")
else:
    print("No spatial reference is set in the current environment.")

In [None]:
# Read the Shapefile with the processed drainage areas
drain_areas = root_folder + "\\Streamflow_Sts_Drainage_Areas\GRDC_Watersheds\CSS-WATERSHEDS_FINAL_SELECTION.shp"

# Create a feature layer object
arcpy.MakeFeatureLayer_management(drain_areas, "drain_areas_lyr")

# Initialize an empty list to store the station IDs
sts_ids = []

# Use a SearchCursor to iterate through the rows of the feature layer
with arcpy.da.SearchCursor("drain_areas_lyr", [serial_id]) as cursor:
    for row in cursor:
        sts_ids.append(row[0])

sts_ids

In [None]:
######################################################
### Starting values for the water balance model - T&M
######################################################

# Initial variables
years = list(range(1958, 1967 + 1)) # Years to process. This line can be used to execute this code for specific years in multiple runs.
months = range(1, 12 + 1)

In [None]:
# Create folders for other variables of tam model
wyield_dir = tam_out_dir + '\\wyield'
folders_exist([wyield_dir])

# Folder with baseflow rasters resulting from the model
bflow_dir = tam_out_dir + '\\bflow'

In [None]:
# Enable better logging from multiprocessing
# This must be set before creating any processes
mp.set_start_method('spawn', force=True)

def zonal_stastics_iteratively(year):
    """
    Function to calculate zonal statistics iteratively for each station ID.
    """
    # Improved logging with timestamps
    process_id = os.getpid()
    print(f"\n[{time.strftime('%H:%M:%S')}] Process {process_id} STARTED - Calculating zonal statistics for year {year}")
    
    try:
        # Check out the spatial analyst extension in each process
        if arcpy.CheckExtension("Spatial") == "Available":
            arcpy.CheckOutExtension("Spatial")
            print(f"[{time.strftime('%H:%M:%S')}] Process {process_id} - Spatial Analyst checked out successfully")
        else:
            print(f"[{time.strftime('%H:%M:%S')}] Process {process_id} - ERROR: Spatial Analyst extension not available!")
            return f"ERROR: Year {year} - Spatial Analyst not available"

        sts_flows_sim = pd.DataFrame(columns=[serial_id, "YEAR", "MONTH", "COUNT", "AREA", "MIN", "MAX", "RANGE", "MEAN", "STD", "SUM", "MEDIAN", "PCT90"])  

        # Create a feature layer for this process with a unique name
        layer_name = f"drain_areas_lyr_{process_id}"
        print(f"[{time.strftime('%H:%M:%S')}] Process {process_id} - Creating feature layer {layer_name}")
        arcpy.MakeFeatureLayer_management(drain_areas, layer_name)

        total_stations = len(sts_ids)
        for i, st in enumerate(sts_ids):
            print(f"[{time.strftime('%H:%M:%S')}] Process {process_id} - Year {year} - Station {st} ({i+1}/{total_stations})")
            
            # Select the current station ID in the feature layer
            arcpy.SelectLayerByAttribute_management(layer_name, "NEW_SELECTION", f"{serial_id} = {st}")

            for m, month in enumerate(months):
                if m % 4 == 0:  # Only log every few months to avoid excessive output
                    print(f"[{time.strftime('%H:%M:%S')}] Process {process_id} - Year {year} - Station {st} - Processing month {month}")
                
                wyield = os.path.join(wyield_dir, f"wyield_{year}_{month}.tif")
                out_table = f"in_memory\\zonal_wyield_{st}_{year}_{month}_{process_id}"
                
                # Verify input file exists
                if not arcpy.Exists(wyield):
                    print(f"[{time.strftime('%H:%M:%S')}] Process {process_id} - WARNING: {wyield} does not exist, skipping...")
                    continue

                # Run zonal statistics
                arcpy.sa.ZonalStatisticsAsTable(layer_name, serial_id, wyield, out_table, "DATA", "ALL")

                # Convert the output table to a NumPy array
                array = arcpy.da.TableToNumPyArray(out_table, [serial_id, "COUNT", "AREA", "MIN", "MAX", "RANGE", "MEAN", "STD", "SUM", "MEDIAN", "PCT90"])

                # Convert the NumPy array to a pandas DataFrame
                df_sim = pd.DataFrame(array)

                df_sim["YEAR"] = year 
                df_sim["MONTH"] = month
                df_sim = df_sim[[serial_id, "YEAR", "MONTH", "COUNT", "AREA", "MIN", "MAX", "RANGE", "MEAN", "STD", "SUM", "MEDIAN", "PCT90"]]

                sts_flows_sim = pd.concat([sts_flows_sim, df_sim], ignore_index=True)

                # Clean up the temp table
                arcpy.Delete_management(out_table)

        # Clean up the feature layer
        arcpy.Delete_management(layer_name)
        
        # Save the results to a CSV file for this year
        csv_path = os.path.join(wyield_dir, f"wyield_zonal_statistics_{year}.csv")
        sts_flows_sim.to_csv(csv_path, index=False)
        print(f"[{time.strftime('%H:%M:%S')}] Process {process_id} - COMPLETED - Year {year} - Saved results to {csv_path}")
        
        # Check in the extension
        arcpy.CheckInExtension("Spatial")
        
        return f"SUCCESS: Year {year}"
        
    except Exception as e:
        print(f"[{time.strftime('%H:%M:%S')}] Process {process_id} - ERROR processing year {year}: {str(e)}")
        # Try to check in the extension in case of error
        try:
            arcpy.CheckInExtension("Spatial")
        except:
            pass
        return f"ERROR: Year {year} - {str(e)}"

In [None]:
def run_parallel_processing(years_to_process):
    """
    Run the zonal statistics calculations in parallel for multiple years,
    with safeguards to prevent machine overload.
    """
    print(f"\n[{time.strftime('%H:%M:%S')}] Starting parallel processing setup...")
    
    # Determine the number of cores to use
    total_cores = mp.cpu_count()
    max_cores = max(1, int(total_cores * 0.75))
    
    # Further limit cores based on available memory
    memory_per_process_gb = 4  # Estimate 4GB per process
    available_memory_gb = psutil.virtual_memory().available / (1024 * 1024 * 1024)
    memory_limited_cores = max(1, int(available_memory_gb / memory_per_process_gb))
    
    # Use the more conservative limit
    num_processes = min(max_cores, memory_limited_cores, len(years_to_process))
    
    print(f"[{time.strftime('%H:%M:%S')}] System has {total_cores} cores, using {num_processes} for parallel processing")
    print(f"[{time.strftime('%H:%M:%S')}] Available memory: {available_memory_gb:.2f} GB, estimated usage: {memory_per_process_gb * num_processes:.2f} GB")
    print(f"[{time.strftime('%H:%M:%S')}] Processing {len(years_to_process)} years: {years_to_process}")
    
    # Create a pool of worker processes
    print(f"[{time.strftime('%H:%M:%S')}] Creating process pool with {num_processes} workers...")
    
    # Start processing each year with a small delay between processes
    # to avoid resource contention at startup
    results = []
    with mp.Pool(processes=num_processes) as pool:
        for year in years_to_process:
            print(f"[{time.strftime('%H:%M:%S')}] Submitting year {year} to the process pool...")
            # Use apply_async instead of map to get immediate feedback
            results.append(pool.apply_async(zonal_stastics_iteratively, (year,)))
            time.sleep(2)  # Small delay between process starts
        
        # Wait for all processes to complete and collect results
        print(f"[{time.strftime('%H:%M:%S')}] Waiting for all processes to complete...")
        all_results = []
        for i, result in enumerate(results):
            try:
                res = result.get(timeout=7200)  # 2-hour timeout per process
                all_results.append(res)
                print(f"[{time.strftime('%H:%M:%S')}] Process {i+1}/{len(results)} completed: {res}")
            except Exception as e:
                print(f"[{time.strftime('%H:%M:%S')}] Process {i+1}/{len(results)} failed: {str(e)}")
                all_results.append(f"FAILED: {str(e)}")
    
    print(f"\n[{time.strftime('%H:%M:%S')}] All processing completed with results: {all_results}")
    return all_results

In [None]:
print('\n############################################################')
print('\t\tINITIAL VARIABLES')
print(f'\tPeriod to be executed: {years[0]}-{years[-1]}')
print('############################################################')

try:
    # Run the parallel processing
    start_time = time.time()
    run_parallel_processing(years)
    end_time = time.time()
    
    print(f"\n[{time.strftime('%H:%M:%S')}] Total execution time: {(end_time - start_time)/60:.2f} minutes")
    
except Exception as e:
    print(f"\n[{time.strftime('%H:%M:%S')}] ERROR in main process: {str(e)}")
finally:
    # Make sure to check in extensions in the main process
    try:
        arcpy.CheckInExtension("spatial")
        arcpy.ClearEnvironment("workspace")
        print(f"\n[{time.strftime('%H:%M:%S')}] Cleaned up resources in main process")
    except:
        pass
    
print("\nDONE!!")