diff --git a/libraries/botbuilder-stdlib/src/index.ts b/libraries/botbuilder-stdlib/src/index.ts index d18983fa22..e595b7207a 100644 --- a/libraries/botbuilder-stdlib/src/index.ts +++ b/libraries/botbuilder-stdlib/src/index.ts @@ -5,3 +5,4 @@ export * as assertExt from './assertExt'; export * from './types'; export { delay } from './delay'; export { maybeCast } from './maybeCast'; +export { retry } from './retry'; diff --git a/libraries/botbuilder-stdlib/src/retry.ts b/libraries/botbuilder-stdlib/src/retry.ts new file mode 100644 index 0000000000..741e19e706 --- /dev/null +++ b/libraries/botbuilder-stdlib/src/retry.ts @@ -0,0 +1,41 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +/** + * Retry a given promise with gradually increasing delay. + * + * @param promise a function that returns a promise to retry + * @param maxRetries the maximum number of times to retry + * @param initialDelay the initial value to delay before retrying (in milliseconds) + * @returns a promise resolving to the result of the promise from the promise generating function, or undefined + */ +export async function retry( + promise: (n: number) => Promise, + maxRetries: number, + initialDelay = 500 +): Promise { + let delay = initialDelay, + n = 1, + maybeError: Error | undefined; + + // Take care of negative or zero + maxRetries = Math.max(maxRetries, 1); + + while (n <= maxRetries) { + try { + // Note: return await intentional so we can catch errors + return await promise(n); + } catch (err) { + maybeError = err; + + await new Promise((resolve) => setTimeout(resolve, delay)); + + delay *= n; + n++; + } + } + + if (maybeError) { + throw maybeError; + } +} diff --git a/libraries/botbuilder-stdlib/tests/retry.test.js b/libraries/botbuilder-stdlib/tests/retry.test.js new file mode 100644 index 0000000000..2629e41224 --- /dev/null +++ b/libraries/botbuilder-stdlib/tests/retry.test.js @@ -0,0 +1,41 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +const assert = require('assert'); +const sinon = require('sinon'); +const { retry } = require('../'); + +describe('retry', function () { + it('succeeds on first try', async function () { + const fake = sinon.fake((n) => Promise.resolve(n)); + assert.strictEqual(await retry(fake, 3, 0), 1); + assert.strictEqual(fake.callCount, 1); + }); + + it('handles zero retries', async function () { + const fake = sinon.fake((n) => Promise.resolve(n)); + assert.strictEqual(await retry(fake, 0, 0), 1); + assert.strictEqual(fake.callCount, 1); + }); + + it('handles negative retries', async function () { + const fake = sinon.fake((n) => Promise.resolve(n)); + assert.strictEqual(await retry(fake, -10, 0), 1); + assert.strictEqual(fake.callCount, 1); + }); + + it('succeeds eventually', async function () { + const fake = sinon.fake((n) => (n < 3 ? Promise.reject() : Promise.resolve(10))); + assert.strictEqual(await retry(fake, 3, 0), 10); + assert.strictEqual(fake.callCount, 3); + }); + + it('yields error if never succeeds', async function () { + const fake = sinon.fake(() => Promise.reject(new Error('oh no'))); + await assert.rejects(retry(fake, 3, 0), { + name: 'Error', + message: 'oh no', + }); + assert.strictEqual(fake.callCount, 3); + }); +}); diff --git a/libraries/botbuilder/src/botFrameworkAdapter.ts b/libraries/botbuilder/src/botFrameworkAdapter.ts index 585391126d..fa3b5e4493 100644 --- a/libraries/botbuilder/src/botFrameworkAdapter.ts +++ b/libraries/botbuilder/src/botFrameworkAdapter.ts @@ -83,7 +83,7 @@ import { import { BotFrameworkHttpAdapter } from './botFrameworkHttpAdapter'; import { BotLogic, ConnectorClientBuilder, Emitter, Request, Response, WebRequest, WebResponse } from './interfaces'; -import { delay } from 'botbuilder-stdlib'; +import { delay, retry } from 'botbuilder-stdlib'; import { userAgentPolicy } from '@azure/ms-rest-js'; import { validateAndFixActivity } from './activityValidator'; @@ -1835,12 +1835,17 @@ export class BotFrameworkAdapter /** * Connects the handler to a Named Pipe server and begins listening for incoming requests. - * @param pipeName The name of the named pipe to use when creating the server. + * * @param logic The logic that will handle incoming requests. + * @param pipeName The name of the named pipe to use when creating the server. + * @param retryCount Number of times to attempt to bind incoming and outgoing pipe + * @param onListen Optional callback that fires once when server is listening on both incoming and outgoing pipe */ public async useNamedPipe( logic: (context: TurnContext) => Promise, - pipeName: string = defaultPipeName + pipeName = defaultPipeName, + retryCount = 7, + onListen?: () => void ): Promise { if (!logic) { throw new Error('Bot logic needs to be provided to `useNamedPipe`'); @@ -1862,7 +1867,8 @@ export class BotFrameworkAdapter } this.logic = logic; - await this.startNamedPipeServer(pipeName); + + await retry(() => this.startNamedPipeServer(pipeName, onListen), retryCount); } /** @@ -1900,12 +1906,12 @@ export class BotFrameworkAdapter await this.startWebSocket(nodeWebSocket); } - private async startNamedPipeServer(pipeName: string): Promise { + private async startNamedPipeServer(pipeName: string, onListen?: () => void): Promise { this.namedPipeName = pipeName; this.streamingServer = new NamedPipeServer(pipeName, this); try { - await this.streamingServer.start(); + await this.streamingServer.start(onListen); } finally { this.namedPipeName = undefined; } diff --git a/libraries/botframework-streaming/package.json b/libraries/botframework-streaming/package.json index b63f59ab9c..d8dea588ee 100644 --- a/libraries/botframework-streaming/package.json +++ b/libraries/botframework-streaming/package.json @@ -49,8 +49,13 @@ "clean": "rimraf _ts3.4 es5 lib", "lint": "eslint . --ext .js,.ts", "postbuild": "downlevel-dts lib _ts3.4/lib --checksum", - "test": "yarn build && nyc mocha tests/", - "test:compat": "api-extractor run --verbose" + "test": "npm-run-all build test:mocha", + "test:compat": "api-extractor run --verbose", + "test:mocha": "nyc mocha tests" + }, + "mocha": { + "checkLeaks": true, + "exit": true }, "files": [ "_ts3.4", diff --git a/libraries/botframework-streaming/src/interfaces/IStreamingTransportServer.ts b/libraries/botframework-streaming/src/interfaces/IStreamingTransportServer.ts index 98a03c729f..1fde657e85 100644 --- a/libraries/botframework-streaming/src/interfaces/IStreamingTransportServer.ts +++ b/libraries/botframework-streaming/src/interfaces/IStreamingTransportServer.ts @@ -13,7 +13,7 @@ import { StreamingRequest } from '../streamingRequest'; * Example possible implementations include WebSocket transport server or NamedPipe transport server. */ export interface IStreamingTransportServer { - start(): Promise; + start(onListen?: () => void): Promise; disconnect(): void; send(request: StreamingRequest): Promise; isConnected?: boolean; diff --git a/libraries/botframework-streaming/src/namedPipe/namedPipeServer.ts b/libraries/botframework-streaming/src/namedPipe/namedPipeServer.ts index bf169abe16..be50474f22 100644 --- a/libraries/botframework-streaming/src/namedPipe/namedPipeServer.ts +++ b/libraries/botframework-streaming/src/namedPipe/namedPipeServer.ts @@ -5,13 +5,13 @@ * Copyright (c) Microsoft Corporation. All rights reserved. * Licensed under the MIT License. */ +import { INodeServer, IStreamingTransportServer, IReceiveResponse } from '../interfaces'; +import { NamedPipeTransport } from './namedPipeTransport'; +import { PayloadReceiver, PayloadSender } from '../payloadTransport'; import { ProtocolAdapter } from '../protocolAdapter'; import { RequestHandler } from '../requestHandler'; -import { StreamingRequest } from '../streamingRequest'; import { RequestManager } from '../payloads'; -import { PayloadReceiver, PayloadSender } from '../payloadTransport'; -import { NamedPipeTransport } from './namedPipeTransport'; -import { INodeServer, INodeSocket, IStreamingTransportServer, IReceiveResponse } from '../interfaces'; +import { StreamingRequest } from '../streamingRequest'; import { createNodeServer } from '../utilities/createNodeServer'; /** @@ -20,86 +20,107 @@ import { createNodeServer } from '../utilities/createNodeServer'; export class NamedPipeServer implements IStreamingTransportServer { private _outgoingServer: INodeServer; private _incomingServer: INodeServer; - private readonly _baseName: string; - private readonly _requestHandler: RequestHandler; - private readonly _sender: PayloadSender; - private readonly _receiver: PayloadReceiver; - private readonly _requestManager: RequestManager; + + private readonly _sender = new PayloadSender(); + private readonly _receiver = new PayloadReceiver(); private readonly _protocolAdapter: ProtocolAdapter; - private readonly _autoReconnect: boolean; - private _isDisconnecting: boolean; /** * Creates a new instance of the [NamedPipeServer](xref:botframework-streaming.NamedPipeServer) class. * * @param baseName The named pipe to connect to. * @param requestHandler Optional [RequestHandler](xref:botframework-streaming.RequestHandler) to process incoming messages received by this client. - * @param autoReconnect Optional setting to determine if the client should attempt to reconnect automatically on disconnection events. Defaults to true. + * @param autoReconnect Deprecated: Automatic reconnection is the default behavior. */ - public constructor(baseName: string, requestHandler?: RequestHandler, autoReconnect = true) { + public constructor(private readonly baseName: string, requestHandler?: RequestHandler, autoReconnect?: boolean) { if (!baseName) { throw new TypeError('NamedPipeServer: Missing baseName parameter'); } - this._baseName = baseName; - this._requestHandler = requestHandler; - this._autoReconnect = autoReconnect; - this._requestManager = new RequestManager(); + if (autoReconnect != null) { + console.warn('NamedPipeServer: The autoReconnect parameter is deprecated'); + } + this._sender = new PayloadSender(); this._receiver = new PayloadReceiver(); - this._protocolAdapter = new ProtocolAdapter( - this._requestHandler, - this._requestManager, - this._sender, - this._receiver - ); - this._sender.disconnected = this.onConnectionDisconnected.bind(this); - this._receiver.disconnected = this.onConnectionDisconnected.bind(this); + this._protocolAdapter = new ProtocolAdapter(requestHandler, new RequestManager(), this._sender, this._receiver); } /** - * Returns true if currently connected. + * Get connected status + * + * @returns true if currently connected. */ public get isConnected(): boolean { - return !!(this._receiver.isConnected && this._sender.isConnected); + return this._receiver.isConnected && this._sender.isConnected; } /** * Used to establish the connection used by this server and begin listening for incoming messages. * + * @param onListen Optional callback that fires once when server is listening on both incoming and outgoing pipe * @returns A promised string that will not resolve as long as the server is running. */ - public async start(): Promise { - if (this._receiver.isConnected || this._sender.isConnected || this._incomingServer || this._outgoingServer) { - this.disconnect(); - } + public async start(onListen?: () => void): Promise { + const { PipePath, ServerIncomingPath, ServerOutgoingPath } = NamedPipeTransport; + + // The first promise resolves as soon as the server is listening. The second resolves when the server + // closes, or an error occurs. Wrapping with an array ensures the initial await only waits for the listening + // promise. + // + // We want to ensure we are listening to the servers in series so that, if two processes start at the same + // time, only one is able to listen on both the incoming and outgoing sockets. + const [incoming] = await new Promise<[Promise]>((resolveListening, rejectListening) => { + const server = createNodeServer((socket) => { + if (this._receiver.isConnected) { + return; + } - const incoming = new Promise((resolve) => { - this._incomingServer = createNodeServer((socket: INodeSocket): void => { this._receiver.connect(new NamedPipeTransport(socket)); - resolve(); + }).once('error', rejectListening); + + this._incomingServer = server; + + const isListening = new Promise((resolveClosed, rejectClosed) => { + // Only register rejection once the server is actually listening + server.once('listening', () => server.once('error', rejectClosed)); + server.once('closed', resolveClosed); }); + + server.once('listening', () => resolveListening([isListening])); + + server.listen(PipePath + this.baseName + ServerIncomingPath); }); - const outgoing = new Promise((resolve) => { - this._outgoingServer = createNodeServer((socket: INodeSocket): void => { + // Now that we absolutely have the incoming socket, bind the outgoing socket as well + const [outgoing] = await new Promise<[Promise]>((resolveListening, rejectListening) => { + const server = createNodeServer((socket) => { + if (this._sender.isConnected) { + return; + } + + // Note: manually disconnect sender if client closes socket. This ensures that + // reconnections are allowed this._sender.connect(new NamedPipeTransport(socket)); - resolve(); + socket.once('close', () => this._sender.disconnect()); + }).once('error', rejectListening); + + this._outgoingServer = server; + + const isListening = new Promise((resolveClosed, rejectClosed) => { + // Only register rejection once the server is actually listening + server.once('listening', () => server.once('error', rejectClosed)); + server.once('closed', resolveClosed); }); - }); - // These promises will only resolve when the underlying connection has terminated. - // Anything awaiting on them will be blocked for the duration of the session, - // which is useful when detecting premature terminations, but requires an unawaited - // promise during the process of establishing the connection. - Promise.all([incoming, outgoing]); + server.once('listening', () => resolveListening([isListening])); - const { PipePath, ServerIncomingPath, ServerOutgoingPath } = NamedPipeTransport; - const incomingPipeName = PipePath + this._baseName + ServerIncomingPath; - const outgoingPipeName = PipePath + this._baseName + ServerOutgoingPath; + server.listen(PipePath + this.baseName + ServerOutgoingPath); + }); + + onListen?.(); - this._incomingServer.listen(incomingPipeName); - this._outgoingServer.listen(outgoingPipeName); + await Promise.all([incoming, outgoing]); return 'connected'; } @@ -108,18 +129,13 @@ export class NamedPipeServer implements IStreamingTransportServer { * Allows for manually disconnecting the server. */ public disconnect(): void { - this._sender.disconnect(); this._receiver.disconnect(); + this._incomingServer?.close(); + this._incomingServer = null; - if (this._incomingServer) { - this._incomingServer.close(); - this._incomingServer = null; - } - - if (this._outgoingServer) { - this._outgoingServer.close(); - this._outgoingServer = null; - } + this._sender.disconnect(); + this._outgoingServer?.close(); + this._outgoingServer = null; } /** @@ -131,30 +147,4 @@ export class NamedPipeServer implements IStreamingTransportServer { public async send(request: StreamingRequest): Promise { return this._protocolAdapter.sendRequest(request); } - - /** - * @private - */ - private onConnectionDisconnected(): void { - if (!this._isDisconnecting) { - this._isDisconnecting = true; - try { - if (this._sender.isConnected) { - this._sender.disconnect(); - } - - if (this._receiver.isConnected) { - this._receiver.disconnect(); - } - - if (this._autoReconnect) { - this.start().catch((err): void => { - throw new Error(`Unable to reconnect: ${err.message}`); - }); - } - } finally { - this._isDisconnecting = false; - } - } - } } diff --git a/libraries/botframework-streaming/src/payloadTransport/payloadReceiver.ts b/libraries/botframework-streaming/src/payloadTransport/payloadReceiver.ts index 415ed7216a..c56328a13a 100644 --- a/libraries/botframework-streaming/src/payloadTransport/payloadReceiver.ts +++ b/libraries/botframework-streaming/src/payloadTransport/payloadReceiver.ts @@ -19,27 +19,33 @@ import { INodeBuffer } from '../interfaces/INodeBuffer'; * Payload receiver for streaming. */ export class PayloadReceiver { - public isConnected: boolean; - public disconnected: TransportDisconnectedEventHandler = function (sender, events) {}; + public disconnected?: TransportDisconnectedEventHandler; + private _receiver: ITransportReceiver; private _receiveHeaderBuffer: INodeBuffer; private _receivePayloadBuffer: INodeBuffer; + private _getStream: (header: IHeader) => SubscribableStream; private _receiveAction: (header: IHeader, stream: SubscribableStream, length: number) => void; + /** + * Get current connected state + * + * @returns true if connected to a transport sender. + */ + public get isConnected(): boolean { + return this._receiver != null; + } + /** * Connects to a transport receiver * * @param receiver The [ITransportReceiver](xref:botframework-streaming.ITransportReceiver) object to pull incoming data from. + * @returns a promise that resolves when the receiver is complete */ - public connect(receiver: ITransportReceiver): void { - if (this.isConnected) { - throw new Error('Already connected.'); - } else { - this._receiver = receiver; - this.isConnected = true; - this.runReceive(); - } + public connect(receiver: ITransportReceiver): Promise { + this._receiver = receiver; + return this.receivePackets(); } /** @@ -59,42 +65,25 @@ export class PayloadReceiver { /** * Force this receiver to disconnect. * - * @param e Event arguments to include when broadcasting disconnection event. + * @param event Event arguments to include when broadcasting disconnection event. */ - public disconnect(e?: TransportDisconnectedEvent): void { - let didDisconnect; - try { - if (this.isConnected) { - this._receiver.close(); - didDisconnect = true; - this.isConnected = false; - } - } catch (error) { - this.isConnected = false; - this.disconnected(this, new TransportDisconnectedEvent(error.message)); + public disconnect(event = TransportDisconnectedEvent.Empty): void { + if (!this.isConnected) { + return; } - this._receiver = null; - this.isConnected = false; - if (didDisconnect) { - this.disconnected(this, e || TransportDisconnectedEvent.Empty); + try { + this._receiver.close(); + this.disconnected?.(this, event); + } catch (err) { + this.disconnected?.(this, new TransportDisconnectedEvent(err.message)); + } finally { + this._receiver = null; } } - /** - * @private - */ - private runReceive(): void { - this.receivePackets().catch(); - } - - /** - * @private - */ private async receivePackets(): Promise { - let isClosed; - - while (this.isConnected && !isClosed) { + while (this.isConnected) { try { let readSoFar = 0; while (readSoFar < PayloadConstants.MaxHeaderLength) { @@ -137,9 +126,8 @@ export class PayloadReceiver { this._receiveAction(header, contentStream, bytesActuallyRead); } } - } catch (error) { - isClosed = true; - this.disconnect(new TransportDisconnectedEvent(error.message)); + } catch (err) { + this.disconnect(new TransportDisconnectedEvent(err.message)); } } } diff --git a/libraries/botframework-streaming/src/payloadTransport/payloadSender.ts b/libraries/botframework-streaming/src/payloadTransport/payloadSender.ts index db85ba09fd..ccc0b4085a 100644 --- a/libraries/botframework-streaming/src/payloadTransport/payloadSender.ts +++ b/libraries/botframework-streaming/src/payloadTransport/payloadSender.ts @@ -7,28 +7,29 @@ */ import { HeaderSerializer } from '../payloads/headerSerializer'; -import { SubscribableStream } from '../subscribableStream'; +import { IHeader } from '../interfaces/IHeader'; +import { ISendPacket } from '../interfaces/ISendPacket'; +import { ITransportSender } from '../interfaces/ITransportSender'; import { PayloadConstants } from '../payloads/payloadConstants'; +import { SubscribableStream } from '../subscribableStream'; import { TransportDisconnectedEvent } from './transportDisconnectedEvent'; import { TransportDisconnectedEventHandler } from './transportDisconnectedEventHandler'; -import { ITransportSender } from '../interfaces/ITransportSender'; -import { IHeader } from '../interfaces/IHeader'; -import { ISendPacket } from '../interfaces/ISendPacket'; /** * Streaming payload sender. */ export class PayloadSender { public disconnected?: TransportDisconnectedEventHandler; - private sender: ITransportSender; + + private _sender: ITransportSender; /** - * Tests whether the transport sender is connected. + * Get current connected state * * @returns true if connected to a transport sender. */ public get isConnected(): boolean { - return !!this.sender; + return this._sender != null; } /** @@ -37,7 +38,7 @@ export class PayloadSender { * @param sender The transport sender to connect this payload sender to. */ public connect(sender: ITransportSender): void { - this.sender = sender; + this._sender = sender; } /** @@ -45,7 +46,7 @@ export class PayloadSender { * * @param header The header to attach to the outgoing payload. * @param payload The stream of buffered data to send. - * @param sentCalback The function to execute when the send has completed. + * @param sentCallback The function to execute when the send has completed. */ public sendPayload(header: IHeader, payload?: SubscribableStream, sentCallback?: () => Promise): void { const packet: ISendPacket = { header, payload, sentCallback }; @@ -55,22 +56,23 @@ export class PayloadSender { /** * Disconnects this payload sender. * - * @param e The disconnected event arguments to include in the disconnected event broadcast. + * @param event The disconnected event arguments to include in the disconnected event broadcast. */ - public disconnect(e?: TransportDisconnectedEvent): void { - if (this.isConnected) { - this.sender.close(); - this.sender = null; + public disconnect(event = TransportDisconnectedEvent.Empty): void { + if (!this.isConnected) { + return; + } - if (this.disconnected) { - this.disconnected(this, e || TransportDisconnectedEvent.Empty); - } + try { + this._sender.close(); + this.disconnected?.(this, event); + } catch (err) { + this.disconnected?.(this, new TransportDisconnectedEvent(err.message)); + } finally { + this._sender = null; } } - /** - * @private - */ private writePacket(packet: ISendPacket): void { try { if (packet.header.payloadLength > 0 && packet.payload) { @@ -88,9 +90,9 @@ export class PayloadSender { HeaderSerializer.serialize(header, sendHeaderBuffer); - this.sender.send(sendHeaderBuffer); + this._sender.send(sendHeaderBuffer); - this.sender.send(chunk); + this._sender.send(chunk); leftOver -= chunk.length; } @@ -98,8 +100,8 @@ export class PayloadSender { packet.sentCallback(); } } - } catch (e) { - this.disconnect(new TransportDisconnectedEvent(e.message)); + } catch (err) { + this.disconnect(new TransportDisconnectedEvent(err.message)); } } } diff --git a/libraries/botframework-streaming/src/utilities/createNodeServer.ts b/libraries/botframework-streaming/src/utilities/createNodeServer.ts index d609cd6ee1..a27dae8935 100644 --- a/libraries/botframework-streaming/src/utilities/createNodeServer.ts +++ b/libraries/botframework-streaming/src/utilities/createNodeServer.ts @@ -8,39 +8,54 @@ import { INodeServer, INodeSocket } from '../interfaces'; -export const createNodeServer = function (callback?: (socket: INodeSocket) => void): INodeServer { +type ConnectionListener = (socket: INodeSocket) => void; + +/** + * Create a Node 'net' server + * + * @param callback Optional connection listener + * @returns a Node 'net' server instance + */ +export function createNodeServer(callback?: ConnectionListener): INodeServer { if (callback && typeof callback !== 'function') { - throw new TypeError(`Invalid callback; callback parameter must be a function to create Node 'net' Server.`); + throw new TypeError("Invalid callback; callback parameter must be a function to create Node 'net' Server."); } - try { - const server = getServerFactory()(callback); - if (isNetServer(server)) { - return server; - } - } catch (error) { - throw error; + const server = getServerFactory()(callback); + if (!isNetServer(server)) { + throw new Error("Unable to create Node 'net' server"); } -}; -export const getServerFactory = function (): Function { + return server; +} + +/** + * Get a function that creates a Node 'net' server instance + * + * @returns a server factory function + */ +export function getServerFactory(): (callback?: ConnectionListener) => INodeServer { if (typeof require !== undefined) { + // eslint-disable-next-line @typescript-eslint/no-var-requires return require('net').Server; } throw TypeError( `require is undefined. Must be in a Node module to require 'net' dynamically in order to fetch Server factory.` ); -}; +} +// eslint-disable-next-line @typescript-eslint/no-explicit-any function isNetServer(o: any): o is INodeServer { - return hasCloseMethod && hasListenMethod ? true : false; + return hasCloseMethod(o) && hasListenMethod(o); } +// eslint-disable-next-line @typescript-eslint/no-explicit-any function hasCloseMethod(o: any): o is INodeServer { - return o.close && typeof o.close === 'function' ? true : false; + return o.close && typeof o.close === 'function'; } +// eslint-disable-next-line @typescript-eslint/no-explicit-any function hasListenMethod(o: any): o is INodeServer { - return o.listen && typeof o.listen === 'function' ? true : false; + return o.listen && typeof o.listen === 'function'; } diff --git a/libraries/botframework-streaming/tests/NamedPipe.test.js b/libraries/botframework-streaming/tests/NamedPipe.test.js index 96a476c021..921e1a738c 100644 --- a/libraries/botframework-streaming/tests/NamedPipe.test.js +++ b/libraries/botframework-streaming/tests/NamedPipe.test.js @@ -1,9 +1,10 @@ +const assert = require('assert'); +const chai = require('chai'); +const expect = chai.expect; const np = require('../lib'); const npt = require('../lib/namedPipe/namedPipeTransport'); -const { createNodeServer, getServerFactory } = require('../lib/utilities/createNodeServer'); const protocol = require('../lib'); -const chai = require('chai'); -const expect = chai.expect; +const { createNodeServer, getServerFactory } = require('../lib/utilities/createNodeServer'); class FauxSock { constructor(contentString) { @@ -317,30 +318,52 @@ describe('Streaming Extensions NamedPipe Library Tests', function () { .then(done()); }); - it('handles being disconnected', function (done) { + it('handles being disconnected', function () { const server = new np.NamedPipeServer('pipeA', new protocol.RequestHandler(), false); expect(server).to.be.instanceOf(np.NamedPipeServer); server.start(); - try { - server.onConnectionDisconnected(); - } catch (err) { - expect(err).to.equal(`address already in use \\.\pipe\pipeA.incoming`); - } expect(server.disconnect()).to.not.throw; - done(); }); - it('handles being disconnected and tries to reconnect', function (done) { - const server = new np.NamedPipeServer('pipeA', new protocol.RequestHandler(), true); - expect(server).to.be.instanceOf(np.NamedPipeServer); - server.start(); + it('ensures that two servers cannot get into a split brain scenario', async function () { + const servers = [ + new np.NamedPipeServer('pipeA', new protocol.RequestHandler()), + new np.NamedPipeServer('pipeA', new protocol.RequestHandler()), + ]; + try { - server.onConnectionDisconnected(); - } catch (err) { - expect(err).to.equal(`address already in use \\.\pipe\pipeA.incoming`); + await assert.rejects( + Promise.all( + servers.map(async (server) => { + // introduce jitter which should _not_ matter since we acquire sockets sequentially + await new Promise((resolve) => setTimeout(resolve, Math.ceil(Math.random() * 10) + 10)); + + // start up server, taking care to attach server to thrown errors for later inspection + try { + await new Promise((resolve, reject) => server.start(resolve).catch(reject)); + } catch (err) { + err.server = server; + throw err; + } + }) + ), + (err) => { + // Verify we did get an addr in use error + assert(err.message.indexOf('listen EADDRINUSE: address already in use') === 0); + + assert(err.server, 'server attached to error'); + assert( + err.server._incomingServer && !err.server._incomingServer.listening, + 'incoming server attached but not listening' + ); + assert(!err.server._outgoingServer, 'no outgoing server attached'); + + return true; + } + ); + } finally { + servers.forEach((server) => server.disconnect()); } - expect(server.disconnect()).to.not.throw; - done(); }); it("calling createNodeServer() should throw if passing in a callback that's not a function", function () { diff --git a/libraries/botframework-streaming/tests/PayloadSender.test.js b/libraries/botframework-streaming/tests/PayloadSender.test.js index 879c9e9f40..32f151ba86 100644 --- a/libraries/botframework-streaming/tests/PayloadSender.test.js +++ b/libraries/botframework-streaming/tests/PayloadSender.test.js @@ -64,7 +64,7 @@ describe('PayloadTransport', () => { end: true }; - const psSenderSpy = sinon.spy(ps.sender, 'send'); + const psSenderSpy = sinon.spy(ps._sender, 'send'); expect(ps.sendPayload(header, stream, ()=>done())); expect(psSenderSpy.calledTwice).to.be.true; }); @@ -73,7 +73,7 @@ describe('PayloadTransport', () => { const ps = new PayloadSender.PayloadSender(); ps.connect(new FauxSock); expect(ps.isConnected).to.equal(true); - + const stream = new SubscribableStream.SubscribableStream(); let testString; let count = 250; @@ -81,7 +81,7 @@ describe('PayloadTransport', () => { testString += 'This is a LARGE test stream.'; count--; } - + stream.write(testString); // Max PayloadLength is 4096 const header = { @@ -90,14 +90,14 @@ describe('PayloadTransport', () => { id: '100', end: true }; - const psSenderSpy = sinon.spy(ps.sender, 'send'); - + const psSenderSpy = sinon.spy(ps._sender, 'send'); + expect(ps.sendPayload(header, stream, () => { - // This try-catch is required as chai failures need to be caught and bubbled up via done(). + // This try-catch is required as chai failures need to be caught and bubbled up via done(). try { expect(psSenderSpy.callCount).to.equal(4); done(); - } catch (e) { + } catch (e) { done(e); } })); @@ -152,12 +152,12 @@ describe('PayloadTransport', () => { it('begins disconnected.', () => { let pr = new PayloadReceiver.PayloadReceiver(); - expect(pr.isConnected).to.be.undefined; + expect(pr.isConnected).to.be.false; }); it('connects to and reads a header with no payload from the transport.', () => { let pr = new PayloadReceiver.PayloadReceiver(); - expect(pr.isConnected).to.be.undefined; + expect(pr.isConnected).to.be.false; let sock = new FauxSock(['A.000000.68e999ca-a651-40f4-ad8f-3aaf781862b4.1\n']); sock.setReceiver(pr); @@ -168,7 +168,7 @@ describe('PayloadTransport', () => { it('connects to and reads a header with a stream the transport.', (done) => { let pr = new PayloadReceiver.PayloadReceiver(); - expect(pr.isConnected).to.be.undefined; + expect(pr.isConnected).to.be.false; let sock = new FauxSock(['S.000005.68e999ca-a651-40f4-ad8f-3aaf781862b4.1\n', '12345']); sock.setReceiver(pr); diff --git a/libraries/botframework-streaming/tests/ProtocolAdapter.test.js b/libraries/botframework-streaming/tests/ProtocolAdapter.test.js index 5c341e4367..8427d29ec5 100644 --- a/libraries/botframework-streaming/tests/ProtocolAdapter.test.js +++ b/libraries/botframework-streaming/tests/ProtocolAdapter.test.js @@ -147,7 +147,7 @@ describe('Streaming Extensions ProtocolAdapter', function () { done(); }); - it('payloadreceiver responds with an error when told to connect twice', function () { + it('payloadreceiver ignores duplicate connections', function () { const pa = new PayloadReceiver.PayloadReceiver(); const buffer = Buffer.alloc(Number(PayloadConstants.PayloadConstants.MaxHeaderLength)); buffer.write('A.000168.68e999ca-a651-40f4-ad8f-3aaf781862b4.1\n'); @@ -168,10 +168,8 @@ describe('Streaming Extensions ProtocolAdapter', function () { rp.streams.push(s); pa.connect(receiver); - expect(pa.isConnected).to.be.true; - - expect(() => pa.connect(receiver)).to.throw('Already connected.'); + expect(() => pa.connect(receiver)).to.not.throw(); pa.disconnect(); });