In [None]:
# ================================================================================================
# 	PYTHON SCRIPT: HIGH-PERFORMANCE NETCDF TIME-SERIES EXTRACTOR (PARALLEL & RESILIENT)
# ================================================================================================
#
# 	❖ PURPOSE:
# 	 	A robust, **parallel**, and memory-efficient workflow to process multi-year NetCDF data
# 	 	(e.g., daily streamflow) for specific geographical points (subbasins/outlets).
#
# 	 	1. Extracts time-series data from multiple yearly NetCDF files **in parallel** (Phase 1).
# 	 	2. Merges the yearly data for each point into a single, complete time-series file (Phase 2).
# 	 	3. Uses intelligent resume logic with **verification checks** to provide continuous status feedback.
# 	 	4. **NEW:** Provides a real-time, formatted download/verification report in the console.
#
# 	❖ REQUIREMENTS:
# 	 	- xarray
# 	 	- pandas
# 	 	- tqdm (for status bars)
# 	 	- numpy
# 	 	- multiprocessing (built-in)
#
# ================================================================================================

# STEP 1: IMPORT MODULES
# ----------------------
import xarray as xr
import pandas as pd
import numpy as np
import pathlib
import time
import shutil
import os
import glob
from typing import Dict, Any, List, Tuple
from tqdm import tqdm # Used for clear status bars
import multiprocessing as mp
import sys 

# Global dictionary for process-local caching (one cache per worker process)
# This prevents redundant file opening within the same worker.
worker_datasets: Dict[str, xr.Dataset] = {}

# Console report template constants
REPORT_HEADER = "| Point ID & File Stem 	 	 	 	 	 	 | Status	 | Action	 "
REPORT_SEPARATOR = "-" * 75

# ================================================================================================
# 	STEP 2: USER-EDITABLE SETTINGS (MAXIMUM FLEXIBILITY)
# ================================================================================================

# --- A. FILE PATHS & DIRECTORIES ---

# 1. INPUT DIRECTORY (The folder containing all your NetCDF files)
NC_INPUT_DIRECTORY: str = "Observed_daily_dscharge/Raw_netcdf"

# 2. POINT FILE PATH (The CSV containing the longitudes and latitudes of the outlets/subbasins)
POINT_CSV_FILE_PATH: str = "Outlet_subbasin.csv"

# --- DERIVED OUTPUT PATHS (DO NOT EDIT THESE) ---
INPUT_ROOT_DIR = pathlib.Path(NC_INPUT_DIRECTORY)
# Temporary directory for yearly extracted CSVs (will be created/deleted)
YEARLY_DATA_DIR: str = str(INPUT_ROOT_DIR / "01_Intermediate_Yearly_CSVs")
# Final directory for the single, merged time-series CSV for each point
FINAL_MERGED_OUTPUT_DIR: str = str(INPUT_ROOT_DIR / "02_Final_Merged_Streamflow")

# --- B. NETCDF FILE SELECTION ---
# Pattern to match NetCDF files (e.g., "*.nc" for all files, or "ERA5L_*.nc")
NC_FILE_PATTERN: str = "*.nc"
# Process a subset of files: If empty list [], all files matching the pattern will be processed.
NC_FILES_TO_PROCESS: List[str] = ["ERA5L_9km_BG_daily_streamflow_1952.nc"]

# --- C. INPUT DATA CONFIGURATION ---
POINT_ID_FIELD: str = "NAME"
LONGITUDE_FIELD: str = "LONG"
LATITUDE_FIELD: str = "LAT"
NETCDF_VARIABLE: str = "flw"

# --- D. OUTPUT DATA CONFIGURATION ---
OUTPUT_COLUMN_NAMES: Dict[str, str] = {"time": "DATE", "value": "DISCHARGE"}

