Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Streaming-Preview] Cleanup, refactor & merge StreamingAdapter into BotFrameworkAdapter #1376

Merged
merged 1 commit into from
Oct 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
274 changes: 272 additions & 2 deletions libraries/botbuilder/src/botFrameworkAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,14 @@
* Licensed under the MIT License.
*/

import { IncomingMessage } from 'http';
import * as os from 'os';

import { Activity, ActivityTypes, BotAdapter, BotCallbackHandlerKey, ChannelAccount, ConversationAccount, ConversationParameters, ConversationReference, ConversationsResult, IUserTokenProvider, ResourceResponse, TokenResponse, TurnContext } from 'botbuilder-core';
import { AuthenticationConstants, ChannelValidation, ConnectorClient, EmulatorApiClient, GovernmentConstants, GovernmentChannelValidation, JwtTokenValidation, MicrosoftAppCredentials, SimpleCredentialProvider, TokenApiClient, TokenStatus, TokenApiModels } from 'botframework-connector';
import * as os from 'os';
import { IReceiveRequest, ISocket, IStreamingTransportServer, NamedPipeServer, NodeWebSocketFactory, NodeWebSocketFactoryBase, RequestHandler, StreamingResponse, WebSocketServer } from 'botframework-streaming';

import { StreamingHttpClient, TokenResolver } from './streaming';

