In [1]:
import subprocess as sp
import os

import re
import tarfile
from pathlib import Path
from typing import Dict, List, Optional, Tuple

import pandas as pd

import shutil

In [2]:
# Update these paths to point to the MeteorologicalSensor data locations on CyVerse for the target season
cyverse_dir_l0 = "/iplant/home/shared/phytooracle/season_19_sorghum_cotton_yr_2025/level_0/MeteorologicalSensor"
cyverse_dir_l1 = "/iplant/home/shared/phytooracle/season_19_sorghum_cotton_yr_2025/level_1/MeteorologicalSensor"
cyverse_dir_l2 = "/iplant/home/shared/phytooracle/season_19_sorghum_cotton_yr_2025/level_2/MeteorologicalSensor"

In [3]:
cwd = os.getcwd()

cmd1 = f'iget -rkPVT {cyverse_dir_l0}'
download = sp.run(f"ssh filexfer 'mkdir {cwd}/inputs && cd {cwd}/inputs && {cmd1} && exit'", shell=True)
print(download.stdout)
print("download complete")

Authorized uses only. All activity may be monitored and reported.


D- ./MeteorologicalSensor :
0/61 -  0.00% of files done   0.000/48.393 MB -  0.00% of file sizes done
Processing MeteorologicalSensor-2025-05-30__17-14-43-000_cotton.tar.gz - 0.749 MB   2026-01-05.12:17:59
   MeteorologicalSensor-2025       0.749 MB | 0.924 sec | 0 thr |  0.810 MB/s
1/61 -  1.64% of files done   0.749/48.393 MB -  1.55% of file sizes done
Processing MeteorologicalSensor-2025-05-30__23-10-02-000_cotton.tar.gz - 1.209 MB   2026-01-05.12:18:00
   MeteorologicalSensor-2025       1.209 MB | 0.903 sec | 0 thr |  1.338 MB/s
2/61 -  3.28% of files done   1.958/48.393 MB -  4.05% of file sizes done
Processing MeteorologicalSensor-2025-06-02__17-37-04-000_cotton.tar.gz - 0.751 MB   2026-01-05.12:18:01
   MeteorologicalSensor-2025       0.751 MB | 0.385 sec | 0 thr |  1.950 MB/s
3/61 -  4.92% of files done   2.709/48.393 MB -  5.60% of file sizes done
Processing MeteorologicalSensor-2025-06-05__17-27-49-000_cotton.tar.gz - 0.723 MB   2026-01-05.12:18:01
   MeteorologicalSensor-20

Above download dumps files in CWD/inputs/MeteorologicalSensor

For each tarball, we want to extract the contents, and then merge the CSVs for all plots.

Each tarball has the following tree:
```
MeteorologicalSensor-2025-05-30__17-14-43-000_cotton.tar.gz
.
└── MeteorologicalSensor-2025-05-30__17-14-43-000_cotton
    ├── hdf5_extraction.log
    └── meteorological_sensor
        ├── 1
        │   ├── epar_1_0_data.csv
        │   ├── par_2_1_data.csv
        │   └── weather_station_3_0_data.csv
        ├── 2
        │   ├── epar_1_0_data.csv
        │   ├── par_2_1_data.csv
        │   └── weather_station_3_0_data.csv
        ├── ...        
        └── N
            ├── epar_1_0_data.csv
            ├── par_2_1_data.csv
            └── weather_station_3_0_data.csv
```



In [4]:
# Map logical sensor names to expected CSV filenames inside each plot folder
SENSOR_FILES = {
    "epar": "epar_1_0_data.csv",
    "par": "par_2_1_data.csv",
    "weather_station": "weather_station_3_0_data.csv",
}

TARBALL_NAME_RE = re.compile(
    r"""
    ^MeteorologicalSensor-        # prefix
    (?P<date>\d{4}-\d{2}-\d{2})   # YYYY-MM-DD
    __
    (?P<time>\d{2}-\d{2}-\d{2}-\d{3})  # HH-MM-SS-sss
    _
    (?P<crop>[A-Za-z0-9_-]+)      # crop name
    \.tar\.gz$                    # suffix
    """,
    re.X,
)


In [5]:

def parse_tarball_metadata(tar_path: Path) -> Dict[str, Optional[str]]:
    """
    Extract metadata (date, time, crop, base_name) from tarball filename.
    """
    name = tar_path.name
    m = TARBALL_NAME_RE.match(name)
    # Path.stem for .tar.gz yields stem='MeteorologicalSensor-...tar'
    # Remove trailing '.tar' from the stem for a clean base name.
    stem = tar_path.stem
    base_name = stem[:-4] if stem.endswith(".tar") else stem

    meta = {
        "tarball_name": name,
        "base_name": base_name,
        "date": None,
        "time": None,
        "crop": None,
    }
    if m:
        meta["date"] = m.group("date")
        meta["time"] = m.group("time")
        meta["crop"] = m.group("crop")
    return meta


