Skip to content

Commit

Permalink
Merge 470f93d into 49f1b23
Browse files Browse the repository at this point in the history
  • Loading branch information
joshgummersall committed May 15, 2021
2 parents 49f1b23 + 470f93d commit 6981113
Show file tree
Hide file tree
Showing 13 changed files with 317 additions and 207 deletions.
1 change: 1 addition & 0 deletions libraries/botbuilder-stdlib/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ export * as assertExt from './assertExt';
export * from './types';
export { delay } from './delay';
export { maybeCast } from './maybeCast';
export { retry } from './retry';
41 changes: 41 additions & 0 deletions libraries/botbuilder-stdlib/src/retry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

/**
* Retry a given promise with gradually increasing delay.
*
* @param promise a function that returns a promise to retry
* @param maxRetries the maximum number of times to retry
* @param initialDelay the initial value to delay before retrying (in milliseconds)
* @returns a promise resolving to the result of the promise from the promise generating function, or undefined
*/
export async function retry<T>(
promise: (n: number) => Promise<T>,
maxRetries: number,
initialDelay = 500
): Promise<T | undefined> {
let delay = initialDelay,
n = 1,
maybeError: Error | undefined;

// Take care of negative or zero
maxRetries = Math.max(maxRetries, 1);

while (n <= maxRetries) {
try {
// Note: return await intentional so we can catch errors
return await promise(n);
} catch (err) {
maybeError = err;

await new Promise((resolve) => setTimeout(resolve, delay));

delay *= n;
n++;
}
}

if (maybeError) {
throw maybeError;
}
}
41 changes: 41 additions & 0 deletions libraries/botbuilder-stdlib/tests/retry.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

const assert = require('assert');
const sinon = require('sinon');
const { retry } = require('../');