export enum StatusCodes {
OK = 200,
Expand Down Expand Up @@ -128,6 +133,16 @@ export interface BotFrameworkAdapterSettings {
* Optional. The channel service option for this bot to validate connections from Azure or other channel locations.
*/
channelService?: string;

/**
* Optional. The option to determine if this adapter accepts WebSocket connections
*/
enableWebSockets?: boolean;

/**
* Optional. Used to pass in a NodeWebSocketFactoryBase instance. Allows bot to accept WebSocket connections.
*/
webSocketFactory?: NodeWebSocketFactoryBase;
}

/**
Expand Down Expand Up @@ -160,6 +175,13 @@ const USER_AGENT: string = `Microsoft-BotFramework/3.1 BotBuilder/${ pjson.versi
const OAUTH_ENDPOINT = 'https://api.botframework.com';
const US_GOV_OAUTH_ENDPOINT = 'https://api.botframework.azure.us';

// Streaming-specific constants
const defaultPipeName = 'bfv4.pipes';
const VERSION_PATH: string = '/api/version';
const MESSAGES_PATH: string = '/api/messages';
const GET: string = 'GET';
const POST: string = 'POST';

// This key is exported internally so that the TeamsActivityHandler will not overwrite any already set InvokeResponses.
export const INVOKE_RESPONSE_KEY: symbol = Symbol('invokeResponse');

Expand Down Expand Up @@ -197,13 +219,18 @@ export const INVOKE_RESPONSE_KEY: symbol = Symbol('invokeResponse');
* };
* ```
*/
export class BotFrameworkAdapter extends BotAdapter implements IUserTokenProvider {
export class BotFrameworkAdapter extends BotAdapter implements IUserTokenProvider, RequestHandler {
protected readonly credentials: MicrosoftAppCredentials;
protected readonly credentialsProvider: SimpleCredentialProvider;
protected readonly settings: BotFrameworkAdapterSettings;

private isEmulatingOAuthCards: boolean;

// Streaming-specific properties:
private logic: (context: TurnContext) => Promise<void>;
private streamingServer: IStreamingTransportServer;
private webSocketFactory: NodeWebSocketFactoryBase;

/**
* Creates a new instance of the [BotFrameworkAdapter](xref:botbuilder.BotFrameworkAdapter) class.
*
Expand Down Expand Up @@ -241,6 +268,16 @@ export class BotFrameworkAdapter extends BotAdapter implements IUserTokenProvide
this.credentials.oAuthScope = GovernmentConstants.ToChannelFromBotOAuthScope;
}

// If the developer wants to use WebSockets, but didn't provide a WebSocketFactory,
// create a NodeWebSocketFactory.
if (this.settings.enableWebSockets && !this.settings.webSocketFactory) {
this.webSocketFactory = new NodeWebSocketFactory();
}

if (this.settings.webSocketFactory) {
this.webSocketFactory = this.settings.webSocketFactory;
}

// Relocate the tenantId field used by MS Teams to a new location (from channelData to conversation)
// This will only occur on activities from teams that include tenant info in channelData but NOT in conversation,
// thus should be future friendly. However, once the the transition is complete. we can remove this.
Expand Down Expand Up @@ -718,6 +755,10 @@ export class BotFrameworkAdapter extends BotAdapter implements IUserTokenProvide
* ```
*/
public async processActivity(req: WebRequest, res: WebResponse, logic: (context: TurnContext) => Promise<any>): Promise<void> {
if (this.settings.enableWebSockets && req.method === GET && (req.headers.Upgrade || req.headers.upgrade)) {
return this.useWebSocket(req, res, logic);
}

let body: any;
let status: number;
let processError: Error;
Expand Down Expand Up @@ -853,6 +894,9 @@ export class BotFrameworkAdapter extends BotAdapter implements IUserTokenProvide
if (!activity.conversation || !activity.conversation.id) {
throw new Error(`BotFrameworkAdapter.sendActivity(): missing conversation id.`);
}
if (activity && BotFrameworkAdapter.isStreamingServiceUrl(activity.serviceUrl)) {
TokenResolver.checkForOAuthCards(this, context, activity as Activity);
}
const client: ConnectorClient = this.createConnectorClient(activity.serviceUrl);
if (activity.type === 'trace' && activity.channelId !== 'emulator') {
// Just eat activity
Expand Down Expand Up @@ -911,6 +955,23 @@ export class BotFrameworkAdapter extends BotAdapter implements IUserTokenProvide
* Override this in a derived class to create a mock connector client for unit testing.
*/
public createConnectorClient(serviceUrl: string): ConnectorClient {
if (BotFrameworkAdapter.isStreamingServiceUrl(serviceUrl)) {

// Check if we have a streaming server. Otherwise, requesting a connector client
// for a non-existent streaming connection results in an error
if (!this.streamingServer) {
throw new Error(`Cannot create streaming connector client for serviceUrl ${serviceUrl} without a streaming connection. Call 'useWebSocket' or 'useNamedPipe' to start a streaming connection.`)
}

return new ConnectorClient(
this.credentials,
{
baseUri: serviceUrl,
userAgent: USER_AGENT,
httpClient: new StreamingHttpClient(this.streamingServer)
});
}

const client: ConnectorClient = new ConnectorClient(this.credentials, { baseUri: serviceUrl, userAgent: USER_AGENT} );
return client;
}
Expand Down Expand Up @@ -988,6 +1049,215 @@ export class BotFrameworkAdapter extends BotAdapter implements IUserTokenProvide
protected createContext(request: Partial<Activity>): TurnContext {
return new TurnContext(this as any, request);
}

/**
* Checks the validity of the request and attempts to map it the correct virtual endpoint,
* then generates and returns a response if appropriate.
* @param request A ReceiveRequest from the connected channel.
* @returns A response created by the BotAdapter to be sent to the client that originated the request.
*/
public async processRequest(request: IReceiveRequest): Promise<StreamingResponse> {
let response = new StreamingResponse();

if (!request) {
response.statusCode = StatusCodes.BAD_REQUEST;
response.setBody(`No request provided.`);
return response;
}

if (!request.verb || !request.path) {
response.statusCode = StatusCodes.BAD_REQUEST;
response.setBody(`Request missing verb and/or path. Verb: ${ request.verb }. Path: ${ request.path }`);
return response;
}

if (request.verb.toLocaleUpperCase() !== POST && request.verb.toLocaleUpperCase() !== GET) {
response.statusCode = StatusCodes.METHOD_NOT_ALLOWED;
response.setBody(`Invalid verb received. Only GET and POST are accepted. Verb: ${ request.verb }`);
}

if (request.path.toLocaleLowerCase() === VERSION_PATH) {
return await this.handleVersionRequest(request, response);
}

// Convert the StreamingRequest into an activity the Adapter can understand.
let body: Activity;
try {
body = await this.readRequestBodyAsString(request);

} catch (error) {
response.statusCode = StatusCodes.BAD_REQUEST;
response.setBody(`Request body missing or malformed: ${ error }`);
return response;
}

if (request.path.toLocaleLowerCase() !== MESSAGES_PATH) {
response.statusCode = StatusCodes.NOT_FOUND;
response.setBody(`Path ${ request.path.toLocaleLowerCase() } not not found. Expected ${ MESSAGES_PATH }}.`);
return response;
}

if (request.verb.toLocaleUpperCase() !== POST) {
response.statusCode = StatusCodes.METHOD_NOT_ALLOWED;
response.setBody(`Invalid verb received for ${ request.verb.toLocaleLowerCase() }. Only GET and POST are accepted. Verb: ${ request.verb }`);
return response;
}

try {
let context = new TurnContext(this, body);
await this.runMiddleware(context, this.logic);

if (body.type === ActivityTypes.Invoke) {
let invokeResponse: any = context.turnState.get(INVOKE_RESPONSE_KEY);

if (invokeResponse && invokeResponse.value) {
const value: InvokeResponse = invokeResponse.value;
response.statusCode = value.status;
response.setBody(value.body);
} else {
response.statusCode = StatusCodes.NOT_IMPLEMENTED;
}
} else {
response.statusCode = StatusCodes.OK;
}
} catch (error) {
response.statusCode = StatusCodes.INTERNAL_SERVER_ERROR;
response.setBody(error);
return response;
}

return response;
}

/**
* Connects the handler to a Named Pipe server and begins listening for incoming requests.
* @param pipeName The name of the named pipe to use when creating the server.
* @param logic The logic that will handle incoming requests.
*/
public async useNamedPipe(logic: (context: TurnContext) => Promise<any>, pipeName: string = defaultPipeName): Promise<void> {
if (!logic) {
throw new Error('Bot logic needs to be provided to `useNamedPipe`');
}

this.logic = logic;

this.streamingServer = new NamedPipeServer(pipeName, this);
await this.streamingServer.start();
}

/**
* Process the initial request to establish a long lived connection via a streaming server.
* @param req The connection request.
* @param res The response sent on error or connection termination.
* @param logic The logic that will handle incoming requests.
*/
public async useWebSocket(req: WebRequest, res: WebResponse, logic: (context: TurnContext) => Promise<any>): Promise<void> {
if (!logic) {
throw new Error('Streaming logic needs to be provided to `useWebSocket`');
}

if (!this.webSocketFactory || !this.webSocketFactory.createWebSocket) {
throw new Error('BotFrameworkAdapter must have a WebSocketFactory in order to support streaming.');
}

this.logic = logic;

// Restify-specific check.
stevengum marked this conversation as resolved.
Show resolved Hide resolved
if (typeof((res as any).claimUpgrade) !== 'function') {
throw new Error('ClaimUpgrade is required for creating WebSocket connection.');
}

try {
await this.authenticateConnection(req, this.settings.channelService);
} catch (err) {
// Set the correct status code for the socket to send back to the channel.
res.status(StatusCodes.UNAUTHORIZED);
res.send(err.message);
// Re-throw the error so the developer will know what occurred.
throw err;
}

const upgrade = (res as any).claimUpgrade();
const socket = this.webSocketFactory.createWebSocket(req as IncomingMessage, upgrade.socket, upgrade.head);

await this.startWebSocket(socket);
}

private async authenticateConnection(req: WebRequest, channelService?: string): Promise<void> {
if (!this.credentials.appId) {
// auth is disabled
return;
}

const authHeader: string = req.headers.authorization || req.headers.Authorization || '';
const channelIdHeader: string = req.headers.channelid || req.headers.ChannelId || req.headers.ChannelID || '';
// Validate the received Upgrade request from the channel.
const claims = await JwtTokenValidation.validateAuthHeader(authHeader, this.credentialsProvider, channelService, channelIdHeader);

// Add serviceUrl from claim to static cache to trigger token refreshes.
const serviceUrl = claims.getClaimValue(AuthenticationConstants.ServiceUrlClaim);
MicrosoftAppCredentials.trustServiceUrl(serviceUrl);

if (!claims.isAuthenticated) { throw new Error('Unauthorized Access. Request is not authorized'); }
}

/**
* Connects the handler to a WebSocket server and begins listening for incoming requests.
* @param socket The socket to use when creating the server.
*/
private async startWebSocket(socket: ISocket): Promise<void>{
this.streamingServer = new WebSocketServer(socket, this);
await this.streamingServer.start();
}

private async readRequestBodyAsString(request: IReceiveRequest): Promise<Activity> {
const contentStream = request.streams[0];
return await contentStream.readAsJson<Activity>();
}

private async handleVersionRequest(request: IReceiveRequest, response: StreamingResponse): Promise<StreamingResponse> {
if (request.verb.toLocaleUpperCase() === GET) {
response.statusCode = StatusCodes.OK;

if (!this.credentials.appId) {
response.setBody({ UserAgent: USER_AGENT });
return response;
}

let token = '';
try {
token = await this.credentials.getToken();

} catch (err) {
/**
* In reality a missing BotToken will cause the channel to close the connection,
* but we still send the response and allow the channel to make that decision
* instead of proactively disconnecting. This allows the channel to know why
* the connection has been closed and make the choice not to make endless reconnection
* attempts that will end up right back here.
*/
console.error(err.message);
}
response.setBody({ UserAgent: USER_AGENT, BotToken: token });

} else {
response.statusCode = StatusCodes.METHOD_NOT_ALLOWED;
response.setBody(`Invalid verb received for path: ${ request.path }. Only GET is accepted. Verb: ${ request.verb }`);
}

return response;
}

/**
* Determine if the serviceUrl was sent via an Http/Https connection or Streaming
* This can be determined by looking at the ServiceUrl property:
* (1) All channels that send messages via http/https are not streaming
* (2) Channels that send messages via streaming have a ServiceUrl that does not begin with http/https.
* @param serviceUrl the serviceUrl provided in the resquest.
*/
private static isStreamingServiceUrl(serviceUrl: string): boolean {
return serviceUrl && !serviceUrl.toLowerCase().startsWith('http');
}
}

/**
Expand Down
2 changes: 1 addition & 1 deletion libraries/botbuilder/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
export * from './botFrameworkAdapter';
export * from './fileTranscriptStore';
export * from './inspectionMiddleware';
export { StreamingAdapter, StreamingHttpClient, TokenResolver } from './streaming';
export * from './streaming';
export * from './teamsActivityHandler';
export * from './teamsActivityHelpers';
export * from './teamsInfo';
Expand Down
1 change: 0 additions & 1 deletion libraries/botbuilder/src/streaming/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,5 @@
* Licensed under the MIT License.
*/

export * from './streamingAdapter';
export * from './streamingHttpClient';
export * from './tokenResolver';
Loading