Permalink
sugar-web/bus.js
Newer
100644
223 lines (170 sloc)
5.71 KB
11
function WebSocketClient(environment) {
12
this.queue = [];
13
this.socket = null;
17
env.getEnvironment(function (error, environment) {
19
var socket = new WebSocket("ws://0.0.0.0:" + port);
21
socket.binaryType = "arraybuffer";
22
23
socket.onopen = function () {
24
var params = [environment.activityId,
25
environment.apiSocketKey];
26
27
socket.send(JSON.stringify({
28
"method": "authenticate",
29
"id": "authenticate",
30
"params": params
31
}));
32
33
while (that.queue.length > 0) {
34
socket.send(that.queue.shift());
46
WebSocketClient.prototype.send = function (data) {
47
if (this.socket && this.socket.readyState == WebSocket.OPEN) {
48
this.socket.send(data);
52
};
53
54
WebSocketClient.prototype.close = function () {
55
this.socket.close();
56
};
67
bus.sendMessage("open_stream", [], function (error, result) {
68
that.streamId = result[0];
69
inputStreams[that.streamId] = that;
71
});
72
};
73
74
InputStream.prototype.read = function (count, callback) {
80
81
var buffer = new ArrayBuffer(8);
82
83
var headerView = new Uint8Array(buffer, 0, 1);
84
headerView[0] = this.streamId;
85
86
var bodyView = new Uint32Array(buffer, 4, 1);
87
bodyView[0] = count;
88
100
InputStream.prototype.close = function (callback) {
102
103
function onStreamClosed(error, result) {
104
if (callback) {
105
callback(error);
106
}
108
}
109
110
bus.sendMessage("close_stream", [this.streamId], onStreamClosed);
111
};
112
113
function OutputStream() {
114
this.streamId = null;
115
}
116
117
OutputStream.prototype.open = function (callback) {
119
bus.sendMessage("open_stream", [], function (error, result) {
122
});
123
};
124
125
OutputStream.prototype.write = function (data) {
126
var buffer = new ArrayBuffer(data.byteLength + 1);
127
128
var bufferView = new Uint8Array(buffer);
129
bufferView[0] = this.streamId;
130
bufferView.set(new Uint8Array(data), 1);
131
135
OutputStream.prototype.close = function (callback) {
136
bus.sendMessage("close_stream", [this.streamId], callback);
151
"params": params,
152
"jsonrpc": "2.0"
153
};
154
155
if (callback) {
156
callbacks[lastId] = callback;
157
}
158
164
bus.onNotification = function (method, callback) {
165
notificationCallbacks[method] = callback;
166
};
167
172
bus.listen = function (customClient) {
173
if (customClient) {
174
client = customClient;
175
} else {
176
client = new WebSocketClient();
177
}
178
179
client.onMessage = function (message) {
180
if (typeof message.data != "string") {
181
var dataView = new Uint8Array(message.data);
182
var streamId = dataView[0];
183
184
if (streamId in inputStreams) {
185
var inputStream = inputStreams[streamId];
186
inputStream.gotData(message.data.slice(1));
187
}
188
189
return;
190
}
191
192
var parsed = JSON.parse(message.data);
193
var responseId = parsed.id;
194
195
if (parsed.method) {
196
var notificationCallback = notificationCallbacks[parsed.method];
197
if (notificationCallback !== undefined) {
198
notificationCallback(parsed.params);
199
}
200
return;
201
}
202
204
var callback = callbacks[responseId];
205
206
if (parsed.error === null) {
207
callback(null, parsed.result);
208
} else {
209
callback(new Error(parsed.error), null);
210
}
211
212
delete callbacks[responseId];
213
}
214
};