# --- E. PARALLELISM, RESILIENCE & CLEANUP ---
# Maximum number of CPU cores to use for the heavy extraction phase.
CPU_CORES_TO_USE: int = 6 
# If True, files in FINAL_MERGED_OUTPUT_DIR will be overwritten.
OVERWRITE_EXISTING: bool = False
# If True, the temporary YEARLY_DATA_DIR is deleted after a successful run.
CLEANUP_YEARLY_DIR: bool = False

# ================================================================================================
# 	STEP 3: HELPER FUNCTIONS (NETCDF CACHING AND REPORTING)
# ================================================================================================

def open_nc_dataset_cached(nc_file_path: str) -> xr.Dataset:
	"""
	Opens a NetCDF dataset using a process-local cache.
	This prevents redundant opening of the same file within a single worker process.
	Note: We rely on the process terminating to release the file handle, which is 
	standard practice for multiprocessing pools.
	"""
	global worker_datasets
	if nc_file_path not in worker_datasets:
		# Crucial for multiprocessing stability: use cache=False to prevent Dask/xarray issues
		worker_datasets[nc_file_path] = xr.open_dataset(nc_file_path, engine='netcdf4', cache=False)
	return worker_datasets[nc_file_path]

def report_task_result(point_id: str, file_stem: str, status: str, action: str):
	"""Prints a single line to the console using the formatted report template."""
	
	# Truncate point_id + file_stem to fit the first column (40 chars max)
	name = f"{point_id} | {file_stem}"
	if len(name) > 40:
		name = name[:37] + "..."

	# Ensure output is printed correctly with status bar in mind
	tqdm.write(
		f"| {name:<40} | {status:<7} | {action:<10}"
	)

# ================================================================================================
# 	STEP 4: CORE WORKER FUNCTIONS (PARALLEL & SEQUENTIAL LOGIC)
# ================================================================================================

def extraction_worker_parallel(task_data: Tuple[pd.Series, str]) -> Tuple[bool, str, str, str]:
	"""
	Worker function executed by the Process Pool for Phase 1.
	Returns: (success_status, point_id, nc_stem, message)
	"""
	point_data, nc_file_path = task_data
	nc_stem = pathlib.Path(nc_file_path).stem # Initialize stem for error reporting

	try:
		# --- 1. SETUP ---
		point_id = str(point_data[POINT_ID_FIELD])
		yearly_dir = pathlib.Path(YEARLY_DATA_DIR) / nc_stem
		yearly_dir.mkdir(parents=True, exist_ok=True)
		yearly_output_path = yearly_dir / f"{point_id}.csv"

		# --- 2. VERIFIED RESUME/SKIP CHECK ---
		if not OVERWRITE_EXISTING and yearly_output_path.exists():
			ds = open_nc_dataset_cached(nc_file_path)
			expected_records = len(ds['time'])
			
			# Count lines efficiently
			with open(yearly_output_path, 'rb') as f:
				actual_lines = sum(1 for line in f)
			
			# Check if CSV line count (data rows + 1 header) matches expected records
			if actual_lines == expected_records + 1:
				return True, point_id, nc_stem, "SKIPPED (VERIFIED)"
			else:
				# File is corrupt/incomplete. Re-process.
				os.remove(yearly_output_path)
				# Code proceeds to extraction (3)

		# --- 3. EXTRACTION (If not skipped) ---
		lon = point_data[LONGITUDE_FIELD]
		lat = point_data[LATITUDE_FIELD]
		ds = open_nc_dataset_cached(nc_file_path)

		# Cleaner spatial selection using dictionary for sel
		data_slice = ds[NETCDF_VARIABLE].sel(
			{'lon': lon, 'lat': lat},
			method='nearest'
		)

		# 4. Convert to Pandas DataFrame and save
		# to_dataframe names the column automatically after the variable
		df = data_slice.to_dataframe().reset_index() 
		df = df.rename(columns={'time': 'time', NETCDF_VARIABLE: 'value'})
		df[['time', 'value']].to_csv(yearly_output_path, index=False)

		return True, point_id, nc_stem, "PROCESSED"

	except Exception as e:
		# Return the error message and the file/point context
		error_msg = f"Error: {e}"
		return False, str(point_data.get(POINT_ID_FIELD, 'UNKNOWN')), nc_stem, error_msg


