In [None]:
import yt.wrapper as yt

import time
import sys

In [None]:
# 1. Running a simple operation.

# Prepare input.
yt.create("table", "//tmp/in", force=True)
yt.write_table("//tmp/in", [{"a": 1}])
          
# Run operation
yt.run_map("sleep 60; cat", "//tmp/in", "//tmp/out", format="yson")

# Read output.
for row in yt.read_table("//tmp/out"):
    print(row)

In [None]:
# 2. Running several operations.

tracker = yt.OperationsTracker(print_progress=False)
for i in range(6):
    op = yt.run_operation(
        yt.VanillaSpecBuilder()
            .begin_task("task")
                .command("sleep 1000")
                .job_count(3)
                .cpu_limit(2.0)
            .end_task(),
        sync=False,
    )
    tracker.add(op)

In [None]:
tracker.abort_all()

In [None]:
# 3. Running several operations with different weights.

tracker = yt.OperationsTracker(print_progress=False)
for i in range(3):
    op = yt.run_operation(
        yt.VanillaSpecBuilder()
            .begin_task("task")
                .command("sleep 1000")
                .job_count(12)
            .end_task()
            .weight(i + 1),
        sync=False,
    )
    tracker.add(op)

In [None]:
tracker.abort_all()

In [None]:
# 4. Running several operations in different pools.

tracker = yt.OperationsTracker(print_progress=False)
for pool in ["a", "b"]:
    for i in range(3):
        op = yt.run_operation(
            yt.VanillaSpecBuilder()
                .begin_task("task")
                    .command("sleep 1000")
                    .job_count(12)
                .end_task()
                .pool(pool)
                .weight(i + 1), 
            sync=False,
        )
        tracker.add(op)

In [None]:
tracker.abort_all()

In [None]:
def run_sleeping_operation(job_count=1, cpu_limit=1.0, memory_limit=512 * 1024**2, pool=None, weight=None):
    spec_builder = yt.VanillaSpecBuilder()\
        .begin_task("task")\
            .command("sleep 1000")\
            .job_count(job_count)\
            .cpu_limit(cpu_limit)\
            .memory_limit(memory_limit)\
            .spec({"user_job_memory_digest_default_value": 1.0})\
        .end_task()
    
    if pool is not None:
        spec_builder = spec_builder.pool(pool)
    if weight is not None:
        spec_builder = spec_builder.weight(weight)
        
    return yt.run_operation(spec_builder, sync=False)

In [None]:
# 5. Creating a pool.

yt.create("scheduler_pool", attributes={"name": "c", "parent_name": "example", "pool_tree": "default"})

In [None]:
yt.remove("//sys/pool_trees/default/example/c")

In [None]:
# 6. Setting pool strong guarantees.

# (pool_path, cpu_guarantee)
pool_params = [
    ("company", 24.0),
    ("company/development", 20.0),
    ("company/development/production", 16.0),
    ("company/development/testing", 4.0),
    ("company/analytics", 4.0),
    ("company/analytics/chyt", 4.0),
]

for pool_path, cpu_guarantee in pool_params:
    yt.set("//sys/pool_trees/default/" + pool_path + "/@strong_guarantee_resources", {"cpu": cpu_guarantee})

In [None]:
# 7. Running operations in pools with guarantees.

# (job_count, pool)
operation_params = [
    (4, "production"),
    (4, "chyt"),
    (12, "production"),
    (8, "testing"),
    (4, "bi"),
]

ops = []
for job_count, pool in operation_params:
    op = run_sleeping_operation(job_count=job_count, pool=pool)
    ops.append(op)

In [None]:
# Abort some operations in guaranteed pools.
for op in ops[:2]:
    op.abort()

In [None]:
# Abort the rest.
for op in ops[2:]:
    op.abort()

In [None]:
# 8. Resource limits.

yt.set("//sys/pool_trees/default/company/development/testing/@resource_limits", {"cpu": 5.0})

In [None]:
op = run_sleeping_operation(job_count=8, pool="testing")

In [None]:
op.abort()

