-
Notifications
You must be signed in to change notification settings - Fork 31
/
__init__.py
97 lines (82 loc) · 3.09 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import duckdb
import anywidget
import traitlets
import pyarrow as pa
import pathlib
import time
import logging
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())
_DEV = False # switch to False for production
if _DEV:
# from `npm run dev`
ESM = "http://localhost:5173/src/index.js?anywidget"
CSS = ""
else:
# from `npm run build`
bundled_assets_dir = pathlib.Path(__file__).parent / "static"
ESM = (bundled_assets_dir / "index.js").read_text()
CSS = (bundled_assets_dir / "style.css").read_text()
class MosaicWidget(anywidget.AnyWidget):
_esm = ESM
_css = CSS
# The Mosaic specification
spec = traitlets.Dict({}).tag(sync=True)
# The current selections
selections = traitlets.List([]).tag(sync=True)
# Whether data cube indexes should be created as temp tables
temp_indexes = traitlets.Bool().tag(sync=True)
def __init__(
self,
spec: dict = {},
con=duckdb.connect(),
temp_indexes=True,
data={},
*args,
**kwargs,
):
"""Create a Mosaic widget.
Args:
spec (dict, optional): The initial Mosaic specification. Defaults to {}.
con (connection, optional): A DuckDB connection.
Defaults to duckdb.connect().
data (dict, optional): Pandas DataFrames to add to DuckDB.
The keys are used as the names of the tables. Defaults to {}.
temp_indexes (bool, optional): Whether data cube indexes should be
created as temp tables tables. Defaults to True.
"""
super().__init__(*args, **kwargs)
self.spec = spec
self.con = con
self.temp_indexes = temp_indexes
for name, df in data.items():
self.con.register(name, df)
self.on_msg(self._handle_custom_msg)
def _handle_custom_msg(self, data: dict, buffers: list):
logger.debug(f"{data=}, {buffers=}")
start = time.time()
uuid = data["uuid"]
sql = data["sql"]
try:
if data["type"] == "arrow":
result = self.con.query(sql).arrow()
sink = pa.BufferOutputStream()
with pa.ipc.new_stream(sink, result.schema) as writer:
writer.write(result)
buf = sink.getvalue()
self.send({"type": "arrow", "uuid": uuid}, buffers=[buf.to_pybytes()])
elif data["type"] == "exec":
self.con.execute(sql)
self.send({"type": "exec", "uuid": uuid})
else:
result = self.con.query(sql).df()
json = result.to_dict(orient="records")
self.send({"type": "json", "uuid": uuid, "result": json})
except Exception as e:
logger.error(e)
self.send({"error": str(e), "uuid": uuid})
total = round((time.time() - start) * 1_000)
if total > 5000:
logger.warning(f"DONE. Slow query { uuid } took { total } ms.\n{ sql }")
else:
logger.info(f"DONE. Query { uuid } took { total } ms.\n{ sql }")