def safe_extract(tar: tarfile.TarFile, path: Path) -> None:
    """
    Safely extract tarfile to 'path', preventing path traversal.
    """
    path = path.resolve()
    for member in tar.getmembers():
        member_path = (path / member.name).resolve()
        if not str(member_path).startswith(str(path)):
            raise RuntimeError(f"Blocked path traversal attempt: {member.name}")
    tar.extractall(path)


def extract_tarball(tar_path: Path, work_dir: Path) -> Path:
    """
    Extract tarball into work_dir/<base_name> and return the extraction root.

    If the tar contains exactly one top-level directory, return that inner directory
    to make downstream path handling simpler. Otherwise return the outer path.
    """
    meta = parse_tarball_metadata(tar_path)
    extract_root = work_dir / meta["base_name"]
    extract_root.mkdir(parents=True, exist_ok=True)

    with tarfile.open(tar_path, "r:gz") as tf:
        safe_extract(tf, extract_root)

    # If there is exactly one top-level directory, return it for convenience
    try:
        top_dirs = [p for p in extract_root.iterdir() if p.is_dir()]
        if len(top_dirs) == 1:
            return top_dirs[0]
    except Exception:
        pass

    return extract_root


def find_sensor_root(extract_root: Path) -> Optional[Path]:
    """
    Locate the 'meteorological_sensor' directory under the extracted content,
    even if it's nested (e.g., extract_root/<top>/meteorological_sensor).
    """
    direct = extract_root / "meteorological_sensor"
    if direct.exists():
        return direct

    candidates = list(extract_root.rglob("meteorological_sensor"))
    if candidates:
        candidates.sort(key=lambda p: len(p.parts))  # shallowest first
        return candidates[0]

    return None


def find_plot_dirs(extract_root: Path) -> List[Path]:
    """
    Find plot directories under .../meteorological_sensor/<plot_id>/.

    A plot directory is valid if:
      - It's a directory
      - Its name is strictly numeric, e.g., '1', '2', ... 'N'
    """
    sensor_root = find_sensor_root(extract_root)
    if sensor_root is None:
        print(f"[WARN] 'meteorological_sensor' directory not found under {extract_root}")
        return []

    plot_dirs: List[Path] = []
    try:
        for child in sensor_root.iterdir():
            if child.is_dir() and child.name.isdigit():
                plot_dirs.append(child)
    except Exception as e:
        print(f"[WARN] Failed to iterate {sensor_root}: {e}")
        return []

    plot_dirs.sort(key=lambda p: int(p.name))

    if not plot_dirs:
        sample_children = []
        try:
            sample_children = [c.name for c in sensor_root.iterdir()]
        except Exception:
            pass
        print(
            "[WARN] Found 'meteorological_sensor' at "
            f"{sensor_root}, but no numeric plot dirs inside. "
            f"Children: {sample_children[:10]}{' ...' if len(sample_children) > 10 else ''}"
        )
    else:
        print(f"[INFO] Found {len(plot_dirs)} plot directories in {sensor_root}")

    return plot_dirs


def find_sensor_csv_path(plot_dir: Path, expected_filename: str, sensor_name: str) -> Optional[Path]:
    """
    Locate the sensor CSV within a plot directory.
    Tries exact expected filename first, then falls back to a pattern like '{sensor_name}_*_data.csv'.
    """
    exact = plot_dir / expected_filename
    if exact.exists():
        return exact
    candidates = sorted(plot_dir.glob(f"{sensor_name}_*_data.csv"))
    if candidates:
        # Prefer the lexicographically smallest candidate for reproducibility
        return candidates[0]
    print(f"[WARN] Could not find CSV for sensor '{sensor_name}' in {plot_dir}")
    return None


def read_sensor_csv(csv_path: Path) -> Optional[pd.DataFrame]:
    """
    Robust CSV reader for semicolon-delimited files with a header in the first row.
    Tries the C engine first (fast), falls back to Python engine (forgiving) without low_memory.
    """
    if not csv_path or not csv_path.exists():
        print(f"[WARN] Missing expected CSV: {csv_path}")
        return None

    # Attempt fast C engine
    try:
        df = pd.read_csv(
            csv_path,
            sep=";",
            encoding="utf-8",
            low_memory=False,  # supported by C engine
        )
        return df
    except Exception as e_c:
        print(f"[WARN] C engine failed for {csv_path}: {e_c}. Falling back to Python engine.")

    # Fallback to Python engine (do NOT pass low_memory)
    try:
        try:
            df = pd.read_csv(
                csv_path,
                sep=";",
                encoding="utf-8",
                engine="python",
                on_bad_lines="skip",   # pandas >= 1.3
            )
        except TypeError:
            # Older pandas fallback (pre-1.3)
            df = pd.read_csv(
                csv_path,
                sep=";",
                encoding="utf-8",
                engine="python",
                error_bad_lines=False,
            )
        return df
    except Exception as e_py:
        print(f"[WARN] Python engine also failed for {csv_path}: {e_py}")
        return None


