# variables

In [None]:
API_URL = "https://api.osparc-master-zmt.click/"
API_KEY = "testi_4311114729"
API_SECRET = "9fe5517b639280a8a12e5dd076abee67de68a026"
SOLVER_NAME = "simcore/services/comp/itis/sleeper"
SOLVER_VERSION = "2.2.1"

In [None]:
import os
os.environ["OSPARC_DEV_FEATURES_ENABLED"] = "1"

# initialize connection

In [None]:
from multiprocessing.pool import AsyncResult
from tenacity import AsyncRetrying, TryAgain, retry_if_exception_type
from tenacity.wait import wait_fixed
from tqdm.auto import tqdm
from rich import print
import asyncio
import functools
import osparc
import typing


cfg = osparc.Configuration(
    host=f"{API_URL}",
    username=API_KEY,
    password=API_SECRET,
)
# cfg.debug = True

In [None]:
with osparc.ApiClient(cfg) as api_client:
    profile = osparc.UsersApi(api_client).get_my_profile()
    print(profile)
    solvers_api = osparc.SolversApi(api_client)


# get solver

In [None]:
solver = typing.cast(osparc.Solver, solvers_api.get_solver_release(SOLVER_NAME, SOLVER_VERSION))
print(solver)

# create job

In [None]:
async def create_job(solver: osparc.Solver) -> osparc.Job:
    result = await asyncio.get_running_loop().run_in_executor(None, functools.partial(solvers_api.create_job,
            solver.id,
            solver.version,
            osparc.JobInputs(
                {
                    "input_2": 300,
                    "input_4": 0,
                    "input_5": 0
                }
            ), async_req=True
        ))
    assert isinstance(result, AsyncResult) # nosec
    # print(job)
    return typing.cast(osparc.Job, await asyncio.get_running_loop().run_in_executor(None, result.get))

# list jobs

In [None]:

def _list_jobs(solver: osparc.Solver) -> list[osparc.Job]:
    assert solver.id
    assert solver.version
    gen = solvers_api.jobs(solver.id, solver.version)
    return [job for job in gen if isinstance(job, osparc.Job)]


async def list_jobs(solver: osparc.Solver) -> list[osparc.Job]:
    return await asyncio.get_running_loop().run_in_executor(None, _list_jobs, solver)

# inspect job

In [None]:
async def inspect_job(solver: osparc.Solver, job: osparc.Job)-> osparc.JobStatus:
    result = await asyncio.get_running_loop().run_in_executor(None, functools.partial(solvers_api.inspect_job, solver.id, solver.version, job.id,async_req=True))
    assert isinstance(result, AsyncResult) # nosec
    # print(status)
    return typing.cast(osparc.JobStatus, await asyncio.get_running_loop().run_in_executor(None, result.get))

## get job result

In [None]:
async def get_job_result(solver: osparc.Solver, job: osparc.Job) -> osparc.JobOutputs:
    result = await asyncio.get_running_loop().run_in_executor(None, functools.partial(solvers_api.get_job_outputs, solver.id, solver.version, job.id, async_req=True))
    assert isinstance(result, AsyncResult) # nosecregistry.staging.osparc.io/simcore/services/comp/itis/sleeper:2.0.2
    return typing.cast(osparc.JobOutputs, await asyncio.get_running_loop().run_in_executor(None, result.get))

# start job

In [None]:
async def start_job(solver: osparc.Solver, job: osparc.Job) -> osparc.JobStatus:
    result= await asyncio.get_running_loop().run_in_executor(None, functools.partial(solvers_api.start_job, solver.id, solver.version, job.id,async_req=True))
    assert isinstance(result, AsyncResult) # nosec
    return typing.cast(osparc.JobStatus, await asyncio.get_running_loop().run_in_executor(None, result.get))

# stop job

In [None]:
async def stop_job(solver: osparc.Solver, job: osparc.Job) -> osparc.Job:
    result= await asyncio.get_running_loop().run_in_executor(None, functools.partial(solvers_api.stop_job, solver.id, solver.version, job.id,async_req=True))
    assert isinstance(result, AsyncResult) # nosec
    return typing.cast(osparc.Job, await asyncio.get_running_loop().run_in_executor(None, result.get))