def merge_worker_sequential(point_id: str, nc_file_stems: List[str]) -> Tuple[bool, str, str]:
	"""
	Sequential worker function for Phase 2: Merges all yearly CSVs for one point into a single file.
	"""
	final_output_path = pathlib.Path(FINAL_MERGED_OUTPUT_DIR) / f"{point_id}.csv"

	# Smart resume check for final file
	if not OVERWRITE_EXISTING and final_output_path.exists():
		try:
			expected_file_count = len(nc_file_stems)
			# Only count intermediate files that actually exist
			actual_yearly_files = sum(1 for stem in nc_file_stems if (pathlib.Path(YEARLY_DATA_DIR) / stem / f"{point_id}.csv").exists())

			if actual_yearly_files == expected_file_count and final_output_path.stat().st_size > 1024:
				return True, point_id, f"Skipped (Already fully merged: {final_output_path.name})"
		except Exception:
			# Fall through and re-process if any check fails
			pass 

	try:
		yearly_dfs = []
		for stem in nc_file_stems:
			yearly_path = pathlib.Path(YEARLY_DATA_DIR) / stem / f"{point_id}.csv"
			if yearly_path.exists():
				df = pd.read_csv(
					yearly_path,
					parse_dates=['time'],
					dtype={'value': np.float64}
				)
				yearly_dfs.append(df)

		if not yearly_dfs:
			return False, point_id, f"Error: No yearly data found for Point ID {point_id}."

		merged_df = pd.concat(yearly_dfs, ignore_index=True)
		# Ensure correct chronological order and handle potential duplicates (e.g., from incomplete runs)
		merged_df = merged_df.sort_values(by='time').drop_duplicates(subset='time', keep='last')

		merged_df.rename(columns=OUTPUT_COLUMN_NAMES, inplace=True)

		final_output_path.parent.mkdir(parents=True, exist_ok=True)
		merged_df.to_csv(final_output_path, index=False)

		return True, point_id, str(final_output_path)

	except Exception as e:
		return False, point_id, f"Error merging data: {e}"

# ================================================================================================
# 	STEP 5: MAIN WORKFLOW PHASES (PARALLEL EXECUTION)
# ================================================================================================

def prepare_data_and_files(points_df: pd.DataFrame) -> Tuple[pd.DataFrame, List[str]]:
	"""Loads points, finds NC files, and prepares the processing list."""
	
	# 1. Load Points
	points_df[POINT_ID_FIELD] = points_df[POINT_ID_FIELD].astype(str)
	print(f"Loaded {len(points_df)} points from {pathlib.Path(POINT_CSV_FILE_PATH).name}.")

	# 2. Find and Sort NetCDF Files
	nc_input_dir = pathlib.Path(NC_INPUT_DIRECTORY)
	all_nc_files = glob.glob(str(nc_input_dir / NC_FILE_PATTERN))
	all_nc_files.sort() # Ensure base list is chronological

	if not all_nc_files:
		raise FileNotFoundError(f"No NetCDF files found matching pattern '{NC_FILE_PATTERN}' in directory: {NC_INPUT_DIRECTORY}")

	# 3. Filter NC Files based on USER selection
	if NC_FILES_TO_PROCESS:
		nc_files_to_filter = set(NC_FILES_TO_PROCESS)
		nc_files = [f for f in all_nc_files if pathlib.Path(f).name in nc_files_to_filter]
		
		if len(nc_files) != len(NC_FILES_TO_PROCESS):
			missing = nc_files_to_filter - set(pathlib.Path(f).name for f in nc_files)
			print(f"⚠️ Warning: {len(missing)} specified NC files were not found: {missing}")
	else:
		nc_files = all_nc_files

	if not nc_files:
		raise ValueError("After filtering, no NetCDF files remain to be processed.")
	
	# Re-sort to guarantee chronological order for merging phase
	nc_files.sort() 
	
	print(f"Found {len(nc_files)} NetCDF files for processing (from a total of {len(all_nc_files)}).")

	# 4. Filter Points for Smart Resume (based on final merged file)
	if not OVERWRITE_EXISTING:
		processed_ids = []
		if pathlib.Path(FINAL_MERGED_OUTPUT_DIR).exists():
			for file_path in pathlib.Path(FINAL_MERGED_OUTPUT_DIR).glob("*.csv"):
				point_id_from_file = file_path.stem
				# Quick check to ensure file is not just a header
				if file_path.stat().st_size > 1024: 
					processed_ids.append(point_id_from_file)

		processed_set = set(processed_ids)
		points_to_process = points_df[~points_df[POINT_ID_FIELD].astype(str).isin(processed_set)].copy()

		if len(points_to_process) < len(points_df):
			skipped_count = len(points_df) - len(points_to_process)
			print(f"Smart Resume Enabled: Skipping {skipped_count}/{len(points_df)} points that are already merged in the final directory.")
		else:
			print("Smart Resume: No existing merged files found for skipping.")
	else:
		points_to_process = points_df.copy()

	return points_to_process, nc_files

