-
Notifications
You must be signed in to change notification settings - Fork 5.3k
/
streaming_executor_state.py
706 lines (607 loc) · 27.4 KB
/
streaming_executor_state.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
"""Contains classes that encapsulate streaming executor state.
This is split out from streaming_executor.py to facilitate better unit testing.
"""
import math
import threading
import time
from collections import defaultdict, deque
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple
import ray
from ray.data._internal.dataset_logger import DatasetLogger
from ray.data._internal.execution.autoscaling_requester import (
get_or_create_autoscaling_requester_actor,
)
from ray.data._internal.execution.backpressure_policy import BackpressurePolicy
from ray.data._internal.execution.interfaces import (
ExecutionOptions,
ExecutionResources,
PhysicalOperator,
RefBundle,
)
from ray.data._internal.execution.interfaces.physical_operator import (
DataOpTask,
MetadataOpTask,
OpTask,
Waitable,
)
from ray.data._internal.execution.operators.base_physical_operator import (
AllToAllOperator,
)
from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer
from ray.data._internal.execution.resource_manager import ResourceManager
from ray.data._internal.execution.util import memory_string
from ray.data._internal.progress_bar import ProgressBar
from ray.data.context import DataContext
logger = DatasetLogger(__name__)
# Holds the full execution state of the streaming topology. It's a dict mapping each
# operator to tracked streaming exec state.
Topology = Dict[PhysicalOperator, "OpState"]
# Min number of seconds between two autoscaling requests.
MIN_GAP_BETWEEN_AUTOSCALING_REQUESTS = 20
@dataclass
class AutoscalingState:
"""State of the interaction between an executor and Ray autoscaler."""
# The timestamp of the latest resource request made to Ray autoscaler
# by an executor.
last_request_ts: int = 0
class OpBufferQueue:
"""A FIFO queue to buffer RefBundles between upstream and downstream operators.
This class is thread-safe.
"""
def __init__(self):
self._memory_usage = 0
self._num_blocks = 0
self._queue = deque()
self._num_per_split = defaultdict(int)
self._lock = threading.Lock()
# Used to buffer output RefBundles indexed by output splits.
self._outputs_by_split = defaultdict(deque)
super().__init__()
@property
def memory_usage(self) -> int:
"""The total memory usage of the queue in bytes."""
with self._lock:
return self._memory_usage
@property
def num_blocks(self) -> int:
"""The total number of blocks in the queue."""
with self._lock:
return self._num_blocks
def __len__(self):
return len(self._queue)
def has_next(self, output_split_idx: Optional[int] = None) -> bool:
"""Whether next RefBundle is available.
Args:
output_split_idx: If specified, only check ref bundles with the
given output split.
"""
if output_split_idx is None:
return len(self._queue) > 0
else:
with self._lock:
return self._num_per_split[output_split_idx] > 0
def append(self, ref: RefBundle):
"""Append a RefBundle to the queue."""
self._queue.append(ref)
with self._lock:
self._memory_usage += ref.size_bytes()
self._num_blocks += len(ref.blocks)
if ref.output_split_idx is not None:
self._num_per_split[ref.output_split_idx] += 1
def pop(self, output_split_idx: Optional[int] = None) -> Optional[RefBundle]:
"""Pop a RefBundle from the queue.
Args:
output_split_idx: If specified, only pop a RefBundle
with the given output split.
Returns:
A RefBundle if available, otherwise None.
"""
ret = None
if output_split_idx is None:
try:
ret = self._queue.popleft()
except IndexError:
pass
else:
with self._lock:
split_queue = self._outputs_by_split[output_split_idx]
if len(split_queue) == 0:
# Move all ref bundles to their indexed queues
# Note, the reason why we do indexing here instead of in the append
# is because only the last `OpBufferQueue` in the DAG, which will call
# pop with output_split_idx, needs indexing.
# If we also index the `OpBufferQueue`s in the middle, we cannot
# preserve the order of ref bundles with different output splits.
with self._lock:
while len(self._queue) > 0:
ref = self._queue.popleft()
self._outputs_by_split[ref.output_split_idx].append(ref)
try:
ret = split_queue.popleft()
except IndexError:
pass
if ret is None:
return None
with self._lock:
self._memory_usage -= ret.size_bytes()
self._num_blocks -= len(ret.blocks)
if ret.output_split_idx is not None:
self._num_per_split[ret.output_split_idx] -= 1
return ret
def clear(self):
with self._lock:
self._queue.clear()
self._memory_usage = 0
self._num_blocks = 0
self._num_per_split.clear()
class OpState:
"""The execution state tracked for each PhysicalOperator.
This tracks state to manage input and output buffering for StreamingExecutor and
progress bars, which is separate from execution state internal to the operators.
Note: we use the `deque` data structure here because it is thread-safe, enabling
operator queues to be shared across threads.
"""
def __init__(self, op: PhysicalOperator, inqueues: List[OpBufferQueue]):
# Each inqueue is connected to another operator's outqueue.
assert len(inqueues) == len(op.input_dependencies), (op, inqueues)
self.inqueues: List[OpBufferQueue] = inqueues
# The outqueue is connected to another operator's inqueue (they physically
# share the same Python list reference).
#
# Note: this queue is also accessed concurrently from the consumer thread.
# (in addition to the streaming executor thread). Hence, it must be a
# thread-safe type such as `deque`.
self.outqueue: OpBufferQueue = OpBufferQueue()
self.op = op
self.progress_bar = None
self.num_completed_tasks = 0
self.inputs_done_called = False
# Tracks whether `input_done` is called for each input op.
self.input_done_called = [False] * len(op.input_dependencies)
# Used for StreamingExecutor to signal exception or end of execution
self._finished: bool = False
self._exception: Optional[Exception] = None
def __repr__(self):
return f"OpState({self.op.name})"
def initialize_progress_bars(self, index: int, verbose_progress: bool) -> int:
"""Create progress bars at the given index (line offset in console).
For AllToAllOperator, zero or more sub progress bar would be created.
Return the number of progress bars created for this operator.
"""
is_all_to_all = isinstance(self.op, AllToAllOperator)
# Only show 1:1 ops when in verbose progress mode.
enabled = verbose_progress or is_all_to_all
self.progress_bar = ProgressBar(
"- " + self.op.name,
self.op.num_outputs_total(),
index,
enabled=enabled,
)
if enabled:
num_bars = 1
if is_all_to_all:
num_bars += self.op.initialize_sub_progress_bars(index + 1)
else:
num_bars = 0
return num_bars
def close_progress_bars(self):
"""Close all progress bars for this operator."""
if self.progress_bar:
self.progress_bar.close()
if isinstance(self.op, AllToAllOperator):
self.op.close_sub_progress_bars()
def num_queued(self) -> int:
"""Return the number of queued bundles across all inqueues."""
return sum(len(q) for q in self.inqueues)
def num_processing(self):
"""Return the number of bundles currently in processing for this operator."""
return self.op.num_active_tasks() + self.op.internal_queue_size()
def add_output(self, ref: RefBundle) -> None:
"""Move a bundle produced by the operator to its outqueue."""
self.outqueue.append(ref)
self.num_completed_tasks += 1
if self.progress_bar:
self.progress_bar.update(1, self.op._estimated_output_blocks)
def refresh_progress_bar(self, resource_manager: ResourceManager) -> None:
"""Update the console with the latest operator progress."""
if self.progress_bar:
self.progress_bar.set_description(self.summary_str(resource_manager))
def summary_str(self, resource_manager: ResourceManager) -> str:
queued = self.num_queued() + self.op.internal_queue_size()
active = self.op.num_active_tasks()
desc = f"- {self.op.name}: {active} active, {queued} queued"
mem = memory_string(resource_manager.get_op_usage(self.op).object_store_memory)
desc += f", {mem} objects"
suffix = self.op.progress_str()
if suffix:
desc += f", {suffix}"
return desc
def dispatch_next_task(self) -> None:
"""Move a bundle from the operator inqueue to the operator itself."""
for i, inqueue in enumerate(self.inqueues):
ref = inqueue.pop()
if ref is not None:
self.op.add_input(ref, input_index=i)
return
assert False, "Nothing to dispatch"
def get_output_blocking(self, output_split_idx: Optional[int]) -> RefBundle:
"""Get an item from this node's output queue, blocking as needed.
Returns:
The RefBundle from the output queue, or an error / end of stream indicator.
Raises:
StopIteration: If all outputs are already consumed.
Exception: If there was an exception raised during execution.
"""
while True:
# Check if StreamingExecutor has caught an exception or is done execution.
if self._exception is not None:
raise self._exception
elif self._finished and not self.outqueue.has_next(output_split_idx):
raise StopIteration()
ref = self.outqueue.pop(output_split_idx)
if ref is not None:
return ref
time.sleep(0.01)
def inqueue_memory_usage(self) -> int:
"""Return the object store memory of this operator's inqueue."""
total = 0
for op, inq in zip(self.op.input_dependencies, self.inqueues):
# Exclude existing input data items from dynamic memory usage.
if not isinstance(op, InputDataBuffer):
total += inq.memory_usage
return total
def outqueue_memory_usage(self) -> int:
"""Return the object store memory of this operator's outqueue."""
return self.outqueue.memory_usage
def outqueue_num_blocks(self) -> int:
"""Return the number of blocks in this operator's outqueue."""
return self.outqueue.num_blocks
def mark_finished(self, exception: Optional[Exception] = None):
"""Marks this operator as finished. Used for exiting get_output_blocking."""
if exception is None:
self._finished = True
else:
self._exception = exception
def build_streaming_topology(
dag: PhysicalOperator, options: ExecutionOptions
) -> Tuple[Topology, int]:
"""Instantiate the streaming operator state topology for the given DAG.
This involves creating the operator state for each operator in the DAG,
registering it with this class, and wiring up the inqueues/outqueues of
dependent operator states.
Args:
dag: The operator DAG to instantiate.
options: The execution options to use to start operators.
Returns:
The topology dict holding the streaming execution state.
The number of progress bars initialized so far.
"""
topology: Topology = {}
# DFS walk to wire up operator states.
def setup_state(op: PhysicalOperator) -> OpState:
if op in topology:
raise ValueError("An operator can only be present in a topology once.")
# Wire up the input outqueues to this op's inqueues.
inqueues = []
for i, parent in enumerate(op.input_dependencies):
parent_state = setup_state(parent)
inqueues.append(parent_state.outqueue)
# Create state.
op_state = OpState(op, inqueues)
topology[op] = op_state
op.start(options)
return op_state
setup_state(dag)
# Create the progress bars starting from the first operator to run.
# Note that the topology dict is in topological sort order. Index zero is reserved
# for global progress information.
i = 1
for op_state in list(topology.values()):
if not isinstance(op_state.op, InputDataBuffer):
i += op_state.initialize_progress_bars(i, options.verbose_progress)
return (topology, i)
def process_completed_tasks(
topology: Topology,
backpressure_policies: List[BackpressurePolicy],
max_errored_blocks: int,
) -> int:
"""Process any newly completed tasks. To update operator
states, call `update_operator_states()` afterwards.
Args:
topology: The toplogy of operators.
backpressure_policies: The backpressure policies to use.
max_errored_blocks: Max number of errored blocks to allow,
unlimited if negative.
Returns:
The number of errored blocks.
"""
# All active tasks, keyed by their waitables.
active_tasks: Dict[Waitable, Tuple[OpState, OpTask]] = {}
for op, state in topology.items():
for task in op.get_active_tasks():
active_tasks[task.get_waitable()] = (state, task)
max_blocks_to_read_per_op: Dict[OpState, int] = {}
for policy in backpressure_policies:
res = policy.calculate_max_blocks_to_read_per_op(topology)
if len(res) > 0:
if len(max_blocks_to_read_per_op) > 0:
raise ValueError(
"At most one backpressure policy that implements "
"calculate_max_blocks_to_read_per_op() can be used at a time."
)
else:
max_blocks_to_read_per_op = res
# Process completed Ray tasks and notify operators.
num_errored_blocks = 0
if active_tasks:
ready, _ = ray.wait(
list(active_tasks.keys()),
num_returns=len(active_tasks),
fetch_local=False,
timeout=0.1,
)
# Organize tasks by the operator they belong to, and sort them by task index.
# So that we'll process them in a deterministic order.
# This is because some backpressure policies (e.g.,
# StreamingOutputBackpressurePolicy) may limit the number of blocks to read
# per operator. In this case, we want to have fewer tasks finish quickly and
# yield resources, instead of having all tasks output blocks together.
ready_tasks_by_op = defaultdict(list)
for ref in ready:
state, task = active_tasks[ref]
ready_tasks_by_op[state].append(task)
for state, ready_tasks in ready_tasks_by_op.items():
ready_tasks = sorted(ready_tasks, key=lambda t: t.task_index())
for task in ready_tasks:
if isinstance(task, DataOpTask):
try:
num_blocks_read = task.on_data_ready(
max_blocks_to_read_per_op.get(state, None)
)
if state in max_blocks_to_read_per_op:
max_blocks_to_read_per_op[state] -= num_blocks_read
except Exception as e:
num_errored_blocks += 1
should_ignore = (
max_errored_blocks < 0
or max_errored_blocks >= num_errored_blocks
)
error_message = (
"An exception was raised from a task of "
f'operator "{state.op.name}".'
)
if should_ignore:
remaining = (
max_errored_blocks - num_errored_blocks
if max_errored_blocks >= 0
else "unlimited"
)
error_message += (
" Ignoring this exception with remaining"
f" max_errored_blocks={remaining}."
)
logger.get_logger().warning(error_message, exc_info=e)
else:
error_message += (
" Dataset execution will now abort."
" To ignore this exception and continue, set"
" DataContext.max_errored_blocks."
)
logger.get_logger().error(error_message)
raise e from None
else:
assert isinstance(task, MetadataOpTask)
task.on_task_finished()
# Pull any operator outputs into the streaming op state.
for op, op_state in topology.items():
while op.has_next():
op_state.add_output(op.get_next())
return num_errored_blocks
def update_operator_states(topology: Topology) -> None:
"""Update operator states accordingly for newly completed tasks.
Should be called after `process_completed_tasks()`."""
# Call inputs_done() on ops where no more inputs are coming.
for op, op_state in topology.items():
if op_state.inputs_done_called:
continue
all_inputs_done = True
for idx, dep in enumerate(op.input_dependencies):
if dep.completed() and not topology[dep].outqueue:
if not op_state.input_done_called[idx]:
op.input_done(idx)
op_state.input_done_called[idx] = True
else:
all_inputs_done = False
if all_inputs_done:
op.all_inputs_done()
op_state.inputs_done_called = True
# Traverse the topology in reverse topological order.
# For each op, if all of its downstream operators have completed.
# call mark_execution_completed() to also complete this op.
for op, op_state in reversed(list(topology.items())):
if op.completed():
continue
dependents_completed = len(op.output_dependencies) > 0 and all(
dep.completed() for dep in op.output_dependencies
)
if dependents_completed:
op.mark_execution_completed()
def select_operator_to_run(
topology: Topology,
resource_manager: ResourceManager,
backpressure_policies: List[BackpressurePolicy],
ensure_at_least_one_running: bool,
execution_id: str,
autoscaling_state: AutoscalingState,
) -> Optional[PhysicalOperator]:
"""Select an operator to run, if possible.
The objective of this function is to maximize the throughput of the overall
pipeline, subject to defined memory and parallelism limits.
This is currently implemented by applying backpressure on operators that are
producing outputs faster than they are consuming them `len(outqueue)`, as well as
operators with a large number of running tasks `num_processing()`.
Note that memory limits also apply to the outqueue of the output operator. This
provides backpressure if the consumer is slow. However, once a bundle is returned
to the user, it is no longer tracked.
"""
# Filter to ops that are eligible for execution.
ops = []
for op, state in topology.items():
under_resource_limits = _execution_allowed(op, resource_manager)
if (
under_resource_limits
and not op.completed()
and state.num_queued() > 0
and op.should_add_input()
and all(p.can_add_input(op) for p in backpressure_policies)
):
ops.append(op)
# Update the op in all cases to enable internal autoscaling, etc.
op.notify_resource_usage(state.num_queued(), under_resource_limits)
# If no ops are allowed to execute due to resource constraints, try to trigger
# cluster scale-up.
if not ops and any(state.num_queued() > 0 for state in topology.values()):
now = time.time()
if (
now
> autoscaling_state.last_request_ts + MIN_GAP_BETWEEN_AUTOSCALING_REQUESTS
):
autoscaling_state.last_request_ts = now
_try_to_scale_up_cluster(topology, execution_id)
# To ensure liveness, allow at least 1 op to run regardless of limits. This is
# gated on `ensure_at_least_one_running`, which is set if the consumer is blocked.
if (
ensure_at_least_one_running
and not ops
and all(op.num_active_tasks() == 0 for op in topology)
):
# The topology is entirely idle, so choose from all ready ops ignoring limits.
ops = [
op
for op, state in topology.items()
if state.num_queued() > 0 and not op.completed()
]
# Nothing to run.
if not ops:
return None
# Run metadata-only operators first. After that, equally penalize outqueue length
# and num bundles processing for backpressure.
return min(
ops,
key=lambda op: (
not op.throttling_disabled(),
len(topology[op].outqueue) + topology[op].num_processing(),
),
)
def _try_to_scale_up_cluster(topology: Topology, execution_id: str):
"""Try to scale up the cluster to accomodate the provided in-progress workload.
This makes a resource request to Ray's autoscaler consisting of the current,
aggregate usage of all operators in the DAG + the incremental usage of all operators
that are ready for dispatch (i.e. that have inputs queued). If the autoscaler were
to grant this resource request, it would allow us to dispatch one task for every
ready operator.
Note that this resource request does not take the global resource limits or the
liveness policy into account; it only tries to make the existing resource usage +
one more task per ready operator feasible in the cluster.
Args:
topology: The execution state of the in-progress workload for which we wish to
request more resources.
"""
# Get resource usage for all ops + additional resources needed to launch one more
# task for each ready op.
resource_request = []
def to_bundle(resource: ExecutionResources) -> Dict:
req = {}
if resource.cpu:
req["CPU"] = math.ceil(resource.cpu)
if resource.gpu:
req["GPU"] = math.ceil(resource.gpu)
return req
for op, state in topology.items():
per_task_resource = op.incremental_resource_usage()
task_bundle = to_bundle(per_task_resource)
resource_request.extend([task_bundle] * op.num_active_tasks())
# Only include incremental resource usage for ops that are ready for
# dispatch.
if state.num_queued() > 0:
# TODO(Clark): Scale up more aggressively by adding incremental resource
# usage for more than one bundle in the queue for this op?
resource_request.append(task_bundle)
# Make autoscaler resource request.
actor = get_or_create_autoscaling_requester_actor()
actor.request_resources.remote(resource_request, execution_id)
def _execution_allowed(op: PhysicalOperator, resource_manager: ResourceManager) -> bool:
"""Return whether an operator is allowed to execute given resource usage.
Operators are throttled globally based on CPU and GPU limits for the stream.
For an N operator DAG, we only throttle the kth operator (in the source-to-sink
ordering) on object store utilization if the cumulative object store utilization
for the kth operator and every operator downstream from it is greater than
k/N * global_limit; i.e., the N - k operator sub-DAG is using more object store
memory than it's share.
Args:
op: The operator to check.
resource_manager: The ResourceManager of the current dataset.
Returns:
Whether the op is allowed to run.
"""
if op.throttling_disabled():
return True
global_usage = resource_manager.get_global_usage()
global_limits = resource_manager.get_global_limits()
# To avoid starvation problems when dealing with fractional resource types,
# convert all quantities to integer (0 or 1) for deciding admissibility. This
# allows operators with non-integral requests to slightly overshoot the limit.
global_floored = ExecutionResources(
cpu=math.floor(global_usage.cpu or 0),
gpu=math.floor(global_usage.gpu or 0),
object_store_memory=global_usage.object_store_memory,
)
inc = op.incremental_resource_usage()
if inc.cpu and inc.gpu:
raise NotImplementedError(
"Operator incremental resource usage cannot specify both CPU "
"and GPU at the same time, since it may cause deadlock."
)
# Ignore the scale of CPU and GPU requests, i.e., treating them as either 1 or 0.
# This ensures operators don't get starved due to the shape of their resource
# requests.
inc_indicator = ExecutionResources(
cpu=1 if inc.cpu else 0,
gpu=1 if inc.gpu else 0,
object_store_memory=inc.object_store_memory
if DataContext.get_current().use_runtime_metrics_scheduling
else None,
)
# Under global limits; always allow.
new_usage = global_floored.add(inc_indicator)
if new_usage.satisfies_limit(global_limits):
return True
# We're over global limits, but execution may still be allowed if memory is the
# only bottleneck and this wouldn't impact downstream memory limits. This avoids
# stalling the execution for memory bottlenecks that occur upstream.
# See for more context: https://github.com/ray-project/ray/pull/32673
global_limits_sans_memory = ExecutionResources(
cpu=global_limits.cpu, gpu=global_limits.gpu
)
global_ok_sans_memory = new_usage.satisfies_limit(global_limits_sans_memory)
downstream_memory = resource_manager.get_downstream_object_store_memory(op)
if (
DataContext.get_current().use_runtime_metrics_scheduling
and inc.object_store_memory
):
downstream_memory += inc.object_store_memory
downstream_limit = global_limits.scale(resource_manager.get_downstream_fraction(op))
downstream_memory_ok = ExecutionResources(
object_store_memory=downstream_memory
).satisfies_limit(downstream_limit)
# If completing a task decreases the overall object store memory usage, allow it
# even if we're over the global limit.
if (
DataContext.get_current().use_runtime_metrics_scheduling
and global_ok_sans_memory
and op.metrics.average_bytes_change_per_task is not None
and op.metrics.average_bytes_change_per_task <= 0
):
return True
return global_ok_sans_memory and downstream_memory_ok