-
Notifications
You must be signed in to change notification settings - Fork 33
/
SessionTransport.ts
66 lines (59 loc) · 2.18 KB
/
SessionTransport.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import { GRPCError } from '@viamrobotics/rpc'
import type SessionManager from './SessionManager'
import { grpc } from '@improbable-eng/grpc-web'
export default class SessionTransport implements grpc.Transport {
private readonly opts: grpc.TransportOptions
protected readonly transport: grpc.Transport
protected readonly sessionManager: SessionManager
private mdProm: Promise<void> | undefined
private mdPromResolve: (() => void) | undefined
constructor (opts: grpc.TransportOptions, innerFactory: grpc.TransportFactory, sessionManager: SessionManager) {
const actualOnEnd = opts.onEnd
opts.onEnd = (err?: Error) => {
if (err && err instanceof GRPCError) {
if (err.code === grpc.Code.InvalidArgument && err.grpcMessage === 'SESSION_EXPIRED') {
this.sessionManager.reset()
}
}
actualOnEnd(err)
}
const actualOnHeaders = opts.onHeaders
opts.onHeaders = (headers: grpc.Metadata, status: number) => {
const gStatus = headers.has('grpc-status') ? headers.get('grpc-status') : undefined
if (gStatus && gStatus.length === 1 && gStatus[0] === `${grpc.Code.InvalidArgument}`) {
const gMsg = headers.has('grpc-message') ? headers.get('grpc-message') : undefined
if (gMsg && gMsg.length === 1 && gMsg[0] === 'SESSION_EXPIRED') {
this.sessionManager.reset()
}
}
actualOnHeaders(headers, status)
}
this.opts = opts
this.sessionManager = sessionManager
this.transport = innerFactory(opts)
this.mdProm = new Promise<void>((resolve) => {
this.mdPromResolve = resolve
})
}
public start (metadata: grpc.Metadata) {
this.sessionManager.getSessionMetadata().then((md) => {
md.forEach((key: string, values: string | string[]) => {
metadata.set(key, values)
})
this.transport.start(metadata)
this.mdPromResolve?.()
})
.catch((err) => {
this.opts.onEnd(err)
})
}
public sendMessage (msgBytes: Uint8Array) {
this.mdProm?.then(() => this.transport.sendMessage(msgBytes))
}
public finishSend () {
this.mdProm?.then(() => this.transport.finishSend())
}
public cancel () {
this.transport.cancel()
}
}