Skip to content

Commit 8472a3e

Browse files
committed
feat: Implement Push-Based WebSocket Integration in Data Pipeline (#9454)
1 parent 2a9910d commit 8472a3e

4 files changed

Lines changed: 140 additions & 7 deletions

File tree

src/data/connection/WebSocket.mjs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,11 @@ class Socket extends Base {
6565
* @member {String|null} serverAddress=null
6666
*/
6767
serverAddress = null
68+
/**
69+
* @member {Object} streamCallbacks={}
70+
* @protected
71+
*/
72+
streamCallbacks = {}
6873

6974
/**
7075
* @param {Object} config
@@ -217,9 +222,13 @@ class Socket extends Base {
217222

218223
me.fire('message', {data});
219224

220-
if (data.mId) {
225+
if (data.mId && me.messageCallbacks[data.mId]) {
221226
me.messageCallbacks[data.mId].resolve(data.data);
222227
delete me.messageCallbacks[data.mId]
228+
} else if (data.method && me.streamCallbacks[data.method]) {
229+
me.streamCallbacks[data.method](data.data || data)
230+
} else if (data.stream && me.streamCallbacks[data.stream]) {
231+
me.streamCallbacks[data.stream](data.data || data)
223232
}
224233
}
225234

@@ -246,6 +255,26 @@ class Socket extends Base {
246255
})
247256
}
248257

258+
/**
259+
* @param {Object} data
260+
* @param {Function} callback
261+
*/
262+
registerStream(data, callback) {
263+
let me = this,
264+
streamId = data.method; // Based on remotes-api.json, the key is the method name
265+
266+
me.streamCallbacks[streamId] = callback;
267+
me.sendMessage(data)
268+
}
269+
270+
/**
271+
* @param {Object} data
272+
*/
273+
unregisterStream(data) {
274+
let streamId = data.method;
275+
delete this.streamCallbacks[streamId]
276+
}
277+
249278
/**
250279
* @param {Object} data
251280
*/

src/manager/rpc/Api.mjs

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,30 @@ class Api extends Base {
2525
*/
2626
registerApi(api) {
2727
Object.entries(api.services).forEach(([service, serviceValue]) => {
28-
Object.entries(serviceValue.methods).forEach(([method, methodValue]) => {
28+
Object.entries(serviceValue.methods || {}).forEach(([method, methodValue]) => {
2929
this.register({
30-
id : `${service}.${method}`,
30+
id : `${service}.${method}`,
3131
method,
3232
service,
33-
type: methodValue.type || serviceValue.type || api.type || 'ajax',
34-
url : methodValue.url || serviceValue.url || api.url
33+
type : methodValue.type || serviceValue.type || api.type || 'ajax',
34+
url : methodValue.url || serviceValue.url || api.url,
35+
parser : methodValue.parser,
36+
normalizer: methodValue.normalizer,
37+
pipeline : methodValue.pipeline
38+
})
39+
});
40+
41+
Object.entries(serviceValue.streams || {}).forEach(([stream, streamValue]) => {
42+
this.register({
43+
id : `${service}.${stream}`,
44+
isStream : true,
45+
method : stream,
46+
service,
47+
type : streamValue.type || serviceValue.type || api.type || 'websocket',
48+
url : streamValue.url || serviceValue.url || api.url,
49+
parser : streamValue.parser,
50+
normalizer: streamValue.normalizer,
51+
pipeline : streamValue.pipeline
3552
})
3653
})
3754
})

src/manager/rpc/Message.mjs

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ class Message extends Base {
5555
transactionId = 1
5656

5757
/**
58-
*
5958
* @param {Object} msg
6059
* @returns {Promise<any>}
6160
*/
@@ -65,6 +64,32 @@ class Message extends Base {
6564
return this[`onMessage${Neo.capitalize(api.type)}`](msg, api);
6665
}
6766

67+
/**
68+
* @param {Object} msg
69+
* @param {Function} callback
70+
*/
71+
onMessageStream(msg, callback) {
72+
let api = Neo.manager.rpc.Api.get(`${msg.service}.${msg.method}`);
73+
74+
if (api.type !== 'websocket') {
75+
console.error('onMessageStream is only supported for websocket connections', msg);
76+
return
77+
}
78+
79+
this.onMessageStreamWebsocket(msg, api, callback)
80+
}
81+
82+
/**
83+
* @param {Object} msg
84+
*/
85+
onMessageStreamUnsubscribe(msg) {
86+
let api = Neo.manager.rpc.Api.get(`${msg.service}.${msg.method}`);
87+
88+
if (api?.type === 'websocket') {
89+
this.onMessageStreamUnsubscribeWebsocket(msg, api)
90+
}
91+
}
92+
6893
/**
6994
*
7095
* @param {Object} msg
@@ -101,7 +126,6 @@ class Message extends Base {
101126
}
102127

103128
/**
104-
*
105129
* @param {Object} msg
106130
* @param {Object} api
107131
* @protected
@@ -121,6 +145,39 @@ class Message extends Base {
121145
return await connection.promiseMessage(msg)
122146
}
123147

148+
/**
149+
* @param {Object} msg
150+
* @param {Object} api
151+
* @param {Function} callback
152+
* @protected
153+
* @returns {Promise<any>}
154+
*/
155+
async onMessageStreamWebsocket(msg, api, callback) {
156+
let me = this,
157+
{url} = api,
158+
connection = me.socketConnections[url];
159+
160+
if (!connection) {
161+
let module = await import('../../data/connection/WebSocket.mjs');
162+
163+
me.socketConnections[url] = connection = Neo.create(module.default, {serverAddress: url})
164+
}
165+
166+
connection.registerStream(msg, callback)
167+
}
168+
169+
/**
170+
* @param {Object} msg
171+
* @param {Object} api
172+
* @protected
173+
*/
174+
onMessageStreamUnsubscribeWebsocket(msg, api) {
175+
let connection = this.socketConnections[api.url];
176+
if (connection) {
177+
connection.unregisterStream(msg)
178+
}
179+
}
180+
124181
/**
125182
* @param {String} url
126183
*/

src/worker/App.mjs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,11 @@ class App extends Base {
5656
singleton: true
5757
}
5858

59+
/**
60+
* @member {Object} rpcStreamCallbacks={}
61+
* @protected
62+
*/
63+
rpcStreamCallbacks = {}
5964
/**
6065
* We are storing the params of insertThemeFiles() calls here, in case the method does get triggered
6166
* before the json theme structure got loaded.
@@ -649,6 +654,18 @@ class App extends Base {
649654
Neo.apps[msg.windowId]?.fire('orientationchange', msg.data)
650655
}
651656

657+
/**
658+
* @param {Object} msg
659+
* @param {Object} msg.data
660+
* @param {String} msg.pipeline The origin pipeline ID
661+
*/
662+
onPipelinePush(msg) {
663+
let pipeline = Neo.manager.Instance.get(msg.pipeline);
664+
if (pipeline) {
665+
pipeline.fire('push', msg.data)
666+
}
667+
}
668+
652669
/**
653670
* @param {Object} msg
654671
*/
@@ -722,6 +739,19 @@ class App extends Base {
722739
me.channelPorts[msg.origin] = port
723740
}
724741

742+
/**
743+
* @param {Object} msg
744+
* @param {String} msg.callbackId
745+
* @param {Object} msg.data
746+
*/
747+
onRpcStreamData(msg) {
748+
let callback = this.rpcStreamCallbacks?.[msg.callbackId];
749+
750+
if (typeof callback === 'function') {
751+
callback(msg.data)
752+
}
753+
}
754+
725755
/**
726756
* @param {Object} msg
727757
* @param {Object} msg.data

0 commit comments

Comments
 (0)