/
node-server-websocket.ts
87 lines (77 loc) · 2.8 KB
/
node-server-websocket.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import { ObservableResource, ObservableResources, GetOutput, SubscriptionSocket, GetInput } from "../src";
import { Observable } from "rxjs";
import * as WebSocket from "ws"
export class NodeSubscriptionServerWebsocket<
ServerResources extends ObservableResources = any,
ClientResources extends ObservableResources = any,
StorageType extends { [Name in string]?: any } = any
> extends SubscriptionSocket<ServerResources, ClientResources> {
private messageQeue: Array<string> = []
constructor(
protected websocket: WebSocket,
public storage: StorageType,
subscribeableResources: ToObservableServerResources<ServerResources, NodeSubscriptionServerWebsocket<ServerResources, ClientResources, StorageType>>,
onError: (socket: NodeSubscriptionServerWebsocket<ServerResources, ClientResources, StorageType>, error: any) => void,
onClose: (socket: NodeSubscriptionServerWebsocket<ServerResources, ClientResources, StorageType>) => void
) {
super(
error => onError(this, error),
() => onClose(this),
Object.entries(subscribeableResources).reduce((prev, [name, resource]) => {
prev[name as keyof ServerResources] = ((input: any) => resource(input, this)) as ServerResources[keyof ServerResources]
return prev
}, <ServerResources>{})
)
}
protected send(message: string): void {
this.messageQeue.push(message)
if(this.messageQeue.length === 1) {
this.sendQeue()
}
}
private sendQeue(): void {
if(this.messageQeue.length > 0) {
let message = this.messageQeue.shift()!
this.sendMessage(message)
.catch(error => this.onError(error))
.then(this.sendQeue.bind(this))
}
}
private sendMessage(message: string): Promise<void> {
return new Promise((resolve, reject) => {
if(this.websocket.readyState === WebSocket.OPEN) {
this.websocket.send(message, error => {
if(error != null) {
reject(error)
} else {
resolve()
}
})
}
})
}
}
export type ToObservableServerResources<
ServerResources extends ObservableResources = any,
Socket extends NodeSubscriptionServerWebsocket<any, any, any> = any
> = {
[Name in keyof ServerResources]: ToObservableServerResource<
ServerResources[Name],
Socket
>
}
export type ToObservableServerResource<
ServerResource extends ObservableResource,
Socket extends NodeSubscriptionServerWebsocket<any, any, any> = any
> = ObservableServerResource<
GetInput<ServerResource>,
GetOutput<ServerResource>,
Socket
>
export type ObservableServerResource<
Input = any,
Output = any,
Socket extends NodeSubscriptionServerWebsocket<any, any, any> = any
> =
(input: Input, socket: Socket) =>
Observable<Output>