forked from dask/distributed
-
Notifications
You must be signed in to change notification settings - Fork 0
/
utils.py
143 lines (110 loc) · 3.41 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import struct
import msgpack
from ..utils import ensure_bytes, nbytes
BIG_BYTES_SHARD_SIZE = 2 ** 26
msgpack_opts = {
("max_%s_len" % x): 2 ** 31 - 1 for x in ["str", "bin", "array", "map", "ext"]
}
try:
msgpack.loads(msgpack.dumps(""), raw=False, **msgpack_opts)
msgpack_opts["raw"] = False
except TypeError:
# Backward compat with old msgpack (prior to 0.5.2)
msgpack_opts["encoding"] = "utf-8"
def frame_split_size(frames, n=BIG_BYTES_SHARD_SIZE):
"""
Split a list of frames into a list of frames of maximum size
This helps us to avoid passing around very large bytestrings.
Examples
--------
>>> frame_split_size([b'12345', b'678'], n=3) # doctest: +SKIP
[b'123', b'45', b'678']
"""
if not frames:
return frames
if max(map(nbytes, frames)) <= n:
return frames
out = []
for frame in frames:
if nbytes(frame) > n:
if isinstance(frame, (bytes, bytearray)):
frame = memoryview(frame)
try:
itemsize = frame.itemsize
except AttributeError:
itemsize = 1
for i in range(0, nbytes(frame) // itemsize, n // itemsize):
out.append(frame[i : i + n // itemsize])
else:
out.append(frame)
return out
def merge_frames(header, frames):
""" Merge frames into original lengths
Examples
--------
>>> merge_frames({'lengths': [3, 3]}, [b'123456'])
[b'123', b'456']
>>> merge_frames({'lengths': [6]}, [b'123', b'456'])
[b'123456']
"""
lengths = list(header["lengths"])
if not frames:
return frames
assert sum(lengths) == sum(map(nbytes, frames))
if all(len(f) == l for f, l in zip(frames, lengths)):
return frames
frames = frames[::-1]
lengths = lengths[::-1]
out = []
while lengths:
l = lengths.pop()
L = []
while l:
frame = frames.pop()
if nbytes(frame) <= l:
L.append(frame)
l -= nbytes(frame)
else:
mv = memoryview(frame)
L.append(mv[:l])
frames.append(mv[l:])
l = 0
if len(L) == 1: # no work necessary
out.extend(L)
else:
out.append(b"".join(map(ensure_bytes, L)))
return out
def pack_frames_prelude(frames):
lengths = [len(f) for f in frames]
lengths = [struct.pack("Q", len(frames))] + [
struct.pack("Q", nbytes(frame)) for frame in frames
]
return b"".join(lengths)
def pack_frames(frames):
""" Pack frames into a byte-like object
This prepends length information to the front of the bytes-like object
See Also
--------
unpack_frames
"""
prelude = [pack_frames_prelude(frames)]
if not isinstance(frames, list):
frames = list(frames)
return b"".join(prelude + frames)
def unpack_frames(b):
""" Unpack bytes into a sequence of frames
This assumes that length information is at the front of the bytestring,
as performed by pack_frames
See Also
--------
pack_frames
"""
(n_frames,) = struct.unpack("Q", b[:8])
frames = []
start = 8 + n_frames * 8
for i in range(n_frames):
(length,) = struct.unpack("Q", b[(i + 1) * 8 : (i + 2) * 8])
frame = b[start : start + length]
frames.append(frame)
start += length
return frames