# Low level interface

## Basic usage

First you need to create a connection

In [1]:
import stoilo

conn = await stoilo.connect('localhost:57010')

A minimal working example of creating a task and getting a result

In [2]:
await conn.create_task(
    kwargs={"a": 2, "b": 3},
    func=lambda kwargs: kwargs["a"] + kwargs["b"],
).result()

5

In detail

In [3]:
# synchronous non-blocking creation
task = conn.create_task(
    kwargs={"a": 2, "b": 3},
    func=lambda kwargs: kwargs["a"] + kwargs["b"]
)
# task has not been submitted to the server yet
print(task.task_id is None)  # True

# asynchronous submission, will return fairly quickly
task = await task.submit()
# task has been submitted and the task_id has been assigned
print(task.task_id)  # some uuid

# asynchronous waiting for the result, takes some time to finish
result = await task.result()
print(result)  # 5

True
3dcaabd9f99a48c98037e133849a5732
5


## Data transfer

The value returned by the function should be json-serializable. This is done for a combination of built-in Python types such as scalar types, str, list, tuple and dict. If you need to transfer more complex types, the serialization logic should be written by the user. **PLEASE PAY SPECIAL ATTENTION** that using `pickle`, `drill` or `cloudpickle` for the returned values is insecure, since arbitrary code can be executed during deserialization, thus opening up a vulnerability to attacks by volunteers on the server or users. Use such insecure serialization libraries **ONLY** in a trusted environment.

## Computational Replication and Validation settings

You can configure validation functions and computational redundancy

In [4]:
redundancy_options = stoilo.low_level.redundancy.CreateOptions(min_quorum=3, max_total_results=5)

def quadratic_roots_harmonic_mean(kwargs):
    import math
    a, b, c = kwargs["a"], kwargs["b"], kwargs["c"]
    discriminant = b ** 2 - 4 * a * c
    if discriminant < 0:
        return None
    sqrt_discriminant = math.sqrt(discriminant)
    denom = 2 * a
    x1 = (-b + sqrt_discriminant) / denom
    x2 = (-b - sqrt_discriminant) / denom
    harmonic_mean = 2 / (1 / x1 + 1 / x2)
    return harmonic_mean

task = conn.create_task(
    kwargs={"a": 1, "b": -3, "c": 2},
    func=quadratic_roots_harmonic_mean,
    init_valid_func=lambda res: res is None or isinstance(res, float),
    compare_valid_func=lambda x, y: (
        x is None and y is None
        or x is not None and y is not None and abs(x - y) < 1e-6
    ),
    redundancy_options=redundancy_options,
)

task = await task.submit()

As a user, you do not have to be available all the time, so you can reliably save serialized `SubmittedTask` object and close the Python notebook

In [5]:
print(task.task_id)

with open("task_id.txt", "w") as f:
    f.write(task.task_id)

e9a655b80bd048d6b44ff1734db39303


And after returning to the work and resturting the notebook

In [6]:
with open("task_id.txt", "r") as f:
    task_id = f.read()

task = conn.restore_task(task_id)
print(task.task_id)

await task.result()

e9a655b80bd048d6b44ff1734db39303


1.3333333333333333

## Parallel task execution

Thanks to the asynchronous interface for waiting for results, you can create many subtasks and wait for them all to complete.

In [7]:
import asyncio
import math
from typing import Callable

async def integrate_trapezoid_distributed(conn, f: Callable[[float], float], a: float, b: float) -> float:
    def worker(kwargs):
        x_0, x_n, n, f = kwargs["a"], kwargs["b"], kwargs["n"], kwargs["f"]
        h = (x_n - x_0) / n
        total = 0.5 * (f(x_0) + f(x_n))
        for j in range(1, n):
            xj = x_0 + j * h
            total += f(xj)
        return total * h
    
    n_tasks = 10
    n_step_in_task = 1000

    subseg_size = (b - a) / n_tasks
    tasks = []
    for i in range(n_tasks):
        sub_a = a + i * subseg_size
        sub_b = sub_a + subseg_size
        task = conn.create_task(
            kwargs={"a": sub_a, "b": sub_b, "n": n_step_in_task, "f": f},
            func=worker,
            redundancy_options=stoilo.low_level.redundancy.TRIVIAL_OPTIONS,
        )
        tasks.append(task)

    partial_integrals = await asyncio.gather(*(t.result() for t in tasks))
    return sum(partial_integrals)


