-
Notifications
You must be signed in to change notification settings - Fork 0
/
streamlit_app.py
106 lines (87 loc) · 3 KB
/
streamlit_app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import asyncio
import logging
from typing import Any
import nest_asyncio
import streamlit as st
from ecc.connection import EdgeDBCloudConn
from st_comps import (
_display_big_red_btn_and_db_calls,
_display_res,
_display_sidebar,
_get_query_form,
_render_exception,
_render_result,
)
from st_utils import (
_create_task_from_form,
_routine_clean,
generate_token,
get_conn_dict,
get_cur_ts,
get_loop_dict,
load_st_toml,
)
nest_asyncio.apply()
st.set_page_config(
page_title='Streamlit EdgeDB Cloud Connection',
layout='centered')
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
if 'token' not in st.session_state:
token = generate_token()
logging.info(f'Generating token: {token}')
st.session_state['token'] = token
async def main(tg: asyncio.TaskGroup,
conn: EdgeDBCloudConn,
token: str,
tasks: set[asyncio.Task[Any]]) -> None:
_display_sidebar()
form = _get_query_form()
if form.submitted:
await _create_task_from_form(tg, conn, form, tasks)
_display_big_red_btn_and_db_calls(conn, token)
async def run(algo, conn: EdgeDBCloudConn, token: str) -> None:
# https://youtu.be/-CzqsgaXUM8?list=PLhNSoGM2ik6SIkVGXWBwerucXjgP1rHmB&t=2375
top_name = 'top'
tasks: set[asyncio.Task[Any]] = set()
try:
async with asyncio.TaskGroup() as tg:
task = tg.create_task(algo(tg, conn, token, tasks), name=top_name)
tasks.add(task)
except* Exception as ex:
for exc in ex.exceptions:
st.warning(f'Exception: {type(exc).__name__}')
_render_exception(exc)
else:
for task in tasks:
if (task_name := task.get_name()) != top_name:
st.write(f'task_name: {task_name}')
_render_result(task.result())
def _prepare_loop(cur_ts: int, token: str) -> asyncio.AbstractEventLoop:
loop_dict = get_loop_dict()
if token not in loop_dict:
loop = asyncio.new_event_loop()
else:
loop, _ = loop_dict[token]
loop_dict[token] = (loop, cur_ts)
return loop
def _prepare_conn(cur_ts: int, token: str) -> EdgeDBCloudConn:
conn_dict = get_conn_dict()
if token not in conn_dict:
conn = EdgeDBCloudConn(**load_st_toml())
else:
conn, _ = conn_dict[token]
conn_dict[token] = (conn, cur_ts)
return conn
if __name__ == '__main__':
# TODO
# How to gracefully close the db connection if the Streamlit server is shutdown?
cur_ts = get_cur_ts()
token = st.session_state.token
excluded_tokens = [token]
loop = _prepare_loop(cur_ts, token)
conn = _prepare_conn(cur_ts, token)
_display_res(token, loop, conn, excluded_tokens)
_routine_clean(excluded_tokens)
asyncio.set_event_loop(loop)
loop.run_until_complete(run(main, conn, token))