# set job metadata

In [None]:
from typing import Any

async def set_job_metadata(solver: osparc.Solver, job: osparc.Job, job_metadata: dict[str, Any]) -> dict[str, Any]:
    result= await asyncio.get_running_loop().run_in_executor(None, functools.partial(solvers_api.replace_job_custom_metadata, solver.id, solver.version, job.id, {"metadata":job_metadata},async_req=True))
    assert isinstance(result, AsyncResult) # nosec
    return typing.cast(dict[str, Any], await asyncio.get_running_loop().run_in_executor(None, result.get))

# delete job

In [None]:
async def delete_job(solver: osparc.Solver, job: osparc.Job) -> None:
    result = await asyncio.get_running_loop().run_in_executor(None, functools.partial(solvers_api.delete_job, solver.id, solver.version, job.id, async_req=True))
    assert isinstance(result, AsyncResult) # nosec
    await asyncio.get_running_loop().run_in_executor(None, result.get)

# run N sleepers

In [None]:
NUM_JOBS = 500

created_jobs = []
with tqdm(total=NUM_JOBS, desc="Creating jobs") as pbar:
    for result in asyncio.as_completed([create_job(solver) for _ in range(NUM_JOBS)]):
        job = await result
        created_jobs.append(job)
        pbar.update()

In [None]:
PARENT_NODE_ID = "7a7d6b38-7e9f-46b2-8cfb-4ee7ae20d0b1"
with tqdm(total=NUM_JOBS, desc="Setting jobs metadata") as pbar:
    for result in asyncio.as_completed([set_job_metadata(solver, job, job_metadata={"job_index": created_jobs.index(job), "node_id": PARENT_NODE_ID}) for job in created_jobs]):
        await result
        pbar.update()

In [None]:
from collections import Counter


async def run_job_wo_progressbar(solver: osparc.Solver, job: osparc.Job, status_dict: dict) -> osparc.JobStatus:
    await start_job(solver, job)
    job_status = await inspect_job(solver, job)
    status_dict[job.id] = job_status.state
    async for attempt in AsyncRetrying(wait=wait_fixed(1), retry=retry_if_exception_type()):
        with attempt:
            job_status = await inspect_job(solver, job)
            status_dict[job.id] = job_status.state
            if job_status.state not in ["FAILED", "SUCCESS", "ABORTED"]:
                raise TryAgain
    status_dict[job.id] = job_status.state
    return job_status

job_statuses = []
status_dict = {}  # job_id -> state
tasks = [asyncio.create_task(run_job_wo_progressbar(solver, job, status_dict)) for job in created_jobs]
summary_interval = 5  # seconds
done_count = 0
with tqdm(total=len(created_jobs), desc="Running jobs") as pbar:
    while done_count < len(tasks):
        done_count = sum(t.done() for t in tasks)
        # Summarize states
        states = Counter(status_dict.values())
        pbar.n = done_count
        pbar.refresh()
        # Print summary every interval
        if states:
            tqdm.write(f"Summary: {dict(states)}", end="\r")
        await asyncio.sleep(summary_interval)
        
    # Gather results
    for t in tasks:
        try:
            job_statuses.append(await t)
        except osparc.ApiException as exc:
            tqdm.write(f"Error while running job: {exc}")

In [None]:
async def run_job(solver: osparc.Solver, job: osparc.Job) -> osparc.JobStatus:
    job_prefix = f"{solver.id}:{solver.version}-{job.id}"
    with tqdm(total=100, desc=job_prefix) as pbar:
        await start_job(solver, job)

        job_status = await inspect_job(solver, job)
        assert job_status.progress is not None  # nosec
        pbar.update(job_status.progress)
        pbar.set_description_str(f"{job_prefix}-----{job_status.state}", refresh=True)
        current_progress = job_status.progress
        current_state = job_status.state
        async for attempt in AsyncRetrying(
            wait=wait_fixed(1), retry=retry_if_exception_type()
        ):
            with attempt:
                job_status = await inspect_job(solver, job)
                assert job_status.progress is not None  # nosec
                if job_status.progress != current_progress:
                    pbar.update(job_status.progress - current_progress)
                    current_progress = job_status.progress

                if current_state != job_status.state:
                    pbar.set_description_str(
                        f"{job_prefix}-----{job_status.state}", refresh=True
                    )

                if job_status.state not in ["FAILED", "SUCCESS", "ABORTED"]:
                    raise TryAgain
        return job_status


