# Soil Moisture Analysis & Irrigation Prediction Pegasus Workflow

Precision irrigation is critical for sustainable agriculture, water conservation, and crop yield optimization. The [Open-Meteo](https://open-meteo.com/) platform provides free access to historical weather and soil data from ERA5-Land reanalysis, covering global soil moisture and temperature at hourly resolution.

This workflow fetches soil moisture and temperature data for user-defined field polygons, analyzes moisture levels with crop-specific and soil-specific thresholds, trains an **LSTM neural network** for 24-hour soil moisture forecasting, generates irrigation recommendations combining ML predictions with rule-based decision logic, and produces multi-panel visualizations.

## Containers

All tools required to execute the jobs are included in a single container available on DockerHub:

[Soil Moisture Container](https://hub.docker.com/r/kthare10/soilmoisture) defined in `Docker/SoilMoisture_Dockerfile` with:
* pandas, numpy, matplotlib, scipy, requests (base analysis)
* torch, scikit-learn, tqdm (ML forecasting)
* pytz (timezone handling)

## Accessing the Input Data

Soil data is fetched from the **Open-Meteo Historical Weather API** (`https://archive-api.open-meteo.com/v1/era5`), which provides ERA5-Land reanalysis data from ECMWF. **No API key is required.** The API returns hourly soil moisture (0-7 cm) and soil temperature (0-7 cm, 7-28 cm) for polygon centroids.

Field locations are defined in a **polygons JSON file** with latitude/longitude coordinates.

## Workflow

The workflow processes each field polygon through a 5-step pipeline:

| Job Label              | Description                                                                          |
|------------------------|--------------------------------------------------------------------------------------|
| fetch_soil_data        | Fetches hourly soil moisture and temperature from Open-Meteo for polygon centroids   |
| analyze_moisture       | Computes moisture statistics, water deficit, trend analysis, crop/soil thresholds     |
| train_model            | Trains 2-layer LSTM (64 hidden units) for 24-hour soil moisture forecasting          |
| predict_irrigation     | Generates irrigation recommendations using ML predictions + rule-based urgency logic |
| visualize_moisture     | Creates multi-panel dashboard: time series, urgency gauge, water status, trend, recs  |

### Dependency Graph

```
fetch_soil_data → analyze_moisture ──→ predict_irrigation → visualize_moisture
       └──────→ train_model ─────────↗
```

### Supported Crops & Soil Types

**Crops:** tomato, corn, wheat, lettuce, potato, grape, alfalfa, cotton (each with specific wilting/stress/optimal thresholds)

**Soils:** sand, sandy_loam, loam, clay_loam, clay (each with specific field capacity and wilting point values)

## 1. Create the Soil Moisture Workflow

First, configure the field polygon(s), date range, crop type, soil type, and ML training parameters.

The polygons JSON file should define field boundaries as coordinate arrays. Example format:
```json
[
  {
    "id": "field1",
    "name": "North Field",
    "coordinates": [[-121.50, 37.50], [-121.48, 37.50], [-121.48, 37.52], [-121.50, 37.52], [-121.50, 37.50]]
  }
]
```

Then the workflow class below will:
1. Build the Pegasus catalogs (sites, transformations, replicas)
2. Construct the DAG with fetch, analyze, train, predict, and visualize jobs for each polygon

In [None]:
# Path to polygons JSON file defining field boundaries
POLYGONS_FILE = 'polygons.json'

# Polygon IDs to analyze (set to None to use all polygons in the file)
POLYGON_IDS = ['field1']

# Date range for soil data (Open-Meteo provides historical ERA5-Land data)
START_DATE = '2024-01-01'
END_DATE = '2024-01-31'

# Crop type for threshold selection
# Choices: tomato, corn, wheat, lettuce, potato, grape, alfalfa, cotton, default
CROP_TYPE = 'tomato'

# Soil type for water capacity calculation
# Choices: sand, sandy_loam, loam, clay_loam, clay, default
SOIL_TYPE = 'loam'

# LSTM training epochs (more epochs = better model but longer training)
ML_EPOCHS = 50

**Note:** At least 31 days of data is recommended for LSTM training (minimum ~58 hourly records needed to create 10 training sequences). Longer periods improve model accuracy.

In [None]:
import os
import sys
import json
import logging
from argparse import Namespace
from pathlib import Path
from datetime import datetime, timedelta

# --- Import Pegasus API ---
from Pegasus.api import *
logging.basicConfig(level=logging.DEBUG)


# --- Main workflow class ---
class SoilMoistureWorkflow():
    wf = None
    sc = None
    tc = None
    rc = None
    props = None

    dagfile = None
    wf_dir = None
    shared_scratch_dir = None
    local_storage_dir = None
    wf_name = "soilmoisture"

    # --- Init ---
    def __init__(self, dagfile="workflow.yml"):
        self.dagfile = dagfile
        self.wf_dir = str(Path(".").resolve())
        self.shared_scratch_dir = os.path.join(self.wf_dir, "scratch")
        self.local_storage_dir = os.path.join(self.wf_dir, "output")

    # --- Write files in directory ---
    def write(self):
        if self.sc is not None:
            self.sc.write()
        self.props.write()
        self.rc.write()
        self.tc.write()

        try:
            self.wf.write(file=self.dagfile)
        except PegasusClientError as e:
            print(e)

    # --- Plan and Submit the workflow ---
    def plan_submit(self):
        try:
            self.wf.plan(submit=True)
        except PegasusClientError as e:
            print(e)

    # --- Get status of the workflow ---
    def status(self):
        try:
            self.wf.status(long=True)
        except PegasusClientError as e:
            print(e)

    # --- Wait for the workflow to finish ---
    def wait(self):
        try:
            self.wf.wait()
        except PegasusClientError as e:
            print(e)

    # --- Get statistics of the workflow ---
    def statistics(self):
        try:
            self.wf.statistics()
        except PegasusClientError as e:
            print(e)

    # --- Configuration (Pegasus Properties) ---
    def create_pegasus_properties(self):
        self.props = Properties()
        self.props["pegasus.transfer.threads"] = "16"
        return

    # --- Site Catalog ---
    def create_sites_catalog(self, exec_site_name="condorpool"):
        self.sc = SiteCatalog()

        local = (Site("local")
                    .add_directories(
                        Directory(Directory.SHARED_SCRATCH, self.shared_scratch_dir)
                            .add_file_servers(FileServer("file://" + self.shared_scratch_dir, Operation.ALL)),
                        Directory(Directory.LOCAL_STORAGE, self.local_storage_dir)
                            .add_file_servers(FileServer("file://" + self.local_storage_dir, Operation.ALL))
                    )
                )

        exec_site = (Site(exec_site_name)
                        .add_condor_profile(universe="vanilla")
                        .add_pegasus_profile(style="condor")
                    )

        self.sc.add_sites(local, exec_site)

    # --- Transformation Catalog (Executables and Containers) ---
    def create_transformation_catalog(self, exec_site_name="condorpool",
                                       container_image="kthare10/soilmoisture:latest"):
        self.tc = TransformationCatalog()

        # Container
        soilmoisture_container = Container("soilmoisture_container",
            container_type=Container.SINGULARITY,
            image=f"docker://{container_image}",
            image_site="docker_hub"
        )

        # Transformations
        fetch_soil_data = Transformation("fetch_soil_data", site=exec_site_name,
            pfn=os.path.join(self.wf_dir, "fetch_soil_data.py"),
            is_stageable=True, container=soilmoisture_container
        ).add_pegasus_profile(memory="1 GB")

        analyze_moisture = Transformation("analyze_moisture", site=exec_site_name,
            pfn=os.path.join(self.wf_dir, "bin/analyze_moisture.py"),
            is_stageable=True, container=soilmoisture_container
        ).add_pegasus_profile(memory="1 GB")

        train_model = Transformation("train_model", site=exec_site_name,
            pfn=os.path.join(self.wf_dir, "bin/train_model.py"),
            is_stageable=True, container=soilmoisture_container
        ).add_pegasus_profile(memory="4 GB")

        predict_irrigation = Transformation("predict_irrigation", site=exec_site_name,
            pfn=os.path.join(self.wf_dir, "bin/predict_irrigation.py"),
            is_stageable=True, container=soilmoisture_container
        ).add_pegasus_profile(memory="2 GB")

        visualize_moisture = Transformation("visualize_moisture", site=exec_site_name,
            pfn=os.path.join(self.wf_dir, "bin/visualize_moisture.py"),
            is_stageable=True, container=soilmoisture_container
        ).add_pegasus_profile(memory="2 GB")

        self.tc.add_containers(soilmoisture_container)
        self.tc.add_transformations(
            fetch_soil_data, analyze_moisture, train_model,
            predict_irrigation, visualize_moisture
        )

    # --- Replica Catalog ---
    def create_replica_catalog(self):
        self.rc = ReplicaCatalog()

    # --- Create Workflow ---
    def create_workflow(self, args):
        self.wf = Workflow(self.wf_name)

        polygons_file = File(os.path.basename(args.polygons_file))
        self.rc.add_replica("local", polygons_file, os.path.abspath(args.polygons_file))

        # Shared ML model files
        model_file = File("soil_moisture_model.pt")
        model_metadata = File("soil_moisture_model_metadata.json")

        # Collect all fetch and analyze jobs
        fetch_jobs = []
        analyze_jobs = []
        soil_data_files = []

        # Add fetch and analyze jobs for each polygon
        for polygon_id in args.polygon_ids:
            fetch_job, analyze_job, soil_data_file = self._add_fetch_analyze_jobs(
                polygon_id, args, polygons_file
            )
            fetch_jobs.append(fetch_job)
            analyze_jobs.append(analyze_job)
            soil_data_files.append(soil_data_file)

        # Add ML training job (uses first polygon's data)
        train_job = self._add_train_job(soil_data_files, model_file, model_metadata, args)

        # Training depends on having fetched data
        self.wf.add_dependency(fetch_jobs[0], children=[train_job])

        # Add predict and visualize jobs for each polygon (uses trained model)
        for i, polygon_id in enumerate(args.polygon_ids):
            self._add_predict_visualize_jobs(
                polygon_id, args,
                soil_data_files[i], analyze_jobs[i],
                model_file, model_metadata, train_job
            )

    def _add_fetch_analyze_jobs(self, polygon_id, args, polygons_file):
        soil_data_file = File(f"{polygon_id}_soil_data.csv")
        analysis_file = File(f"{polygon_id}_analysis.json")

        # Job 1: Fetch soil data
        fetch_job = Job("fetch_soil_data", _id=f"fetch_{polygon_id}", node_label=f"fetch_{polygon_id}")
        fetch_job.add_args("--fetch", "--polygon-id", polygon_id)
        fetch_job.add_args("--start-date", args.start_date, "--end-date", args.end_date)
        fetch_job.add_args("--output", soil_data_file)
        fetch_job.add_args("--polygons-file", polygons_file)
        fetch_job.add_inputs(polygons_file)
        fetch_job.add_outputs(soil_data_file, stage_out=True, register_replica=False)
        fetch_job.add_pegasus_profile(label=polygon_id)

        # Job 2: Analyze moisture
        analyze_job = Job("analyze_moisture", _id=f"analyze_{polygon_id}", node_label=f"analyze_{polygon_id}")
        analyze_job.add_args(
            "--input", soil_data_file,
            "--output", analysis_file,
            "--crop-type", args.crop_type,
            "--soil-type", args.soil_type,
        )
        analyze_job.add_inputs(soil_data_file)
        analyze_job.add_outputs(analysis_file, stage_out=True, register_replica=False)
        analyze_job.add_pegasus_profile(label=polygon_id)

        self.wf.add_jobs(fetch_job, analyze_job)
        self.wf.add_dependency(fetch_job, children=[analyze_job])

        return fetch_job, analyze_job, soil_data_file

    def _add_train_job(self, soil_data_files, model_file, model_metadata, args):
        primary_data_file = soil_data_files[0]

        train_job = Job("train_model", _id="train_ml_model", node_label="train_ml_model")
        train_job.add_args(
            "--input", primary_data_file,
            "--output", model_file,
            "--metadata", model_metadata,
            "--sequence-length", "24",
            "--forecast-horizon", "24",
            "--epochs", str(args.ml_epochs),
        )
        train_job.add_inputs(primary_data_file)
        train_job.add_outputs(model_file, stage_out=True, register_replica=False)
        train_job.add_outputs(model_metadata, stage_out=True, register_replica=False)
        train_job.add_pegasus_profile(label="ml_training")

        self.wf.add_jobs(train_job)
        return train_job

    def _add_predict_visualize_jobs(self, polygon_id, args,
                                     soil_data_file, analyze_job,
                                     model_file, model_metadata, train_job):
        analysis_file = File(f"{polygon_id}_analysis.json")
        prediction_file = File(f"{polygon_id}_prediction.json")
        visualization_file = File(f"{polygon_id}_visualization.png")

        # Job 4: Predict irrigation (with ML model)
        predict_job = Job("predict_irrigation", _id=f"predict_{polygon_id}", node_label=f"predict_{polygon_id}")
        predict_job.add_args(
            "--analysis", analysis_file,
            "--output", prediction_file,
            "--model", model_file,
            "--model-metadata", model_metadata,
            "--soil-data", soil_data_file,
        )
        predict_job.add_inputs(analysis_file, model_file, model_metadata, soil_data_file)
        predict_job.add_outputs(prediction_file, stage_out=True, register_replica=False)
        predict_job.add_pegasus_profile(label=polygon_id)

        # Job 5: Visualize
        visualize_job = Job("visualize_moisture", _id=f"visualize_{polygon_id}", node_label=f"visualize_{polygon_id}")
        visualize_job.add_args(
            "--data", soil_data_file,
            "--analysis", analysis_file,
            "--prediction", prediction_file,
            "--output", visualization_file,
        )
        visualize_job.add_inputs(soil_data_file, analysis_file, prediction_file)
        visualize_job.add_outputs(visualization_file, stage_out=True, register_replica=False)
        visualize_job.add_pegasus_profile(label=polygon_id)

        self.wf.add_jobs(predict_job, visualize_job)

        # Dependencies
        self.wf.add_dependency(analyze_job, children=[predict_job])
        self.wf.add_dependency(train_job, children=[predict_job])
        self.wf.add_dependency(predict_job, children=[visualize_job])


# --- Build and generate the workflow ---
# Resolve polygon IDs from file if not specified
polygon_ids = POLYGON_IDS
if polygon_ids is None:
    with open(POLYGONS_FILE, 'r') as f:
        data = json.load(f)
    polygons = data.get('polygons', data) if isinstance(data, dict) else data
    polygon_ids = [p.get('id') for p in polygons if p.get('id')]
    print(f"Using all polygon IDs from file: {polygon_ids}")

# Create args namespace for workflow creation
args = Namespace(
    polygons_file=POLYGONS_FILE,
    polygon_ids=polygon_ids,
    start_date=START_DATE,
    end_date=END_DATE,
    crop_type=CROP_TYPE,
    soil_type=SOIL_TYPE,
    ml_epochs=ML_EPOCHS
)

dagfile = 'workflow.yml'

workflow = SoilMoistureWorkflow(dagfile=dagfile)

print("Creating execution sites...")
workflow.create_sites_catalog("condorpool")

print("Creating workflow properties...")
workflow.create_pegasus_properties()

print("Creating transformation catalog...")
workflow.create_transformation_catalog("condorpool")

print("Creating replica catalog...")
workflow.create_replica_catalog()

print("Creating soil moisture workflow DAG...")
workflow.create_workflow(args)

workflow.write()
print(f"\nSoil Moisture Workflow has been generated!")
print(f"  Polygons: {len(polygon_ids)} ({', '.join(polygon_ids)})")
print(f"  Crop type: {CROP_TYPE}")
print(f"  Soil type: {SOIL_TYPE}")
print(f"  ML epochs: {ML_EPOCHS}")

## View the Generated Workflow DAG

Before submitting, we can visualize the workflow DAG using `pegasus-graphviz`. The graph shows the 5-step pipeline: fetch data feeds both analyze and train_model, which then converge at predict_irrigation before the final visualization step.

In [None]:
!pegasus-graphviz -f workflow.yml --output workflow.png

In [None]:
from IPython.display import Image
Image(filename='workflow.png')

## 2. Plan and Submit the Workflow

We will now plan and submit the workflow for execution. By default we are running jobs on site **condorpool** i.e. the selected ACCESS resource.

In [None]:
workflow.plan_submit()

After the workflow has been successfully planned and submitted, you can use the Python `Workflow` object to monitor the status of the workflow. It shows in detail the counts of jobs of each status and whether a job is idle or running.

In [None]:
workflow.status()

In [None]:
workflow.wait()

## 3. Statistics

Depending on whether the workflow finished successfully or not, you have options on what to do next. If the workflow failed you can use `workflow.analyze()` to get help finding out what went wrong. If the workflow finished successfully, we can pull out some statistics from the provenance database:

In [None]:
workflow.statistics()

## 4. Examining the Results

Once the workflow has finished, we can look at the output directory for our results. The workflow produces the following outputs for each polygon:

```
output/
├── <polygon>_soil_data.csv                    # Raw hourly soil moisture & temperature data
├── <polygon>_analysis.json                    # Moisture statistics, deficit, trend, classification
├── <polygon>_prediction.json                  # Irrigation recommendation with urgency score
├── <polygon>_visualization.png                # Multi-panel irrigation decision dashboard
├── soil_moisture_model.pt                     # Trained LSTM model weights (shared)
└── soil_moisture_model_metadata.json          # Model config, scaler parameters (shared)
```

In [None]:
!ls -ltR output/

### Irrigation Decision Dashboard

The multi-panel visualization includes: a soil moisture time series with crop-specific threshold zones (critical/stressed/low/optimal/high/saturated), an urgency gauge (0-100), water status bar chart comparing current vs. optimal vs. field capacity, a trend indicator with daily change rate, and a recommendation panel with action, timing, and irrigation amount.

In [None]:
import glob
from IPython.display import Image, display

viz_pngs = sorted(glob.glob("output/*_visualization.png"))
for png in viz_pngs:
    print(f"\n{png}")
    display(Image(filename=png))

### Moisture Analysis Summary

The moisture analysis provides crop-specific threshold evaluation, water deficit calculations, saturation percentage, trend analysis (increasing/stable/decreasing), and classification distribution across the monitoring period.

In [None]:
import json

analysis_files = sorted(glob.glob("output/*_analysis.json"))
for af in analysis_files:
    with open(af, 'r') as f:
        data = json.load(f)

    metadata = data.get('metadata', {})
    thresholds = data.get('crop_thresholds', {})
    polygons = data.get('polygon_analyses', [])

    print(f"\n--- {metadata.get('crop_type', 'unknown')} / {metadata.get('soil_type', 'unknown')} ---")
    print(f"  Thresholds: wilting={thresholds.get('wilting')}, stress={thresholds.get('stress')}, "
          f"optimal={thresholds.get('optimal_low')}-{thresholds.get('optimal_high')}, "
          f"saturated={thresholds.get('saturated')}")

    for poly in polygons:
        stats = poly.get('moisture_stats', {})
        deficit = poly.get('water_deficit', {})
        trend = poly.get('trend', {})

        print(f"\n  Polygon: {poly.get('polygon_id', 'unknown')}")
        print(f"    Classification:    {poly.get('moisture_classification', 'N/A')}")
        print(f"    Current moisture:  {stats.get('current', 'N/A')} m\u00b3/m\u00b3")
        print(f"    Mean moisture:     {stats.get('mean', 'N/A')} m\u00b3/m\u00b3")
        print(f"    Range:             {stats.get('min', 'N/A')} - {stats.get('max', 'N/A')}")
        print(f"    Saturation:        {deficit.get('saturation_percent', 'N/A')}%")
        print(f"    Trend:             {trend.get('trend', 'N/A')} (rate: {trend.get('daily_change_rate', 'N/A')}/day)")

### Irrigation Prediction Summary

The irrigation prediction combines ML-based 24-hour soil moisture forecasting with rule-based urgency scoring. Urgency ranges from 0 (no action needed) to 100 (immediate irrigation required). Actions include: `monitor`, `schedule_irrigation`, `irrigate_soon`, and `irrigate_immediately`.

In [None]:
prediction_files = sorted(glob.glob("output/*_prediction.json"))
for pf in prediction_files:
    with open(pf, 'r') as f:
        pred_data = json.load(f)

    summary = pred_data.get('summary', {})
    predictions = pred_data.get('predictions', [])

    print(f"\nOverall: {summary.get('overall_action', 'N/A')}")
    print(f"ML predictions made: {summary.get('ml_predictions_made', 0)}")

    for pred in predictions:
        ml = pred.get('ml_insights', {})
        print(f"\n  Polygon: {pred.get('polygon_id', 'unknown')}")
        print(f"    Urgency score:     {pred.get('urgency_score', 'N/A')}/100")
        print(f"    Action:            {pred.get('action', 'N/A')}")
        print(f"    Timing:            {pred.get('recommended_timing', 'N/A')}")
        print(f"    Irrigation amount: {pred.get('irrigation_amount_mm', 0)} mm")
        if ml.get('ml_prediction_available'):
            print(f"    ML predicted min:  {ml.get('predicted_min_moisture', 'N/A')}")
            print(f"    ML predicted trend: {ml.get('predicted_trend', 'N/A')}")
            print(f"    Hours to critical: {ml.get('hours_until_critical', 'N/A')}")
            print(f"    Hours to stress:   {ml.get('hours_until_stress', 'N/A')}")

### Trained LSTM Model Info

The LSTM model metadata includes architecture configuration, feature scaling parameters, and training information. The model is shared across all polygons in the workflow.

In [None]:
model_meta_files = sorted(glob.glob("output/*_model_metadata.json"))
for mf in model_meta_files:
    with open(mf, 'r') as f:
        meta = json.load(f)

    config = meta.get('config', {})
    print(f"Model type:        {meta.get('model_type', 'N/A')}")
    print(f"Input features:    {config.get('input_size', 'N/A')} ({', '.join(config.get('feature_cols', []))})")
    print(f"Hidden size:       {config.get('hidden_size', 'N/A')}")
    print(f"LSTM layers:       {config.get('num_layers', 'N/A')}")
    print(f"Sequence length:   {config.get('sequence_length', 'N/A')} hours")
    print(f"Forecast horizon:  {config.get('forecast_horizon', 'N/A')} hours")