# Demo

In [1]:
import ray

print(ray.__version__)
ray.shutdown()
# ray.init connects to the cluster
ray.init("ray://127.0.0.1:10001")

1.12.1


ClientContext(dashboard_url='10.0.2.103:8265', python_version='3.8.5', ray_version='1.12.1', ray_commit='4863e33856b54ccf8add5cbe75e41558850a1b75', protocol_version='2022-03-16', _num_clients=1, _context_to_restore=<ray.util.client._ClientContext object at 0x7fd840de4c70>)

In [2]:
import time
from contextlib import contextmanager

@contextmanager
def timer():
    """Context manager to measure running time of code."""
    start = time.time()
    yield
    time_elapsed = round(time.time() - start)
    print(f"timer: took {time_elapsed} seconds")

In [3]:
def get_instance_type():
    """Returns what instance type this function is running on."""
    import requests
    token = requests.put(
        "http://169.254.169.254/latest/api/token",
        headers={"X-aws-ec2-metadata-token-ttl-seconds": "21600"}
    ).text
    instance_type = requests.get(
        "http://169.254.169.254/latest/meta-data/instance-type",
        headers={"X-aws-ec2-metadata-token": token},
    ).text
    return instance_type

In [4]:
def print_cluster_resources():
    """Prints the CPUs, memory and GPUs of the current Ray cluster."""
    cluster_resources = ray.cluster_resources()
    CPUs = int(cluster_resources["CPU"])
    memory = round(cluster_resources["memory"] / (1000 ** 3))
    GPUs = round(cluster_resources.get("GPU", 0))
    print(f"CPUs = {CPUs}, memory = {memory}G, GPUs = {GPUs}")

---

In [None]:
@ray.remote(num_cpus=1, memory=1000 ** 3)
def preprocess(data):
    time.sleep(1)
    return get_instance_type()

with timer():
    print(ray.get(preprocess.remote("data")))

In [None]:
print_cluster_resources()

from collections import Counter
with timer():
    print(Counter(
        ray.get([preprocess.remote(x) for x in range(60)])
    ))

In [None]:
from collections import Counter
with timer():
    print(Counter(
        ray.get([preprocess.remote(x) for x in range(6000)])
    ))

print_cluster_resources()

---

In [5]:
@ray.remote(memory=100 * (1000 ** 3))
def preprocess_big_data():
    return get_instance_type()

print(ray.get(preprocess_big_data.remote()))
print_cluster_resources()

i4i.8xlarge
CPUs = 34, memory = 197G, GPUs = 0


In [6]:
@ray.remote(num_gpus=4, accelerator_type="p2")
def train():
    return get_instance_type()

with timer():
    print(ray.get(train.remote()))
print_cluster_resources()
# does not work due to this bug:
# https://github.com/aws/karpenter/pull/1837

p2.xlarge
timer: took 178 seconds
CPUs = 37, memory = 239G, GPUs = 1


---

---

---

In [None]:
from ray.autoscaler.sdk import request_resources

In [None]:
!kubectl get no -L node.kubernetes.io/instance-type,karpenter.sh/capacity-type