In [62]:
import os 
import sys
import ismn
import pandas as pd
import numpy as np
import xarray as xr
from multiprocessing import Pool, cpu_count # 
from pathlib import Path
from datetime import datetime
import warnings
from ismn.interface import ISMN_Interface

In [63]:
insitu_dir = '/home/khanalp/data/ISMNsoilMoisture/'

In [64]:
# Read the data using ISMN_Interface
ds = ISMN_Interface("/home/khanalp/data/ISMNsoilMoisture/Data_separate_files_header_20140101_20251231_13107_18mx_20260208", parallel=True)

Using the existing ismn metadata in /home/khanalp/data/ISMNsoilMoisture/Data_separate_files_header_20140101_20251231_13107_18mx_20260208/python_metadata/Data_separate_files_header_20140101_20251231_13107_18mx_20260208.csv to set up ISMN_Interface. 
If there are issues with the data reader, you can remove the metadata csv file to repeat metadata collection.


In [65]:
ds

ismn.base.IsmnRoot Unzipped at /home/khanalp/data/ISMNsoilMoisture/Data_separate_files_header_20140101_20251231_13107_18mx_20260208
with Networks[Stations]:
------------------------
  AMMA-CATCH: ['Banizoumbou', 'Belefoungou-Mid', 'Belefoungou-Top', 'Nalohou-Mid', 'Nalohou-Top', 'Tondikiboro', 'Wankama'],
  ARM: ['Anthony', 'Ashton', 'Byron', 'Lamont-CF1', 'Lamont-CF2', 'MapleCity', 'Marshall', 'Medford', 'Morrison', 'Newkirk', 'Okmulgee', 'Omega', 'Pawhuska', 'Pawnee', 'Ringwood', 'Tryon', 'Tyro', 'Waukomis'],
  BDF_Saxony: ['Hilbersdorf', 'Koellitsch', 'Lippen', 'Schmorren'],
  BFG_Nw: ['BFG-Niederwerth-1-weighable'],
  BIEBRZA_S-1: ['grassland-soil-1', 'grassland-soil-2', 'grassland-soil-3', 'grassland-soil-4', 'grassland-soil-5', 'grassland-soil-6', 'grassland-soil-7', 'grassland-soil-8', 'grassland-soil-9', 'marshland-soil-11', 'marshland-soil-12', 'marshland-soil-13', 'marshland-soil-14', 'marshland-soil-15', 'marshland-soil-16', 'marshland-soil-17', 'marshland-soil-18', 'marshla

In [66]:
# 1. List networks and stations using modern ISMN interface
networks_stations = []
for network in ds.collection.networks:
    for station in ds.collection[network].stations:
        networks_stations.append({
            'network': network,
            'station': station
        })


In [67]:
df_stations = pd.DataFrame(networks_stations)
print(f"Total stations: {len(df_stations)}")
df_stations

Total stations: 2393


Unnamed: 0,network,station
0,AMMA-CATCH,Banizoumbou
1,AMMA-CATCH,Belefoungou-Mid
2,AMMA-CATCH,Belefoungou-Top
3,AMMA-CATCH,Nalohou-Mid
4,AMMA-CATCH,Nalohou-Top
...,...,...
2388,iRON,NorthstarAspenGrove
2389,iRON,NorthstarTransitionZone
2390,iRON,SkyMountain
2391,iRON,SmugglerMountain


In [72]:
# 2. Select one station to test (change these to your preferred station)
# test_network = df_stations.iloc[0]['network']  # First network
# test_station = df_stations.iloc[0]['station']  # First station
test_network = "CW3E"
test_station = "BeaumontCherryValley"

print(f"\n--- Testing with: {test_network} - {test_station} ---\n")
station_data = ds[test_network][test_station].to_xarray()
station_data  # To check how it looks like. 


--- Testing with: CW3E - BeaumontCherryValley ---



In [12]:
# Create output directory
output_dir = Path('processed_soil_moisture')
output_dir.mkdir(exist_ok=True)

To learn about ISMN data quality flag: check https://ismn.earth/en/data/flag-overview/
In short, G = good, D = Dubious, C= outside plausible range.

Inside process_station, \
1. Create a mask for good-quality measurements: soil_moisture_flag == "G".
2. Apply the mask to soil_moisture so non-G values become NaN.
3. Add a coordinate depth_group (per sensor) using depth_from so sensors can be grouped by depth.
4. Group by depth_group and take the mean across sensors (depth-average), ignoring NaNs.
5. Rename depth_group to depth.
6. Compute how many valid values each depth has over all times; drop depths that are all NaN.
7. If no depths remain, return None.
8. Resample to daily mean soil moisture (per depth).
9. Resample to a daily count of valid observations (per depth).
10.Try to convert the daily count to an integer (fallback to a float if conversion fails).
11. Mask daily soil moisture where the daily count is < 6 (those days become NaN).
12. Return an xarray.Dataset with:
13. soil_moisture (daily, filtered by count)
14. observation_count (daily count)
Add metadata attributes: network, station, latitude, and longitude.

In [None]:
# Function to process each station
def process_station(station_data, network, station):
    """
    Process a single station dataset:
    - Depth average soil_moisture where flag = 'G'
    - Drop depths with no valid data
    - Convert to daily if >= 6 valid observations per day
    """
    # Create mask where soil_moisture_flag == "G" and also include "D" (Dubious) flags as valid data.
    # mask = station_data['soil_moisture_flag'] == "G"
    mask = (station_data["soil_moisture_flag"] == "G") | station_data["soil_moisture_flag"].astype(str).str.startswith("D")
    
    # Apply mask to soil_moisture
    soil_moisture_masked = station_data['soil_moisture'].where(mask)
    
    # Assign depth_from as a coordinate for grouping
    soil_moisture_masked = soil_moisture_masked.assign_coords(
        depth_group=('sensor', station_data['depth_to'].values) # Use depth_to instead of depth_from because some stations have depth_from = 0 for all sensors, but depth_to varies and can be used to group sensors by depth.
    )
    
    # Group by depth and average across sensors
    depth_averaged = soil_moisture_masked.groupby('depth_group').mean(dim='sensor', skipna=True)
    depth_averaged = depth_averaged.rename({'depth_group': 'depth'})
    
    # Drop depths that have ALL NaN values (no valid data)
    valid_count_per_depth = depth_averaged.count(dim='date_time')
    depths_with_data = valid_count_per_depth > 0
    depth_averaged = depth_averaged.where(depths_with_data, drop=True)
    
    # If no valid depths remain, return None
    if len(depth_averaged.depth) == 0:
        return None
    
    # Resample to daily
    daily = depth_averaged.resample(date_time='1D').mean(dim='date_time', skipna=True)
    
    # Count valid observations per day
    count = depth_averaged.resample(date_time='1D').count(dim='date_time')
    
    # Handle the casting more gracefully
    try:
        count = count.fillna(0).astype(int)
    except:
        count = count.astype(float)
    
    # Mask out days with < 6 valid observations
    daily_filtered = daily.where(count >= 6)
    
    # ---- depth-bin averaging AFTER daily filtering ----
    depth_vals = daily_filtered["depth"].values.astype(float)

    # heuristic: if looks like meters, convert to cm
    depth_cm = depth_vals * 100.0 if np.nanmax(depth_vals) <= 3 else depth_vals

    bin_edges = [0.0, 5.0, 50.0, np.inf]
    bin_labels = ["0-5", "5-50", ">50"]

    depth_bin = pd.cut(depth_cm, bins=bin_edges, labels=bin_labels, right=True, include_lowest=True)

    daily_filtered = daily_filtered.assign_coords(depth_bin=("depth", depth_bin.astype(str)))
    count = count.assign_coords(depth_bin=("depth", depth_bin.astype(str)))

    # how many depths contribute to each bin (metadata)
    depths_in_bin = {b: depth_vals[np.array(depth_bin.astype(str)) == b].tolist() for b in bin_labels}
    n_depths_in_bin = {b: len(v) for b, v in depths_in_bin.items()}

    # average across depths within each bin (skip NaNs)
    daily_binned = (
        daily_filtered.groupby("depth_bin")
        .mean(dim="depth", skipna=True)
        .rename({"depth_bin": "depth"})
    )

    # also aggregate counts (sum across depths in bin)
    count_binned = (
        count.groupby("depth_bin")
        .sum(dim="depth")
        .rename({"depth_bin": "depth"})
    )
      
    # Create dataset with both soil moisture and count
    result_ds = xr.Dataset({
        'soil_moisture': daily_binned,
        'observation_count': count_binned
    })
    
    # Add metadata as attributes
    result_ds.attrs['network'] = network
    result_ds.attrs['station'] = station
    result_ds.attrs['latitude'] = float(station_data.attrs.get('lat', np.nan)) #The attrs lat and variables latitude are latitude.
    result_ds.attrs['longitude'] = float(station_data.attrs.get('lon', np.nan)) # same for lon. 
    
    return result_ds

## 'process_single_station' is just a wrapper for parallel processing. 
What really happens is:
1. station data is read into xarray. 
2. then process_station is called, which does QC, filtering, converts hourly to daily, which returns result_ds. 
3. If result_ds is None there we skip to other stations, if not
4. We get metadata like network, stations, longitude, latitude, depths, start_date, end_date, etc.
5. Filename is saved as f"{network}_{station}_{start_date}_{end_date}.nc" in output_dir.
6. Function returns metadata.


In [None]:
def process_single_station(args):
    """
    Wrapper function for parallel processing
    """
    network, station, idx, total = args
    
    try:
        print(f"[{idx+1}/{total}] Processing: {network}/{station}")
        
        # Read station data
        station_data = ds[network][station].to_xarray()
        
        # Process station
        result_ds = process_station(station_data, network, station)
        
        # Skip if no valid data
        if result_ds is None:
            print(f"[{idx+1}/{total}] Skipped (no valid data): {network}/{station}")
            return None
        
        # Get metadata
        lat = result_ds.attrs.get('latitude', np.nan)
        lon = result_ds.attrs.get('longitude', np.nan)
        depths = result_ds.depth.values.tolist()
        
        # Get date range
        valid_dates = result_ds['soil_moisture'].dropna(dim='date_time', how='all').date_time
        if len(valid_dates) == 0:
            print(f"[{idx+1}/{total}] Skipped (no valid dates): {network}/{station}")
            return None
            
        start_date = pd.to_datetime(valid_dates.min().values).strftime('%Y%m%d')
        end_date = pd.to_datetime(valid_dates.max().values).strftime('%Y%m%d')
        
        # Create filename
        filename = f"{network}_{station}_{start_date}_{end_date}.nc"
        filepath = output_dir / filename
        
        # Save to netCDF
        result_ds.to_netcdf(filepath)
        
        print(f"[{idx+1}/{total}] Success: {network}/{station} -> {filename}")
        
        # Return metadata
        metadata = {
            'network': network,
            'station': station,
            'latitude': lat,
            'longitude': lon,
            'depths': str(depths),  # Convert list to string for CSV
            'n_depths': len(depths),
            'start_date': start_date,
            'end_date': end_date,
            'n_days': len(valid_dates),
            'filename': filename
        }
        
        return metadata
        
    except Exception as e:
        print(f"[{idx+1}/{total}] Error processing {network}/{station}: {e}")
        return None


In [None]:
# # Inspect the data structure
# print("Data structure:")
# print(ds_test)
# print("\n" + "="*60 + "\n")

# # Check dimensions
# print("Dimensions:")
# print(ds_test.dims)
# print("\n" + "="*60 + "\n")

# # Check coordinates
# print("Coordinates:")
# print(ds_test.coords)
# print("\n" + "="*60 + "\n")


In [14]:
# Collect all station tasks
all_tasks = []
for network in ds.collection.networks:
    for station in ds.collection[network].stations:
        all_tasks.append((network, station))

In [16]:

# ============================================
# CONFIGURE TEST RUN HERE
# ============================================
TEST_MODE = True  # Set to False to process all stations
N_TEST_STATIONS = 1  # Number of stations to test with

if TEST_MODE:
    tasks = all_tasks[:N_TEST_STATIONS]
    print(f"=== TEST MODE: Processing {N_TEST_STATIONS} stations ===")
else:
    tasks = all_tasks
    print(f"=== FULL MODE: Processing all {len(tasks)} stations ===")

# Add index and total count to tasks
tasks_with_idx = [(net, sta, i, len(tasks)) for i, (net, sta) in enumerate(tasks)]

print(f"Output directory: {output_dir}/")
print(f"Using {cpu_count()} CPU cores available")

=== TEST MODE: Processing 10 stations ===
Output directory: processed_soil_moisture/
Using 128 CPU cores available


In [17]:

# ============================================
# CONFIGURE PARALLELIZATION HERE
# ============================================
USE_PARALLEL = False  # Set to False for sequential (easier debugging)
N_WORKERS = 10  # Number of parallel workers (adjust as needed)

if USE_PARALLEL:
    print(f"Running in parallel with {N_WORKERS} workers")
    with Pool(N_WORKERS) as pool:
        results = pool.map(process_single_station, tasks_with_idx)
else:
    print("Running sequentially (no parallelization)")
    results = [process_single_station(task) for task in tasks_with_idx]

Running in parallel with 10 workers
[2/10] Processing: AMMA-CATCH/Belefoungou-Mid[5/10] Processing: AMMA-CATCH/Nalohou-Top[1/10] Processing: AMMA-CATCH/Banizoumbou[3/10] Processing: AMMA-CATCH/Belefoungou-Top[9/10] Processing: ARM/Ashton[4/10] Processing: AMMA-CATCH/Nalohou-Mid[10/10] Processing: ARM/Byron[6/10] Processing: AMMA-CATCH/Tondikiboro[7/10] Processing: AMMA-CATCH/Wankama[8/10] Processing: ARM/Anthony









[4/10] Success: AMMA-CATCH/Nalohou-Mid -> AMMA-CATCH_Nalohou-Mid_20140101_20171231.nc[1/10] Success: AMMA-CATCH/Banizoumbou -> AMMA-CATCH_Banizoumbou_20140101_20181231.nc

[5/10] Success: AMMA-CATCH/Nalohou-Top -> AMMA-CATCH_Nalohou-Top_20140101_20171231.nc
[3/10] Success: AMMA-CATCH/Belefoungou-Top -> AMMA-CATCH_Belefoungou-Top_20140101_20171231.nc
[2/10] Success: AMMA-CATCH/Belefoungou-Mid -> AMMA-CATCH_Belefoungou-Mid_20140101_20171110.nc
[6/10] Success: AMMA-CATCH/Tondikiboro -> AMMA-CATCH_Tondikiboro_20140101_20181231.nc
[7/10] Success: AMMA-CATCH/Wankama -> AMMA-

In [18]:

# Filter out None results
metadata_list = [r for r in results if r is not None]

# Create metadata DataFrame
if len(metadata_list) > 0:
    df_metadata = pd.DataFrame(metadata_list)
    
    print(f"\n{'='*60}")
    print(f"Processing complete!")
    print(f"Successfully processed: {len(df_metadata)}/{len(tasks)} stations")
    print(f"Files saved to: {output_dir}/")
    print(f"\nMetadata summary:")
    print(df_metadata.head(20))
    
    # Save metadata
    metadata_file = output_dir / 'station_metadata.csv'
    df_metadata.to_csv(metadata_file, index=False)
    print(f"\nMetadata saved to: {metadata_file}")
    
    # Print summary statistics
    print(f"\nSummary:")
    print(f"  Networks: {df_metadata['network'].nunique()}")
    print(f"  Stations: {len(df_metadata)}")
    print(f"  Date range: {df_metadata['start_date'].min()} to {df_metadata['end_date'].max()}")
    print(f"  Depth range: {df_metadata['n_depths'].min()}-{df_metadata['n_depths'].max()} depths per station")
    
else:
    print("\nNo valid data found across all stations")


Processing complete!
Successfully processed: 10/10 stations
Files saved to: processed_soil_moisture/

Metadata summary:
      network          station  latitude  longitude  \
0  AMMA-CATCH      Banizoumbou       NaN        NaN   
1  AMMA-CATCH  Belefoungou-Mid       NaN        NaN   
2  AMMA-CATCH  Belefoungou-Top       NaN        NaN   
3  AMMA-CATCH      Nalohou-Mid       NaN        NaN   
4  AMMA-CATCH      Nalohou-Top       NaN        NaN   
5  AMMA-CATCH      Tondikiboro       NaN        NaN   
6  AMMA-CATCH          Wankama       NaN        NaN   
7         ARM          Anthony       NaN        NaN   
8         ARM           Ashton       NaN        NaN   
9         ARM            Byron       NaN        NaN   

                                              depths  n_depths start_date  \
0                                             [0.05]         1   20140101   
1                    [0.05, 0.1, 0.2, 0.4, 0.6, 1.0]         6   20140101   
2                         [0.05, 0.1, 0.4,

In [40]:
df_metadata = pd.read_csv('/home/khanalp/code/PhD/soilMoisture/processed_soil_moisture/station_metadata.csv')

In [41]:
df_metadata

Unnamed: 0,network,station,latitude,longitude,depths,n_depths,start_date,end_date,n_days,filename
0,GROW,pwvxhaqg,40.88741,25.85531,[0.0],1,20171124,20190521,455,GROW_pwvxhaqg_20171124_20190521.nc
1,COSMOS,GLEES,41.36440,-106.23940,[0.0],1,20140311,20170913,683,COSMOS_GLEES_20140311_20170913.nc
2,SCAN,MonoclineRidge,36.54417,-120.55463,"[0.0508, 0.1016, 0.2032, 0.508, 1.016]",5,20141030,20251217,4047,SCAN_MonoclineRidge_20141030_20251217.nc
3,Ru_CFR,Fyodorovskoyedrysprucestand,56.44760,32.90188,"[0.05, 0.15, 0.5, 1.0]",4,20150525,20241230,1733,Ru_CFR_Fyodorovskoyedrysprucestand_20150525_20...
4,REMEDHUS,LasEritas,41.20548,-5.41558,[0.0],1,20140101,20241231,3921,REMEDHUS_LasEritas_20140101_20241231.nc
...,...,...,...,...,...,...,...,...,...,...
184,GROW,6sbmpr2j,48.31841,15.17387,[0.0],1,20170709,20190904,570,GROW_6sbmpr2j_20170709_20190904.nc
185,SNOTEL,TroutCreek,40.73900,-109.67280,"[0.0508, 0.2032, 0.508]",3,20140419,20251230,2534,SNOTEL_TroutCreek_20140419_20251230.nc
186,SNOTEL,FryCanyon,41.58008,-115.93640,"[0.0508, 0.2032, 0.508]",3,20141009,20251227,2828,SNOTEL_FryCanyon_20141009_20251227.nc
187,SNOTEL,SusitnaValleyHigh,62.13245,-150.04391,"[0.0508, 0.2032, 0.508]",3,20140508,20251203,2120,SNOTEL_SusitnaValleyHigh_20140508_20251203.nc


In [86]:
output_dir = "/home/khanalp/code/PhD/soilMoisture/processed_soil_moisture/"
# Create directory for plot 
plot_dir = "/home/khanalp/code/PhD/soilMoisture/processed_soil_moisture/plots_for_depth_averaged/"
plot_dir = Path(plot_dir)
plot_dir.mkdir(exist_ok=True)

In [87]:
nc_files = list(Path(output_dir).glob("*.nc"))
len(nc_files)

189

In [88]:
# ---------------- Plot style (your template) ----------------
import matplotlib.pyplot as plt
import matplotlib as mpl
import scienceplots  # registers 'science', 'no-latex', etc.


plt.style.use(['science', 'no-latex'])
mpl.rcParams.update({
    'font.family': 'serif',
    'font.serif': ['Times New Roman', 'Times', 'DejaVu Serif'],
    'font.size': 14,
    'axes.titlesize': 16,
    'axes.titleweight': 'bold',
    'axes.labelsize': 14,
    'xtick.labelsize': 12,
    'ytick.labelsize': 12,
    'lines.linewidth': 2,
    'legend.fontsize': 12,
    'figure.dpi': 300,
    'xtick.direction': 'in',
    'ytick.direction': 'in',
    'xtick.major.size': 6,
    'ytick.major.size': 6,
    'xtick.minor.size': 3,
    'ytick.minor.size': 3,
    'axes.spines.top': False,
    'axes.spines.right': False,
    'pdf.fonttype': 42,
    'ps.fonttype': 42,
})


In [93]:
# Add index + total (same pattern as your station processing)
nc_files_with_idx = [(str(f), str(plot_dir), i, len(nc_files)) for i, f in enumerate(nc_files)]

In [94]:
USE_PARALLEL = True   # False = sequential (easier debugging)
N_WORKERS = 50        # adjust as needed

In [95]:
def plot_single_file(task):
    f, plot_dir, i, total = task
    try:
        ds = xr.open_dataset(f)

        if "soil_moisture" not in ds.data_vars:
            ds.close()
            return None

        station = ds.attrs.get("station", Path(f).stem)
        network = ds.attrs.get("network", "")
        title = f"Soil Moisture - {station}" + (f" ({network})" if network else "")

        fig, ax = plt.subplots(figsize=(14, 6))

        if ("depth" in ds.dims) or ("depth" in ds.coords):
            for depth in ds["depth"].values:
                ds["soil_moisture"].sel(depth=depth).plot(ax=ax, label=str(depth))
            ax.legend(title="Depth")
        else:
            ds["soil_moisture"].plot(ax=ax, label="soil_moisture")
            ax.legend()

        ax.set_ylabel("Soil Moisture")
        ax.set_xlabel("Date")
        ax.set_title(title)
        ax.grid(True, alpha=0.3)
        plt.tight_layout()

        out_png = Path(plot_dir) / f"{Path(f).stem}_soil_moisture_by_depth.png"
        plt.savefig(out_png, dpi=300, bbox_inches="tight")
        plt.close(fig)
        ds.close()

        if (i + 1) % 50 == 0:
            print(f"[{i+1}/{total}] done")

        return str(out_png)

    except Exception as e:
        return f"FAILED: {f} -> {e}"

In [96]:
if USE_PARALLEL:
    print(f"Running in parallel with {N_WORKERS} workers")
    with Pool(N_WORKERS) as pool:
        results = pool.map(plot_single_file, nc_files_with_idx)
else:
    print("Running sequentially (no parallelization)")
    results = [plot_single_file(task) for task in nc_files_with_idx]

# Optional: quick summary
n_ok = sum(r is not None and not str(r).startswith("FAILED:") for r in results)
n_fail = sum(isinstance(r, str) and r.startswith("FAILED:") for r in results)
print(f"Saved {n_ok} plots, failed {n_fail}. Output: {plot_dir}")

Running in parallel with 50 workers
[50/189] done
[100/189] done
[150/189] done
Saved 189 plots, failed 0. Output: /home/khanalp/code/PhD/soilMoisture/processed_soil_moisture/plots_for_depth_averaged


In [82]:
df_metadata = pd.read_csv("/home/khanalp/code/PhD/soilMoisture/processed_soil_moisture/station_metadata.csv")
df_metadata

Unnamed: 0,network,station,latitude,longitude,start_date,end_date,n_days,filename
0,GROW,pwvxhaqg,40.88741,25.85531,20171124,20190521,457,GROW_pwvxhaqg_20171124_20190521.nc
1,COSMOS,GLEES,41.36440,-106.23940,20140101,20170913,1169,COSMOS_GLEES_20140101_20170913.nc
2,SCAN,MonoclineRidge,36.54417,-120.55463,20141030,20251217,4047,SCAN_MonoclineRidge_20141030_20251217.nc
3,Ru_CFR,Fyodorovskoyedrysprucestand,56.44760,32.90188,20150525,20241231,2825,Ru_CFR_Fyodorovskoyedrysprucestand_20150525_20...
4,REMEDHUS,LasEritas,41.20548,-5.41558,20140101,20241231,3968,REMEDHUS_LasEritas_20140101_20241231.nc
...,...,...,...,...,...,...,...,...
184,GROW,6sbmpr2j,48.31841,15.17387,20170709,20190904,764,GROW_6sbmpr2j_20170709_20190904.nc
185,SNOTEL,TroutCreek,40.73900,-109.67280,20140101,20251230,4382,SNOTEL_TroutCreek_20140101_20251230.nc
186,SNOTEL,FryCanyon,41.58008,-115.93640,20141009,20251230,4100,SNOTEL_FryCanyon_20141009_20251230.nc
187,SNOTEL,SusitnaValleyHigh,62.13245,-150.04391,20140101,20251230,4363,SNOTEL_SusitnaValleyHigh_20140101_20251230.nc


In [83]:
ds_trial = xr.open_dataset('/home/khanalp/code/PhD/soilMoisture/processed_soil_moisture/CW3E_BeaumontCherryValley_20240119_20251230.nc')

In [91]:
ds_trial.dims