job_statuses = []
with tqdm(total=len(created_jobs), desc="Running jobs") as pbar:
    for result in asyncio.as_completed([run_job(solver, job) for job in created_jobs]):
        try:
            job_statuses.append(await result)
        except osparc.ApiException as exc:
            tqdm.write(f"Error while running {job.id}: {exc}")
        finally:
            pbar.update()

## Check results

In [None]:
job_inspection_results = await asyncio.gather( *(inspect_job(solver, job)for job in created_jobs))
assert all(status.state == "SUCCESS" for status in job_inspection_results), job_inspection_results

## Get job result

In [None]:
print(await get_job_result(solver, created_jobs[0]))

# Cleanup jobs

In [None]:
listed_jobs =await list_jobs(solver)
from tqdm.asyncio import tqdm_asyncio
await tqdm_asyncio.gather(*(stop_job(solver, job) for job in listed_jobs), desc="stopping jobs")
await tqdm_asyncio.gather(*(delete_job(solver, job) for job in listed_jobs), desc="deleting jobs")
await asyncio.sleep(3)
listed_jobs =await list_jobs(solver)
assert len(listed_jobs) == 0, f"found {len(listed_jobs)} jobs"

In [None]:
listed_jobs =await list_jobs(solver)

In [None]:
created_jobs = listed_jobs
len(listed_jobs)


## Reproduce issue with multiple jobs sent at interval

### send first set of jobs

In [None]:
# 8 jobs first
NUM_JOBS = 8
jobs_1st_wave = []
for i in tqdm(range(NUM_JOBS), desc=f"Create and start {NUM_JOBS} jobs"):
    created_job = await create_job(solver)
    jobs_1st_wave.append(created_job)
    started_job = await start_job(solver, created_job)


In [None]:
# 8 jobs again
NUM_JOBS = 8
jobs_2nd_wave = []
for i in tqdm(range(NUM_JOBS), desc=f"Create and start {NUM_JOBS} jobs"):
    created_job = await create_job(solver)
    jobs_2nd_wave.append(created_job)
    started_job = await start_job(solver, created_job)


In [None]:
from osparc import Job, JobStatus, Solver


async def check_job(solver: Solver, job: Job) -> JobStatus:
    job_status = await inspect_job(solver, job)
    assert job_status.progress is not None # nosec
    last_progress = job_status.progress
    with tqdm(total=100, desc=job_status.state) as pbar:
        async for attempt in AsyncRetrying(wait=wait_fixed(1), retry=retry_if_exception_type()):
            with attempt:
                job_status = await inspect_job(solver, job)
                assert job_status.progress is not None # nosec
                if job_status.progress != last_progress:
                    pbar.update(job_status.progress - last_progress)
                if job_status.state != pbar.desc:
                    pbar.set_description_str(job_status.state, refresh=True)
                    last_progress = job_status.progress
                    
                if not job_status.state in ["FAILED", "SUCCESS"]:
                    raise TryAgain
        return job_status

In [None]:
job_statuses = []
all_jobs = jobs_1st_wave + jobs_2nd_wave
with tqdm(total=len(all_jobs), desc="Running jobs") as pbar:
    for result in asyncio.as_completed([check_job(solver, job) for job in all_jobs]):
        try:
            job_statuses.append(await result)            
        except osparc.ApiException as exc:
            tqdm.write(f"Error while running {job.id}: {exc}")
        finally:
            pbar.update()

In [None]:
# cleanup
all_jobs = await list_jobs(solver)
from tqdm.asyncio import tqdm_asyncio
await tqdm_asyncio.gather(*(stop_job(solver, job) for job in listed_jobs), desc="stopping jobs")
await tqdm_asyncio.gather(*(delete_job(solver, job) for job in all_jobs))
all_jobs = await list_jobs(solver)
assert len(all_jobs) == 0, f"found {len(all_jobs)} jobs"