-
-
Notifications
You must be signed in to change notification settings - Fork 174
/
util.py
143 lines (99 loc) · 4.42 KB
/
util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import asyncio
from datetime import datetime
from random import choices
from typing import Literal, Union
import pandas as pd
class Pane:
def __init__(self, window):
from lightweight_charts import Window
self.win: Window = window
self.run_script = window.run_script
if hasattr(self, 'id'):
return
self.id = Window._id_gen.generate()
class IDGen(list):
ascii = 'abcdefghijklmnopqrstuvwxyz'
def generate(self):
var = ''.join(choices(self.ascii, k=8))
if var not in self:
self.append(var)
return f'window.{var}'
self.generate()
def parse_event_message(window, string):
name, args = string.split('_~_')
args = args.split(';;;')
func = window.handlers[name]
return func, args
def js_data(data: Union[pd.DataFrame, pd.Series]):
orient = 'columns' if isinstance(data, pd.Series) else 'records'
return data.to_json(orient=orient, default_handler=lambda x: 'null' if pd.isna(x) else x)
def jbool(b: bool): return 'true' if b is True else 'false' if b is False else None
LINE_STYLE = Literal['solid', 'dotted', 'dashed', 'large_dashed', 'sparse_dotted']
MARKER_POSITION = Literal['above', 'below', 'inside']
MARKER_SHAPE = Literal['arrow_up', 'arrow_down', 'circle', 'square']
CROSSHAIR_MODE = Literal['normal', 'magnet']
PRICE_SCALE_MODE = Literal['normal', 'logarithmic', 'percentage', 'index100']
TIME = Union[datetime, pd.Timestamp, str]
NUM = Union[float, int]
FLOAT = Literal['left', 'right', 'top', 'bottom']
def line_style(line: LINE_STYLE):
js = 'LightweightCharts.LineStyle.'
return js+line[:line.index('_')].title() + line[line.index('_') + 1:].title() if '_' in line else js+line.title()
def crosshair_mode(mode: CROSSHAIR_MODE):
return f'LightweightCharts.CrosshairMode.{mode.title()}' if mode else None
def price_scale_mode(mode: PRICE_SCALE_MODE):
return f"LightweightCharts.PriceScaleMode.{'IndexedTo100' if mode == 'index100' else mode.title() if mode else None}"
def marker_shape(shape: MARKER_SHAPE):
return shape[:shape.index('_')]+shape[shape.index('_')+1:].title() if '_' in shape else shape
def marker_position(p: MARKER_POSITION):
return {
'above': 'aboveBar',
'below': 'belowBar',
'inside': 'inBar',
None: None,
}[p]
class Emitter:
def __init__(self):
self._callable = None
def __iadd__(self, other):
self._callable = other
return self
def _emit(self, *args):
self._callable(*args) if self._callable else None
class JSEmitter:
def __init__(self, chart, name, on_iadd, wrapper=None):
self._on_iadd = on_iadd
self._chart = chart
self._name = name
self._wrapper = wrapper
def __iadd__(self, other):
def final_wrapper(*arg):
other(self._chart, *arg) if not self._wrapper else self._wrapper(other, self._chart, *arg)
async def final_async_wrapper(*arg):
await other(self._chart, *arg) if not self._wrapper else await self._wrapper(other, self._chart, *arg)
self._chart.win.handlers[self._name] = final_async_wrapper if asyncio.iscoroutinefunction(other) else final_wrapper
self._on_iadd(other)
return self
class Events:
def __init__(self, chart):
self.new_bar = Emitter()
from lightweight_charts.abstract import JS
self.search = JSEmitter(chart, f'search{chart.id}',
lambda o: chart.run_script(f'''
{JS['callback']}
makeSpinner({chart.id})
{chart.id}.search = makeSearchBox({chart.id})
''')
)
self.range_change = JSEmitter(chart, f'range_change{chart.id}',
lambda o: chart.run_script(f'''
let checkLogicalRange = (logical) => {{
{chart.id}.chart.timeScale().unsubscribeVisibleLogicalRangeChange(checkLogicalRange)
let barsInfo = {chart.id}.series.barsInLogicalRange(logical)
if (barsInfo) window.callbackFunction(`range_change{chart.id}_~_${{barsInfo.barsBefore}};;;${{barsInfo.barsAfter}}`)
setTimeout(() => {chart.id}.chart.timeScale().subscribeVisibleLogicalRangeChange(checkLogicalRange), 50)
}}
{chart.id}.chart.timeScale().subscribeVisibleLogicalRangeChange(checkLogicalRange)
'''),
wrapper=lambda o, c, *arg: o(c, *[float(a) for a in arg])
)