In [None]:
import pandas as pd
import os
import glob
import numpy as np
import xarray as xr
from datetime import datetime
import zarr

obtain a parquet z scale fdc from all individual simulated csv to perform clustering

In [None]:
def calculate_z_scaled_fdc(data):
    """
    Calculate z-scaled flow duration curve for a given dataset
    
    Args:
        data (pandas.DataFrame): DataFrame with a 'streamflow_m^3/s' column
        
    Returns:
        dict: Dictionary with percentiles from Q0 to Q100
    """
    # Extract streamflow values
    streamflow = data['streamflow_m^3/s'].values
    
    # Z-scale the values
    mean = np.mean(streamflow)
    std = np.std(streamflow)
    z_scaled = (streamflow - mean) / std
    
    # Sort in descending order (for FDC)
    z_scaled_sorted = np.sort(z_scaled)[::-1]
    
    # Calculate percentiles in intervals of 1 (0 to 100)
    percentiles = {}
    n = len(z_scaled_sorted)
    
    for i in range(101):  # 0 to 100 inclusive
        position = (i / 100) * (n - 1)
        lower_index = int(np.floor(position))
        upper_index = int(np.ceil(position))
        
        if lower_index == upper_index:
            percentiles[f'Q{i}'] = z_scaled_sorted[lower_index]
        else:
            weight = position - lower_index
            percentiles[f'Q{i}'] = z_scaled_sorted[lower_index] * (1 - weight) + z_scaled_sorted[upper_index] * weight
    
    return percentiles

def process_csv_files(directory_path):
    """
    Process all CSV files in a directory and calculate z-scaled FDCs
    
    Args:
        directory_path (str): Path to directory containing CSV files
        
    Returns:
        pandas.DataFrame: DataFrame with z-scaled FDCs for all CSV files
    """
    # Get list of all CSV files in the directory
    csv_files = glob.glob(os.path.join(directory_path, '*.csv'))
    
    if not csv_files:
        raise ValueError(f"No CSV files found in {directory_path}")
    
    # Process each file
    results = []
    for file_path in csv_files:
        try:
            # Read CSV file
            df = pd.read_csv(file_path, parse_dates=['datetime'])
            
            # Calculate FDC
            fdc = calculate_z_scaled_fdc(df)
            
            # Get file name without extension as index
            file_index = os.path.splitext(os.path.basename(file_path))[0]
            
            # Add to results
            results.append({'index': file_index, **fdc})
            
            print(f"Processed: {file_path}")
        except Exception as e:
            print(f"Error processing {file_path}: {e}")
    
    # Create a dataframe from results
    if not results:
        raise ValueError("No valid results were obtained from the CSV files")
    
    result_df = pd.DataFrame(results)
    result_df.set_index('index', inplace=True)
    
    return result_df

if __name__ == "__main__":
    # Directory containing CSV files
    csv_dir = 'path to folder that contain individual csv simulated files'
    
    # Process all CSV files
    print(f"Processing CSV files in {csv_dir}...")
    result_df = process_csv_files(csv_dir)
    
    # Save to parquet
    output_path = 'path to folder save the parquet files'
    result_df.to_parquet(output_path)
    
    print(f"Z-scaled FDC saved to {output_path}")
    print(f"Processed {len(result_df)} files successfully")

make a zarr file of individual simulated data 

