-
Notifications
You must be signed in to change notification settings - Fork 142
/
runtime.py
1550 lines (1371 loc) · 56.8 KB
/
runtime.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Copyright 2024 Marimo. All rights reserved.
from __future__ import annotations
import asyncio
import builtins
import contextlib
import dataclasses
import io
import itertools
import os
import pathlib
import signal
import sys
import threading
import time
import traceback
from multiprocessing import connection
from typing import TYPE_CHECKING, Any, Callable, Iterator, Optional
from marimo import _loggers
from marimo._ast.cell import CellConfig, CellId_t
from marimo._ast.compiler import compile_cell
from marimo._ast.visitor import Name, is_local
from marimo._config.config import MarimoConfig
from marimo._messaging.cell_output import CellChannel
from marimo._messaging.errors import (
Error,
MarimoAncestorStoppedError,
MarimoExceptionRaisedError,
MarimoInterruptionError,
MarimoSyntaxError,
UnknownError,
)
from marimo._messaging.ops import (
Alert,
CellOp,
CompletedRun,
FunctionCallResult,
HumanReadableStatus,
InstallingPackageAlert,
MissingPackageAlert,
PackageStatusType,
RemoveUIElements,
VariableDeclaration,
Variables,
VariableValue,
VariableValues,
)
from marimo._messaging.streams import (
ThreadSafeStderr,
ThreadSafeStdin,
ThreadSafeStdout,
ThreadSafeStream,
)
from marimo._messaging.tracebacks import write_traceback
from marimo._messaging.types import (
KernelMessage,
Stderr,
Stdin,
Stdout,
Stream,
)
from marimo._output import formatting
from marimo._output.rich_help import mddoc
from marimo._plugins.core.web_component import JSONType
from marimo._plugins.ui._core.ui_element import MarimoConvertValueException
from marimo._runtime import (
cell_runner,
dataflow,
handlers,
marimo_pdb,
patches,
)
from marimo._runtime.complete import complete, completion_worker
from marimo._runtime.context import (
ContextNotInitializedError,
ExecutionContext,
get_context,
get_global_context,
)
from marimo._runtime.context.kernel_context import initialize_kernel_context
from marimo._runtime.control_flow import MarimoInterrupt, MarimoStopError
from marimo._runtime.input_override import input_override
from marimo._runtime.packages.module_registry import ModuleRegistry
from marimo._runtime.packages.package_manager import PackageManager
from marimo._runtime.packages.package_managers import create_package_manager
from marimo._runtime.packages.utils import is_python_isolated
from marimo._runtime.params import CLIArgs, QueryParams
from marimo._runtime.redirect_streams import redirect_streams
from marimo._runtime.reload.autoreload import ModuleReloader
from marimo._runtime.reload.module_watcher import ModuleWatcher
from marimo._runtime.requests import (
AppMetadata,
CompletionRequest,
ControlRequest,
CreationRequest,
DeleteRequest,
ExecuteMultipleRequest,
ExecuteStaleRequest,
ExecutionRequest,
FunctionCallRequest,
InstallMissingPackagesRequest,
SetCellConfigRequest,
SetUIElementValueRequest,
SetUserConfigRequest,
StopRequest,
)
from marimo._runtime.state import State
from marimo._runtime.validate_graph import check_for_errors
from marimo._server.types import QueueType
from marimo._utils.platform import is_pyodide
from marimo._utils.signals import restore_signals
from marimo._utils.typed_connection import TypedConnection
if TYPE_CHECKING:
from collections.abc import Iterable, Sequence
LOGGER = _loggers.marimo_logger()
@mddoc
def defs() -> tuple[str, ...]:
"""Get the definitions of the currently executing cell.
**Returns**:
- tuple of the currently executing cell's defs.
"""
try:
ctx = get_context()
except ContextNotInitializedError:
return tuple()
if ctx.execution_context is not None:
return tuple(
sorted(
defn
for defn in ctx.graph.cells[ctx.execution_context.cell_id].defs
)
)
return tuple()
@mddoc
def refs() -> tuple[str, ...]:
"""Get the references of the currently executing cell.
**Returns**:
- tuple of the currently executing cell's refs.
"""
try:
ctx = get_context()
except ContextNotInitializedError:
return tuple()
# builtins that have not been shadowed by the user
unshadowed_builtins = set(builtins.__dict__.keys()).difference(
set(ctx.graph.definitions.keys())
)
if ctx.execution_context is not None:
return tuple(
sorted(
defn
for defn in ctx.graph.cells[ctx.execution_context.cell_id].refs
# exclude builtins that have not been shadowed
if defn not in unshadowed_builtins
)
)
return tuple()
@mddoc
def query_params() -> QueryParams:
"""Get the query parameters of a marimo app.
**Examples**:
Keep the text input in sync with the URL query parameters.
```python3
# In it's own cell
query_params = mo.query_params()
# In another cell
search = mo.ui.text(
value=query_params["search"] or "",
on_change=lambda value: query_params.set("search", value),
)
search
```
You can also set the query parameters reactively:
```python3
toggle = mo.ui.switch(label="Toggle me")
toggle
# In another cell
query_params["is_enabled"] = toggle.value
```
**Returns**:
- A `QueryParams` object containing the query parameters.
You can directly interact with this object like a dictionary.
If you mutate this object, changes will be persisted to the frontend
query parameters and any other cells referencing the query parameters
will automatically re-run.
"""
return get_context().query_params
@mddoc
def cli_args() -> CLIArgs:
"""Get the command line arguments of a marimo notebook.
**Examples**:
`marimo edit notebook.py -- -size 10`
```python3
# Access the command line arguments
size = mo.cli_args().get("size") or 100
for i in range(size):
print(i)
```
**Returns**:
- A dictionary containing the command line arguments.
This dictionary is read-only and cannot be mutated.
"""
return get_context().cli_args
@mddoc
def running_in_notebook() -> bool:
"""Returns True if running in a marimo notebook, False otherwise"""
from marimo._runtime.context.kernel_context import KernelRuntimeContext
try:
ctx = get_context()
except ContextNotInitializedError:
return False
else:
return isinstance(ctx, KernelRuntimeContext)
@dataclasses.dataclass
class CellMetadata:
"""CellMetadata
Metadata the kernel needs to persist, even when a cell is removed
from the graph or when a cell can't be formed from user code due to syntax
errors.
"""
config: CellConfig = dataclasses.field(default_factory=CellConfig)
class Kernel:
"""Kernel that manages the dependency graph and its execution.
Args:
- cell_configs: initial configuration for each cell
- app_metadata: metadata about the notebook
- user_config: the initial user configuration
- stream: object used to communicate with the server/outside world
- stdout: replacement for sys.stdout
- stderr: replacement for sys.stderr
- stdin: replacement for sys.stdin
- input_override: a function that overrides the builtin input() function
- debugger_override: a replacement for the built-in Pdb
- execute_stale_cells_callback: callback to enqueue a stale cells request
"""
def __init__(
self,
cell_configs: dict[CellId_t, CellConfig],
app_metadata: AppMetadata,
user_config: MarimoConfig,
stream: Stream,
stdout: Stdout | None,
stderr: Stderr | None,
stdin: Stdin | None,
execute_stale_cells_callback: Callable[[], None],
input_override: Callable[[Any], str] = input_override,
debugger_override: marimo_pdb.MarimoPdb | None = None,
) -> None:
self.app_metadata = app_metadata
self.query_params = QueryParams(app_metadata.query_params)
self.cli_args = CLIArgs(app_metadata.cli_args)
self.stream = stream
self.stdout = stdout
self.stderr = stderr
self.stdin = stdin
self.execute_stale_cells_callback = execute_stale_cells_callback
self.debugger = debugger_override
if self.debugger is not None:
patches.patch_pdb(self.debugger)
self._module = patches.patch_main_module(
file=self.app_metadata.filename, input_override=input_override
)
if self.app_metadata.filename is not None:
try:
notebook_directory = str(
pathlib.Path(self.app_metadata.filename).parent.absolute()
)
if notebook_directory not in sys.path:
sys.path.insert(0, notebook_directory)
except Exception as e:
LOGGER.warning(
"Failed to add directory to path (error %e)", str(e)
)
self.graph = dataflow.DirectedGraph()
self.cell_metadata: dict[CellId_t, CellMetadata] = {
cell_id: CellMetadata(config=config)
for cell_id, config in cell_configs.items()
}
self.module_registry = ModuleRegistry(
self.graph, excluded_modules=set()
)
self.package_manager: PackageManager | None = None
self.module_reloader: ModuleReloader | None = None
self.module_watcher: ModuleWatcher | None = None
# Load runtime settings from user config
self._update_runtime_from_user_config(user_config)
# Set up the execution context
self.execution_context: Optional[ExecutionContext] = None
# initializers to override construction of ui elements
self.ui_initializers: dict[str, Any] = {}
# errored cells
self.errors: dict[CellId_t, tuple[Error, ...]] = {}
# Mapping from state to the cell when its setter
# was invoked. New state updates evict older ones.
self.state_updates: dict[State[Any], CellId_t] = {}
if not is_pyodide():
patches.patch_micropip(self.globals)
# an empty string represents the current directory
exec("import sys; sys.path.append(''); del sys", self.globals)
exec("import marimo as __marimo__", self.globals)
def _update_runtime_from_user_config(self, config: MarimoConfig) -> None:
package_manager = config["package_management"]["manager"]
autoreload_mode = config["runtime"]["auto_reload"]
if (
self.package_manager is None
or package_manager != self.package_manager.name
):
self.package_manager = create_package_manager(package_manager)
if autoreload_mode == "detect" or autoreload_mode == "autorun":
if self.module_reloader is None:
self.module_reloader = ModuleReloader()
if (
self.module_watcher is not None
and self.module_watcher.mode != autoreload_mode
):
self.module_watcher.stop()
self.module_watcher = None
if self.module_watcher is None:
self.module_watcher = ModuleWatcher(
self.graph,
enqueue_run_stale_cells=self.execute_stale_cells_callback,
mode=autoreload_mode,
stream=self.stream,
)
else:
self.module_reloader = None
if self.module_watcher is not None:
self.module_watcher.stop()
self.user_config = config
@property
def globals(self) -> dict[Any, Any]:
return self._module.__dict__
def start_completion_worker(
self, completion_queue: QueueType[CompletionRequest]
) -> None:
"""Must be called after context is initialized"""
threading.Thread(
target=completion_worker,
args=(completion_queue, self.graph, get_context().stream),
daemon=True,
).start()
def code_completion(
self, request: CompletionRequest, docstrings_limit: int
) -> None:
complete(request, self.graph, get_context().stream, docstrings_limit)
@contextlib.contextmanager
def _install_execution_context(
self, cell_id: CellId_t, setting_element_value: bool = False
) -> Iterator[ExecutionContext]:
self.execution_context = ExecutionContext(
cell_id, setting_element_value
)
with get_context().provide_ui_ids(str(cell_id)), redirect_streams(
cell_id,
stream=self.stream,
stdout=self.stdout,
stderr=self.stderr,
stdin=self.stdin,
):
modules = None
try:
if self.module_reloader is not None:
# Reload modules if they have changed
modules = set(sys.modules)
self.module_reloader.check(
modules=sys.modules, reload=True
)
yield self.execution_context
finally:
self.execution_context = None
if self.module_reloader is not None and modules is not None:
# Note timestamps for newly loaded modules
new_modules = set(sys.modules) - modules
self.module_reloader.check(
modules={m: sys.modules[m] for m in new_modules},
reload=False,
)
def _try_registering_cell(
self, cell_id: CellId_t, code: str
) -> Optional[Error]:
"""Attempt to register a cell with given id and code.
Precondition: a cell with the supplied id must not already exist in the
graph.
If cell was unable to be registered, returns an Error object.
"""
error: Optional[Error] = None
try:
cell = compile_cell(code, cell_id=cell_id)
except Exception as e:
cell = None
if isinstance(e, SyntaxError):
tmpio = io.StringIO()
traceback.print_exc(file=tmpio, limit=0)
tmpio.seek(0)
syntax_error = tmpio.read().split("\n")
# first line has the form File XXX, line XXX
syntax_error[0] = syntax_error[0][
syntax_error[0].find("line") :
]
error = MarimoSyntaxError(msg="\n".join(syntax_error))
else:
tmpio = io.StringIO()
traceback.print_exc(file=tmpio)
tmpio.seek(0)
error = UnknownError(msg=tmpio.read())
if cell_id in self.cell_metadata and cell is not None:
# If we already have a config for this cell id, restore it
# This can happen when a cell was previously deactivated (due to a
# syntax error or multiple definition error, for example) and then
# re-registered
cell.configure(self.cell_metadata[cell_id].config)
elif cell_id not in self.cell_metadata:
self.cell_metadata[cell_id] = CellMetadata()
if cell is not None:
self.graph.register_cell(cell_id, cell)
LOGGER.debug("registered cell %s", cell_id)
LOGGER.debug("parents: %s", self.graph.parents[cell_id])
LOGGER.debug("children: %s", self.graph.children[cell_id])
return error
def _maybe_register_cell(
self, cell_id: CellId_t, code: str
) -> tuple[set[CellId_t], Optional[Error]]:
"""Register a cell (given by id, code) if not already registered.
If a cell with id `cell_id` is already registered but with different
code, that cell is deleted from the graph and a new cell with the
same id but different code is registered.
Returns:
- a set of ids for cells that were previously children of `cell_id`;
only non-empty when `cell-id` was already registered but with
different code.
- an `Error` if the cell couldn't be registered, `None` otherwise
"""
previous_children: set[CellId_t] = set()
error = None
if not self.graph.is_cell_cached(cell_id, code):
if cell_id in self.graph.cells:
LOGGER.debug("Deleting cell %s", cell_id)
previous_children = self._deactivate_cell(cell_id)
error = self._try_registering_cell(cell_id, code)
LOGGER.debug(
"graph:\n\tcell id %s\n\tparents %s\n\tchildren %s\n\tsiblings %s",
cell_id,
self.graph.parents,
self.graph.children,
self.graph.siblings,
)
return previous_children, error
def _delete_names(
self, names: Iterable[Name], exclude_defs: set[Name]
) -> None:
"""Delete `names` from kernel, except for `exclude_defs`"""
for name in names:
if name in exclude_defs:
continue
if name in self.globals:
del self.globals[name]
if (
"__annotations__" in self.globals
and name in self.globals["__annotations__"]
):
del self.globals["__annotations__"][name]
def _invalidate_cell_state(
self,
cell_id: CellId_t,
exclude_defs: Optional[set[Name]] = None,
deletion: bool = False,
) -> None:
"""Cleanup state associated with this cell.
Deletes a cell's defs from the kernel state, except for the names in
`exclude_defs`, and instructs the frontend to invalidate its UI
elements.
"""
missing_modules_before_deletion = (
self.module_registry.missing_modules()
)
defs_to_delete = self.graph.cells[cell_id].defs
self._delete_names(
defs_to_delete, exclude_defs if exclude_defs is not None else set()
)
missing_modules_after_deletion = (
missing_modules_before_deletion & self.module_registry.modules()
)
if (
self.package_manager is not None
and missing_modules_after_deletion
!= missing_modules_before_deletion
):
# Deleting a cell can make the set of missing packages smaller
MissingPackageAlert(
packages=list(
sorted(
self.package_manager.module_to_package(mod)
for mod in missing_modules_after_deletion
)
),
isolated=is_python_isolated(),
).broadcast()
get_context().cell_lifecycle_registry.dispose(
cell_id, deletion=deletion
)
RemoveUIElements(cell_id=cell_id).broadcast()
def _deactivate_cell(self, cell_id: CellId_t) -> set[CellId_t]:
"""Deactivate: remove from graph, invalidate state, but keep metadata
Keeps the cell's config, in case we see the same cell again.
In contrast to deleting a cell, which fully scrubs the cell
from the kernel and graph.
"""
if cell_id not in self.errors:
self._invalidate_cell_state(cell_id, deletion=True)
return self.graph.delete_cell(cell_id)
else:
# An errored cell can be thought of as a cell that's in the graph
# but that has no state in the kernel (because it was never run).
# Its defs may overlap with defs of a non-errored cell, so we MUST
# NOT delete/cleanup its defs from the kernel (i.e., an errored
# cell shouldn't invalidate state of another cell).
self.graph.delete_cell(cell_id)
return set()
def _delete_cell(self, cell_id: CellId_t) -> set[CellId_t]:
"""Delete a cell from the kernel and the graph.
Deletion from the kernel involves removing cell's defs and
de-registering its UI Elements.
Deletion from graph is forwarded to graph object.
"""
del self.cell_metadata[cell_id]
return self._deactivate_cell(cell_id)
def mutate_graph(
self,
execution_requests: Sequence[ExecutionRequest],
deletion_requests: Sequence[DeleteRequest],
) -> set[CellId_t]:
"""Add and remove cells to/from the graph.
This method adds the cells in `execution_requests` to the kernel's
graph (deleting old versions of these cells, if any), and removes the
cells in `deletion_requests` from the kernel's graph.
The mutations that this method makes to the graph renders the
kernel inconsistent (stale).
This method does not register errors for cells that were previously
valid and are not descendants of any of the newly registered cells.
This is important for multiple definition errors, since a user may
absent-mindedly redefine an existing name when creating a new cell:
such a mistake shouldn't invalidate the program state.
Returns
- set of cells that must be run to return kernel to consistent state
"""
LOGGER.debug("Current set of errors: %s", self.errors)
cells_before_mutation = set(self.graph.cells.keys())
cells_with_errors_before_mutation = set(self.errors.keys())
# The set of cells that were successfully registered
registered_cell_ids: set[CellId_t] = set()
# The set of cells that need to be re-run due to cells being
# deleted/re-registered.
cells_that_were_children_of_mutated_cells: set[CellId_t] = set()
# Cells that were unable to be added to the graph due to syntax errors
syntax_errors: dict[CellId_t, Error] = {}
# Register and delete cells
for er in execution_requests:
old_children, error = self._maybe_register_cell(
er.cell_id, er.code
)
cells_that_were_children_of_mutated_cells |= old_children
if error is None:
registered_cell_ids.add(er.cell_id)
else:
syntax_errors[er.cell_id] = error
for dr in deletion_requests:
cells_that_were_children_of_mutated_cells |= self._delete_cell(
dr.cell_id
)
cells_in_graph = set(self.graph.cells.keys())
# Check for semantic errors, like multiple definition errors, cycle
# errors, and delete nonlocal errors.
semantic_errors = check_for_errors(self.graph)
LOGGER.debug("After mutation, syntax errors %s", syntax_errors)
LOGGER.debug("Semantic errors %s", semantic_errors)
# Prune semantic errors: we won't invalidate cells that were previously
# valid, except for cells we just tried to register
#
# We don't want "action at a distance": running
# a cell shouldn't invalidate cells that were previously valid
# and weren't requested for execution
previously_valid_cell_ids = (
cells_in_graph
# cells successfully registered
- registered_cell_ids
# cells that already had errors
- cells_with_errors_before_mutation
)
# defs that we shouldn't remove from the graph
keep_alive_defs: set[Name] = set()
for cid in list(semantic_errors.keys()):
# If a cell was previously valid, don't invalidate it unless
# we have to, ie, unless it is a descendant of a just-registered
# cell that has an error
#
# Handles the introduction of a multiple definition error, eg
#
# cell 1: x = 0
# cell 2 (requested for execution): x = 1
#
# cell 1 won't be invalidated because cell 1 was previously valid
# and there's no path from cell 2 to cell 1
if cid in previously_valid_cell_ids and not any(
self.graph.get_path(other_cid, cid)
for other_cid in registered_cell_ids
):
del semantic_errors[cid]
keep_alive_defs |= self.graph.cells[cid].defs
all_errors = {**semantic_errors}
for cid, error in syntax_errors.items():
# No chance of collision because cells with syntax errors are not
# in the graph, so can't be in semantic errors
assert cid not in all_errors
all_errors[cid] = (error,)
LOGGER.debug(
"Final set of errors, after pruning valid cells: %s", all_errors
)
cells_with_errors_after_mutation = set(all_errors.keys())
# Construct sets of cells that will need to be re-run.
# Cells that previously had errors (eg, multiple definition or cycle)
# that no longer have errors need to be refreshed.
cells_that_no_longer_have_errors = (
cells_with_errors_before_mutation
- cells_with_errors_after_mutation
) & cells_in_graph
for cid in cells_that_no_longer_have_errors:
# clear error outputs before running
CellOp.broadcast_output(
channel=CellChannel.OUTPUT,
mimetype="text/plain",
data="",
cell_id=cid,
status=None,
)
# Cells that were successfully registered need to be run
cells_registered_without_error = (
registered_cell_ids - cells_with_errors_after_mutation
)
# Cells that didn't have errors associated with them before the
# run request but now have errors; these cells' descendants
# will need to be run. Handles the case where a cell was cached (cell's
# code didn't change), so its previous children were not added to
# cells_that_were_children_of_mutated_cells
cells_transitioned_to_error = (
cells_with_errors_after_mutation
- cells_with_errors_before_mutation
) & cells_before_mutation
# Invalidate state defined by error-ed cells, with the exception of
# names that were defined by valid cells (relevant for multiple
# definition errors)
for cid in all_errors:
if cid not in self.graph.cells:
# error is a registration error
continue
self._invalidate_cell_state(cid, exclude_defs=keep_alive_defs)
roots = (
set(
itertools.chain(
cells_registered_without_error,
cells_that_were_children_of_mutated_cells,
cells_transitioned_to_error,
cells_that_no_longer_have_errors,
)
)
& cells_in_graph
)
descendants = (
dataflow.transitive_closure(self.graph, roots)
# cells with errors can't be run, but are still in the graph
# so that they can be transitioned out of error if a future
# run request repairs the graph
- cells_with_errors_after_mutation
)
self.errors = all_errors
for cid in self.errors:
if (
# Cells with syntax errors are not in the graph
cid in self.graph.cells
and not self.graph.cells[cid].config.disabled
and self.graph.is_disabled(cid)
):
# this may be the first time we're seeing the cell: set its
# status
self.graph.cells[cid].set_status("disabled-transitively")
CellOp.broadcast_error(
data=self.errors[cid],
clear_console=True,
cell_id=cid,
status=None,
)
Variables(
variables=[
VariableDeclaration(
name=variable,
declared_by=list(declared_by),
used_by=list(self.graph.get_referring_cells(variable)),
)
for variable, declared_by in self.graph.definitions.items()
]
).broadcast()
return descendants
async def _run_cells(self, cell_ids: set[CellId_t]) -> None:
"""Run cells and any state updates they trigger"""
# This patch is an attempt to mitigate problems caused by the fact
# that in run mode, kernels run in threads and share the same
# sys.modules. Races can still happen, but this should help in most
# common cases. We could also be more aggressive and run this before
# every cell, or even before pickle.dump/pickle.dumps()
patches.patch_sys_module(self._module)
while cells_with_stale_state := await self._run_cells_internal(
cell_ids
):
LOGGER.debug("Running state updates ...")
cell_ids = dataflow.transitive_closure(
self.graph, cells_with_stale_state
)
LOGGER.debug("Finished run.")
async def _run_cells_internal(
self, cell_ids: set[CellId_t]
) -> set[CellId_t]:
"""Run cells, send outputs to frontends
Returns set of cells that need to be re-run due to state updates.
"""
LOGGER.debug("preparing to evaluate cells %s", cell_ids)
# Status updates: cells transition to queued, except for
# cells that are disabled (explicitly or implicitly).
for cid in cell_ids:
if self.graph.is_disabled(cid):
self.graph.cells[cid].set_stale(stale=True)
else:
self.graph.cells[cid].set_status(status="queued")
if self.graph.cells[cid].stale:
self.graph.cells[cid].set_stale(stale=False)
# State clean-up: don't leak names, UI elements, ...
#
# Clean-up state for all cells upfront, before running
# cells, to relieve memory pressure
self._invalidate_cell_state(cid)
runner = cell_runner.Runner(
cell_ids=cell_ids,
graph=self.graph,
glbls=self.globals,
debugger=self.debugger,
)
# I/O
#
# TODO(akshayka): ignore stdin (always read empty string)
# TODO(akshayka): redirect input to frontend (override builtins.input)
# or ignore/disallow input, since most users will use a
# marimo UI component anyway
# TODO(akshayka): when no logger is configured, log output is not
# redirected to frontend (it's printed to console),
# which is incorrect
# TODO(akshayka): pdb support
LOGGER.debug("final set of cells to run %s", runner.cells_to_run)
while runner.pending():
cell_id = runner.pop_cell()
if runner.cancelled(cell_id):
continue
cell = self.graph.cells[cell_id]
if cell.stale:
continue
LOGGER.debug("running cell %s", cell_id)
cell.set_status(status="running")
with self._install_execution_context(cell_id) as exc_ctx:
run_result = await runner.run(cell_id)
# Don't rebroadcast an output that was already sent
#
# 1. if run_result.output is not None, need to send it
# 2. otherwise if exc_ctx.output is None, then need to send
# the (empty) output (to clear it)
new_output = (
run_result.output is not None or exc_ctx.output is None
)
values = [
VariableValue(
name=variable,
value=(
self.globals[variable]
if variable in self.globals
else None
),
)
for variable in self.graph.cells[cell_id].defs
]
if values:
VariableValues(variables=values).broadcast()
cell.set_status(status="idle")
if (
run_result.success()
or isinstance(run_result.exception, MarimoStopError)
) and new_output:
with self._install_execution_context(cell_id) as exc_ctx:
formatted_output = formatting.try_format(run_result.output)
if formatted_output.traceback is not None:
with self._install_execution_context(cell_id):
write_traceback(formatted_output.traceback)
CellOp.broadcast_output(
channel=CellChannel.OUTPUT,
mimetype=formatted_output.mimetype,
data=formatted_output.data,
cell_id=cell_id,
status=cell.status,
)
elif isinstance(run_result.exception, MarimoInterrupt):
LOGGER.debug("Cell %s was interrupted", cell_id)
# don't clear console because this cell was running and
# its console outputs are not stale
CellOp.broadcast_error(
data=[MarimoInterruptionError()],
clear_console=False,
cell_id=cell_id,
status=cell.status,
)
elif run_result.exception is not None:
LOGGER.debug(
"Cell %s raised %s",
cell_id,
type(run_result.exception).__name__,
)
# don't clear console because this cell was running and
# its console outputs are not stale
exception_type = type(run_result.exception).__name__
CellOp.broadcast_error(
data=[
MarimoExceptionRaisedError(
msg="This cell raised an exception: %s%s"
% (
exception_type,
(
f"('{str(run_result.exception)}')"
if str(run_result.exception)
else ""
),
),
exception_type=exception_type,
raising_cell=None,
)
],
clear_console=False,
cell_id=cell_id,
status=cell.status,
)
if get_global_context().mpl_installed:
# ensures that every cell gets a fresh axis.
exec("__marimo__._output.mpl.close_figures()", self.globals)
if runner.cells_to_run:
assert runner.interrupted
for cid in runner.cells_to_run:
# `cid` was not run
self.graph.cells[cid].set_status("idle")
CellOp.broadcast_error(
data=[MarimoInterruptionError()],
# these cells are transitioning from queued to stopped
# (interrupted); they didn't get to run, so their consoles
# reflect a previous run and should be cleared
clear_console=True,
cell_id=cid,
status="idle",
)
for raising_cell in runner.cells_cancelled:
for cid in runner.cells_cancelled[raising_cell]:
# `cid` was not run
self.graph.cells[cid].set_status("idle")
exception = runner.exceptions[raising_cell]
data: Error
if isinstance(exception, MarimoStopError):
data = MarimoAncestorStoppedError(
msg=(
"This cell wasn't run because an "
"ancestor was stopped with `mo.stop`: "
),
raising_cell=raising_cell,
)
else:
exception_type = type(
runner.exceptions[raising_cell]
).__name__
data = MarimoExceptionRaisedError(
msg=(