Skip to content

Commit

Permalink
improved error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
jdiehl committed Dec 28, 2018
1 parent ae16bf1 commit a35c3eb
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 25 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "smsi",
"version": "1.0.4",
"version": "1.0.5",
"description": "Simple MicroService Interface",
"main": "index.js",
"types": "index.d.ts",
Expand Down
6 changes: 3 additions & 3 deletions src/SMSIClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export class SMSIClient extends EventEmitter {
const ws = new WebSocket(this.url)
this.transport = new SMSITransport(ws)
this.transport.on('error', err => this.emit('error', err))
this.transport.on('close', () => this.restart())
this.transport.on('close', () => this.onClose())
this.transport.on('open', () => resolve())
})
}
Expand Down Expand Up @@ -68,7 +68,7 @@ export class SMSIClient extends EventEmitter {

// private methods

private async restart(): Promise<void> {
// todo
private onClose(): void {
this.transport = undefined
}
}
2 changes: 1 addition & 1 deletion src/SMSIServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ export class SMSIServer extends EventEmitter {
if (handlers[service][event]) return transport.sendError(`Already subscribed to event: ${service}#${event}`, id)

handlers[service][event] = (...params: any[]) => {
transport.sendEvent(service, event, params)
if (transport.connected) transport.sendEvent(service, event, params)
}
s.on(event, handlers[service][event])
transport.sendResponse(id)
Expand Down
63 changes: 43 additions & 20 deletions src/SMSITransport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,20 @@ import * as WebSocket from 'ws'
export class SMSITransport extends EventEmitter {
private handlers: Record<string, Function> = {}
private subscriptions: Record<string, Record<string, Function[]>> = {}
private requestRejections: Function[] = []

constructor(private socket: WebSocket) {
super()
this.socket.on('error', err => this.emit('error', err))
this.socket.on('open', () => this.emit('open'))
this.socket.on('close', () => this.emit('close'))
this.socket.on('close', () => this.onClose())
this.socket.on('message', message => this.onMessage(message))
}

get connected(): boolean {
return this.socket.readyState === 1
}

async sendExec(service: string, method: string, params: any[]): Promise<any[]> {
const type = 'exec'
return this.sendRequest({ type, service, method, params })
Expand Down Expand Up @@ -46,21 +51,21 @@ export class SMSITransport extends EventEmitter {
await this.sendRequest({ type, service, event })
}

sendResponse(id: string, response?: any): void {
sendResponse(id: string, response?: any): Promise<void> {
const type = 'response'
this.send({ id, type, response })
return this.send({ id, type, response })
}

sendEvent(service: string, event: string, params: any[]): void {
sendEvent(service: string, event: string, params: any[]): Promise<void> {
const type = 'event'
this.send({ type, service, event, params })
return this.send({ type, service, event, params })
}

// send error
sendError(error: string | Error, id?: string): void {
sendError(error: string | Error, id?: string): Promise<void> {
if (error instanceof Error) error = error.message
const type = 'error'
this.send({ id, type, error })
return this.send({ id, type, error })
}

async close(): Promise<void> {
Expand All @@ -73,49 +78,58 @@ export class SMSITransport extends EventEmitter {
// private methods

// send a message
private send(data: any): void {
if (this.socket.readyState !== 1) return
private send(data: any): Promise<void> {
if (!this.connected) return Promise.reject('Not connected')
const message = JSON.stringify(data)
this.socket.send(message)
return new Promise((resolve, reject) => {
this.socket.send(message, err => {
if (err) return reject(err)
resolve()
})
})
}

// send a request
private async sendRequest(data: any): Promise<any> {
const id = uuid()
data.id = id
this.send(data)
await this.send(data)
return new Promise<any>((resolve, reject) => {
this.requestRejections.push(reject)
this.handlers[id] = (err: string, response: any) => {
const i = this.requestRejections.indexOf(reject)
this.requestRejections.splice(i, 1)
err ? reject(err) : resolve(response)
delete this.handlers[id]
}
})
}

private findIdForHandler(handler: () => void): string | undefined {
for (const id of Object.keys(this.handlers)) {
if (this.handlers[id] === handler) return id
}
}

private onMessage(data: WebSocket.Data): void {
let message: any

// parse message
try {
message = JSON.parse(data.toString())
} catch (err) {
return this.sendError(`Invalid message: parse error`)
this.sendError(`Invalid message: parse error`)
return
}

// validate message
const error = this.validateMessage(message)
if (error) return this.sendError(`Invalid message: ${error}`)
if (error) {
this.sendError(`Invalid message: ${error}`)
return
}

switch (message.type) {
case 'response': {
const handler = this.handlers[message.id]
if (!handler) return this.sendError(`unknown id ${message.id}`)
if (!handler) {
this.sendError(`unknown id ${message.id}`)
return
}
handler(null, message.response)
return
}
Expand Down Expand Up @@ -178,4 +192,13 @@ export class SMSITransport extends EventEmitter {
}
}

// reject all outstanding requests
private onClose(): void {
this.emit('close')
for (const reject of this.requestRejections) {
reject('Connection closed')
}
this.requestRejections = []
}

}
13 changes: 13 additions & 0 deletions src/integration.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,3 +177,16 @@ test('should remove all listeners when disconnected', async () => {
await wait()
expect(handler).toHaveBeenCalledTimes(0)
})

test('should reject a request on a closing connection', async () => {
await server.stop()
await expect(client.exec('s1', 'm1')).rejects.toBe('Connection closed')
})

test('should reconnect automatically', async () => {
await server.stop()
await server.start()
await wait()
await client.exec('s1', 'm1')
expect(s1.m1).toBeCalledTimes(1)
})

0 comments on commit a35c3eb

Please sign in to comment.