# UAT for CYCLE 06: Resilience and Scalability

This notebook demonstrates the key features implemented in Cycle 6: the **checkpointing/recovery system** and **parallel execution with Dask**.

## Scenario UAT-C6-001: "Pull the Plug" - Catastrophic Failure and Recovery

In [None]:
import multiprocessing
import os
import shutil
import time
import uuid
from pathlib import Path

from mlip_autopipec.config.models import (
    CutoffConfig,
    DFTConfig,
    DFTInputParameters,
    ExplorerConfig,
    FingerprintConfig,
    InferenceConfig,
    Pseudopotentials,
    SystemConfig,
    TrainingConfig,
)
from mlip_autopipec.modules.dft import DFTRunner
from mlip_autopipec.workflow_manager import WorkflowManager

WORK_DIR = Path("./uat_c6_temp_work_dir")


def setup_module():
    """Create a clean working directory for the UAT."""
    if WORK_DIR.exists():
        shutil.rmtree(WORK_DIR)
    WORK_DIR.mkdir()


def teardown_module():
    """Clean up the working directory after the UAT."""
    shutil.rmtree(WORK_DIR)


def mock_dft_run(job):
    """A mock DFT function that simulates a long-running calculation."""
    print(f"Starting mock DFT job: {job.job_id}")
    time.sleep(5)
    print(f"Finished mock DFT job: {job.job_id}")
    # In a real scenario, this would return a DFTResult object
    return job.job_id


def run_workflow_process(config, work_dir, runner):
    """Target function for the multiprocessing.Process."""
    manager = WorkflowManager(system_config=config, work_dir=work_dir, dft_runner=runner)
    # In a real run, jobs would be submitted here. For this test, we assume they are already submitted.
    # This part is to keep the process alive while the mock jobs run.
    time.sleep(20)


setup_module()
print(f"Created working directory: {WORK_DIR.resolve()}")

In [None]:
# Part 1: The Initial Run and the "Crash"
print("--- Part 1: Starting workflow and simulating a crash ---")

# 1. Set up a valid SystemConfig
valid_config = SystemConfig(
    project_name="uat_c6_project",
    run_uuid=uuid.uuid4(),
    dft_config=DFTConfig(
        dft_input_params=DFTInputParameters(
            pseudopotentials=Pseudopotentials.model_validate({"Si": "Si.upf"}),
            cutoffs=CutoffConfig(wavefunction=60, density=240),
            k_points=(3, 3, 3),
        )
    ),
    explorer_config=ExplorerConfig(
        surrogate_model_path="path", fingerprint=FingerprintConfig(species=["Si"])
    ),
    training_config=TrainingConfig(data_source_db=WORK_DIR / "test.db"),
    inference_config=InferenceConfig(),
)

# 2. Create a mock DFTRunner
mock_runner = MagicMock(spec=DFTRunner)
mock_runner.run.side_effect = mock_dft_run

# 3. Launch the WorkflowManager in a separate process
p = multiprocessing.Process(target=run_workflow_process, args=(valid_config, WORK_DIR, mock_runner))
p.start()
print(f"Workflow process started with PID: {p.pid}")

time.sleep(2)  # Give it a moment to initialize

# 4. Manually create a checkpoint file to simulate job submission
from mlip_autopipec.config.models import CheckpointState

job_ids = [uuid.uuid4() for _ in range(3)]
initial_state = CheckpointState(
    run_uuid=valid_config.run_uuid,
    system_config=valid_config,
    pending_job_ids=job_ids,
    job_submission_args={job_id: [MagicMock()] for job_id in job_ids},
)
with open(WORK_DIR / "checkpoint.json", "w") as f:
    f.write(initial_state.model_dump_json(indent=4))
print("Manually created checkpoint file with 3 pending jobs.")

# 5. Let it run for a bit, then terminate forcefully
print("Allowing workflow to run for 7 seconds...")
time.sleep(7)
print("ðŸ”´ Simulating crash: Terminating process forcefully!")
p.kill()
p.join()

## Scenario UAT-C6-002: High-Throughput Parallel Execution with Dask

In [None]:
from dask.distributed import Client, LocalCluster


def short_mock_dft_run(job):
    time.sleep(2)
    return job.job_id


print("--- Scenario UAT-C6-002: High-Throughput Parallel Execution with Dask ---")

# 1. Sequential baseline
print("\n--- Running 8 jobs sequentially ---")
start_time = time.time()
for i in range(8):
    print(f"Starting sequential job {i + 1}")
    time.sleep(2)
end_time = time.time()
sequential_time = end_time - start_time
print(f"\nTotal time for 8 jobs sequentially: {sequential_time:.2f} seconds")

# 2. Parallel execution with Dask
print("\n--- Running 8 jobs in parallel with 4 Dask workers ---")
with LocalCluster(n_workers=4) as cluster, Client(cluster) as client:
    print(f"Dask dashboard link: {client.dashboard_link}")

    mock_runner_parallel = MagicMock(spec=DFTRunner)
    mock_runner_parallel.run.side_effect = short_mock_dft_run

    # We need to ensure the manager uses the local cluster
    os.environ["DASK_SCHEDULER_ADDRESS"] = client.scheduler.address

    manager = WorkflowManager(
        system_config=valid_config, work_dir=WORK_DIR, dft_runner=mock_runner_parallel
    )

    start_time_parallel = time.time()
    futures = [client.submit(short_mock_dft_run, MagicMock(job_id=uuid.uuid4())) for _ in range(8)]
    results = [future.result() for future in futures]
    end_time_parallel = time.time()
    parallel_time = end_time_parallel - start_time_parallel

    print(f"\nTotal time for 8 jobs in parallel: {parallel_time:.2f} seconds")

    # Cleanup environment variable
    del os.environ["DASK_SCHEDULER_ADDRESS"]

    # 3. The Comparison
    print("\n--- Comparison ---")
    speedup = sequential_time / parallel_time
    print(f"âœ… Achieved a {speedup:.2f}x speedup with parallel execution.")

teardown_module()
print(f"\nCleaned up working directory: {WORK_DIR.resolve()}")