def normalize_time_columns(df: pd.DataFrame) -> pd.DataFrame:
    """
    Create a unified 'timestamp' column from 'date_int' (µs since epoch) or 'date' (YYYY-MM-DD_HH:MM:SS).
    Fills NaT where parsing fails; leaves timezone-naive timestamps.
    """
    ts = None

    if "date_int" in df.columns:
        try:
            ts = pd.to_datetime(df["date_int"].astype("int64"), unit="us", errors="coerce")
        except Exception:
            ts = pd.to_datetime(df["date_int"], unit="us", errors="coerce")

    if ts is None or getattr(ts, "isna", lambda: True)().all():
        if "date" in df.columns:
            ts = pd.to_datetime(df["date"], format="%Y-%m-%d_%H:%M:%S", errors="coerce")
        else:
            ts = pd.Series(pd.NaT, index=df.index)

    df["timestamp"] = ts
    return df


def add_metadata_columns(df: pd.DataFrame, meta: Dict[str, Optional[str]], sensor_name: str, plot_id: str) -> pd.DataFrame:
    """
    Add collision-safe metadata columns:
      - plot_id, sensor_type, tarball_name
      - meta_date, meta_time, meta_crop
    Ensures these columns exist; then reorders to place metadata first.
    """
    # Assign (won't error if original df already has 'date')
    df["plot_id"] = plot_id
    df["sensor_type"] = sensor_name
    df["tarball_name"] = meta.get("tarball_name")
    df["meta_date"] = meta.get("date")
    df["meta_time"] = meta.get("time")
    df["meta_crop"] = meta.get("crop")

    meta_cols = ["plot_id", "sensor_type", "tarball_name", "meta_date", "meta_time", "meta_crop"]
    # Reorder: metadata first, then the rest
    remaining = [c for c in df.columns if c not in meta_cols]
    df = df[meta_cols + remaining]
    return df


# --- Core merge logic ---------------------------------------------------------

def merge_tarball(
    tarball_path: Path,
    extract_root: Path,
    output_dir: Path,
    dedupe_on_timestamp: bool = True,
    make_long_format: bool = True,
) -> Dict[str, Path]:
    """
    Merge CSVs for all plots within a single tarball.
    Returns dict of output file paths keyed by sensor name and 'all_sensors' if applicable.
    """
    meta = parse_tarball_metadata(tarball_path)
    plot_dirs = find_plot_dirs(extract_root)

    if not plot_dirs:
        print(f"[WARN] No plot directories found for {tarball_path}")
        return {}

    per_sensor_frames: Dict[str, List[pd.DataFrame]] = {k: [] for k in SENSOR_FILES.keys()}
    long_frames: List[pd.DataFrame] = []

    for plot_dir in plot_dirs:
        plot_id = plot_dir.name

        for sensor_name, expected_filename in SENSOR_FILES.items():
            csv_path = find_sensor_csv_path(plot_dir, expected_filename, sensor_name)
            df = read_sensor_csv(csv_path)
            if df is None:
                continue

            # Normalize time columns for consistent dedupe/merge behavior
            df = normalize_time_columns(df)

            # Add collision-safe metadata columns
            df = add_metadata_columns(df, meta, sensor_name, plot_id)

            per_sensor_frames[sensor_name].append(df)
            if make_long_format:
                long_frames.append(df)

    # Prepare output directory per tarball
    out_dir = output_dir / meta["base_name"]
    out_dir.mkdir(parents=True, exist_ok=True)

    outputs: Dict[str, Path] = {}

    # Write per-sensor merged CSVs
    for sensor_name, frames in per_sensor_frames.items():
        if not frames:
            continue
        merged = pd.concat(frames, axis=0, ignore_index=True)

        # Deduplication preferences: timestamp -> date_int -> date
        if dedupe_on_timestamp:
            if "timestamp" in merged.columns and merged["timestamp"].notna().any():
                merged = merged.drop_duplicates(subset=["plot_id", "timestamp"])
            elif "date_int" in merged.columns:
                merged = merged.drop_duplicates(subset=["plot_id", "date_int"])
            elif "date" in merged.columns:
                merged = merged.drop_duplicates(subset=["plot_id", "date"])

        out_path = out_dir / f"{sensor_name}_merged.csv"
        merged.to_csv(out_path, index=False)
        outputs[sensor_name] = out_path
        print(f"[INFO] Wrote {out_path} ({len(merged)} rows)")

    # Write long-format all-sensors merged CSV
    if make_long_format and long_frames:
        long_merged = pd.concat(long_frames, axis=0, ignore_index=True)
        if dedupe_on_timestamp:
            if "timestamp" in long_merged.columns and long_merged["timestamp"].notna().any():
                long_merged = long_merged.drop_duplicates(subset=["sensor_type", "plot_id", "timestamp"])
            elif "date_int" in long_merged.columns:
                long_merged = long_merged.drop_duplicates(subset=["sensor_type", "plot_id", "date_int"])
            elif "date" in long_merged.columns:
                long_merged = long_merged.drop_duplicates(subset=["sensor_type", "plot_id", "date"])

        out_path = out_dir / "all_sensors_long.csv"
        long_merged.to_csv(out_path, index=False)
        outputs["all_sensors"] = out_path
        print(f"[INFO] Wrote {out_path} ({len(long_merged)} rows)")

    return outputs


