Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/codeflare_sdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
CodeFlareClusterStatus,
RayCluster,
AppWrapper,
get_cluster,
)

from .job import JobDefinition, Job, DDPJobDefinition, DDPJob, RayJobClient
Expand Down
2 changes: 1 addition & 1 deletion src/codeflare_sdk/cluster/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@
AppWrapper,
)

from .cluster import Cluster, ClusterConfiguration
from .cluster import Cluster, ClusterConfiguration, get_cluster

from .awload import AWManager
173 changes: 85 additions & 88 deletions src/codeflare_sdk/cluster/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ def cluster_dashboard_uri(self) -> str:
elif "route.openshift.io/termination" in annotations:
protocol = "https"
return f"{protocol}://{ingress.spec.rules[0].host}"
return "Dashboard ingress not available yet, have you run cluster.up()?"
return "Dashboard not available yet, have you run cluster.up()?"

def list_jobs(self) -> List:
"""
Expand Down Expand Up @@ -505,30 +505,50 @@ def torchx_config(
def from_k8_cluster_object(
rc, mcad=True, ingress_domain=None, ingress_options={}, write_to_file=False
):
config_check()
openshift_oauth = False
if (
rc["metadata"]["annotations"]["sdk.codeflare.dev/local_interactive"]
== "True"
):
local_interactive = True
else:
local_interactive = False
if "codeflare.dev/oauth" in rc["metadata"]["annotations"]:
openshift_oauth = (
rc["metadata"]["annotations"]["codeflare.dev/oauth"] == "True"
)
else:
for container in rc["spec"]["headGroupSpec"]["template"]["spec"][
"containers"
]:
openshift_oauth = "oauth-proxy" in container["name"]
machine_types = (
rc["metadata"]["labels"]["orderedinstance"].split("_")
if "orderedinstance" in rc["metadata"]["labels"]
else []
)
local_interactive = (
"volumeMounts"
in rc["spec"]["workerGroupSpecs"][0]["template"]["spec"]["containers"][0]
)
if local_interactive:
ingress_domain = get_ingress_domain_from_client(
rc["metadata"]["name"], rc["metadata"]["namespace"]
)

if local_interactive and ingress_domain == None:
ingress_domain = rc["metadata"]["annotations"][
"sdk.codeflare.dev/ingress_domain"
]

cluster_config = ClusterConfiguration(
name=rc["metadata"]["name"],
namespace=rc["metadata"]["namespace"],
machine_types=machine_types,
num_workers=rc["spec"]["workerGroupSpecs"][0]["minReplicas"],
min_cpus=rc["spec"]["workerGroupSpecs"][0]["template"]["spec"][
"containers"
][0]["resources"]["requests"]["cpu"],
max_cpus=rc["spec"]["workerGroupSpecs"][0]["template"]["spec"][
"containers"
][0]["resources"]["limits"]["cpu"],
min_cpus=int(
rc["spec"]["workerGroupSpecs"][0]["template"]["spec"]["containers"][0][
"resources"
]["requests"]["cpu"]
),
max_cpus=int(
rc["spec"]["workerGroupSpecs"][0]["template"]["spec"]["containers"][0][
"resources"
]["limits"]["cpu"]
),
min_memory=int(
rc["spec"]["workerGroupSpecs"][0]["template"]["spec"]["containers"][0][
"resources"
Expand All @@ -539,9 +559,11 @@ def from_k8_cluster_object(
"resources"
]["limits"]["memory"][:-1]
),
num_gpus=rc["spec"]["workerGroupSpecs"][0]["template"]["spec"][
"containers"
][0]["resources"]["limits"]["nvidia.com/gpu"],
num_gpus=int(
rc["spec"]["workerGroupSpecs"][0]["template"]["spec"]["containers"][0][
"resources"
]["limits"]["nvidia.com/gpu"]
),
instascale=True if machine_types else False,
image=rc["spec"]["workerGroupSpecs"][0]["template"]["spec"]["containers"][
0
Expand All @@ -551,6 +573,7 @@ def from_k8_cluster_object(
ingress_domain=ingress_domain,
ingress_options=ingress_options,
write_to_file=write_to_file,
openshift_oauth=openshift_oauth,
)
return Cluster(cluster_config)

Expand Down Expand Up @@ -652,56 +675,57 @@ def get_cluster(
for rc in rcs["items"]:
if rc["metadata"]["name"] == cluster_name:
mcad = _check_aw_exists(cluster_name, namespace)

try:
config_check()
api_instance = client.NetworkingV1Api(api_config_handler())
ingresses = api_instance.list_namespaced_ingress(namespace)
ingress_host = None
ingress_options = {}
for ingress in ingresses.items:
# Search for ingress with AppWrapper name as the owner
if (
"ingress-owner" in ingress.metadata.labels
and ingress.metadata.labels["ingress-owner"] == cluster_name
):
ingress_host = ingress.spec.rules[0].host
ingress_host = None
ingress_options = {}
if not is_openshift_cluster():
try:
config_check()
api_instance = client.NetworkingV1Api(api_config_handler())
ingresses = api_instance.list_namespaced_ingress(namespace)
for ingress in ingresses.items:
# Search for ingress with AppWrapper name as the owner
if (
"ingress-options" in ingress.metadata.labels
and ingress.metadata.labels["ingress-options"] == "true"
"ingress-owner" in ingress.metadata.labels
and ingress.metadata.labels["ingress-owner"] == cluster_name
):
ingress_name = ingress.metadata.name
port = (
ingress.spec.rules[0]
.http.paths[0]
.backend.service.port.number
)
annotations = ingress.metadata.annotations
path = ingress.spec.rules[0].http.paths[0].path
ingress_class_name = ingress.spec.ingress_class_name
path_type = ingress.spec.rules[0].http.paths[0].path_type

ingress_options = {
"ingresses": [
{
"ingressName": ingress_name,
"port": port,
"annotations": annotations,
"ingressClassName": ingress_class_name,
"pathType": path_type,
"path": path,
"host": ingress_host,
}
]
}
except Exception as e:
return _kube_api_error_handling(e)
ingress_host = ingress.spec.rules[0].host
if (
"ingress-options" in ingress.metadata.labels
and ingress.metadata.labels["ingress-options"] == "true"
):
ingress_name = ingress.metadata.name
port = (
ingress.spec.rules[0]
.http.paths[0]
.backend.service.port.number
)
annotations = ingress.metadata.annotations
path = ingress.spec.rules[0].http.paths[0].path
ingress_class_name = ingress.spec.ingress_class_name
path_type = (
ingress.spec.rules[0].http.paths[0].path_type
)

ingress_options = {
"ingresses": [
{
"ingressName": ingress_name,
"port": port,
"annotations": annotations,
"ingressClassName": ingress_class_name,
"pathType": path_type,
"path": path,
"host": ingress_host,
}
]
}
except Exception as e: # pragma: no cover
return _kube_api_error_handling(e)
# We gather the ingress domain from the host
if ingress_host is not None and ingress_options == {}:
ingress_domain = ingress_host.split(".", 1)[1]
else:
ingress_domain = None

return Cluster.from_k8_cluster_object(
rc,
mcad=mcad,
Expand Down Expand Up @@ -1043,30 +1067,3 @@ def _copy_to_ray(cluster: Cluster) -> RayCluster:
if ray.status == CodeFlareClusterStatus.READY:
ray.status = RayClusterStatus.READY
return ray


def get_ingress_domain_from_client(cluster_name: str, namespace: str = "default"):
if is_openshift_cluster():
try:
config_check()
api_instance = client.CustomObjectsApi(api_config_handler())
route = api_instance.get_namespaced_custom_object(
group="route.openshift.io",
version="v1",
namespace=namespace,
plural="routes",
name=f"rayclient-{cluster_name}",
)
return route["spec"]["host"].split(".", 1)[1]
except Exception as e: # pragma no cover
return _kube_api_error_handling(e)
else:
try:
config_check()
api_instance = client.NetworkingV1Api(api_config_handler())
ingress = api_instance.read_namespaced_ingress(
f"rayclient-{cluster_name}", namespace
)
return ingress.spec.rules[0].host.split(".", 1)[1]
except Exception as e: # pragma no cover
return _kube_api_error_handling(e)
2 changes: 2 additions & 0 deletions src/codeflare_sdk/templates/base-template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ spec:
apiVersion: ray.io/v1
kind: RayCluster
metadata:
annotations:
sdk.codeflare.dev/local_interactive: "False"
labels:
workload.codeflare.dev/appwrapper: "aw-kuberay"
controller-tools.k8s.io: "1.0"
Expand Down
16 changes: 16 additions & 0 deletions src/codeflare_sdk/utils/generate_yaml.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,12 +461,25 @@ def enable_local_interactive(resources, cluster_name, namespace, ingress_domain)
namespace,
ingress_domain,
)
item["generictemplate"]["metadata"]["annotations"][
"sdk.codeflare.dev/local_interactive"
] = "True"
item["generictemplate"]["metadata"]["annotations"][
"sdk.codeflare.dev/ingress_domain"
] = ingress_domain

item["generictemplate"]["spec"]["headGroupSpec"]["template"]["spec"][
"initContainers"
][0].get("command")[2] = command


def apply_ingress_domain_annotation(resources, ingress_domain):
item = resources["resources"].get("GenericItems")[0]
item["generictemplate"]["metadata"]["annotations"][
"sdk.codeflare.dev/ingress_domain"
] = ingress_domain


def del_from_list_by_name(l: list, target: typing.List[str]) -> list:
return [x for x in l if x["name"] not in target]

Expand Down Expand Up @@ -734,6 +747,9 @@ def generate_appwrapper(
ingress_options,
ingress_domain,
)
if ingress_domain is not None:
apply_ingress_domain_annotation(resources, ingress_domain)

if local_interactive:
enable_local_interactive(resources, cluster_name, namespace, ingress_domain)
else:
Expand Down
2 changes: 2 additions & 0 deletions tests/test-case-bad.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ spec:
apiVersion: ray.io/v1
kind: RayCluster
metadata:
annotations:
sdk.codeflare.dev/local_interactive: 'False'
labels:
workload.codeflare.dev/appwrapper: unit-test-cluster
controller-tools.k8s.io: '1.0'
Expand Down
3 changes: 3 additions & 0 deletions tests/test-case-no-mcad.yamls
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
apiVersion: ray.io/v1
kind: RayCluster
metadata:
annotations:
sdk.codeflare.dev/ingress_domain: apps.cluster.awsroute.org
sdk.codeflare.dev/local_interactive: 'False'
labels:
controller-tools.k8s.io: '1.0'
workload.codeflare.dev/appwrapper: unit-test-cluster-ray
Expand Down
3 changes: 3 additions & 0 deletions tests/test-case-prio.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ spec:
apiVersion: ray.io/v1
kind: RayCluster
metadata:
annotations:
sdk.codeflare.dev/ingress_domain: apps.cluster.awsroute.org
sdk.codeflare.dev/local_interactive: 'False'
labels:
controller-tools.k8s.io: '1.0'
workload.codeflare.dev/appwrapper: prio-test-cluster
Expand Down
3 changes: 3 additions & 0 deletions tests/test-case.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ spec:
apiVersion: ray.io/v1
kind: RayCluster
metadata:
annotations:
sdk.codeflare.dev/ingress_domain: apps.cluster.awsroute.org
sdk.codeflare.dev/local_interactive: 'False'
labels:
controller-tools.k8s.io: '1.0'
workload.codeflare.dev/appwrapper: unit-test-cluster
Expand Down
3 changes: 3 additions & 0 deletions tests/test-default-appwrapper.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ spec:
apiVersion: ray.io/v1
kind: RayCluster
metadata:
annotations:
sdk.codeflare.dev/ingress_domain: apps.cluster.awsroute.org
sdk.codeflare.dev/local_interactive: 'False'
labels:
controller-tools.k8s.io: '1.0'
workload.codeflare.dev/appwrapper: unit-test-default-cluster
Expand Down
Loading