/
SectionConnection.js
234 lines (218 loc) · 9.32 KB
/
SectionConnection.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
/**
* This file implements the Section Layer of the Transport Protocol.
* See Wiki for Protocol definition. Please note that this implementation
* does not 100% conform to the defition in the Wiki page.
* Here, SectionConnection implements the actual Section Layer, which
* transports instances of Section. Different kinds of Section are not
* implemented by inheriting Section, instead, applications should use or
* inherit SectionCoder and its subclasses.
* See index.js for examples.
*/
const { Buffer } = require('buffer');
const log = require('debug')('bilibili-danmaku-client/SectionConnection');
const { isEqual } = require('lodash');
const { CascadeConnection } = require('../util/connection');
const WebSocketConnection = require('./WebSocketConnection');
const protoVer = 0x10;
const encoding = 'utf8';
/**
* Section is sent through the Section Layer.
* Section is a simple object. Its 'coder' property specifies the type of
* this Section, while its 'data' property is the data is contains.
* Sections can contain any kind of data, however data that cannot be
* transformed by its coder will be useless and problematic. Therefore,
* whoever specifies the coder should be responsible specifying the data.
* Section should not be subclassed. A typical usage is:
* const coder = new JsonCoder({ controlFlag: true, opCode: 10, binaryFlag: false });
* const section = new Section(coder, { foo: 1 });
*/
class Section {
/**
* Constructs a new Section.
* @param {SectionCoder} coder The coder of this Section.
* @param {any} data The data of this Section.
*/
constructor(coder, data) {
this.coder = coder;
this.data = data;
}
}
/**
* SectionCoder is used to encode and decode Sections to and from Buffers.
* Meanwhile, SectionCoder also contains a 'header' property which specifies
* the header that should be used for all Sections with this coder.
* SectionCoder should be subclassed to support encoding and decoding of
* different types of data, however only new instances should be used to support
* different headers.
* Meanwhile, SectionCoder instances should be reused across Sections. Since
* section.coder === this is used in hasConstructed(), only one SectionCoder with
* the same header should be used in the same application.
*/
class SectionCoder {
/**
* Constructs a new SectionCoder with the given header.
* Header MUST contain the following properties:
* controlFlag: boolean,
* opCode: Number,
* binaryFlag: boolean.
* Detailed definitions can be found in Wiki page 'Transport Protocol'.
* @param {Object} header The header object of this SectionCoder.
*/
constructor(header) { this.header = header; }
/**
* Encode the data of the Section into Buffer.
* By default, the data is kept as-is. Therefore non-Buffer data might lead to
* an error.
* @param {any} data The data to encode.
* @returns {Buffer} The encoded buffer.
*/
encode(data) { return data; }
/**
* Decode the Buffer back to the Section data.
* By default, the Buffer is kept as-is.
* Typically construct() is used to construct a new Section with decoded data
* and this coder.
* @param {Buffer} buf The buffer to decode.
* @returns {any} The decoded data.
*/
decode(buf) { return buf; }
/**
* Return whether the Section is constructed by this SectionCoder.
* section is constructed by an coder by section = coder.construct() or
* by section = new Section(coder, data).
* @param {Section} section The section to check.
* @return {boolean} Whether the Section is constructed by this SectionCoder.
*/
hasConstructed(section) { return section.coder === this; }
/**
* Construct a Section with this SectionCoder and decoded data.
* @param {Buffer} data The data to decode.
* @returns {Section} The constructed Section.
*/
construct(data) { return new Section(this, this.decode(data)); }
}
/**
* Implementation of SectionCoder that encodes and decodes strings.
*/
class StringCoder extends SectionCoder {
encode(str) { return Buffer.from(str, encoding); }
decode(buf) { return buf.toString(encoding); }
}
/**
* Implementation of SectionCoder that encodes and decodes JSONs.
*/
class JsonCoder extends StringCoder {
encode(json) { return super.encode(JSON.stringify(json)); }
decode(buf) { return JSON.parse(super.decode(buf)); }
}
/**
* SectionConnection implements the Section Layer of the Transport Protocol.
* It uses WebSocketConnection as implementation of the Connection Layer and uses
* CascadeConnection to wrap over it.
* As specified in the Transport Protocol, SectionConnection is stateless, and
* inherits events from and delegates method to the Connection Layer.
* As a flaw in the Transport Protocol, Sections can contain arbitrary types
* of data, while the convertion from Buffer to the data is done in the Data Layer.
* However, Sections have to be constructed in the Section layer, so in the
* implementation, the debundling process of Section Layer and the convertion
* process of the Data Layer are combinded together to form the transform() and
* detransform() methods of CascadeConnection.
*/
class SectionConnection extends CascadeConnection {
/**
* Construct a new SectionConnection.
* @param {SectionCoder[]} coders The list of coders to use.
* @param {String} url The url to connect to.
* @param {String | String[] | undefined} protocols The WebSocket protocols to use.
* @param {Object | undefined} options The options used to configure WebSocketConnection.
*/
constructor(coders, url, { protocols, options } = {}) {
super(new WebSocketConnection(url, protocols, options));
this.coders = coders;
}
/**
* Transform given sections to Buffer. See super method documentation.
* This method implements the bundling process.
* Note that the SectionConnection actually transports arrays of Section, so
* connection.send(new Section(...)) will result in an error.
* @param {Section[]} sections The sections to transform to Buffer.
* @returns {Buffer} The transformed Buffer.
*/
transform(sections) {
return Buffer.concat(sections.map(this.encodeSection.bind(this)));
}
/**
* Detransform given Buffer back to a list of Sections. See super method documentation.
* This method implements the debundling process.
* Note that the SectionConnection actuallt transports arrays of Section so
* the 'message' event will be emitted with a Section[] as argument.
* @param {BufferEncoding} buf The Buffer to detransform.
* @returns {Section[]} The detransformed Sections.
*/
detransform(buf) {
const sections = [];
for (let off = 0; off < buf.length; off = this.decodeSection(sections, buf, off));
return sections;
}
encodeSection(section) {
try {
const { coder, data } = section;
const content = coder.encode(data);
const header = Buffer.alloc(16);
header.writeInt32BE(content.length + 16, 0);
header.writeInt16BE(protoVer, 4);
header[7] = coder.header.controlFlag ? 0x01 : 0x00;
header.writeInt32BE(coder.header.opCode, 8);
header[15] = coder.header.binaryFlag ? 0x01 : 0x00;
return Buffer.concat([header, content]);
} catch (e) {
log(`Unable to encode section: section=${section}, error=${e}.`);
return Buffer.alloc(0);
}
}
decodeHeader(buf, offset) {
if (buf.length < offset + 16) throw new Error(`Buffer too short: offset=${offset}, length=${buf.length}.`);
const length = buf.readInt32BE(offset); // length = CONTENT length + 16
if (length < 16) throw new Error(`Invalid section length: ${length}.`);
if (length + offset > buf.length) throw new Error(`Section too long: end=${length + offset}, length=${buf.length}.`);
const sectionProtoVer = buf.readInt16BE(offset + 4);
if (sectionProtoVer !== protoVer) throw new Error(`Invalid section header: protoVer=${sectionProtoVer}, expected=${protoVer}.`);
return {
length,
header: {
controlFlag: buf[offset + 7] === 0x01,
opCode: buf.readInt32BE(offset + 8),
binaryFlag: buf[offset + 15] === 0x01,
},
};
}
decodeSection(sections, buf, offset) {
let header;
let length;
try {
({ header, length } = this.decodeHeader(buf, offset));
} catch (e) {
log('Unable to decoder header: %s', e);
return buf.length; // stop debundling process
}
const coder = this.coders.find(c => isEqual(c.header, header));
if (typeof coder === 'undefined') {
log('No matching coder found: header=%s.', header);
return offset + length; // skip this section
}
const content = buf.slice(offset + 16, offset + length);
try {
sections.push(coder.construct(content));
} catch (e) {
log('Unable to decode section: content=%s, coder=%s.', content, coder);
}
return offset + length; // proceed to next section
}
}
module.exports = {
Section,
SectionCoder,
StringCoder,
JsonCoder,
SectionConnection,
};