def process_all_tarballs(
    input_dir: Path,
    work_dir: Path,
    output_dir: Path,
    keep_extracted: bool = False,
    make_global_merge: bool = False,
    dedupe_on_timestamp: bool = True,
) -> None:
    """
    Iterate over all *.tar.gz in input_dir, extract and merge per tarball.
    Optionally, produce a global merged CSV across all tarballs for each sensor and for all sensors.
    """
    # Strict Paths: callers should pass Path objects (Option B).
    if not input_dir.exists():
        raise FileNotFoundError(f"Input dir not found: {input_dir}")

    work_dir.mkdir(parents=True, exist_ok=True)
    output_dir.mkdir(parents=True, exist_ok=True)

    tarballs = sorted(input_dir.glob("*.tar.gz"))
    if not tarballs:
        print(f"[INFO] No .tar.gz files found in {input_dir}")
        return

    global_sensor_frames: Dict[str, List[pd.DataFrame]] = {k: [] for k in SENSOR_FILES.keys()}
    global_long_frames: List[pd.DataFrame] = []

    for tar in tarballs:
        print(f"[INFO] Processing {tar.name} ...")
        extract_root = extract_tarball(tar, work_dir)

        # Optional debug: show where the script is looking
        sensor_root = find_sensor_root(extract_root)
        print(f"[DEBUG] sensor_root: {sensor_root}")

        outputs = merge_tarball(
            tarball_path=tar,
            extract_root=extract_root,
            output_dir=output_dir,
            dedupe_on_timestamp=dedupe_on_timestamp,
            make_long_format=True,
        )

        # Accumulate for global merges
        for sensor_name in SENSOR_FILES.keys():
            out_path = outputs.get(sensor_name)
            if out_path and out_path.exists():
                # Read back with semicolon? No—these are our outputs, so they are comma CSVs from pandas.
                df = pd.read_csv(out_path, low_memory=False)
                global_sensor_frames[sensor_name].append(df)

        if outputs.get("all_sensors") and outputs["all_sensors"].exists():
            df_long = pd.read_csv(outputs["all_sensors"], low_memory=False)
            global_long_frames.append(df_long)

        # Clean up extracted content if requested
        if not keep_extracted:
            try:
                for root, dirs, files in os.walk(extract_root, topdown=False):
                    for f in files:
                        Path(root, f).unlink(missing_ok=True)
                    for d in dirs:
                        Path(root, d).rmdir()
                Path(extract_root).rmdir()
            except Exception as e:
                print(f"[WARN] Failed to remove {extract_root}: {e}")

    # Create global merges
    if make_global_merge:
        global_dir = output_dir / "_GLOBAL"
        global_dir.mkdir(parents=True, exist_ok=True)

        # Per-sensor global merges
        for sensor_name, frames in global_sensor_frames.items():
            if not frames:
                continue
            merged = pd.concat(frames, axis=0, ignore_index=True)
            if dedupe_on_timestamp:
                if "timestamp" in merged.columns and merged["timestamp"].notna().any():
                    merged = merged.drop_duplicates(subset=["tarball_name", "plot_id", "timestamp"])
                elif "date_int" in merged.columns:
                    merged = merged.drop_duplicates(subset=["tarball_name", "plot_id", "date_int"])
                elif "date" in merged.columns:
                    merged = merged.drop_duplicates(subset=["tarball_name", "plot_id", "date"])
            out_path = global_dir / f"{sensor_name}_GLOBAL_merged.csv"
            merged.to_csv(out_path, index=False)
            print(f"[INFO] Wrote {out_path} ({len(merged)} rows)")

        # All-sensors long global merge
        if global_long_frames:
            merged = pd.concat(global_long_frames, axis=0, ignore_index=True)
            if dedupe_on_timestamp:
                if "timestamp" in merged.columns and merged["timestamp"].notna().any():
                    merged = merged.drop_duplicates(subset=["tarball_name", "sensor_type", "plot_id", "timestamp"])
                elif "date_int" in merged.columns:
                    merged = merged.drop_duplicates(subset=["tarball_name", "sensor_type", "plot_id", "date_int"])
                elif "date" in merged.columns:
                    merged = merged.drop_duplicates(subset=["tarball_name", "sensor_type", "plot_id", "date"])
            out_path = global_dir / "all_sensors_GLOBAL_long.csv"
            merged.to_csv(out_path, index=False)
            print(f"[INFO] Wrote {out_path} ({len(merged)} rows)")


