forked from bokeh/bokeh
-
Notifications
You must be signed in to change notification settings - Fork 12
/
utils.py
149 lines (126 loc) · 3.75 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import logging
logger = logging.getLogger(__name__)
from functools import reduce
import math
import platform
import sys
import uuid
from six.moves.urllib.parse import urljoin as sys_urljoin
_simple_id = 1000
def make_id():
global _simple_id
from . import settings
if settings.simple_ids(False):
_simple_id += 1
new_id = _simple_id
else:
new_id = uuid.uuid4()
return str(new_id)
def urljoin(*args):
return reduce(sys_urljoin, args)
def get_json(request):
"""request from requests library handles backwards compatability for
requests < 1.0
"""
if hasattr(request.json, '__call__'):
return request.json()
else:
return request.json
def encode_utf8(u):
if sys.version_info[0] == 2:
u = u.encode('utf-8')
return u
def decode_utf8(u):
if sys.version_info[0] == 2:
u = u.decode('utf-8')
return u
_scales = [1e0, 1e3, 1e6, 1e9]
_units = ['s', 'ms', 'us', 'ns']
def scale_delta(time):
if time > 0.0:
order = min(-int(math.floor(math.log10(time)) // 3), 3)
else:
order = 3
return time*_scales[order], _units[order]
def is_py3():
return sys.version_info[0] == 3
def is_pypy():
return platform.python_implementation() == "PyPy"
def convert_references(json_obj):
from .plot_object import PlotObject
from .properties import HasProps
def convert(obj):
if isinstance(obj, PlotObject):
return obj.ref
elif isinstance(obj, HasProps):
return obj.to_dict()
else:
return obj
def helper(json_obj):
if isinstance(json_obj, list):
for idx, x in enumerate(json_obj):
json_obj[idx] = convert(x)
if isinstance(json_obj, dict):
for k, x in json_obj.iteritems():
json_obj[k] = convert(x)
json_apply(json_obj, helper)
return json_obj
def dump(objs, docid):
json_objs = []
for obj in objs:
ref = obj.ref
ref["attributes"] = obj.vm_serialize()
ref["attributes"].update({"id": ref["id"], "doc" : docid})
json_objs.append(ref)
return json_objs
def nice_join(seq, sep=", "):
seq = [str(x) for x in seq]
if len(seq) <= 1:
return sep.join(seq)
else:
return "%s or %s" % (sep.join(seq[:-1]), seq[-1])
def publish_display_data(data, source='bokeh'):
"""Compatibility wrapper for IPython publish_display_data which removes the
`source` (first) argument in later versions.
Parameters
----------
source : str
data : dict
"""
import IPython.core.displaypub as displaypub
try:
displaypub.publish_display_data(source, data)
except TypeError:
displaypub.publish_display_data(data)
def is_ref(frag):
return isinstance(frag, dict) and \
frag.get('type') and \
frag.get('id')
def json_apply(fragment, check_func, func):
"""recursively searches through a nested dict/lists
if check_func(fragment) is True, then we return
func(fragment)
"""
if check_func(fragment):
return func(fragment)
elif isinstance(fragment, list):
output = []
for val in fragment:
output.append(json_apply(val, check_func, func))
return output
elif isinstance(fragment, dict):
output = {}
for k, val in fragment.items():
output[k] = json_apply(val, check_func, func)
return output
else:
return fragment
def resolve_json(fragment, models):
check_func = is_ref
def func(fragment):
if fragment['id'] in models:
return models[fragment['id']]
else:
logging.error("model not found for %s", fragment)
return None
return json_apply(fragment, check_func, func)