Skip to content

Commit

Permalink
Record the start running time stamp in PodInfo and sort the live work…
Browse files Browse the repository at this point in the history
…ers according to this property for allreduce. (#2402)

* Record the start running time stamp in PodInfo and sort the live workers according to this property.

* Fix CI failure.
  • Loading branch information
brightcoder01 committed Dec 5, 2020
1 parent 21f5dab commit b9e443e
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 6 deletions.
2 changes: 1 addition & 1 deletion elasticdl/python/master/pod_event_callbacks.py
Expand Up @@ -15,7 +15,7 @@
import collections

PodInfo = collections.namedtuple(
"PodInfo", ("type", "id", "name", "ip", "status")
"PodInfo", ("type", "id", "name", "ip", "status", "start_time")
)

ClusterContext = collections.namedtuple("ClusterContext", ("pod_manager"))
Expand Down
21 changes: 16 additions & 5 deletions elasticdl/python/master/pod_manager.py
Expand Up @@ -103,6 +103,17 @@ def _should_relaunch_killed_pod(evt_obj):
)


def _get_start_running_time_stamp(pod_status_obj):
if (
pod_status_obj.container_statuses
and pod_status_obj.container_statuses[0].state
and pod_status_obj.container_statuses[0].state.running
):
return pod_status_obj.container_statuses[0].state.running.started_at

return None


def get_image_cluster_spec(cluster_spec):
if cluster_spec:
filename = os.path.basename(cluster_spec)
Expand Down Expand Up @@ -480,6 +491,7 @@ def _event_cb(self, event):
pod_name = evt_obj.metadata.name
pod_ip = evt_obj.status.pod_ip
phase = evt_obj.status.phase
pod_start_time = _get_start_running_time_stamp(evt_obj.status)
pod_type = evt_obj.metadata.labels[ELASTICDL_REPLICA_TYPE_KEY]

if pod_type == PodType.MASTER:
Expand All @@ -502,13 +514,14 @@ def _event_cb(self, event):
return

# Update the pod status in cache
new_state = matched_pod_state_flow.to_status
new_status = matched_pod_state_flow.to_status
pod_info = PodInfo(
type=pod_type,
id=pod_id,
name=pod_name,
ip=pod_ip,
status=new_state,
status=new_status,
start_time=pod_start_time,
)
self._pod_info_cache[pod_type][pod_name] = pod_info

Expand Down Expand Up @@ -597,9 +610,7 @@ def get_alive_workers(self):

def get_alive_worker_name_addr(self):
alive_workers = self.get_alive_workers()
# TODO: Update from sorting by id to sort by
# the timestamp when the pod changes to the Running state.
alive_workers.sort(key=lambda tup: tup.id)
alive_workers.sort(key=lambda pod_info: pod_info.start_time)

return [(info.name, info.ip) for info in alive_workers]

Expand Down

0 comments on commit b9e443e

Please sign in to comment.