In [6]:
os.makedirs('./workdir', exist_ok=True)
os.makedirs('./outputs', exist_ok=True)

In [7]:
process_all_tarballs(
    input_dir=Path('./inputs/MeteorologicalSensor'),
    work_dir=Path('./workdir'),
    output_dir=Path('./outputs'),
    keep_extracted=True,
    make_global_merge=False,
    dedupe_on_timestamp=not True,
)


[INFO] Processing MeteorologicalSensor-2025-05-30__17-14-43-000_cotton.tar.gz ...
[DEBUG] sensor_root: workdir/MeteorologicalSensor-2025-05-30__17-14-43-000_cotton/MeteorologicalSensor-2025-05-30__17-14-43-000_cotton/meteorological_sensor
[INFO] Found 166 plot directories in workdir/MeteorologicalSensor-2025-05-30__17-14-43-000_cotton/MeteorologicalSensor-2025-05-30__17-14-43-000_cotton/meteorological_sensor
[INFO] Wrote outputs/MeteorologicalSensor-2025-05-30__17-14-43-000_cotton/epar_merged.csv (10985 rows)
[INFO] Wrote outputs/MeteorologicalSensor-2025-05-30__17-14-43-000_cotton/par_merged.csv (10986 rows)
[INFO] Wrote outputs/MeteorologicalSensor-2025-05-30__17-14-43-000_cotton/weather_station_merged.csv (943 rows)
[INFO] Wrote outputs/MeteorologicalSensor-2025-05-30__17-14-43-000_cotton/all_sensors_long.csv (22914 rows)
[INFO] Processing MeteorologicalSensor-2025-05-30__23-10-02-000_cotton.tar.gz ...
[DEBUG] sensor_root: workdir/MeteorologicalSensor-2025-05-30__23-10-02-000_cotton

In [8]:

def parse_crop_from_dirname(name: str) -> str:
    """
    Parse crop from tarball-derived directory name:
    MeteorologicalSensor-YYYY-MM-DD__HH-MM-SS-sss_<crop>

    Returns lowercased crop or None if not found.
    """
    pat = re.compile(
        r"^MeteorologicalSensor-\d{4}-\d{2}-\d{2}__\d{2}-\d{2}-\d{2}-\d{3}_(?P<crop>[A-Za-z0-9_-]+)$"
    )
    m = pat.match(name)
    if m:
        return m.group("crop").lower()
    # Fallback: last underscore segment (best effort)
    parts = name.split("_")
    return parts[-1].lower() if len(parts) > 1 else None


def infer_crop_from_csv(dir_path: Path) -> str:
    """
    If crop isn't in the folder name, infer from the CSV:
    prefer 'meta_crop' (added by our merge script), else a 'crop' column if present.
    """
    csv_path = dir_path / "all_sensors_long.csv"
    if not csv_path.exists():
        return None
    try:
        # Read a small sample to be fast
        df = pd.read_csv(csv_path, nrows=200)
        for col in ["meta_crop", "crop"]:
            if col in df.columns:
                vals = df[col].dropna().astype(str).str.strip().str.lower()
                if not vals.empty:
                    # Use the most common value (mode); falls back to first if mode fails
                    try:
                        return vals.mode().iat[0]
                    except Exception:
                        return vals.iloc[0]
    except Exception as e:
        print(f"[WARN] Could not infer crop from {csv_path}: {e}")
    return None


def unique_target_path(dest_dir: Path, original_name: str) -> Path:
    """
    Create a collision-safe target path. If a folder with the same name exists,
    append a suffix like '__dup1', '__dup2', ...
    """
    target = dest_dir / original_name
    if not target.exists():
        return target
    suffix = 1
    while (dest_dir / f"{original_name}__dup{suffix}").exists():
        suffix += 1
    return dest_dir / f"{original_name}__dup{suffix}"