In [None]:
def process_csv_files_to_zarr(csv_dir, output_zarr_path):
    """
    Process all CSV files in a directory and create a Zarr file with specified structure
    
    Args:
        csv_dir (str): Path to directory containing CSV files
        output_zarr_path (str): Path to save the Zarr file
    """
    # Get list of all CSV files in the directory
    csv_files = glob.glob(os.path.join(csv_dir, '*.csv'))
    
    if not csv_files:
        raise ValueError(f"No CSV files found in {csv_dir}")
    
    print(f"Found {len(csv_files)} CSV files")
    
    # Get all unique river IDs from filenames (assuming filename is the river ID)
    rivids = [int(os.path.splitext(os.path.basename(file))[0]) for file in csv_files]
    
    # specify the time range
    time_index = pd.date_range(start='1980-01-01', end='2023-01-01', freq='D')
    
    print(f"Creating dataset with {len(rivids)} river IDs and {len(time_index)} time steps")
    
    # Create xarray dataset with the specified structure
    ds = xr.Dataset(
        coords={
            'rivid': ('rivid', np.array(rivids, dtype=np.int32)),
            'time': time_index,
        },
        attrs={
            'Conventions': 'CF-1.6',
            'comment': '',
            'featureType': 'timeSeries',
            'history': '',
            'date_created': datetime.now().strftime("%Y-%m-%dT%H:%M:%S+00:00"),
            'institution': '',
            'references': 'https://github.com/c-h-david/rapid/, http://dx.doi.org/10.1175/2011JHM1345.1',
            'source': 'RAPID: unknown, NOT a git repository, water inflow: /mnt/inflows/101/m3_101_20240731_20241016.nc',
            'title': ''
        }
    )
    
    # Initialize Qout data variable with NaN values
    # Using chunking for better performance with large datasets
    ds['Qout'] = xr.DataArray(
        data=np.full((len(time_index), len(rivids)), np.nan, dtype=np.float32),
        dims=['time', 'rivid'],
        coords={'time': time_index, 'rivid': rivids},
        attrs={
            'long_name': 'River Discharge',
            'units': 'm^3/s'
        }
    )
    
    print("Processing CSV files and populating dataset...")
    
    # Process each file and populate dataset
    for i, file_path in enumerate(csv_files):
        try:
            rivid = int(os.path.splitext(os.path.basename(file_path))[0])
            rivid_index = rivids.index(rivid)
            
            # Read CSV file
            df = pd.read_csv(file_path)
            
            # Convert datetime column to pandas datetime
            # Try different date formats if needed
            try:
                df['datetime'] = pd.to_datetime(df['datetime'])
            except:
                try:
                    # Try MM/DD/YY format
                    df['datetime'] = pd.to_datetime(df['datetime'], format='%m/%d/%y')
                except:
                    # If all else fails, infer format
                    df['datetime'] = pd.to_datetime(df['datetime'], infer_datetime_format=True)
            
            # Set datetime as index
            df.set_index('datetime', inplace=True)
            
            # Reindex to match the full time range (this will create NaN for missing dates)
            reindexed = df.reindex(time_index)
            
            # Populate the dataset for this river
            ds['Qout'].loc[dict(rivid=rivid)] = reindexed['streamflow_m^3/s'].values
            
            # Print progress periodically
            if (i+1) % 100 == 0 or i+1 == len(csv_files):
                print(f"Processed {i+1}/{len(csv_files)} files")
                
        except Exception as e:
            print(f"Error processing {file_path}: {e}")
    
    # Save to Zarr format with chunking for better performance
    # Using reasonable chunk sizes for both dimensions
    time_chunk_size = min(500, len(time_index))
    rivid_chunk_size = min(500, len(rivids))
    
    encoding = {
        'Qout': {
            'chunks': (time_chunk_size, rivid_chunk_size),
            'compressor': zarr.Blosc(cname='zstd', clevel=3),
        }
    }
    
    print("Saving to Zarr format...")
    ds.to_zarr(output_zarr_path, encoding=encoding, mode='w')
    
    print(f"Successfully created Zarr dataset at {output_zarr_path}")
    return ds