describe('retry', function () {
it('succeeds on first try', async function () {
const fake = sinon.fake((n) => Promise.resolve(n));
assert.strictEqual(await retry(fake, 3, 0), 1);
assert.strictEqual(fake.callCount, 1);
});

it('handles zero retries', async function () {
const fake = sinon.fake((n) => Promise.resolve(n));
assert.strictEqual(await retry(fake, 0, 0), 1);
assert.strictEqual(fake.callCount, 1);
});

it('handles negative retries', async function () {
const fake = sinon.fake((n) => Promise.resolve(n));
assert.strictEqual(await retry(fake, -10, 0), 1);
assert.strictEqual(fake.callCount, 1);
});

it('succeeds eventually', async function () {
const fake = sinon.fake((n) => (n < 3 ? Promise.reject() : Promise.resolve(10)));
assert.strictEqual(await retry(fake, 3, 0), 10);
assert.strictEqual(fake.callCount, 3);
});

it('yields error if never succeeds', async function () {
const fake = sinon.fake(() => Promise.reject(new Error('oh no')));
await assert.rejects(retry(fake, 3, 0), {
name: 'Error',
message: 'oh no',
});
assert.strictEqual(fake.callCount, 3);
});
});
18 changes: 12 additions & 6 deletions libraries/botbuilder/src/botFrameworkAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ import {

import { BotFrameworkHttpAdapter } from './botFrameworkHttpAdapter';
import { BotLogic, ConnectorClientBuilder, Emitter, Request, Response, WebRequest, WebResponse } from './interfaces';
import { delay } from 'botbuilder-stdlib';
import { delay, retry } from 'botbuilder-stdlib';
import { userAgentPolicy } from '@azure/ms-rest-js';
import { validateAndFixActivity } from './activityValidator';

Expand Down Expand Up @@ -1835,12 +1835,17 @@ export class BotFrameworkAdapter

/**
* Connects the handler to a Named Pipe server and begins listening for incoming requests.
* @param pipeName The name of the named pipe to use when creating the server.
*
* @param logic The logic that will handle incoming requests.
* @param pipeName The name of the named pipe to use when creating the server.
* @param retryCount Number of times to attempt to bind incoming and outgoing pipe
* @param onListen Optional callback that fires once when server is listening on both incoming and outgoing pipe
*/
public async useNamedPipe(
logic: (context: TurnContext) => Promise<any>,
pipeName: string = defaultPipeName
pipeName = defaultPipeName,
retryCount = 7,
onListen?: () => void
): Promise<void> {
if (!logic) {
throw new Error('Bot logic needs to be provided to `useNamedPipe`');
Expand All @@ -1862,7 +1867,8 @@ export class BotFrameworkAdapter
}

this.logic = logic;
await this.startNamedPipeServer(pipeName);

await retry(() => this.startNamedPipeServer(pipeName, onListen), retryCount);
}

/**
Expand Down Expand Up @@ -1900,12 +1906,12 @@ export class BotFrameworkAdapter
await this.startWebSocket(nodeWebSocket);
}

private async startNamedPipeServer(pipeName: string): Promise<void> {
private async startNamedPipeServer(pipeName: string, onListen?: () => void): Promise<void> {
this.namedPipeName = pipeName;
this.streamingServer = new NamedPipeServer(pipeName, this);

try {
await this.streamingServer.start();
await this.streamingServer.start(onListen);
} finally {
this.namedPipeName = undefined;
}
Expand Down
9 changes: 7 additions & 2 deletions libraries/botframework-streaming/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,13 @@
"clean": "rimraf _ts3.4 es5 lib",
"lint": "eslint . --ext .js,.ts",
"postbuild": "downlevel-dts lib _ts3.4/lib --checksum",
"test": "yarn build && nyc mocha tests/",
"test:compat": "api-extractor run --verbose"
"test": "npm-run-all build test:mocha",
"test:compat": "api-extractor run --verbose",
"test:mocha": "nyc mocha tests"
},
"mocha": {
"checkLeaks": true,
"exit": true
},
"files": [
"_ts3.4",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { StreamingRequest } from '../streamingRequest';
* Example possible implementations include WebSocket transport server or NamedPipe transport server.
*/
export interface IStreamingTransportServer {
start(): Promise<string>;
start(onListen?: () => void): Promise<string>;
disconnect(): void;
send(request: StreamingRequest): Promise<IReceiveResponse>;
isConnected?: boolean;
Expand Down
158 changes: 74 additions & 84 deletions libraries/botframework-streaming/src/namedPipe/namedPipeServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License.
*/
import { INodeServer, IStreamingTransportServer, IReceiveResponse } from '../interfaces';
import { NamedPipeTransport } from './namedPipeTransport';
import { PayloadReceiver, PayloadSender } from '../payloadTransport';
import { ProtocolAdapter } from '../protocolAdapter';
import { RequestHandler } from '../requestHandler';
import { StreamingRequest } from '../streamingRequest';
import { RequestManager } from '../payloads';
import { PayloadReceiver, PayloadSender } from '../payloadTransport';
import { NamedPipeTransport } from './namedPipeTransport';
import { INodeServer, INodeSocket, IStreamingTransportServer, IReceiveResponse } from '../interfaces';
import { StreamingRequest } from '../streamingRequest';
import { createNodeServer } from '../utilities/createNodeServer';

/**
Expand All @@ -20,86 +20,107 @@ import { createNodeServer } from '../utilities/createNodeServer';
export class NamedPipeServer implements IStreamingTransportServer {
private _outgoingServer: INodeServer;
private _incomingServer: INodeServer;
private readonly _baseName: string;
private readonly _requestHandler: RequestHandler;
private readonly _sender: PayloadSender;
private readonly _receiver: PayloadReceiver;
private readonly _requestManager: RequestManager;

private readonly _sender = new PayloadSender();
private readonly _receiver = new PayloadReceiver();
private readonly _protocolAdapter: ProtocolAdapter;
private readonly _autoReconnect: boolean;
private _isDisconnecting: boolean;

/**
* Creates a new instance of the [NamedPipeServer](xref:botframework-streaming.NamedPipeServer) class.
*
* @param baseName The named pipe to connect to.
* @param requestHandler Optional [RequestHandler](xref:botframework-streaming.RequestHandler) to process incoming messages received by this client.
* @param autoReconnect Optional setting to determine if the client should attempt to reconnect automatically on disconnection events. Defaults to true.
* @param autoReconnect Deprecated: Automatic reconnection is the default behavior.
*/
public constructor(baseName: string, requestHandler?: RequestHandler, autoReconnect = true) {
public constructor(private readonly baseName: string, requestHandler?: RequestHandler, autoReconnect?: boolean) {
if (!baseName) {
throw new TypeError('NamedPipeServer: Missing baseName parameter');
}

this._baseName = baseName;
this._requestHandler = requestHandler;
this._autoReconnect = autoReconnect;
this._requestManager = new RequestManager();
if (autoReconnect != null) {
console.warn('NamedPipeServer: The autoReconnect parameter is deprecated');
}

this._sender = new PayloadSender();
this._receiver = new PayloadReceiver();
this._protocolAdapter = new ProtocolAdapter(
this._requestHandler,
this._requestManager,
this._sender,
this._receiver
);
this._sender.disconnected = this.onConnectionDisconnected.bind(this);
this._receiver.disconnected = this.onConnectionDisconnected.bind(this);
this._protocolAdapter = new ProtocolAdapter(requestHandler, new RequestManager(), this._sender, this._receiver);
}

/**
* Returns true if currently connected.
* Get connected status
*
* @returns true if currently connected.
*/
public get isConnected(): boolean {
return !!(this._receiver.isConnected && this._sender.isConnected);
return this._receiver.isConnected && this._sender.isConnected;
}

/**
* Used to establish the connection used by this server and begin listening for incoming messages.
*
* @param onListen Optional callback that fires once when server is listening on both incoming and outgoing pipe
* @returns A promised string that will not resolve as long as the server is running.
*/
public async start(): Promise<string> {
if (this._receiver.isConnected || this._sender.isConnected || this._incomingServer || this._outgoingServer) {
this.disconnect();
}
public async start(onListen?: () => void): Promise<string> {
const { PipePath, ServerIncomingPath, ServerOutgoingPath } = NamedPipeTransport;

// The first promise resolves as soon as the server is listening. The second resolves when the server
// closes, or an error occurs. Wrapping with an array ensures the initial await only waits for the listening
// promise.
//
// We want to ensure we are listening to the servers in series so that, if two processes start at the same
// time, only one is able to listen on both the incoming and outgoing sockets.
const [incoming] = await new Promise<[Promise<void>]>((resolveListening, rejectListening) => {
const server = createNodeServer((socket) => {
if (this._receiver.isConnected) {
return;
}

const incoming = new Promise((resolve) => {
this._incomingServer = createNodeServer((socket: INodeSocket): void => {
this._receiver.connect(new NamedPipeTransport(socket));
resolve();
}).once('error', rejectListening);

this._incomingServer = server;

const isListening = new Promise<void>((resolveClosed, rejectClosed) => {
// Only register rejection once the server is actually listening
server.once('listening', () => server.once('error', rejectClosed));
server.once('closed', resolveClosed);
});

server.once('listening', () => resolveListening([isListening]));

server.listen(PipePath + this.baseName + ServerIncomingPath);
});

const outgoing = new Promise((resolve) => {
this._outgoingServer = createNodeServer((socket: INodeSocket): void => {
// Now that we absolutely have the incoming socket, bind the outgoing socket as well
const [outgoing] = await new Promise<[Promise<void>]>((resolveListening, rejectListening) => {
const server = createNodeServer((socket) => {
if (this._sender.isConnected) {
return;
}

// Note: manually disconnect sender if client closes socket. This ensures that
// reconnections are allowed
this._sender.connect(new NamedPipeTransport(socket));
resolve();
socket.once('close', () => this._sender.disconnect());
}).once('error', rejectListening);

this._outgoingServer = server;

const isListening = new Promise<void>((resolveClosed, rejectClosed) => {
// Only register rejection once the server is actually listening
server.once('listening', () => server.once('error', rejectClosed));
server.once('closed', resolveClosed);
});
});

// These promises will only resolve when the underlying connection has terminated.
// Anything awaiting on them will be blocked for the duration of the session,
// which is useful when detecting premature terminations, but requires an unawaited
// promise during the process of establishing the connection.
Promise.all([incoming, outgoing]);
server.once('listening', () => resolveListening([isListening]));

const { PipePath, ServerIncomingPath, ServerOutgoingPath } = NamedPipeTransport;
const incomingPipeName = PipePath + this._baseName + ServerIncomingPath;
const outgoingPipeName = PipePath + this._baseName + ServerOutgoingPath;
server.listen(PipePath + this.baseName + ServerOutgoingPath);
});

onListen?.();

this._incomingServer.listen(incomingPipeName);
this._outgoingServer.listen(outgoingPipeName);
await Promise.all([incoming, outgoing]);

return 'connected';
}
Expand All @@ -108,18 +129,13 @@ export class NamedPipeServer implements IStreamingTransportServer {
* Allows for manually disconnecting the server.
*/
public disconnect(): void {
this._sender.disconnect();
this._receiver.disconnect();
this._incomingServer?.close();
this._incomingServer = null;

if (this._incomingServer) {
this._incomingServer.close();
this._incomingServer = null;
}

if (this._outgoingServer) {
this._outgoingServer.close();
this._outgoingServer = null;
}
this._sender.disconnect();
this._outgoingServer?.close();
this._outgoingServer = null;
}

/**
Expand All @@ -131,30 +147,4 @@ export class NamedPipeServer implements IStreamingTransportServer {
public async send(request: StreamingRequest): Promise<IReceiveResponse> {
return this._protocolAdapter.sendRequest(request);
}

/**
* @private
*/
private onConnectionDisconnected(): void {
if (!this._isDisconnecting) {
this._isDisconnecting = true;
try {
if (this._sender.isConnected) {
this._sender.disconnect();
}

if (this._receiver.isConnected) {
this._receiver.disconnect();
}

if (this._autoReconnect) {
this.start().catch((err): void => {
throw new Error(`Unable to reconnect: ${err.message}`);
});
}
} finally {
this._isDisconnecting = false;
}
}
}
}

0 comments on commit 6981113

Please sign in to comment.