def move_outputs_by_crop(outputs_root: Path = Path("./outputs"), dry_run: bool = False) -> None:
    """
    Find all subfolders under outputs_root that contain 'all_sensors_long.csv',
    determine their crop dynamically, and move them into outputs_root/<crop>/.
    """
    if not outputs_root.exists():
        raise FileNotFoundError(f"Outputs root not found: {outputs_root}")

    moved = []
    skipped = []

    for d in sorted(outputs_root.iterdir()):
        # Consider only directories containing our per-tarball long file
        if not d.is_dir():
            continue
        if d.name == "_GLOBAL":
            # keep global outputs in place
            skipped.append((d, "skip _GLOBAL"))
            continue
        if not (d / "all_sensors_long.csv").exists():
            # skip folders that aren't tarball result folders
            skipped.append((d, "no all_sensors_long.csv"))
            continue

        # If this folder is already inside a crop folder (e.g., outputs/cotton/<tarball_dir>), skip
        parent = d.parent
        if parent != outputs_root and (parent / "all_sensors_long.csv").exists() is False:
            # Heuristic: if parent isn't outputs_root and isn't a tarball dir itself, we assume it's already organized
            skipped.append((d, f"already under '{parent.name}'"))
            continue

        # Try to parse crop from folder name; fallback to CSV metadata
        crop = parse_crop_from_dirname(d.name)
        if not crop:
            crop = infer_crop_from_csv(d)

        if not crop:
            print(f"[WARN] Could not determine crop for {d}. Skipping.")
            skipped.append((d, "no crop parsed"))
            continue

        dest_dir = outputs_root / crop
        dest_dir.mkdir(parents=True, exist_ok=True)
        target = unique_target_path(dest_dir, d.name)

        print(f"[INFO] {'DRY-RUN:' if dry_run else ''} moving {d} -> {target}")
        if not dry_run:
            shutil.move(str(d), str(target))
        moved.append((d, target))

    print(f"[INFO] Moved {len(moved)} folders. Skipped {len(skipped)}.")
    if skipped:
        for d, reason in skipped[:10]:
            print(f"  - skipped {d.name}: {reason}")
        if len(skipped) > 10:
            print("  ...")


In [9]:
move_outputs_by_crop(outputs_root=Path('./outputs'), dry_run=False)

[INFO]  moving outputs/MeteorologicalSensor-2025-05-30__17-14-43-000_cotton -> outputs/cotton/MeteorologicalSensor-2025-05-30__17-14-43-000_cotton
[INFO]  moving outputs/MeteorologicalSensor-2025-05-30__23-10-02-000_cotton -> outputs/cotton/MeteorologicalSensor-2025-05-30__23-10-02-000_cotton
[INFO]  moving outputs/MeteorologicalSensor-2025-06-02__17-37-04-000_cotton -> outputs/cotton/MeteorologicalSensor-2025-06-02__17-37-04-000_cotton
[INFO]  moving outputs/MeteorologicalSensor-2025-06-05__17-27-49-000_cotton -> outputs/cotton/MeteorologicalSensor-2025-06-05__17-27-49-000_cotton
[INFO]  moving outputs/MeteorologicalSensor-2025-06-09__17-41-10-000_cotton -> outputs/cotton/MeteorologicalSensor-2025-06-09__17-41-10-000_cotton
[INFO]  moving outputs/MeteorologicalSensor-2025-06-13__18-06-09-000_cotton -> outputs/cotton/MeteorologicalSensor-2025-06-13__18-06-09-000_cotton
[INFO]  moving outputs/MeteorologicalSensor-2025-06-14__00-16-53-000_cotton -> outputs/cotton/MeteorologicalSensor-202

In [10]:
cmd1 = f'iput -rkPVT ./outputs/* {cyverse_dir_l1}'
upload = sp.run(f"ssh filexfer 'cd {cwd} && {cmd1} && exit'", shell=True)
print(upload.stdout)
print("upload complete")

Authorized uses only. All activity may be monitored and reported.


Running recursive pre-scan... pre-scan complete... transferring data...
C- /iplant/home/shared/phytooracle/season_19_sorghum_cotton_yr_2025/level_1/MeteorologicalSensor/cotton:
C- /iplant/home/shared/phytooracle/season_19_sorghum_cotton_yr_2025/level_1/MeteorologicalSensor/cotton/MeteorologicalSensor-2025-07-24__17-25-09-000_cotton:
0/244 -  0.00% of files done   0.000/651.353 MB -  0.00% of file sizes done
Processing all_sensors_long.csv - 4.663 MB   2026-01-05.12:34:08
   all_sensors_long.csv            4.663 MB | 1.506 sec | 0 thr |  3.097 MB/s
1/244 -  0.41% of files done   4.663/651.353 MB -  0.72% of file sizes done
Processing weather_station_merged.csv - 0.228 MB   2026-01-05.12:34:09
   weather_station_merged.cs       0.228 MB | 0.898 sec | 0 thr |  0.254 MB/s
2/244 -  0.82% of files done   4.891/651.353 MB -  0.75% of file sizes done
Processing epar_merged.csv - 2.117 MB   2026-01-05.12:34:10
   epar_merged.csv                 2.117 MB | 0.903 sec | 0 thr |  2.343 MB/s