# For memory-constrained environments, here's a chunked version
def process_csv_files_to_zarr_memory_efficient(csv_dir, output_zarr_path):
    """
    Memory-efficient version for very large datasets
    Processes files in batches and writes to Zarr incrementally
    """
    # Get list of all CSV files in the directory
    csv_files = glob.glob(os.path.join(csv_dir, '*.csv'))
    
    if not csv_files:
        raise ValueError(f"No CSV files found in {csv_dir}")
    
    print(f"Found {len(csv_files)} CSV files")
    
    # Get river IDs from filenames
    rivids = [int(os.path.splitext(os.path.basename(file))[0]) for file in csv_files]
    
    # Create time range from 1940-01-01 to 2024-10-16
    time_index = pd.date_range(start='1980-01-01', end='2023-01-01', freq='D')
    
    print(f"Creating dataset with {len(rivids)} river IDs and {len(time_index)} time steps")
    
    # Set up chunking for efficient I/O
    time_chunk_size = 500
    rivid_chunk_size = 500
    
    # Create a template dataset with proper metadata
    template_ds = xr.Dataset(
        coords={
            'rivid': ('rivid', np.array(rivids, dtype=np.int32)),
            'time': time_index,
        },
        attrs={
            'Conventions': 'CF-1.6',
            'comment': '',
            'featureType': 'timeSeries',
            'history': '',
            'date_created': datetime.now().strftime("%Y-%m-%dT%H:%M:%S+00:00"),
            'institution': '',
            'references': 'https://github.com/c-h-david/rapid/, http://dx.doi.org/10.1175/2011JHM1345.1',
            'source': 'RAPID: unknown, NOT a git repository, water inflow: /mnt/inflows/101/m3_101_20240731_20241016.nc',
            'title': ''
        }
    )
    
    # Create Qout variable with chunking and proper attributes
    qout_data = np.full((len(time_index), len(rivids)), np.nan, dtype=np.float32)
    template_ds['Qout'] = xr.DataArray(
        data=qout_data,
        dims=['time', 'rivid'],
        coords={'time': time_index, 'rivid': rivids},
        attrs={'long_name': 'River Discharge', 'units': 'm^3/s'}
    )
    
    # Initialize Zarr store with chunking
    encoding = {
        'Qout': {
            'chunks': (time_chunk_size, rivid_chunk_size),
            'compressor': zarr.Blosc(cname='zstd', clevel=3),
        }
    }
    template_ds.to_zarr(output_zarr_path, encoding=encoding, mode='w')
    
    # Process files in batches
    batch_size = 100
    num_batches = (len(csv_files) + batch_size - 1) // batch_size
    
    for batch_idx in range(num_batches):
        start_idx = batch_idx * batch_size
        end_idx = min((batch_idx + 1) * batch_size, len(csv_files))
        batch_files = csv_files[start_idx:end_idx]
        
        print(f"Processing batch {batch_idx+1}/{num_batches} ({len(batch_files)} files)")
        
        # Process each file in the batch
        for i, file_path in enumerate(batch_files):
            try:
                rivid = int(os.path.splitext(os.path.basename(file_path))[0])
                rivid_idx = rivids.index(rivid)
                
                # Read CSV file
                df = pd.read_csv(file_path)
                
                # Convert datetime column
                try:
                    df['datetime'] = pd.to_datetime(df['datetime'])
                except:
                    try:
                        df['datetime'] = pd.to_datetime(df['datetime'], format='%m/%d/%y')
                    except:
                        df['datetime'] = pd.to_datetime(df['datetime'], infer_datetime_format=True)
                
                # Set datetime as index and reindex to match global time index
                df.set_index('datetime', inplace=True)
                reindexed = df.reindex(time_index)
                
                # Open zarr store in append mode
                with zarr.open(output_zarr_path, mode='a') as zstore:
                    # Get Qout array
                    qout_array = zstore['Qout']
                    # Update with values for this river
                    qout_array[:, rivid_idx] = reindexed['streamflow_m^3/s'].fillna(np.nan).values
                
                if (i+1) % 10 == 0:
                    print(f"  Processed {i+1}/{len(batch_files)} files in current batch")
                    
            except Exception as e:
                print(f"Error processing {file_path}: {e}")
        
        # Force garbage collection between batches
        import gc
        gc.collect()
    
    print(f"Successfully created Zarr dataset at {output_zarr_path}")
    return xr.open_zarr(output_zarr_path)

if __name__ == "__main__":
    # Directory containing CSV files
    csv_dir = 'path to folder that contain individual csv simulated files'
    
    # Path to save the Zarr file
    output_zarr_path = 'path to folder to save the zarr file'
    
    # For smaller datasets (country level):
    ds = process_csv_files_to_zarr(csv_dir, output_zarr_path)
    
    # For very large datasets (whole world but change the code to use whole file rather than individual files):
    # ds = process_csv_files_to_zarr_memory_efficient(csv_dir, output_zarr_path)
    
    print("Final dataset info:")
    print(ds)