In [1]:
import numpy as np
from numba import njit, prange
import matplotlib.pyplot as plt
import xarray as xr
from typing import Callable, List, Union, Tuple, Dict
from pathlib import Path
import time
from joblib import Parallel, delayed
from tqdm import tqdm
import pandas as pd

## Reading in Catchment Timeseries

In [2]:
def load_and_preprocess_catchment_ts(runoff_path: str, catchment_path: str, catchment_name: str) -> xr.Dataset:
    """
    Optimized loading and preprocessing of time series data.
    
    Args:
        runoff_path: Path to GRDC runoff data
        catchment_path: Directory containing catchment climate data
        catchment_name: Name of the catchment
        
    Returns:
        Preprocessed xarray Dataset with aligned time series
    """
    # Convert paths to Path objects for better handling
    catchment_path = catchment_path + "/" + catchment_name.upper()
    catchment_path = Path(catchment_path)
    
    # 1. Load runoff data with optimized parameters
    runoff = xr.open_dataset(
        runoff_path,
        chunks={'time': 'auto'},  # Enable chunking for better memory management
        cache=True              # Cache the data for faster repeated access
    ).load()
    
    # 2. Immediately select time period and station
    runoff = runoff.sel(
        time=slice('2000-03-01', '2022-12-19')
    )

    catchment_id = runoff["station_name"].values == catchment_name.upper()
    catchment_index = np.where(catchment_id)[0][0]

    # Select only the runoff timeseries
    runoff = runoff["runoff_mean"].isel(id=catchment_index)
    
    # 3. Use dictionary comprehension for cleaner climate data loading
    climate_vars = {
        'temperature': 't2m',
        'precipitation': 'precipitation',
        'radiation': 'nr',
        'ndvi': 'ndvi'
    }
    
    # 4. Parallel loading of climate data
    climate_ds = xr.open_mfdataset(
        [catchment_path/f'{var}.nc' for var in climate_vars],
        parallel=True,  # Enable parallel loading
        combine='by_coords',
        chunks={'time': 'auto'},
        cache=True
    ).load()
    
    # 5. Create output dataset with aligned data
    data = xr.Dataset({
        'temperature': climate_ds[climate_vars['temperature']],
        'precipitation': climate_ds[climate_vars['precipitation']],
        'radiation': climate_ds[climate_vars['radiation']],
        'ndvi': climate_ds[climate_vars['ndvi']],
        'observed': runoff.broadcast_like(climate_ds[climate_vars['radiation']])
    })
    
    # 6. Ensure consistent time alignment
    data = data.sel(time=slice('2001-01-01', '2021-12-31'))
    
    # 7. Convert to float32 to save memory (if precision is sufficient)
    #for var in data.data_vars:
    #    data[var] = data[var].astype(np.float32)
    
    return data

In [3]:
data = load_and_preprocess_catchment_ts(
    runoff_path="data/catchments/GRDC-Daily.nc",
    catchment_path="data/catchment_timeseries",
    catchment_name="Bentfeld"
)

## Simple Water Balance Model

In [4]:
@njit(fastmath=True)
def time_evolution_numba(temp, rad, prec, ndvi, c_s, alpha, beta, gamma, c_m, iota, temp_w, ndvi_w):
    #Initialize 
    length = len(temp)
    runoff_out = np.full(length, np.nan)
    evapo_out = np.full(length, np.nan)
    soil_mois_out = np.full(length, np.nan)
    snow_out = np.full(length, np.nan)

    # Transformations / Calculations for Setup
    conv = 1 / 2260000  # from J/day/m**2 to mm/day
    rad = rad * conv  # convert radiation to mm/day
    prec = prec * 10 **3 # from m/day to mm/day
    w = 0.9 * c_s
    snow = 0

    # --- calc_et_weight function ---
    ndvi = np.nan_to_num(ndvi, nan=0.0)
    normalized_temp = (temp - temp.min()) / (temp.max() - temp.min())
    normalized_ndvi = (ndvi - ndvi.min()) / (ndvi.max() - ndvi.min())
    et_weight = temp_w * normalized_temp + ndvi_w * normalized_ndvi
    beta_weighted = beta * et_weight

    for t in range(1, length):
        prec_t = prec[t-1]
        temp_t = temp[t-1]
        rad_t = rad[t-1]
        beta_t = beta_weighted[t-1]

        # ---- snow_function ----
        is_melting = temp_t > 273.15
        has_snow = snow >= 0.001

        if not is_melting:
            snow += prec_t
            water = 0.0
        elif is_melting and has_snow:
            melt = c_m * (temp_t - 273.15)
            melt = min(melt, snow)
            snow -= melt
            water = melt + prec_t
        else:
            water = prec_t

        runoff = (water + iota) * (w / c_s) ** alpha
        evap = beta_t * (w / c_s) ** gamma * rad_t

        w += (water - runoff - evap)
        w = np.maximum(w, 0.0)

        # Store results
        runoff_out[t] = runoff
        evapo_out[t] = evap
        soil_mois_out[t] = w
        snow_out[t] = snow

    return runoff_out, evapo_out, soil_mois_out, snow_out