integral = await integrate_trapezoid_distributed(conn, lambda x: x * math.sin(x), 0, math.pi)
print(f"∫₀^π x sin(x) dx ≈ {integral:.4f}  (actual value is π)")

∫₀^π x sin(x) dx ≈ 3.1416  (actual value is π)


## Error handling

If an exception occurs in the user's function, an object of type `stoilo.low_level.UserError` containing this exception will be returned. In case of any other errors or inability to complete the task with the specified replication options, an object of type `stoilo.low_level.SystemError` will be returned.

The Initial validator always accepts `UserError` and rejects `SystemError`. The comparative validator considers the returned values to be from the same equivalence class if either they are both `UserError`, or both are not `UserError` and `compare_valid_func` returned True. The fact is that validators are responsible for verifying the correctness of calculations, and throwing an exception in user code can be a true result of execution.

In total, the type of the returned value is `stoilo.low_level.TaskResult = Union[Any, UserError, SystemError]`.

In [8]:
falling_tasks = []

falling_tasks.append(conn.create_task(
    kwargs={"a": 1, "b": 0},
    func=lambda kwargs: kwargs["a"] / kwargs["b"],  # division by zero
))

falling_tasks.append(conn.create_task(
    kwargs={"a": 1, "b": 2},
    func=lambda kwargs: kwargs["a"] / kwargs["b"],
    init_valid_func=lambda _: 1 / 0,  # failing validator
))

falling_tasks.append(conn.create_task(
    kwargs={"a": 1, "b": 2},
    func=lambda kwargs: kwargs["a"] / kwargs["b"],
    init_valid_func=lambda _: False,  # reject all the results
))

results = await asyncio.gather(*(t.result() for t in falling_tasks))

for i, res in enumerate(results):
    print(f"Task {i} returned {type(res)}: {res}")

Task 0 returned <class 'stoilo.low_level.task_result.UserError'>: Stoilo user error: Exception is thrown in user function: division by zero
Task 1 returned <class 'stoilo.low_level.task_result.SystemError'>: Stoilo system error: BOINC error code: 8, see WU_ERROR_* in html/inc/common_defs.inc
Task 2 returned <class 'stoilo.low_level.task_result.SystemError'>: Stoilo system error: BOINC error code: 8, see WU_ERROR_* in html/inc/common_defs.inc


## Flavors

The volunteer nodes run workers called _raboshkas_ that perform the user's Python tasks. The raboshka contains a Python interpreter and some pip dependencies, which makes its own dependencies and requirements for a volunteer minimal. The specific list of pip dependencies and modules installed in the raboshka is called flavor. All raboshka_flavor are created by administrators of the deployed system, and are identified by a hash of `dependencies.yaml`.

Below is an example of `dependencies.yml` with the hash `44814764c91bf9ef426c4aa899df974f`
```yaml
requirements:
  - cloudpickle==3.1.1
  - numpy==2.2.5
  - torch==2.7.0
  - torchvision==0.22.0
modules:
  - cloudpickle
  - numpy
  - torch
  - torchvision

```
As you know, in general, the names of pip libraries and top level modules which they export may not match.

Thanks to flavor's mechanism, you can create tasks using Python libraries.

In [9]:
import numpy as np

A = np.array([[ 3,  1,  2],
              [ 1,  4,  0],
              [ 2,  0,  5]], dtype=float)
b = np.array([1, 2, 3], dtype=float)

def solve_linear_system(kwargs):
    import numpy as np_
    x = np_.linalg.solve(kwargs["A"], kwargs["b"])
    return x.tolist()

task = conn.create_task(
    kwargs={"A": A, "b": b},
    func=solve_linear_system,
    flavor='44814764c91bf9ef426c4aa899df974f',
    init_valid_func=lambda x: isinstance(x, list) and len(x) == 3 and all(isinstance(elem, float) for elem in x),
    compare_valid_func=lambda x, y: all(abs(elem_x - elem_y) < 1e-6 for elem_x, elem_y in zip(x, y)),
)

await task.result()

[-0.3589743589743591, 0.5897435897435899, 0.7435897435897437]