def run_extraction_phase(points_to_process: pd.DataFrame, nc_files: List[str]) -> bool:
	"""Phase 1: Extracts time-series data using a multiprocessing pool."""
	print("\n" + "="*80)
	print(f"PHASE 1: PARALLEL DATA EXTRACTION (Using {CPU_CORES_TO_USE} cores)")
	print("="*80)

	extraction_tasks = []
	for index, point_data in points_to_process.iterrows():
		for nc_file_path in nc_files:
			extraction_tasks.append((point_data, nc_file_path))

	total_tasks = len(extraction_tasks)
	if total_tasks == 0:
		print("Sub-Process: No points or NC files to process. Skipping Phase 1.")
		return True

	print(f"Sub-Process: Initializing {total_tasks} extraction tasks for parallel execution...")
	# ADDED FEEDBACK: Inform the user that the pool is starting up.
	print("Sub-Process: Waiting for the first worker to return a result (this may take a moment)...")

	failed_tasks = 0
	verified_skip_count = 0
	processed_count = 0
	
	# --- Execute in Parallel ---
	print("\n✅ REAL-TIME EXTRACTION/VERIFICATION REPORT")
	print(REPORT_SEPARATOR)
	print(REPORT_HEADER)
	print(REPORT_SEPARATOR)

	try:
		# Use a descriptive element for the status bar during startup
		initial_desc = "Extraction Progress (Awaiting first result)"
		
		with mp.Pool(processes=CPU_CORES_TO_USE) as pool:
			# imap_unordered provides results as soon as they are ready, ideal for real-time reporting
			results = pool.imap_unordered(extraction_worker_parallel, extraction_tasks)
			
			# Use tqdm to wrap the results iterable for the progress bar
			for success, point_id, nc_stem, message in tqdm(results, total=total_tasks, desc=initial_desc):
				
				# After the first result, change the description to a more active state
				if processed_count + verified_skip_count + failed_tasks == 0:
					# This block executes only after the first result is processed
					# Use a lock to ensure the print and set_description are handled correctly with tqdm
					tqdm.set_lock(mp.Lock())
					tqdm.get_lock().acquire()
					tqdm.write("Sub-Process: First result received. Extraction in progress.")
					tqdm.get_lock().release()
					tqdm.tqdm.set_description('Extraction Progress')

				# Report status to the console
				if success:
					if message.startswith("SKIPPED"):
						report_task_result(point_id, nc_stem, "SKIPPED", "verified")
						verified_skip_count += 1
					elif message.startswith("PROCESSED"):
						report_task_result(point_id, nc_stem, "SUCCESS", "processed")
						processed_count += 1
				else:
					report_task_result(point_id, nc_stem, "FAILED", "error")
					failed_tasks += 1
					
			print(REPORT_SEPARATOR) # Footer line

	except Exception as e:
		print(f"\nFATAL: An error occurred during parallel execution: {e}")
		return False
		
	if failed_tasks > 0:
		print(f"\n⚠️ Phase 1 completed with {failed_tasks} failed tasks. Check the report above for details.")
	
	print(f"\nSummary: {processed_count} tasks completed, {verified_skip_count} tasks verified and skipped, {failed_tasks} failed.")

	return failed_tasks == 0

