-
-
Notifications
You must be signed in to change notification settings - Fork 314
/
layout.py
379 lines (314 loc) · 13.3 KB
/
layout.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
from __future__ import annotations
import abc
import asyncio
from functools import wraps
from logging import getLogger
from typing import Any, AsyncIterator, Dict, Iterator, List, NamedTuple, Set, cast
from jsonpatch import apply_patch, make_patch
from typing_extensions import TypedDict
from idom.config import IDOM_DEBUG_MODE
from .component import AbstractComponent
from .events import EventHandler
from .hooks import LifeCycleHook
from .utils import CannotAccessResource, HasAsyncResources, async_resource
from .vdom import VdomDict, validate_vdom
logger = getLogger(__name__)
class LayoutUpdate(NamedTuple):
"""An object describing an update to a :class:`Layout`"""
path: str
changes: List[Dict[str, Any]]
def apply_to(self, model: Any) -> Any:
"""Return the model resulting from the changes in this update"""
return apply_patch(
model, [{**c, "path": self.path + c["path"]} for c in self.changes]
)
@classmethod
def create_from(cls, source: Any, target: Any) -> "LayoutUpdate":
return cls("", make_patch(source, target).patch)
class LayoutEvent(NamedTuple):
target: str
"""The ID of the event handler."""
data: List[Any]
"""A list of event data passed to the event handler."""
class Layout(HasAsyncResources):
__slots__ = ["root", "_event_handlers"]
if not hasattr(abc.ABC, "__weakref__"): # pragma: no cover
__slots__.append("__weakref__")
def __init__(self, root: "AbstractComponent") -> None:
super().__init__()
if not isinstance(root, AbstractComponent):
raise TypeError("Expected an AbstractComponent, not %r" % root)
self.root = root
self._event_handlers: Dict[str, EventHandler] = {}
def update(self, component: "AbstractComponent") -> None:
try:
self._rendering_queue.put(component)
except CannotAccessResource:
logger.info(f"Did not update {component} - resources of {self} are closed")
async def dispatch(self, event: LayoutEvent) -> None:
# It is possible for an element in the frontend to produce an event
# associated with a backend model that has been deleted. We only handle
# events if the element and the handler exist in the backend. Otherwise
# we just ignore the event.
handler = self._event_handlers.get(event.target)
if handler is not None:
await handler(event.data)
else:
logger.info(
f"Ignored event - handler {event.target!r} does not exist or its component unmounted"
)
async def render(self) -> LayoutUpdate:
while True:
component = await self._rendering_queue.get()
if self._has_component_state(component):
return self._create_layout_update(component)
if IDOM_DEBUG_MODE.get():
# If in debug mode inject a function that ensures all returned updates
# contain valid VDOM models. We only do this in debug mode in order to
# avoid unnecessarily impacting performance.
_debug_render = render
@wraps(_debug_render)
async def render(self) -> LayoutUpdate:
# Ensure that the model is valid VDOM on each render
result = await self._debug_render()
validate_vdom(self._component_states[id(self.root)].model)
return result
@async_resource
async def _rendering_queue(self) -> AsyncIterator["_ComponentQueue"]:
queue = _ComponentQueue()
queue.put(self.root)
yield queue
@async_resource
async def _component_states(self) -> AsyncIterator[Dict[int, _ComponentState]]:
root_component_state = self._create_component_state(
self.root, "", "", save=False
)
try:
yield {root_component_state.component_id: root_component_state}
finally:
self._delete_component_state(root_component_state)
def _create_layout_update(self, component: AbstractComponent) -> LayoutUpdate:
component_state = self._get_component_state(component)
component_state.life_cycle_hook.component_will_render()
for state in self._iter_component_states_from_root(
component_state,
include_root=False,
):
state.life_cycle_hook.component_will_unmount()
self._clear_component_state_event_handlers(component_state)
self._delete_component_state_children(component_state)
old_model = component_state.model.copy() # we copy because it will be mutated
new_model = self._render_component(component_state)
changes = make_patch(old_model, new_model).patch
for state in self._iter_component_states_from_root(
component_state,
include_root=True,
):
state.life_cycle_hook.component_did_render()
return LayoutUpdate(path=component_state.patch_path, changes=changes)
def _render_component(self, component_state: _ComponentState) -> Dict[str, Any]:
component_obj = component_state.component_obj
try:
component_state.life_cycle_hook.set_current()
try:
raw_model = component_obj.render()
finally:
component_state.life_cycle_hook.unset_current()
assert "key" not in raw_model, "Component must not return VDOM with a 'key'"
key = getattr(component_obj, "key", "")
if key:
raw_model = cast(VdomDict, {**raw_model, "key": key})
component_state.model.update(
self._render_model(
component_state,
raw_model,
component_state.patch_path,
component_state.key_path,
)
)
except Exception as error:
logger.exception(f"Failed to render {component_obj}")
component_state.model.update({"tagName": "div", "__error__": str(error)})
# We need to return the model from the `component_state` so that the model
# between all `_ComponentState` objects within a `Layout` are shared.
return component_state.model
def _render_model(
self,
component_state: _ComponentState,
model: VdomDict,
patch_path: str,
key_path: str,
) -> Dict[str, Any]:
serialized_model: Dict[str, Any] = {}
event_handlers = self._render_model_event_targets(
component_state, model, key_path
)
if event_handlers:
serialized_model["eventHandlers"] = event_handlers
if "children" in model:
serialized_model["children"] = self._render_model_children(
component_state, model, patch_path, key_path
)
return {**model, **serialized_model}
def _render_model_children(
self,
component_state: _ComponentState,
model: VdomDict,
patch_path: str,
key_path: str,
) -> List[Any]:
children = model["children"]
resolved_children: List[Any] = []
for index, child in enumerate(
children if isinstance(children, (list, tuple)) else [children]
):
if isinstance(child, dict):
resolved_children.append(
self._render_model(
component_state,
cast(VdomDict, child),
patch_path=f"{patch_path}/children/{index}",
key_path=f"{key_path}/{child.get('key') or hex(id(child))[2:]}",
)
)
elif isinstance(child, AbstractComponent):
resolved_children.append(
self._render_component(
self._create_component_state(
child,
patch_path=f"{patch_path}/children/{index}",
parent_key_path=key_path,
save=True,
)
)
)
component_state.child_component_ids.append(id(child))
else:
resolved_children.append(str(child))
return resolved_children
def _render_model_event_targets(
self,
component_state: _ComponentState,
model: VdomDict,
key_path: str,
) -> Dict[str, _EventTarget]:
handlers_by_event: Dict[str, EventHandler] = {}
if "eventHandlers" in model:
handlers_by_event.update(model["eventHandlers"])
if "attributes" in model:
attrs = model["attributes"]
for k, v in list(attrs.items()):
if callable(v):
if not isinstance(v, EventHandler):
h = handlers_by_event[k] = EventHandler()
h.add(attrs.pop(k))
else:
h = attrs.pop(k)
handlers_by_event[k] = h
handlers_by_target: Dict[str, EventHandler] = {}
model_event_targets: Dict[str, _EventTarget] = {}
for event, handler in handlers_by_event.items():
target = f"{key_path}/{event}"
handlers_by_target[target] = handler
model_event_targets[event] = {
"target": target,
"preventDefault": handler.prevent_default,
"stopPropagation": handler.stop_propagation,
}
component_state.event_handler_ids.clear()
component_state.event_handler_ids.update(handlers_by_target)
self._event_handlers.update(handlers_by_target)
return model_event_targets
def _get_component_state(self, component: AbstractComponent) -> _ComponentState:
return self._component_states[id(component)]
def _has_component_state(self, component: AbstractComponent) -> bool:
return id(component) in self._component_states
def _create_component_state(
self,
component: AbstractComponent,
patch_path: str,
parent_key_path: str,
save: bool,
) -> _ComponentState:
component_id = id(component)
state = _ComponentState(
model={},
patch_path=patch_path,
key_path=f"{parent_key_path}/{_get_component_key(component)}",
component_id=component_id,
component_obj=component,
event_handler_ids=set(),
child_component_ids=[],
life_cycle_hook=LifeCycleHook(component, self.update),
)
if save:
self._component_states[component_id] = state
return state
def _delete_component_state(self, component_state: _ComponentState) -> None:
self._clear_component_state_event_handlers(component_state)
self._delete_component_state_children(component_state)
del self._component_states[component_state.component_id]
def _clear_component_state_event_handlers(
self, component_state: _ComponentState
) -> None:
for handler_id in component_state.event_handler_ids:
del self._event_handlers[handler_id]
component_state.event_handler_ids.clear()
def _delete_component_state_children(
self, component_state: _ComponentState
) -> None:
for e_id in component_state.child_component_ids:
self._delete_component_state(self._component_states[e_id])
component_state.child_component_ids.clear()
def _iter_component_states_from_root(
self,
root_component_state: _ComponentState,
include_root: bool,
) -> Iterator[_ComponentState]:
if include_root:
pending = [root_component_state]
else:
pending = [
self._component_states[i]
for i in root_component_state.child_component_ids
]
while pending:
visited_component_state = pending.pop(0)
yield visited_component_state
pending.extend(
self._component_states[i]
for i in visited_component_state.child_component_ids
)
def __repr__(self) -> str:
return f"{type(self).__name__}({self.root})"
class _ComponentState(NamedTuple):
model: Dict[str, Any]
patch_path: str
key_path: str
component_id: int
component_obj: AbstractComponent
event_handler_ids: Set[str]
child_component_ids: List[int]
life_cycle_hook: LifeCycleHook
class _EventTarget(TypedDict):
target: str
preventDefault: bool # noqa
stopPropagation: bool # noqa
def _get_component_key(component: AbstractComponent) -> str:
return getattr(component, "key", "") or hex(id(component))[2:]
class _ComponentQueue:
__slots__ = "_loop", "_queue", "_pending"
def __init__(self) -> None:
self._loop = asyncio.get_event_loop()
self._queue: "asyncio.Queue[AbstractComponent]" = asyncio.Queue()
self._pending: Set[int] = set()
def put(self, component: AbstractComponent) -> None:
component_id = id(component)
if component_id not in self._pending:
self._pending.add(component_id)
self._loop.call_soon_threadsafe(self._queue.put_nowait, component)
return None
async def get(self) -> AbstractComponent:
component = await self._queue.get()
self._pending.remove(id(component))
return component