## Finding best params using GA

### Evaluator using Pearsons R

In [5]:
@njit
def compute_correlation_numba(observed: np.ndarray, simulated: np.ndarray) -> np.float32:
    """Numba-optimized Pearson correlation calculation."""
    # Create mask for valid observations
    mask = (~np.isnan(observed)) & (~np.isnan(simulated))
    valid_count = np.sum(mask)
    
    if valid_count < 2:
        return -np.inf  # Return negative infinity for invalid cases
    
    # Extract valid values
    x = observed[mask]
    y = simulated[mask]
    
    # Compute means
    x_mean = np.mean(x)
    y_mean = np.mean(y)
    
    # Center the data
    x_centered = x - x_mean
    y_centered = y - y_mean
    
    # Compute correlation components
    numerator = np.dot(x_centered, y_centered)
    denominator = np.sqrt(np.dot(x_centered, x_centered) * np.dot(y_centered, y_centered))
    
    return numerator / denominator if denominator > 0 else 0.0

def create_evaluator(temp, rad, prec, ndvi, observed):
    """Factory function with consistent types."""
    # Convert inputs to float32 and ensure contiguous
    temp = np.ascontiguousarray(temp, dtype=np.float32)
    rad = np.ascontiguousarray(rad, dtype=np.float32)
    prec = np.ascontiguousarray(prec, dtype=np.float32)
    ndvi = np.ascontiguousarray(ndvi, dtype=np.float32)
    observed = np.ascontiguousarray(observed, dtype=np.float32)

    def evaluate_individual(params):
        """Type-consistent evaluation."""
        c_s, alpha, beta, gamma, c_m, iota, temp_w, ndvi_w = params
        
        simulated = time_evolution_numba(
            temp=temp,
            rad=rad,
            prec=prec,
            ndvi=ndvi,
            c_s=c_s,
            alpha=alpha,
            beta=beta,
            gamma=gamma,
            c_m=c_m,
            iota=iota,
            temp_w=temp_w,
            ndvi_w=ndvi_w
        )[0]

        return compute_correlation_numba(observed, simulated)    
    return evaluate_individual

## Genetic Algorithm

In [None]:
def initialize_population(
    pop_size: int,
    genome_length: int,
    lower_bounds: List[float],
    upper_bounds: List[float],
    quantization_steps: int = 20
) -> Tuple[np.ndarray, np.ndarray]:
    """
    Initialize a population uniformly within given bounds.
    
    Args:
        pop_size: Number of individuals in population
        genome_length: Length of each genome
        lower_bounds: List of lower bounds for each gene
        upper_bounds: List of upper bounds for each gene
        quantization_steps: Number of discrete steps for genome space
        
    Returns:
        Tuple of (population array, genome_space array)
    """
    # Convert bounds to numpy arrays (faster than checking types)
    lower = np.asarray(lower_bounds, dtype=np.float64)
    upper = np.asarray(upper_bounds, dtype=np.float64)
    
    # Generate population in one vectorized operation
    population = np.random.uniform(
        low=lower,
        high=upper,
        size=(pop_size, genome_length)
    )
    
    # Create genome space for quantization
    # Transpose to get shape (genome_length, quantization_steps)
    genome_space = np.linspace(lower, upper, quantization_steps).T
    
    return population, genome_space

def evaluate_fitness(
    population: np.ndarray,
    evaluator: Callable[[np.ndarray], float]
) -> np.ndarray:
    """
    Evaluate fitness for each individual in the population.
    Vectorized implementation if fitness_func supports it.
    """
    fitnesses = np.empty(len(population), dtype=np.float32)
    for i in prange(len(population)):
        fitnesses[i] = evaluator(population[i])
    return fitnesses