3/244 -

The client/server socket connection has been renewed


C- /iplant/home/shared/phytooracle/season_19_sorghum_cotton_yr_2025/level_1/MeteorologicalSensor/cotton/MeteorologicalSensor-2025-07-25__05-03-06-000_cotton:
212/244 - 86.89% of files done   580.481/651.353 MB - 89.12% of file sizes done
Processing all_sensors_long.csv - 1.837 MB   2026-01-05.12:43:40
   all_sensors_long.csv            1.837 MB | 0.820 sec | 0 thr |  2.239 MB/s
213/244 - 87.30% of files done   582.318/651.353 MB - 89.40% of file sizes done
Processing weather_station_merged.csv - 0.013 MB   2026-01-05.12:43:41
   weather_station_merged.cs       0.013 MB | 0.838 sec | 0 thr |  0.015 MB/s
214/244 - 87.70% of files done   582.331/651.353 MB - 89.40% of file sizes done
Processing epar_merged.csv - 0.868 MB   2026-01-05.12:43:42
   epar_merged.csv                 0.868 MB | 0.776 sec | 0 thr |  1.118 MB/s
215/244 - 88.11% of files done   583.198/651.353 MB - 89.54% of file sizes done
Processing par_merged.csv - 0.848 MB   2026-01-05.12:43:43
   par_merged.csv                

In [11]:

def _dedupe_df(df: pd.DataFrame) -> pd.DataFrame:
    """Deduplicate with preference: timestamp -> date_int -> date."""
    if "timestamp" in df.columns and df["timestamp"].notna().any():
        key_cols = [c for c in ["tarball_name", "sensor_type", "plot_id", "timestamp"] if c in df.columns]
        return df.drop_duplicates(subset=key_cols)
    if "date_int" in df.columns:
        key_cols = [c for c in ["tarball_name", "sensor_type", "plot_id", "date_int"] if c in df.columns]
        return df.drop_duplicates(subset=key_cols)
    if "date" in df.columns:
        key_cols = [c for c in ["tarball_name", "sensor_type", "plot_id", "date"] if c in df.columns]
        return df.drop_duplicates(subset=key_cols)
    # Fallback: drop exact duplicates
    return df.drop_duplicates()


def _ensure_tarball_name(df: pd.DataFrame, source_dir_name: str) -> pd.DataFrame:
    """Ensure 'tarball_name' column exists; if missing/empty, fill with the parent folder name."""
    if "tarball_name" not in df.columns or df["tarball_name"].isna().all():
        df["tarball_name"] = source_dir_name
    return df


def combine_one_crop(
    crop_dir: Path,
    output_subdir_name: str = "_COMBINED",
    output_filename_template: str = "all_sensors_{crop}.csv",
    dedupe_on_timestamp: bool = True,
) -> Path:
    """
    Combine all 'all_sensors_long.csv' files found under crop_dir (recursively)
    and write a single combined CSV under crop_dir/output_subdir_name/.
    Returns the path to the written file, or raises if nothing found.
    """
    crop = crop_dir.name
    files = sorted(crop_dir.rglob("all_sensors_long.csv"))

    if not files:
        raise FileNotFoundError(f"No 'all_sensors_long.csv' files found under {crop_dir}")

    print(f"[INFO] ({crop}) found {len(files)} files to combine.")

    frames = []
    for f in files:
        try:
            df = pd.read_csv(f)  # our outputs were written by pandas -> comma-separated
            df = _ensure_tarball_name(df, source_dir_name=f.parent.name)
            frames.append(df)
        except Exception as e:
            print(f"[WARN] ({crop}) failed to read {f}: {e}")

    if not frames:
        raise RuntimeError(f"({crop}) no readable 'all_sensors_long.csv' files.")

    combined = pd.concat(frames, axis=0, ignore_index=True)

    if dedupe_on_timestamp:
        before = len(combined)
        combined = _dedupe_df(combined)
        after = len(combined)
        print(f"[INFO] ({crop}) deduped rows: {before - after} removed; {after} remain.")

    # Write result
    out_dir = crop_dir / output_subdir_name
    out_dir.mkdir(parents=True, exist_ok=True)
    out_path = out_dir / output_filename_template.format(crop=crop)
    combined.to_csv(out_path, index=False)
    print(f"[INFO] ({crop}) wrote combined CSV: {out_path} ({len(combined)} rows)")
    return out_path


