# Real-Time Progress Tracking with Kubeflow Trainer

This notebook demonstrates how to monitor training progress in real-time using Kubeflow Trainer v2 on Red Hat OpenShift AI.

**What you will learn:**
- How progress tracking works with RHAI trainers
- How to view real-time training metrics
- How to interpret progress annotations

**Prerequisites:**
- Access to an OpenShift AI cluster with Kubeflow Trainer enabled
- A workbench with Python 3.9+

## 1. Install the Kubeflow SDK

In [None]:
%pip install "kubeflow @ git+https://github.com/opendatahub-io/kubeflow-sdk.git@v0.2.1+rhai0"

In [None]:
# Verify installation
import kubeflow
print(f"Kubeflow SDK version: {kubeflow.__version__}")

from kubeflow.trainer import TrainerClient
from kubeflow.trainer.rhai import TransformersTrainer
from kubeflow.common.types import KubernetesBackendConfig

print("SDK imported successfully")

## 2. Configuration

Progress tracking is **enabled by default** when using `TransformersTrainer`. No additional configuration is needed.

In [None]:
# Configuration
NAMESPACE = None  # None = use current namespace from kubeconfig
NUM_NODES = 2
GPUS_PER_NODE = 1

print(f"Configuration:")
print(f"  Nodes: {NUM_NODES}")
print(f"  GPUs per node: {GPUS_PER_NODE}")

## 3. Define the training function

We use a simple training job with multiple epochs to demonstrate progress tracking over time.

In [None]:
def train_func():
    """Training function with multiple epochs for progress demonstration."""
    import os
    import torch
    from transformers import (
        AutoModelForSequenceClassification,
        AutoTokenizer,
        Trainer,
        TrainingArguments,
    )
    from datasets import load_dataset
    
    rank = int(os.environ.get("RANK", 0))
    local_rank = int(os.environ.get("LOCAL_RANK", 0))
    
    print(f"Starting training on rank {rank}")
    
    if torch.cuda.is_available():
        torch.cuda.set_device(local_rank)
    
    # Load a small model for quick demonstration
    model_name = "distilbert-base-uncased"
    model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=2)
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    
    # Load a subset of IMDB for demonstration
    dataset = load_dataset("imdb", split="train[:500]")
    
    def tokenize_function(examples):
        return tokenizer(examples["text"], padding="max_length", truncation=True, max_length=128)
    
    tokenized_dataset = dataset.map(tokenize_function, batched=True, remove_columns=["text"])
    
    # Train for 2 epochs to show progress over time
    training_args = TrainingArguments(
        output_dir="/tmp/output",
        num_train_epochs=2,  # Multiple epochs to observe progress
        per_device_train_batch_size=8,
        learning_rate=2e-5,
        logging_steps=5,  # Frequent logging for progress updates
        save_strategy="no",
        report_to="none",
        ddp_find_unused_parameters=False,
    )
    
    trainer = Trainer(model=model, args=training_args, train_dataset=tokenized_dataset)
    trainer.train()
    print(f"Training complete on rank {rank}")

## 4. Submit the training job

When using `TransformersTrainer`, progress tracking is automatically enabled. The trainer instruments your code to report metrics without any changes to your training function.

In [None]:
# Create TransformersTrainer - progress tracking is ON by default
trainer = TransformersTrainer(
    func=train_func,
    num_nodes=NUM_NODES,
    resources_per_node={"nvidia.com/gpu": GPUS_PER_NODE},
    # enable_progression_tracking=True  # This is the default, no need to specify
)

print("Trainer configured with progress tracking enabled")

In [None]:
# Create client and submit job
if NAMESPACE:
    backend_config = KubernetesBackendConfig(namespace=NAMESPACE)
    client = TrainerClient(backend_config=backend_config)
else:
    client = TrainerClient()

runtime = client.get_runtime(name="torch-distributed")
JOB_NAME = client.train(trainer=trainer, runtime=runtime)
print(f"Job submitted: {JOB_NAME}")

## 5. Monitor progress

Progress is available through:
1. **OpenShift AI Dashboard** - Visual interface with real-time updates
2. **CLI** - TrainJob annotations contain structured progress data

Run the cell below to poll progress until the job completes.

In [None]:
import subprocess
import json
import time

def get_progress(job_name, namespace=None):
    """Fetch progress from TrainJob annotations."""
    ns_arg = f"-n {namespace}" if namespace else ""
    cmd = f"oc get trainjob {job_name} {ns_arg} -o json"
    result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
    if result.returncode != 0:
        return None, None
    
    data = json.loads(result.stdout)
    state = data.get("status", {}).get("conditions", [{}])
    state_str = "Unknown"
    for c in state:
        if c.get("type") == "Complete" and c.get("status") == "True":
            state_str = "Complete"
            break
        elif c.get("type") == "Suspended" and c.get("status") == "False":
            state_str = "Running"
    
    annotations = data.get("metadata", {}).get("annotations", {})
    progress_str = annotations.get("trainer.opendatahub.io/trainerStatus", "{}")
    try:
        progress = json.loads(progress_str)
    except:
        progress = {}
    
    return state_str, progress

print(f"Monitoring job: {JOB_NAME}")
print("-" * 60)

while True:
    state, progress = get_progress(JOB_NAME, NAMESPACE)
    
    if progress:
        pct = progress.get("progressPercentage", 0)
        step = progress.get("currentStep", 0)
        total = progress.get("totalSteps", 0)
        epoch = progress.get("currentEpoch", 0)
        epochs = progress.get("totalEpochs", 0)
        eta = progress.get("estimatedRemainingTimeSummary", "calculating...")
        metrics = progress.get("trainMetrics", {})
        loss = metrics.get("loss", "N/A")
        throughput = metrics.get("throughput_samples_sec", "N/A")
        
        print(f"Progress: {pct}% | Step: {step}/{total} | Epoch: {epoch}/{epochs}")
        print(f"  Loss: {loss} | Throughput: {throughput} samples/sec | ETA: {eta}")
    else:
        print(f"State: {state} (waiting for progress data...)")
    
    if state == "Complete":
        print("-" * 60)
        print("Training completed!")
        break
    
    time.sleep(5)

## 6. View final progress data

The progress annotations remain on the TrainJob after completion, providing a record of the training run.

In [None]:
# Display final progress data
_, final_progress = get_progress(JOB_NAME, NAMESPACE)

print("Final Training Progress:")
print(json.dumps(final_progress, indent=2))

## 7. Cleanup

In [None]:
# Delete the training job
client.delete_job(name=JOB_NAME)
print(f"Job {JOB_NAME} deleted")

## Summary

In this notebook, you learned:

1. **Progress tracking is automatic** - When using `TransformersTrainer`, progress tracking is enabled by default
2. **Metrics are stored in annotations** - TrainJob annotations contain structured progress data
3. **Real-time visibility** - Monitor steps, epochs, loss, throughput, and ETA as training progresses
4. **No code changes required** - Your training function works as-is

**Available metrics:**
- `progressPercentage` - Overall completion percentage
- `currentStep` / `totalSteps` - Training step progress
- `currentEpoch` / `totalEpochs` - Epoch progress
- `estimatedRemainingSeconds` - Time to completion
- `trainMetrics.loss` - Current training loss
- `trainMetrics.throughput_samples_sec` - Training throughput

**Next steps:**
- View progress in the OpenShift AI Dashboard
- Try disabling progress tracking with `enable_progression_tracking=False`
- Explore JIT checkpointing for fault tolerance