NetCDF variable extractor (Single File)

In [None]:
"""
NetCDF variable extractor (Single File)
---------------------------------------------------

❶  Point INPUT_FILE at any NetCDF file.
❷  List one or more VARS_TO_EXTRACT.
❸  Set OUTPUT_DIR (created automatically if absent).
❹  Adjust parse_time_from_filename() if your date pattern differs.

If the file name contains no 6-digit YYYYMM block, the script falls
back to  <varPart>_<originalFileStem>.nc.
"""

from pathlib import Path
import xarray as xr

# ========= USER CONFIGURATION ==========================================
INPUT_FILE      = r"Enter Input File Path"
VARS_TO_EXTRACT = ["Vaiable Name"]                                                  # e.g. "V1" or ["V1","V2"]
OUTPUT_DIR      = r"Enter Output Directory"
OVERWRITE       = False                                                     # True → silently replace existing files
# =======================================================================


def parse_time_from_filename(file_path: Path) -> str | None:
    """
    Return 'YYYY_MM' if a six-digit YYYYMM block is found in *file_path.stem*;
    otherwise return None.
    """
    stem   = file_path.stem
    digits = ''.join(reversed([c for c in stem[::-1] if c.isdigit()][:6]))
    if len(digits) == 6:
        return f"{digits[:4]}_{digits[4:]}"                                 # "2018_12"
    return None                                                             # no date found


def extract_and_save(input_file: str,
                     variables,
                     output_dir: str,
                     overwrite: bool = False) -> None:
    """Extract *variables* from *input_file* and save to *output_dir*."""

    fp      = Path(input_file).expanduser().resolve()
    out_dir = Path(output_dir).expanduser().resolve()
    out_dir.mkdir(parents=True, exist_ok=True)

    ds = xr.open_dataset(fp)

    if isinstance(variables, str):
        variables = [variables]

    missing = [v for v in variables if v not in ds]
    if missing:
        raise KeyError(f"Variable(s) not found in {fp.name}: {missing}")

    subset = ds[variables]

    # ---------- build output filename -----------------------------------
    tag       = parse_time_from_filename(fp)                                # None if no date
    var_part  = "_".join(variables)

    if tag:                                                                 # date found
        out_name = f"{var_part}_Conc_{tag}.nc"
    else:                                                                   # no date → use file stem
        out_name = f"{var_part}_{fp.stem}.nc"

    out_path = out_dir / out_name

    if out_path.exists() and not overwrite:
        raise FileExistsError(f"{out_path} exists (set OVERWRITE=True).")

    subset.to_netcdf(out_path, mode="w", engine="netcdf4")
    print(f"✔  Saved {variables} → {out_path}")


if __name__ == "__main__":
    extract_and_save(INPUT_FILE,
                     VARS_TO_EXTRACT,
                     OUTPUT_DIR,
                     overwrite=OVERWRITE)

NetCDF variable extractor (Multiple File)

In [None]:
"""
NetCDF variable extractor (Multiple File)
==============================

For every NetCDF file in INPUT_DIR:
    • open the file
    • pull out VARS_TO_EXTRACT
    • write a new file to OUTPUT_DIR

"""

from pathlib import Path
import xarray as xr

# ========= USER CONFIGURATION ==========================================
INPUT_DIR        = r"Enter Input File Directory"                             # where the original files live
OUTPUT_DIR       = r"Enter Output File Directory"                            # where you want the extracted files
VARS_TO_EXTRACT  = ["Vaiable Name"]                                          # e.g. "V1" or ["V1","V2"]
OVERWRITE        = True                                                      # True → silently replace existing output
FILE_SUFFIXES    = (".nc", ".nc4")                                           # processed file extensions
RECURSIVE_SEARCH = False                                                     # True → walk sub-folders too
# =======================================================================

def parse_time_from_filename(path: Path) -> str | None:
    """Return 'YYYY_MM' if six trailing digits exist, else None."""
    digits = ''.join(reversed([c for c in path.stem[::-1] if c.isdigit()][:6]))
    if len(digits) == 6:
        return f"{digits[:4]}_{digits[4:]}"
    return None


def gather_files(root: Path, suffixes, recursive=False):
    if recursive:
        yield from (p for p in root.rglob("*") if p.suffix in suffixes)
    else:
        yield from (p for p in root.iterdir() if p.suffix in suffixes)


def extract_and_save_one(src_path: Path,
                         variables,
                         dst_dir: Path,
                         overwrite=False):
    ds = xr.open_dataset(src_path)
    variables = [variables] if isinstance(variables, str) else list(variables)

    missing = [v for v in variables if v not in ds]
    if missing:
        raise KeyError(f"{src_path.name}: variable(s) not found → {missing}")

    subset = ds[variables]

    time_tag = parse_time_from_filename(src_path)
    var_part = "_".join(variables)

    # --- NEW NAMING RULE ------------------------------------------------
    if time_tag:                                   # date found
        out_name = f"{var_part}_Conc_{time_tag}.nc"
    else:                                          # no date → var + file name
        out_name = f"{var_part}_{src_path.stem}.nc"
    # -------------------------------------------------------------------

    out_path = dst_dir / out_name
    if out_path.exists() and not overwrite:
        raise FileExistsError(f"{out_path} already exists (set OVERWRITE=True)")

    subset.to_netcdf(out_path, mode="w", engine="netcdf4")
    print(f"✔  {src_path.name} → {out_path.name}")


if __name__ == "__main__":
    in_dir  = Path(INPUT_DIR).expanduser().resolve()
    out_dir = Path(OUTPUT_DIR).expanduser().resolve()
    out_dir.mkdir(parents=True, exist_ok=True)

    files = list(gather_files(in_dir, FILE_SUFFIXES, RECURSIVE_SEARCH))
    if not files:
        raise FileNotFoundError(f"No files ending with {FILE_SUFFIXES} in {in_dir}")

    for f in files:
        try:
            extract_and_save_one(f, VARS_TO_EXTRACT, out_dir, overwrite=OVERWRITE)
        except Exception as err:
            print(f"⚠️  Skipping {f.name}: {err}")
