-
Notifications
You must be signed in to change notification settings - Fork 0
/
webSocketTransportClient.ts
128 lines (112 loc) · 3.99 KB
/
webSocketTransportClient.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import { log } from "missionlog";
import Sockette from "sockette";
import { EventDispatcher } from "strongly-typed-events";
export interface EventPayload {
msg: string;
[x: string]: unknown;
}
/**
* A generic transport client for transports that use WebSockets to communicate with
* a backend.
*/
export abstract class WebSocketTransportClient {
protected _requestId?: string;
protected host: string;
/**
*
*/
protected constructor(requestId?: string, host?: string) {
this.host = host ?? WebSocketTransportClient.defaultHost;
this.webSocket = this.connectTo(this.host);
this._requestId = requestId;
}
private messageQueue: any[] = [];
private webSocket?: Sockette;
static defaultHost = "ws://localhost:8090/ws"
/**
* Sends an object to the server using the current socket connection.
* @param message The object to serialize and send.
* @param callback A callback to run after sending.
*/
sendObj = (message: object, callback?: () => any | void) => {
this.send(JSON.stringify(message), callback);
}
protected send = (message: string, callback?: any) => {
//sockette means the socket is never not ready
// that doesn't mean it can't error out though
try {
this.webSocket?.send(message)
if (typeof callback !== 'undefined') {
callback();
}
} catch {
this.messageQueue.push(message);
}
};
_onPublishMessage = new EventDispatcher<WebSocketTransportClient, EventPayload>();
/**
* Event that is dispatched whenever a socket event occurs.
*/
get onPublishMessage() {
return this._onPublishMessage.asEvent();
}
/**
* Returns whether the current transport is ready for messages.
*/
public get ready(): boolean {
// return this.webSocket?.readyState === 1;
return true;
}
protected connectTo = (host?: string): Sockette | undefined => {
host ??= WebSocketTransportClient.defaultHost;
try {
const onOpen = () => {
while (this.messageQueue.length > 0) {
webSocket.send(this.messageQueue.pop());
}
};
const onClose = (e): any => {
if (e.code !== 4999 && e.code !== 1000) {
log.error('socket', 'Socket is closed.', e.code, e.reason, e.wasClean);
if (!this._requestId) {
this._onPublishMessage.dispatch(this, { msg: 'socketClosed' });
}
}
};
const onError = function (ev) {
log.error('socket', 'Socket encountered error: Closing socket');
};
const onMessage = (event) => {
const response = JSON.parse(event.data);
this.processServerMessage(response)?.then(() => {
if (this._requestId) {
webSocket.close(1000);
}
});
};
const webSocket = new Sockette(host, {
timeout: 60,
maxAttempts: 10,
onopen: onOpen,
onmessage: onMessage,
onreconnect: (e) => {
log.warn('socket', 'reconnecting socket!');
this._onPublishMessage.dispatch(this, { msg: 'socketReconnect' });
},
onmaximum: (e) => log.error('socket', 'Socket reconnection failed!', e),
onclose: onClose,
onerror: onError
});
return webSocket;
} catch (e: any) {
log.warn('socket', "Failed to connect to WebSocket! Cannot connect to the target endpoint.", e.toString(), e);
}
}
abstract processServerMessage: (arg0: any) => Promise<any>;
/**
* Closes the current socket transport.
*/
close = () => {
this.webSocket?.close(1000);
}
}