yt.remove("//sys/pool_trees/default/company/development/testing/@resource_limits")

In [None]:
# 9. Running operations with different resource demand profiles.

# (cpu_limit, memory_limit)
operation_params = [
    (2.0, 1024**3),
    (1.0, 10 * 1024**3),
]

tracker = yt.OperationsTracker(print_progress=False)
for cpu_limit, memory_limit in operation_params:
    op = run_sleeping_operation(cpu_limit=cpu_limit, memory_limit=memory_limit, pool="testing")
    tracker.add(op)

In [None]:
tracker.abort_all()

In [None]:
# 10. Vector guarantees.

# (pool_path, guarantees)
pool_params = [
    ("company", {"cpu": 24.0, "memory": 54 * 1024**3}),
    ("company/analytics", {"cpu": 4.0, "memory": 16 * 1024**3}),
    ("company/analytics/chyt", {"cpu": 4.0, "memory": 16 * 1024**3}),
]

for pool_path, guarantees in pool_params:
    yt.set("//sys/pool_trees/default/" + pool_path + "/@strong_guarantee_resources", guarantees)

In [None]:
# 11. Job preemption.

yt.set("//sys/pool_trees/default/company/development/testing/@allow_regular_preemption", False)

tracker = yt.OperationsTracker(print_progress=False)
tracker.add(run_sleeping_operation(job_count=20, pool="testing"))

In [None]:
# Only 4 jobs will run, because preemption is forbidden.
tracker.add(run_sleeping_operation(job_count=12, pool="testing"))

In [None]:
# Enable preemption.
yt.set("//sys/pool_trees/default/company/development/testing/@allow_regular_preemption", True)

In [None]:
tracker.abort_all()

yt.remove("//sys/pool_trees/default/company/development/testing/@allow_regular_preemption")

In [None]:
# 12. FIFO pools.

yt.set("//sys/pool_trees/default/company/development/testing/@mode", "fifo")

tracker = yt.OperationsTracker(print_progress=False)
for _ in range(6):
    op = run_sleeping_operation(job_count=5, pool="testing")
    tracker.add(op)
    time.sleep(0.1)

In [None]:
tracker.add(run_sleeping_operation(job_count=1, pool="testing", weight=100.0))

In [None]:
tracker.abort_all()

yt.set("//sys/pool_trees/default/company/development/testing/@mode", "fair_share")

In [None]:
# 13. Several pool trees.

# Get exec nodes.
all_nodes = yt.list("//sys/cluster_nodes", attributes=["tags", "flavors"])
exec_nodes = [node for node in all_nodes if "exec" in node.attributes["flavors"]]

# Set custom tag for one node.
other_tree_node = exec_nodes[0]
yt.set("//sys/cluster_nodes/" + other_tree_node + "/@user_tags", ["custom_tag"])

for node in exec_nodes:
    print(yt.get("//sys/cluster_nodes/" + node + "/@tags"), file=sys.stderr)

# Configure another pool tree.
yt.set("//sys/pool_trees/default/@config/nodes_filter", "!custom_tag")
yt.create("scheduler_pool_tree", attributes={"name": "other", "config": {"nodes_filter": "custom_tag"}})

In [None]:
yt.create("scheduler_pool", attributes={"name": "testing", "pool_tree": "other"})

op = yt.run_operation(
    yt.VanillaSpecBuilder()
        .begin_task("task")
            .command("sleep 1000")
            .job_count(24)
        .end_task()
        .pool("testing")
        .pool_trees(["default", "other"]),
    sync=False,
)

In [None]:
op.abort()

yt.remove("//sys/pool_trees/other", recursive=True)
yt.set("//sys/pool_trees/default/@config/nodes_filter", "")
yt.set("//sys/cluster_nodes/" + other_tree_node + "/@user_tags", [])

In [None]:
# 14. Speculative jobs.

op = yt.run_operation(
    yt.VanillaSpecBuilder()
        .begin_task("task")
            .command("sleep 1000")
            .job_count(1)
            .spec({"job_speculation_timeout": 5000})
        .end_task(),
    sync=False,
)