def combine_all_crops(
    outputs_root: Path = Path("./outputs"),
    output_subdir_name: str = "_COMBINED",
    output_filename_template: str = "all_sensors_{crop}.csv",
    dedupe_on_timestamp: bool = True,
) -> None:
    """
    Iterate over all crop directories under outputs_root and combine their
    'all_sensors_long.csv' files into one per crop.
    """
    if not outputs_root.exists():
        raise FileNotFoundError(f"Outputs root not found: {outputs_root}")

    # A crop directory is any immediate subdirectory of outputs_root
    # that is not special (_GLOBAL) and contains per-tarball results.
    crop_dirs = [
        d for d in sorted(outputs_root.iterdir())
        if d.is_dir() and d.name != "_GLOBAL"
    ]

    if not crop_dirs:
        print(f"[INFO] No crop directories found under {outputs_root}. Did you run the organizer?")
        return

    combined_paths = []
    for crop_dir in crop_dirs:
        # Skip directories that don't contain any 'all_sensors_long.csv'
        if not list(crop_dir.rglob("all_sensors_long.csv")):
            print(f"[WARN] Skipping {crop_dir} (no all_sensors_long.csv found).")
            continue

        try:
            out_path = combine_one_crop(
                crop_dir=crop_dir,
                output_subdir_name=output_subdir_name,
                output_filename_template=output_filename_template,
                dedupe_on_timestamp=dedupe_on_timestamp,
            )
            combined_paths.append(out_path)
        except Exception as e:
            print(f"[WARN] Failed combining for crop '{crop_dir.name}': {e}")

    print(f"[INFO] Completed. Wrote {len(combined_paths)} combined crop files.")



In [12]:
# Combine all 'all_sensors_long.csv' per crop under ./outputs into single per-crop CSVs.
combine_all_crops(
        outputs_root=Path('./outputs'),
        output_subdir_name="_COMBINED",
        output_filename_template="all_sensors_long_merged.csv",
        dedupe_on_timestamp=not False,
    )

[INFO] (cotton) found 60 files to combine.
[INFO] (cotton) deduped rows: 0 removed; 1547953 remain.
[INFO] (cotton) wrote combined CSV: outputs/cotton/_COMBINED/all_sensors_long_merged.csv (1547953 rows)
[INFO] (sorghum) found 1 files to combine.
[INFO] (sorghum) deduped rows: 0 removed; 6486 remain.
[INFO] (sorghum) wrote combined CSV: outputs/sorghum/_COMBINED/all_sensors_long_merged.csv (6486 rows)
[INFO] Completed. Wrote 2 combined crop files.


In [13]:
src_root = Path("./outputs")
dst_root = Path("./outputs_merged")

for crop_dir in src_root.iterdir():
    if not crop_dir.is_dir() or crop_dir.name == "_GLOBAL":
        continue
    src_file = crop_dir / "_COMBINED" / "all_sensors_long_merged.csv"
    if src_file.exists():
        dest_dir = dst_root / crop_dir.name
        dest_dir.mkdir(parents=True, exist_ok=True)
        dst_file = dest_dir / src_file.name  # keep original name
        shutil.move(str(src_file), str(dst_file))
        print(f"Moved: {src_file} -> {dst_file}")


Moved: outputs/cotton/_COMBINED/all_sensors_long_merged.csv -> outputs_merged/cotton/all_sensors_long_merged.csv
Moved: outputs/sorghum/_COMBINED/all_sensors_long_merged.csv -> outputs_merged/sorghum/all_sensors_long_merged.csv


In [14]:
cmd1 = f'iput -rkPVT ./outputs_merged/* {cyverse_dir_l2}'
upload = sp.run(f"ssh filexfer 'cd {cwd} && {cmd1} && exit'", shell=True)
print(upload.stdout)
print("upload complete")

Authorized uses only. All activity may be monitored and reported.


Running recursive pre-scan... pre-scan complete... transferring data...
C- /iplant/home/shared/phytooracle/season_19_sorghum_cotton_yr_2025/level_2/MeteorologicalSensor/cotton:
0/2 -  0.00% of files done   0.000/334.904 MB -  0.00% of file sizes done
Processing all_sensors_long_merged.csv - 333.529 MB   2026-01-05.12:45:53
From server: NumThreads=4, addr:r03c05u32-ds26.cyverse.org, port:20391, cookie=1725236653
all_sensors_long_merged.csv - 123.382/333.529 MB - 36.99% done   2026-01-05.12:45:54
all_sensors_long_merged.csv - 286.765/333.529 MB - 85.98% done   2026-01-05.12:45:54
all_sensors_long_merged.csv - 330.147/333.529 MB - 98.99% done   2026-01-05.12:45:54
all_sensors_long_merged.csv - 333.529/333.529 MB - 100.00% done   2026-01-05.12:45:54
   all_sensors_long_merged.c     333.529 MB | 3.532 sec | 4 thr | 94.428 MB/s
C- /iplant/home/shared/phytooracle/season_19_sorghum_cotton_yr_2025/level_2/MeteorologicalSensor/sorghum:
1/2 - 50.00% of files done   333.529/334.904 MB - 99.59% of 