def run_merging_phase(points_df: pd.DataFrame) -> bool:
	"""Phase 2: Merges yearly CSV files for each point sequentially."""
	print("\n" + "="*80)
	print("PHASE 2: SEQUENTIAL TIME-SERIES MERGING")
	print("="*80)

	yearly_data_path = pathlib.Path(YEARLY_DATA_DIR)
	if not yearly_data_path.exists():
		print("Sub-Process: Intermediate yearly data directory does not exist. Skipping Phase 2.")
		return True

	# Collect all NC file stems that actually produced output directories
	nc_file_stems = [p.name for p in yearly_data_path.iterdir() if p.is_dir()]
	nc_file_stems.sort()
	
	unique_point_ids = points_df[POINT_ID_FIELD].astype(str).unique()
	# Only merge files for points that exist in the original input
	merging_tasks = [(str(point_id), nc_file_stems) for point_id in unique_point_ids]
	total_tasks = len(merging_tasks)

	if total_tasks == 0:
		print("Sub-Process: No points require merging. Skipping Phase 2.")
		return True

	print(f"Sub-Process: Initializing {total_tasks} merging tasks for sequential execution...")

	successful_merges = 0
	failed_merges = 0
	for task in tqdm(merging_tasks, total=total_tasks, desc="Merging Progress"):
		success, point_id, message = merge_worker_sequential(*task)
		if success:
			if not message.startswith("Skipped"):
				successful_merges += 1
		else:
			failed_merges += 1
			print(f"❌ Failed to merge data for Point ID {point_id}. Details: {message}")

	if failed_merges > 0:
		print(f"\n⚠️ Phase 2 completed with {failed_merges} failed merge tasks.")
	else:
		print(f"\n✅ Phase 2 successfully completed. Merged {successful_merges} new files.")

	return failed_merges == 0

# ================================================================================================
# 	STEP 6: MAIN EXECUTION FLOW & VERIFICATION
# ================================================================================================

