forked from bpot/node-msgpack-rpc
-
Notifications
You must be signed in to change notification settings - Fork 8
/
packetizer.iced
153 lines (117 loc) · 4.21 KB
/
packetizer.iced
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
{unpack,pack} = require 'msgpack2'
{Ring} = require './ring'
##=======================================================================
# This is a hack of sorts, in which we've taken the important
# parts of the Msgpack spec for reading an int from the string.
msgpack_frame_len = (buf) ->
b = buf[0]
if b < 0x80 then 1
else if b is 0xcc then 2
else if b is 0xcd then 3
else if b is 0xce then 5
else 0
##=======================================================================
is_array = (a) -> (typeof a is 'object') and Array.isArray a
##=======================================================================
exports.Packetizer = class Packetizer
"""
A packetizer that is used to read and write to an underlying stream
(like a TcpTransport below). Should be inherited by such a class.
The subclasses should implement:
@_raw_write(msg,enc) - write this msg to the stream with the
given encoding. Typically handled at the transport level
(2 classes higher in the inheritance graph)
@_packetize_error(err) - report an error with the stream. Typically
calls up to the Transport class (2 classes higher).
@_dispatch(msg) - emit a packetized incoming message. Typically
handled by the Dispatcher (1 class higher in the inheritance
graph).
The subclass should call @packetize_data(m) whenever it has data to stuff
into the packetizer's input path, and call @send(m) whenever it wants
to stuff data into the packterizer's output path.
"""
# The two states we can be in
FRAME : 1
DATA : 2
# results of getting
OK : 0
WAIT : 1
ERR : -1
##-----------------------------------------
constructor : ->
@_ring = new Ring()
@_state = @FRAME
@_next_msg_len = 0
##-----------------------------------------
send : (msg) ->
b2 = pack msg
b1 = pack b2.length
bufs = [ b1, b2 ]
rc = 0
enc = 'binary'
for b in bufs
@_raw_write b.toString(enc), enc
return true
##-----------------------------------------
_get_frame : () ->
# We need at least one byte to get started
return @WAIT unless @_ring.len() > 0
# First get the frame's framing byte! This will tell us
# how many more bytes we need to grab. This is a bit of
# an abstraction violation, but one worth it for implementation
# simplicity and efficiency.
f0 = @_ring.grab 1
return @WAIT unless f0
frame_len = msgpack_frame_len f0
unless frame_len
@_packetize_error "Bad frame header received"
return @ERR
# We now know how many bytes to suck in just to get the frame
# header. If we can't get that much, we'll just have to wait!
return @WAIT unless (f_full = @_ring.grab frame_len)?
r = unpack f_full
res = switch (typ = typeof r)
when 'number'
# See implementation of msgpack_frame_len above; this shouldn't
# happen
throw new Error "Negative len #{len} should not have happened" if r < 0
@_ring.consume frame_len
@_next_msg_len = r
@_state = @DATA
@OK
when 'undefined'
@WAIT
else
@_packetize_error "bad frame; got type=#{typ}, which is wrong"
@ERR
return res
##-----------------------------------------
_get_msg: () ->
l = @_next_msg_len
ret = if l > @_ring.len() or not (b = @_ring.grab l)?
@WAIT
else if not (msg = unpack b)?
@_packetize_error "bad encoding found in data/payload; len=#{l}"
@ERR
else if not is_array msg
@_packetize_error "non-array found in data stream: #{JSON.stringify msg}"
@ERR
else
@_ring.consume l
@_state = @FRAME
# Call down one level in the class hierarchy to the dispatcher
@_dispatch msg
@OK
return ret
##-----------------------------------------
packetize_data : (m) ->
@_ring.buffer m
go = @OK
while go is @OK
go = if @_state is @FRAME then @_get_frame() else @_get_msg()
##-----------------------------------------
# On error we need to flush this guy out.
_packetizer_reset : () ->
@_state = @FRAME
@_ring = new Ring()
##=======================================================================