In [None]:
op.abort()

In [None]:
# 15. Integral guarantees.

# (pool_path, cpu_guarantee)
pool_params = [
    ("company/analytics/chyt", 0.0),
    ("company/analytics", 0.0),
    ("company", 22.0),
    ("company/development", 22.0),
]

# Reconfigure strong guarantees.
for pool_path, cpu_guarantee in pool_params:
    yt.set("//sys/pool_trees/default/" + pool_path + "/@strong_guarantee_resources", {"cpu": cpu_guarantee})

# (pool_path, integral_guarantees)
pool_params = [
    ("example", {"resource_flow": {"cpu": 1.0}}),
    ("example/integral", {
        "guarantee_type": "relaxed",
        "resource_flow": {"cpu": 1.0},
    }),
]

# Configure integral guarantees.
yt.create("scheduler_pool", attributes={"name": "integral", "parent_name": "example", "pool_tree": "default"})
for pool_path, integral_guarantees in pool_params:
    yt.set("//sys/pool_trees/default/" + pool_path + "/@integral_guarantees", integral_guarantees)
    
# Track accumulated resource volume.
pools_orchid_path = "//sys/scheduler/orchid/scheduler/pool_trees/default/pools"
while not ("integral" in yt.list(pools_orchid_path) and yt.exists(pools_orchid_path + "/integral/accumulated_resource_volume/cpu")):
    time.sleep(0.1)
    
def get_accumulated_cpu_volume():
    return yt.get(pools_orchid_path + "/integral/accumulated_resource_volume/cpu")

start_time = time.time()
for i in range(10):
    time.sleep(1.0)
    elapsed = time.time() - start_time
    print("Seconds elapsed: {}, accumulated CPU volume: {}".format(elapsed, get_accumulated_cpu_volume()))

In [None]:
# Run operation in integral pool.
op = run_sleeping_operation(job_count=2, pool="integral")

start_time = time.time()
for i in range(10):
    time.sleep(1.0)
    elapsed = time.time() - start_time
    print("Seconds elapsed: {}, accumulated CPU volume: {}".format(elapsed, get_accumulated_cpu_volume()))

In [None]:
op.abort()

In [None]:
# The end.

In [None]:
# P.S.
def reset_cluster():
    operations = yt.list_operations()["operations"]
    for op in operations:
        yt.abort_operation(op["id"])
    
    for tree in yt.list("//sys/pool_trees"):
        if tree != "default":
            yt.remove("//sys/pool_trees/" + tree, recursive=True)
    yt.remove("//sys/pool_trees/default/*", recursive=True, force=True)
    
    for node in yt.list("//sys/cluster_nodes"):
        yt.set("//sys/cluster_nodes/" + node + "/@user_tags", [])
    
    yt.set("//sys/pool_trees/default/@config", {
        "default_parent_pool": "research",
        "nodes_filter": "",
        "fair_share_starvation_timeout": 100,
        "fair_share_starvation_tolerance": 0.99,
        "non_preemptible_resource_usage_threshold": {
            "user_slots": 0,
        },
        "preemption_check_satisfaction": False,
        "preemption_check_starvation": False,
        "preemption_satisfaction_threshold": 0.99,
        "integral_guarantees": {
            "smooth_period": 1000,
        },
    })
    
    pools = [
        ("admin", None),
        ("company", None),
        ("analytics", "company"),
        ("bi", "analytics"),
        ("chyt", "analytics"),
        ("research", "analytics"),
        ("development", "company"),
        ("production", "development"),
        ("testing", "development"),
        ("example", None),
        ("a", "example"),
        ("b", "example"),
    ]
    for name, parent_name in pools:
        attributes = {"name": name, "pool_tree": "default"}
        if parent_name is not None:
            attributes["parent_name"] = parent_name
        yt.create("scheduler_pool", attributes=attributes)
    
    yt.set("//sys/pool_trees/default/example/a/@weight", 3.0)

In [None]:
reset_cluster()