Skip to content

Commit

Permalink
Merge branch 'main' into enh/handle_sensitive_version
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] committed Feb 15, 2023
2 parents 304e101 + 0ec1f13 commit 679c4f5
Show file tree
Hide file tree
Showing 16 changed files with 589 additions and 78 deletions.
6 changes: 3 additions & 3 deletions python/xorbits/_mars/_utils.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -453,15 +453,15 @@ cpdef long long ceildiv(long long x, long long y) nogil:


cdef class Timer:
cdef object _start
cdef readonly object start
cdef readonly object duration

def __enter__(self):
self._start = time.time()
self.start = time.time()
return self

def __exit__(self, *_):
self.duration = time.time() - self._start
self.duration = time.time() - self.start


cdef mt19937_64 _rnd_gen
Expand Down
24 changes: 19 additions & 5 deletions python/xorbits/_mars/services/scheduling/worker/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from typing import Dict, Optional

from .... import oscar as mo
from ...._utils import Timer
from ....core import ExecutionError
from ....core.graph import DAG
from ....core.operand import Fetch, FetchShuffle
Expand All @@ -36,6 +37,7 @@
from ...meta import MetaAPI
from ...storage import StorageAPI
from ...subtask import Subtask, SubtaskAPI, SubtaskResult, SubtaskStatus
from ...task.task_info_collector import TaskInfoCollector
from .quota import QuotaActor
from .workerslot import BandSlotManagerActor