def run_verification_phase(points_df: pd.DataFrame, nc_files: List[str]):
	"""Final verification and reporting."""
	print("\n" + "="*80)
	print("PHASE 3: FINAL VERIFICATION & REPORTING")
	print("="*80)

	total_input_points = len(points_df)
	final_output_dir = pathlib.Path(FINAL_MERGED_OUTPUT_DIR)

	if not final_output_dir.exists():
		print("Sub-Process: Final output directory does not exist. Cannot run verification.")
		return

	all_final_files = list(final_output_dir.glob("*.csv"))
	final_file_count = len(all_final_files)

	# 1. Date Range Check (Uses direct open_dataset to ensure file closure)
	try:
		if not nc_files: raise ValueError("No NC files were processed.")
		
		# CRITICAL FIX: Do NOT use the global 'worker_datasets' cache in the main thread/verification phase
		with xr.open_dataset(nc_files[0], engine='netcdf4') as ds_start:
			expected_start_date = pd.to_datetime(ds_start['time'].values[0])
		with xr.open_dataset(nc_files[-1], engine='netcdf4') as ds_end:
			expected_end_date = pd.to_datetime(ds_end['time'].values[-1])
		
		print("Sub-Process: Date Range Check...")
		print(f"	- Input NC files define a span from: {expected_start_date.strftime('%Y-%m-%d')} to {expected_end_date.strftime('%Y-%m-%d')}")
		if all_final_files:
			sample_file = pd.read_csv(all_final_files[0], parse_dates=[OUTPUT_COLUMN_NAMES["time"]])
			actual_start_date = sample_file[OUTPUT_COLUMN_NAMES["time"]].min()
			actual_end_date = sample_file[OUTPUT_COLUMN_NAMES["time"]].max()
			date_ok = (actual_start_date == expected_start_date) and (actual_end_date == expected_end_date)
			status = "✅ OK" if date_ok else "❌ ERROR"
			print(f"	- Sample file '{all_final_files[0].name}' Date Status: {status}")
			print(f"	- Sample File Range: {actual_start_date.strftime('%Y-%m-%d')} to {actual_end_date.strftime('%Y-%m-%d')}")
	except Exception as e:
		print(f"⚠️ Warning: Could not perform full Date Range Check: {e}")

	# 2. File Count Check
	print("\nSub-Process: File Count Check (Completeness)...")
	if final_file_count == total_input_points:
		status = "✅ COMPLETE"
		message = f"All {total_input_points} input points have a final merged file."
	elif final_file_count < total_input_points:
		status = "⚠️ PARTIAL"
		message = f"Found {final_file_count}/{total_input_points} final merged files. {total_input_points - final_file_count} points are missing."
	else:
		status = "❌ ERROR"
		message = f"Found {final_file_count} files for {total_input_points} points (Check for duplicate Point IDs)."
	print(f"	- Completeness Status: {status}")
	print(f"	- {message}")
	print(f"	- Final Output Directory: {FINAL_MERGED_OUTPUT_DIR}")


def main():
	"""The main control function for the entire workflow."""

	overall_start_time = time.time()
	print("\n" + "#"*80)
	print("	 	 	UNIFIED NETCDF TIME-SERIES EXTRACTION AND MERGING WORKFLOW")
	print("#"*80)
	
	if sys.platform.startswith('win'):
		# Essential for multiprocessing on Windows
		mp.freeze_support() 

	try:
		# --- Task 1: Initialization ---
		print("\n" + "--- [ Task 1: Initialization and File Discovery ] ---")
		points_df = pd.read_csv(POINT_CSV_FILE_PATH)
		points_to_process, nc_files = prepare_data_and_files(points_df)

		# --- Task 2: Run Extraction (Phase 1) ---
		extraction_successful = run_extraction_phase(points_to_process, nc_files)

		# --- Task 3: Run Merging (Phase 2) ---
		# Check if any data exists in the temp directory to proceed with merging
		has_intermediate_data = any(pathlib.Path(YEARLY_DATA_DIR).rglob('*.csv'))

		if extraction_successful or has_intermediate_data:
			run_merging_phase(points_df)
		else:
			print("\nSkipping merging phase due to extraction errors and lack of existing intermediate data.")

		# --- Task 4: Verification and Cleanup (Phase 3) ---
		run_verification_phase(points_df, nc_files)

		if CLEANUP_YEARLY_DIR and pathlib.Path(YEARLY_DATA_DIR).exists():
			print(f"\n--- [ Task 5: Cleanup ] ---")
			shutil.rmtree(YEARLY_DATA_DIR)
			print("Sub-Process: Yearly data directory removed.")

	except Exception as e:
		print(f"\n\n{'='*80}\nFATAL ERROR: The workflow stopped unexpectedly.\nError Details: {e}\n{'='*80}")
		return

	# --- FINAL NOTIFICATION ---
	overall_end_time = time.time()
	print(f"\n\n{'='*80}")
	print("✅ ALL TASKS COMPLETE")
	print(f"Total elapsed time: {overall_end_time - overall_start_time:.2f} seconds.")
	print(f"Results saved to: {FINAL_MERGED_OUTPUT_DIR}")
	print('='*80)

if __name__ == "__main__":
	main()