def select_survivors(
    population: np.ndarray,
    fitnesses: np.ndarray,
    num_parents: int
) -> np.ndarray:
    """
    Roulette wheel (fitness-proportionate) selection.
    Vectorized implementation.
    """
    probs = fitnesses / np.sum(fitnesses)
    indices = np.random.choice(len(population), size=num_parents, replace=True, p=probs)
    return population[indices]

def pop_crossover(
    pop: np.ndarray,
    crossover_rate: float = 0.8
) -> np.ndarray:
    """
    Vectorized single-point crossover for entire population.
    """
    pop_size, genome_length = pop.shape
    if genome_length < 2:
        return pop.copy()
    
    # Create mask for which individuals will crossover
    crossover_mask = np.random.rand(pop_size // 2) < crossover_rate
    num_crossovers = np.sum(crossover_mask)
    
    if num_crossovers == 0:
        return pop
    
    # Select parents and prepare children
    parents1 = pop[:num_crossovers]
    parents2 = pop[num_crossovers:2*num_crossovers]
    
    # Vectorized crossover
    crossover_points = np.random.randint(1, genome_length, size=num_crossovers)
    mask = np.arange(genome_length) < crossover_points[:, None]
    
    children1 = np.where(mask, parents1, parents2)
    children2 = np.where(mask, parents2, parents1)
    
    # Put children back in population
    pop[:num_crossovers] = children1
    pop[num_crossovers:2*num_crossovers] = children2
    
    return pop

def pop_mutate(
    pop: np.ndarray,
    genome_space: np.ndarray,
    mutation_rate: float,
    mutation_scale: float = 0.1
) -> np.ndarray:
    """
    Vectorized mutation with optional quantization.
    """
    pop_size, genome_length = pop.shape
    num_mutations = int(mutation_rate * pop_size * genome_length)
    
    if num_mutations == 0:
        return pop
    
    # Create random mutation indices
    mut_indices = np.random.choice(pop_size * genome_length, size=num_mutations, replace=False)
    rows, cols = np.unravel_index(mut_indices, pop.shape)
    
    # Apply mutations
    if genome_space is not None:
        # Quantized mutation
        which_steps = np.random.randint(0, genome_space.shape[1], size=num_mutations)
        pop[rows, cols] = genome_space[cols, which_steps]
    else:
        # Gaussian mutation
        lower = genome_space[:, 0] if genome_space is not None else np.min(pop, axis=0)
        upper = genome_space[:, -1] if genome_space is not None else np.max(pop, axis=0)
        ranges = upper - lower
        noise = np.random.normal(scale=mutation_scale * ranges[cols], size=num_mutations)
        pop[rows, cols] = np.clip(pop[rows, cols] + noise, lower[cols], upper[cols])
    
    return pop

def select_survivors_elitism(
    combined_pop: np.ndarray,
    combined_fit: np.ndarray,
    pop_size: int
) -> Tuple[np.ndarray, np.ndarray]:
    """
    (mu + lambda) survivor selection: keep best individuals.
    Vectorized implementation.
    """
    idx = np.argsort(combined_fit)[-pop_size:]
    return combined_pop[idx], combined_fit[idx]

def run_ga(
    fitness_func: Callable[[np.ndarray], float],
    genome_length: int,
    lower_bounds: Union[float, List[float]],
    upper_bounds: Union[float, List[float]],
    pop_size: int = 50,
    num_generations: int = 100,
    num_parents: int = 30,
    crossover_rate: float = 0.8,
    mutation_rate: float = 0.1,
    mutation_scale: float = 0.1,
    use_quantization: bool = False,
    seed: int = None
) -> Tuple[np.ndarray, List[float], List[float]]:
    """
    Optimized genetic algorithm with vectorized operations.
    Returns (best_individual, best_fitness_history, avg_fitness_history).
    """
    if seed is not None:
        np.random.seed(seed)

    # Initialize population
    quantization_steps = 20 if use_quantization else 0
    population, genome_space = initialize_population(
        pop_size, genome_length, lower_bounds, upper_bounds, quantization_steps
    )
    fitnesses = evaluate_fitness(population, fitness_func)
    best_hist = []
    avg_hist = []

    for gen in range(num_generations):
        # Record stats
        best_hist.append(np.max(fitnesses))
        avg_hist.append(np.mean(fitnesses))

        # Selection
        parents = select_survivors(population, fitnesses, num_parents)
        
        # Create offspring by copying parents
        offspring = parents.copy()
        
        # Crossover
        offspring = pop_crossover(offspring, crossover_rate)
        
        # Mutation
        offspring = pop_mutate(
            offspring, 
            genome_space if use_quantization else None,
            mutation_rate,
            mutation_scale
        )
        
        # Evaluate offspring
        offspring_fit = evaluate_fitness(offspring, fitness_func)
        
        # Survivor selection (elitism)
        combined_pop = np.vstack((population, offspring))
        combined_fit = np.concatenate((fitnesses, offspring_fit))
        population, fitnesses = select_survivors_elitism(combined_pop, combined_fit, pop_size)

    # Final stats
    best_hist.append(np.max(fitnesses))
    avg_hist.append(np.mean(fitnesses))
    best_idx = np.argmax(fitnesses)
    best_individual = population[best_idx]

    return best_individual, best_hist, avg_hist

## Bootstrapping

In [8]:
def process_bootstrap_sample(dataset: xr.Dataset, sampled_years: np.ndarray) -> Dict[str, np.ndarray]:
    """Process a single bootstrap sample (to be parallelized)."""
    sample_data = {}
    for var in dataset.data_vars:
        chunks = []
        for year in sampled_years:
            year_data = dataset[var].sel(time=dataset.time.dt.year == year)
            chunks.append(year_data)
        sample_data[var] = xr.concat(chunks, dim='time').values
    return sample_data

def parallel_year_bootstrap(dataset: xr.Dataset, 
                          n_samples: int = 10, 
                          n_jobs: int = -1) -> List[Dict[str, np.ndarray]]:
    """
    Parallel year-based block bootstrap.
    
    Args:
        dataset: xarray Dataset containing all variables
        n_samples: Number of bootstrap samples
        n_jobs: Number of parallel jobs (-1 for all cores)
        
    Returns:
        List of bootstrap samples
    """
    years = pd.to_datetime(dataset.time.values).year
    unique_years = np.unique(years)
    n_years = len(unique_years)
    
    # Generate all year samples upfront
    year_samples = [np.random.choice(unique_years, size=n_years, replace=True) 
                   for _ in range(n_samples)]
    
    # Process in parallel with progress bar
    results = Parallel(n_jobs=n_jobs)(
        delayed(process_bootstrap_sample)(dataset, years) 
        for years in tqdm(year_samples, desc="Bootstrapping"))
    
    return results

In [9]:
def parallel_ga_wrapper(sample: Dict[str, np.ndarray]) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """Wrapper function for parallel GA execution."""
    evaluator = create_evaluator(
        temp=sample['temperature'],
        rad=sample['radiation'],
        prec=sample['precipitation'],
        ndvi=sample['ndvi'],
        observed=sample['observed']
    )
    
    return run_ga(
                fitness_func=evaluator,
                genome_length=8,
                lower_bounds=lowerBounds,
                upper_bounds=upperBounds,
                pop_size=100,
                num_generations=20,
                num_parents=40,
                crossover_rate=0.9,
                mutation_rate=0.05,
                mutation_scale=0.2,
                seed=42
            )

def run_parallel_ga_bootstrap(
    dataset: xr.Dataset,
    n_bootstraps: int = 10,
    n_jobs: int = -1
) -> Tuple[List[np.ndarray], List[np.ndarray], List[np.ndarray]]:
    """
    Fully parallelized GA with bootstrap.
    
    Args:
        dataset: xarray Dataset
        n_bootstraps: Number of bootstrap samples
        ga_kwargs: GA configuration
        n_jobs: Number of parallel jobs (-1 for all cores)
        
    Returns:
        (all_best, all_best_hist, all_avg_hist)
    """
   
    # Generate bootstrap samples
    bootstrap_samples = parallel_year_bootstrap(dataset, n_samples=n_bootstraps, n_jobs=n_jobs)
    
    # Run GA in parallel
    results = Parallel(n_jobs=n_jobs)(
        delayed(parallel_ga_wrapper)(sample) 
        for sample in tqdm(bootstrap_samples, desc="GA Optimization"))
    
    # Unpack results
    all_best = [r[0] for r in results]
    all_best_hist = [r[1] for r in results]
    all_avg_hist = [r[2] for r in results]
    
    return all_best, all_best_hist, all_avg_hist