Skip to content

Commit

Permalink
optimize svs operation (#1163)
Browse files Browse the repository at this point in the history
* optimize svs operation

* lint

* optimize ut

* optimize logging

* optimize logging
  • Loading branch information
BalaBalaYi committed Jun 19, 2024
1 parent e0e095f commit c2cc6d9
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 4 deletions.
5 changes: 4 additions & 1 deletion dlrover/python/master/scaler/pod_scaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,10 @@ def _check_master_service_avaliable(self, host, port, timeout=15):
)
time.sleep(1)

logger.warning(f"Master service check failed after {timeout} retries.")
logger.warning(
f"Master service check {host}:{port} "
f"failed after {timeout} retries."
)
return False

def _patch_tf_config_into_env(self, pod, node: Node, pod_stats, ps_addrs):
Expand Down
18 changes: 15 additions & 3 deletions dlrover/python/scheduler/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,7 @@ def create_service(
selector: Dict[str, str],
owner_ref: client.V1OwnerReference,
retry_num=5,
patch_if_exists: bool = True,
):
"""
Create a new service if the service dose not exist, otherwise
Expand All @@ -517,10 +518,16 @@ def create_service(
owner_ref=owner_ref,
)

if not self._k8s_client.get_service(name):
svc = self.get_service(name)
if not svc:
return self._create_new_service(service, retry_num)
else:
return self._patch_service(name, service, retry_num)
if patch_if_exists:
return self._patch_service(name, service, retry_num)
return True

def get_service(self, name):
return self._k8s_client.get_service(name)

def _create_new_service(self, service: client.V1Service, retry_num: int):
for _ in range(retry_num):
Expand All @@ -547,6 +554,7 @@ def _create_service_obj(
target_port: int,
selector: Dict[str, str],
owner_ref: client.V1OwnerReference,
port_name="grpc",
):
labels = {
"app": ElasticJobLabel.APP_NAME,
Expand All @@ -564,7 +572,11 @@ def _create_service_obj(
namespace=self._namespace,
)
spec = client.V1ServiceSpec(
ports=[client.V1ServicePort(port=port, target_port=target_port)],
ports=[
client.V1ServicePort(
name=port_name, port=port, target_port=target_port
)
],
selector=selector,
type=None,
)
Expand Down
22 changes: 22 additions & 0 deletions dlrover/python/tests/test_k8s_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,33 @@ def test_set_container_resource(self):
self.assertEqual(container.resources.limits["cpu"], 4)
self.assertEqual(container.resources.limits["memory"], "1024Mi")

def test_service_exists(self):
fac = k8sServiceFactory("dlrover", "test")
fac.get_service = unittest.mock.Mock()
fac.get_service.return_value = fac._create_service_obj(
"test-master", 12345, 34567, {}, None
)
self.assertTrue(
fac.create_service("test-master", 12345, 34567, {}, None)
)
self.assertTrue(
fac.create_service(
"test-master", 12345, 34567, {}, None, patch_if_exists=False
)
)

def test_service_factory(self):
fac = k8sServiceFactory("dlrover", "test")
self.assertTrue(
fac.create_service("test-master", 12345, 34567, {}, None)
)

svc = fac._create_service_obj("test-master", 12345, 34567, {}, None)
self.assertEqual(svc.spec.ports[0].name, "grpc")
svc.spec.ports[0].name = "http"
succeed = fac._patch_service("test-master", svc, 5)
self.assertTrue(succeed)
self.assertEqual(svc.spec.ports[0].name, "http")

def test_client(self):
client = k8sClient.singleton_instance("default")
Expand Down

0 comments on commit c2cc6d9

Please sign in to comment.