Expand Down Expand Up @@ -354,13 +356,25 @@ async def internal_run_subtask(self, subtask: Subtask, band_name: str):
)
try:
logger.debug("Preparing data for subtask %s", subtask.subtask_id)
prepare_data_task = asyncio.create_task(
_retry_run(
subtask, subtask_info, self._prepare_input_data, subtask, band_name
with Timer() as timer:
prepare_data_task = asyncio.create_task(
_retry_run(
subtask,
subtask_info,
self._prepare_input_data,
subtask,
band_name,
)
)
await asyncio.wait_for(
prepare_data_task, timeout=self._data_prepare_timeout
)
collect_task_info = subtask.extra_config and subtask.extra_config.get(
"collect_task_info", False
)
await asyncio.wait_for(
prepare_data_task, timeout=self._data_prepare_timeout
task_info_collector = TaskInfoCollector(self.address, collect_task_info)
await task_info_collector.collect_fetch_time(
subtask, timer.start, timer.start + timer.duration
)

input_sizes = await self._collect_input_sizes(
Expand Down
2 changes: 1 addition & 1 deletion python/xorbits/_mars/services/session/supervisor/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ async def create_services(self):
)
if "task" in self._service_config["services"]:
self._task_api = await TaskAPI.create(
session_id=self._session_id, address=self.address
session_id=self._session_id, supervisor_address=self.address
)

async def get_last_idle_time(self):
Expand Down
1 change: 1 addition & 0 deletions python/xorbits/_mars/services/subtask/worker/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ async def _create_band_runner_actors(self, band_name: str, n_slots: int):
SubtaskRunnerActor,
band,
worker_address=self._worker_address,
slot_id=slot_id,
subtask_processor_cls=self._subtask_processor_cls,
uid=SubtaskRunnerActor.gen_uid(band_name, slot_id),
address=self.address,
Expand Down
129 changes: 85 additions & 44 deletions python/xorbits/_mars/services/subtask/worker/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@
from ....optimization.physical import optimize
from ....serialization import AioSerializer
from ....typing import BandType, ChunkType
from ....utils import calc_data_size, get_chunk_key_to_data_keys
from ....utils import Timer, calc_data_size, get_chunk_key_to_data_keys
from ...context import ThreadedServiceContext
from ...meta.api import MetaAPI, WorkerMetaAPI
from ...session import SessionAPI
from ...storage import StorageAPI
from ...task import TaskAPI, task_options
from ...task.task_info_collector import TaskInfoCollector
from ..core import Subtask, SubtaskResult, SubtaskStatus
from ..utils import get_mapper_data_keys, iter_input_data_keys, iter_output_data

Expand Down Expand Up @@ -74,6 +75,7 @@ def __init__(
meta_api: MetaAPI,
worker_meta_api: WorkerMetaAPI,
band: BandType,
slot_id: int,
supervisor_address: str,
engines: List[str] = None,
):
Expand All @@ -88,6 +90,7 @@ def __init__(
]
)
self._band = band
self._slot_id = slot_id
self._supervisor_address = supervisor_address
self._engines = engines if engines is not None else task_options.runtime_engines

Expand Down Expand Up @@ -117,6 +120,10 @@ def __init__(
self._storage_api = storage_api
self._meta_api = meta_api
self._worker_meta_api = worker_meta_api
collect_task_info = subtask.extra_config and subtask.extra_config.get(
"collect_task_info", False
)
self._task_info_collector = TaskInfoCollector(self._band[0], collect_task_info)

# add metrics
self._subtask_execution_time = Metrics.gauge(
Expand Down Expand Up @@ -231,35 +238,41 @@ def cb(fut):
to_wait.set_result(fut.result())

future.add_done_callback(cb)

try:
await to_wait
logger.debug(
"Finish executing operand: %s, chunk: %s, subtask id: %s",
chunk.op,
chunk,
self.subtask.subtask_id,
)
except asyncio.CancelledError:
logger.debug(
"Receive cancel instruction for operand: %s,"
"chunk: %s, subtask id: %s",
chunk.op,
chunk,
self.subtask.subtask_id,
)
# wait for this computation to finish
await future
# if cancelled, stop next computation
logger.debug(
"Cancelled operand: %s, chunk: %s, subtask id: %s",
chunk.op,
chunk,
self.subtask.subtask_id,
)
self.result.status = SubtaskStatus.cancelled
raise

with Timer() as timer:
try:
await to_wait
logger.debug(
"Finish executing operand: %s, chunk: %s, subtask id: %s",
chunk.op,
chunk,
self.subtask.subtask_id,
)
except asyncio.CancelledError: # pragma: no cover
logger.debug(
"Receive cancel instruction for operand: %s,"
"chunk: %s, subtask id: %s",
chunk.op,
chunk,
self.subtask.subtask_id,
)
# wait for this computation to finish
await future
# if cancelled, stop next computation
logger.debug(
"Cancelled operand: %s, chunk: %s, subtask id: %s",
chunk.op,
chunk,
self.subtask.subtask_id,
)
self.result.status = SubtaskStatus.cancelled
raise
await self._task_info_collector.collect_runtime_operand_info(
self.subtask,
timer.start,
timer.start + timer.duration,
chunk,
self._processor_context,
)
self.set_op_progress(chunk.op.key, 1.0)

for inp in chunk_graph.iter_predecessors(chunk):
Expand Down Expand Up @@ -557,29 +570,56 @@ async def run(self):
result_chunk_to_optimized[c] for c in raw_update_meta_chunks
}

cost_times = defaultdict(tuple)
# load inputs data
input_keys = await self._load_input_data()
with Timer() as timer:
input_keys = await self._load_input_data()
cost_times["load_data_time"] = (timer.start, timer.start + timer.duration)

try:
# execute chunk graph
await self._execute_graph(chunk_graph)
with Timer() as timer:
await self._execute_graph(chunk_graph)
cost_times["execute_time"] = (timer.start, timer.start + timer.duration)
finally:
# unpin inputs data
unpinned = True
await self._unpin_data(input_keys)
with Timer() as timer:
await self._unpin_data(input_keys)
cost_times["unpin_time"] = (timer.start, timer.start + timer.duration)

# store results data
(
stored_keys,
store_sizes,
memory_sizes,
data_key_to_object_id,
) = await self._store_data(chunk_graph)
with Timer() as timer:
(
stored_keys,
store_sizes,
memory_sizes,
data_key_to_object_id,
) = await self._store_data(chunk_graph)
cost_times["store_result_time"] = (
timer.start,
timer.start + timer.duration,
)

# store meta
await self._store_meta(
chunk_graph,
with Timer() as timer:
await self._store_meta(
chunk_graph,
store_sizes,
memory_sizes,
data_key_to_object_id,
update_meta_chunks,
)
cost_times["store_meta_time"] = (timer.start, timer.start + timer.duration)

await self._task_info_collector.collect_runtime_subtask_info(
self.subtask,
self._band,
self._slot_id,
stored_keys,
store_sizes,
memory_sizes,
data_key_to_object_id,
update_meta_chunks,
cost_times,
)
except asyncio.CancelledError:
self.result.status = SubtaskStatus.cancelled
Expand Down Expand Up @@ -705,7 +745,7 @@ async def _init_context(self, session_id: str) -> ThreadedServiceContext:
await context.init()
return context

async def run(self, subtask: Subtask):
async def run(self, subtask: Subtask, slot_id: int):
logger.info(
"Start to run subtask: %r on %s. chunk graph contains %s",
subtask,
Expand All @@ -725,6 +765,7 @@ async def run(self, subtask: Subtask):
self._meta_api,
self._worker_meta_api,
self._band,
slot_id,
self._supervisor_address,
)
self._processor = self._last_processor = processor
Expand Down
9 changes: 7 additions & 2 deletions python/xorbits/_mars/services/subtask/worker/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,14 @@ def gen_uid(cls, band_name: str, slot_id: int):
return f"slot_{band_name}_{slot_id}_subtask_runner"

def __init__(
self, band: BandType, worker_address: str, subtask_processor_cls: Type = None
self,
band: BandType,
worker_address: str,
slot_id: int,
subtask_processor_cls: Type = None,
):
self._band = band
self._slot_id = slot_id
self._worker_address = worker_address
self._subtask_processor_cls = self._get_subtask_processor_cls(
subtask_processor_cls
Expand Down Expand Up @@ -123,7 +128,7 @@ async def run_subtask(self, subtask: Subtask):
processor = self._session_id_to_processors[session_id]
try:
self._running_processor = self._last_processor = processor
result = yield self._running_processor.run(subtask)
result = yield self._running_processor.run(subtask, self._slot_id)
finally:
self._running_processor = None
raise mo.Return(result)
Expand Down
7 changes: 6 additions & 1 deletion python/xorbits/_mars/services/task/api/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# limitations under the License.

from abc import ABC, abstractmethod
from typing import List, Union
from typing import Dict, List, Union

from ....core import Tileable
from ..core import TaskResult, TileableGraph
Expand Down Expand Up @@ -146,3 +146,8 @@ async def get_last_idle_time(self) -> Union[float, None]:
last_idle_time: float
The last idle time if the task manager is idle else None.
"""

async def save_task_info(self, task_info: Dict, path: str):
"""
Save task information using yaml format.
"""
Loading

0 comments on commit 679c4f5

Please sign in to comment.