diff --git a/doc/index.md b/doc/index.md new file mode 100644 index 00000000..4646e3b4 --- /dev/null +++ b/doc/index.md @@ -0,0 +1,5 @@ +# Documentation + +## `@microsoft/servicehub-framework` NPM package documentation + +[Documentation specific to this package is available elsewhere](../src/servicebroker-npm/doc/index.md). diff --git a/src/Microsoft.ServiceHub.Framework/IpcRelayServiceBroker.cs b/src/Microsoft.ServiceHub.Framework/IpcRelayServiceBroker.cs index 4ee184af..a19c1d57 100644 --- a/src/Microsoft.ServiceHub.Framework/IpcRelayServiceBroker.cs +++ b/src/Microsoft.ServiceHub.Framework/IpcRelayServiceBroker.cs @@ -138,7 +138,7 @@ protected virtual void Dispose(bool disposing) private async Task HandleIncomingConnectionAsync(Stream stream, Guid requestId, IDuplexPipe servicePipe) { // Once a connection is made (or fails), it is no longer cancelable. - ImmutableInterlocked.TryRemove(ref this.remoteServiceRequests, requestId, out IAsyncDisposable _); + ImmutableInterlocked.TryRemove(ref this.remoteServiceRequests, requestId, out IAsyncDisposable? _); // Link the two pipes so that all incoming/outgoing calls get forwarded await Task.WhenAll( diff --git a/src/Microsoft.ServiceHub.Framework/MultiplexingRelayServiceBroker.cs b/src/Microsoft.ServiceHub.Framework/MultiplexingRelayServiceBroker.cs index ee68f588..b14cdd20 100644 --- a/src/Microsoft.ServiceHub.Framework/MultiplexingRelayServiceBroker.cs +++ b/src/Microsoft.ServiceHub.Framework/MultiplexingRelayServiceBroker.cs @@ -133,7 +133,7 @@ public async Task RequestServiceChannelAsync(Servic }; MultiplexingStream.Channel outerChannel = this.multiplexingStreamWithClient.CreateChannel(channelOptions); Assumes.True(this.channelsOfferedToClient.TryAdd(requestId, outerChannel)); - outerChannel.Acceptance.ContinueWith(_ => this.channelsOfferedToClient.TryRemove(requestId, out MultiplexingStream.Channel _), TaskScheduler.Default).Forget(); + outerChannel.Acceptance.ContinueWith(_ => this.channelsOfferedToClient.TryRemove(requestId, out MultiplexingStream.Channel? _), TaskScheduler.Default).Forget(); return new RemoteServiceConnectionInfo { diff --git a/src/servicebroker-npm/.npmignore b/src/servicebroker-npm/.npmignore index 01db16c9..946b4437 100644 --- a/src/servicebroker-npm/.npmignore +++ b/src/servicebroker-npm/.npmignore @@ -16,3 +16,4 @@ coverage/ jest.config.ts .eslintrc.json */test/ +doc/ diff --git a/src/servicebroker-npm/doc/index.md b/src/servicebroker-npm/doc/index.md new file mode 100644 index 00000000..dcf52382 --- /dev/null +++ b/src/servicebroker-npm/doc/index.md @@ -0,0 +1,3 @@ +# `@microsoft/servicehub-framework` NPM package documentation + +- [Marshalable objects](marshalable_objects.md) diff --git a/src/servicebroker-npm/doc/marshalable_objects.md b/src/servicebroker-npm/doc/marshalable_objects.md new file mode 100644 index 00000000..2a16ccf9 --- /dev/null +++ b/src/servicebroker-npm/doc/marshalable_objects.md @@ -0,0 +1,61 @@ +# Marshalable objects + +This package implements [the marshalable object protocol](https://github.com/microsoft/vs-streamjsonrpc/blob/main/doc/general_marshaled_objects.md) first implemented by StreamJsonRpc. +The overall behavior and wire protocol are defined there. + +The TypeScript/javascript-specific behaviors are outlined here. + +## Sending a marshaled object + +By default, all values passed as arguments or return values are serialized by value. +To marshal an object for remote function invocation instead of serializing by value, the object should implement the `RpcMarshalable` interface. + +For example: + +```ts +interface ICalculator { + add(a: number, b: number): Promise | number +} + +class Calculator implements ICalculator, RpcMarshalable { + readonly _jsonRpcMarshalableLifetime: MarshaledObjectLifetime = 'explicit' + + add(a: number, b: number) { + return a + b + } +} +``` + +Only top-level arguments and return values are tested for marshalability. +This means that the Calculator object must appear as its own argument or return value in order to be marshaled. +If it appears deep in the object graph of an argument or return value, it will simply be serialized. + +If the receiver disposes the proxy, and the real object defines a `dispose` method, the `dispose` method will be invoked on the real object. +This means that when you pass an object across RPC, you are effectively transferring lifetime ownership to the remote party. + +## Receiving a marshaled object + +When it arrives on the remote side, it will no longer be an instance of the `Calculator` class, but instead will be a proxy. +This proxy will relay all function calls to the original object. + +The proxy must be disposed of when done or it will occupy resources for as long as the JSON-RPC connection lasts on both ends of the connection. + +You can leverage Typescript for type safety for this. For example, the calculator might be accepted as an argument like this: + +```ts +interface IServer { + doSomeMath(calc: ICalculator): Promise +} +``` + +The server may be implemented like this: + +```ts +class Server { + doSomeMath(calc: ICalculator & IDisposable): Promise { + const sum = await calc.add(2, 3) + calc.dispose() + return sum // 5 + } +} +``` diff --git a/src/servicebroker-npm/src/CancellationTokenAdapter.ts b/src/servicebroker-npm/src/CancellationTokenAdapter.ts index 79082cf8..e42e70a0 100644 --- a/src/servicebroker-npm/src/CancellationTokenAdapter.ts +++ b/src/servicebroker-npm/src/CancellationTokenAdapter.ts @@ -1,6 +1,14 @@ import CancellationToken from 'cancellationtoken' import { CancellationToken as vscodeCancellationToken, CancellationTokenSource as vscodeCancellationTokenSource } from 'vscode-jsonrpc' +function isProxy(value: any): boolean { + // It turns out that it's really hard to detect a proxy in general: + // https://stackoverflow.com/questions/36372611/how-to-test-if-an-object-is-a-proxy + // Our strategy here is to at least detect pass-through proxies that claim that everything exists. + // So we make up a property name that should never exist. If it does, then we know it must be a proxy. + return value && value.ImAProxy !== undefined +} + export class CancellationTokenAdapters { /** Tests whether an object satisfies the {@link vscodeCancellationToken} interface. */ static isVSCode(value: any): value is vscodeCancellationToken { @@ -11,6 +19,7 @@ export class CancellationTokenAdapters { static isCancellationToken(value: any): value is CancellationToken { return ( value && + !isProxy(value) && typeof value.throwIfCancelled === 'function' && typeof value.onCancelled === 'function' && value.isCancelled !== undefined && diff --git a/src/servicebroker-npm/src/ServiceJsonRpcDescriptor.ts b/src/servicebroker-npm/src/ServiceJsonRpcDescriptor.ts index e662da3c..33c6685e 100644 --- a/src/servicebroker-npm/src/ServiceJsonRpcDescriptor.ts +++ b/src/servicebroker-npm/src/ServiceJsonRpcDescriptor.ts @@ -1,5 +1,5 @@ import { Channel } from 'nerdbank-streams' -import { MessageConnection, CancellationToken as vscodeCancellationToken, createMessageConnection, Message, ParameterStructures } from 'vscode-jsonrpc' +import { MessageConnection, createMessageConnection, Message, ParameterStructures } from 'vscode-jsonrpc/node' import { Formatters, MessageDelimiters } from './constants' import { ServiceMoniker } from './ServiceMoniker' import { RpcConnection, RpcEventServer, ServiceRpcDescriptor } from './ServiceRpcDescriptor' @@ -7,10 +7,10 @@ import { clone, constructMessageConnection, isChannel } from './utilities' import { IDisposable } from './IDisposable' import { BE32MessageReader, BE32MessageWriter } from './BigEndianInt32LengthHeaderMessageHandler' import * as msgpack from 'msgpack-lite' -import { CancellationTokenAdapters } from './CancellationTokenAdapter' import { MultiplexingStream, MultiplexingStreamOptions } from 'nerdbank-streams' import { EventEmitter } from 'stream' import { NodeStreamMessageReader, NodeStreamMessageWriter } from './NodeStreamMessageWrappers' +import { invokeRpc, registerInstanceMethodsAsRpcTargets } from './jsonRpc/rpcUtilities' /** * Constructs a JSON RPC message connection to a service @@ -194,74 +194,19 @@ const rpcProxy = { default: return function () { const methodName = property.toString() - - if (arguments.length > 0) { - if (vscodeCancellationToken.is(arguments[arguments.length - 1])) { - const ct = arguments[arguments.length - 1] - const args = validateNoUndefinedElements(Array.prototype.slice.call(arguments, 0, arguments.length - 1)) - return target.messageConnection.sendRequest(methodName, ParameterStructures.byPosition, ...args, ct) - } else if (CancellationTokenAdapters.isCancellationToken(arguments[arguments.length - 1])) { - const ct = CancellationTokenAdapters.cancellationTokenToVSCode(arguments[arguments.length - 1]) - const args = validateNoUndefinedElements(Array.prototype.slice.call(arguments, 0, arguments.length - 1)) - return target.messageConnection.sendRequest(methodName, ParameterStructures.byPosition, ...args, ct) - } else if (arguments[arguments.length - 1] === undefined) { - // The last arg is most likely a `CancellationToken?` that was propagated to the RPC call from another method that made it optional. - // We can't tell, but we mustn't claim it's a CancellationToken nor an ordinary argument or else an RPC server - // may fail to match the RPC call to a method because of an extra argument. - // If this truly was a value intended to propagate, they should use `null` as the argument. - const args = validateNoUndefinedElements(Array.prototype.slice.call(arguments, 0, arguments.length - 1)) - return target.messageConnection.sendRequest(methodName, ParameterStructures.byPosition, ...args) - } - } - - const validatedArgs = validateNoUndefinedElements(Array.prototype.slice.call(arguments)) - return target.messageConnection.sendRequest(methodName, ParameterStructures.byPosition, ...validatedArgs) + return invokeRpc(property.toString(), arguments, target.messageConnection) } } }, } -function validateNoUndefinedElements(values: T[]): T[] { - for (let i = 0; i < values.length; i++) { - if (values[i] === undefined) { - throw new Error(`Argument at 0-based index ${i} is set as undefined, which is not a valid JSON value.`) - } - } - - return values -} - export class JsonRpcConnection extends RpcConnection { constructor(public readonly messageConnection: MessageConnection) { super() } public addLocalRpcTarget(rpcTarget: any | RpcEventServer): void { - function wrapCancellationTokenIfPresent(args: any[]): any[] { - if (args.length > 0 && CancellationTokenAdapters.isVSCode(args[args.length - 1])) { - const adaptedCancellationToken = CancellationTokenAdapters.vscodeToCancellationToken(args[args.length - 1]) - args[args.length - 1] = adaptedCancellationToken - } - - return args - } - - function registerRequestAndNotification(connection: MessageConnection, methodName: string, method: any) { - connection.onRequest(methodName, (...args: []) => method.apply(rpcTarget, wrapCancellationTokenIfPresent(args))) - connection.onNotification(methodName, (...args: []) => method.apply(rpcTarget, wrapCancellationTokenIfPresent(args))) - } - - JsonRpcConnection.getInstanceMethodNames(rpcTarget, Object.prototype).forEach(methodName => { - if (methodName !== 'dispose') { - const method = rpcTarget[methodName] - registerRequestAndNotification(this.messageConnection, methodName, method) - - // Add an alias for the method so that we support with and without the Async suffix. - const suffix = 'Async' - const alias = methodName.endsWith(suffix) ? methodName.substring(0, methodName.length - suffix.length) : `${methodName}${suffix}` - registerRequestAndNotification(this.messageConnection, alias, method) - } - }) + registerInstanceMethodsAsRpcTargets(rpcTarget, this.messageConnection) // If the RPC target is an event emitter, hook up a handler that forwards all events across RPC. if (RpcConnection.IsRpcEventServer(rpcTarget)) { @@ -295,31 +240,9 @@ export class JsonRpcConnection extends RpcConnection { } public dispose(): void {} - - private static isMethod(obj: object, name: string): boolean { - const desc = Object.getOwnPropertyDescriptor(obj, name) - return !!desc && typeof desc.value === 'function' - } - - private static getInstanceMethodNames(obj: object, stopPrototype?: any): string[] { - const array: string[] = [] - let proto = Object.getPrototypeOf(obj) - while (proto && proto !== stopPrototype) { - Object.getOwnPropertyNames(proto).forEach(name => { - if (name !== 'constructor') { - if (JsonRpcConnection.isMethod(proto, name)) { - array.push(name) - } - } - }) - proto = Object.getPrototypeOf(proto) - } - - return array - } } -interface IProxyTarget { +export interface IProxyTarget { messageConnection: MessageConnection eventEmitter: EventEmitter } diff --git a/src/servicebroker-npm/src/index.ts b/src/servicebroker-npm/src/index.ts index 9fb673ae..b29e193f 100644 --- a/src/servicebroker-npm/src/index.ts +++ b/src/servicebroker-npm/src/index.ts @@ -8,6 +8,7 @@ export { IJsonRpcClientProxy } from './IJsonRpcClientProxy' export { IpcRelayServiceBroker } from './IpcRelayServiceBroker' export { IRemoteServiceBroker } from './IRemoteServiceBroker' export { IServiceBroker, ServiceBrokerEvents } from './IServiceBroker' +export { RpcMarshalable, MarshaledObjectLifetime } from './jsonRpc/MarshalableObject' export { MultiplexingRelayServiceBroker } from './MultiplexingRelayServiceBroker' export { ProtectedOperation } from './ProtectedOperation' export { RemoteServiceBroker } from './RemoteServiceBroker' @@ -17,4 +18,5 @@ export { ServiceBrokerClientMetadata } from './ServiceBrokerClientMetadata' export { ServiceJsonRpcDescriptor, JsonRpcConnection } from './ServiceJsonRpcDescriptor' export { ServiceMoniker } from './ServiceMoniker' export { ServiceRpcDescriptor, RpcEventServer } from './ServiceRpcDescriptor' +export { IObserver, Observer } from './jsonRpc/Observer' export * from './container' diff --git a/src/servicebroker-npm/src/jsonRpc/MarshalableObject.ts b/src/servicebroker-npm/src/jsonRpc/MarshalableObject.ts new file mode 100644 index 00000000..066d576a --- /dev/null +++ b/src/servicebroker-npm/src/jsonRpc/MarshalableObject.ts @@ -0,0 +1,292 @@ +/* eslint-disable @typescript-eslint/naming-convention */ +import { MessageConnection, ParameterStructures } from 'vscode-jsonrpc' +import { IDisposable } from '../IDisposable' +import { invokeRpc, registerInstanceMethodsAsRpcTargets } from './rpcUtilities' + +export type MarshaledObjectLifetime = 'call' | 'explicit' + +const enum JsonRpcMarshaled { + /** + * A real object is being marshaled. The receiver should generate a new proxy (or retrieve an existing one) that directs all RPC requests back to the sender referencing the value of the `handle` property. + */ + realObject = 1, + + /** + * A marshaled proxy is being sent *back* to its owner. The owner uses the `handle` property to look up the original object and use it as the provided value. + */ + proxyReturned = 0, +} + +/** The method that releases a marshaled object. Use with {@link ReleaseMarshaledObjectArgs}. */ +const releaseMarshaledObjectMethodName = '$/releaseMarshaledObject' + +/** The request type to use when sending {@link releaseMarshaledObjectMethodName} notifications. */ +interface ReleaseMarshaledObjectArgs { + /** The `handle` named parameter (or first positional parameter) is set to the handle of the marshaled object to be released. */ + handle: number + /** The `ownedBySender` named parameter (or second positional parameter) is set to a boolean value indicating whether the party sending this notification is also the party that sent the marshaled object. */ + ownedBySender: boolean +} + +export function registerReleaseMarshaledObjectCallback(connection: MessageConnection) { + connection.onNotification(releaseMarshaledObjectMethodName, (params: ReleaseMarshaledObjectArgs | any[]) => { + const releaseArgs: ReleaseMarshaledObjectArgs = Array.isArray(params) ? { handle: params[0], ownedBySender: params[1] } : params + const connectionExtensions = connection as MessageConnectionWithMarshaldObjectSupport + if (connectionExtensions._marshaledObjectTracker) { + const releaseByHandle = releaseArgs.ownedBySender + ? connectionExtensions._marshaledObjectTracker.theirsByHandle + : connectionExtensions._marshaledObjectTracker.ownByHandle + releaseByHandle[releaseArgs.handle]?.dispose() + } + }) +} + +function constructProxyMethodName(handle: number, method: string, optionalInterface?: number) { + if (optionalInterface === undefined) { + return `$/invokeProxy/${handle}/${method}` + } else { + return `$/invokeProxy/${handle}/${optionalInterface}.${method}` + } +} + +/** + * An interface to be implemented by objects that should be marshaled across RPC instead of serialized. + * An object that also implements {@link IDisposable} will have its {@link IDisposable.dispose} method invoked + * when the receiver disposes of the proxy. + */ +export interface RpcMarshalable { + readonly _jsonRpcMarshalableLifetime: MarshaledObjectLifetime + readonly _jsonRpcOptionalInterfaces?: number[] +} + +export module RpcMarshalable { + export function is(value: any): value is RpcMarshalable { + const candidate = value as RpcMarshalable | undefined + return typeof candidate?._jsonRpcMarshalableLifetime === 'string' + } +} + +/** + * Describes the contract for JSON-RPC marshaled objects as specified by https://github.com/microsoft/vs-streamjsonrpc/blob/main/doc/general_marshaled_objects.md + */ +export interface IJsonRpcMarshaledObject { + /** + * A required property that identifies a marshaled object and its significance. + */ + __jsonrpc_marshaled: JsonRpcMarshaled + + /** + * A number that SHOULD be unique within the scope and duration of the entire JSON-RPC connection. + * A single object is assigned a new handle each time it gets marshaled and each handle's lifetime is distinct. + */ + handle: number + + /** + * When set to `call`, the marshaled object may only be invoked until the containing RPC call completes. This value is only allowed when used within a JSON-RPC argument. No explicit release using `$/releaseMarshaledObject` is required. + * When set to `explicit`, the marshaled object may be invoked until `$/releaseMarshaledObject` releases it. **This is the default behavior when the `lifetime` property is omitted.** + */ + lifetime?: MarshaledObjectLifetime + + /** + * Specify that the marshaled object implements additional known interfaces, where each array element represents one of these interfaces. + * Each element is expected to add to some base functionality that is assumed to be present for this object even if `optionalInterfaces` were omitted. + * These integers MUST be within the range of a signed, 32-bit integer. + * Each element in the array SHOULD be unique. + * A receiver MUST NOT consider order of the integers to be significant, and MUST NOT assume they will be sorted. + */ + optionalInterfaces?: number[] +} + +interface MessageConnectionWithMarshaldObjectSupport extends MessageConnection { + _marshaledObjectTracker?: { + counter: number + ownByHandle: { + [key: number]: { + target: {} + dispose: () => void + } + } + theirsByHandle: { + [key: number]: { + proxy: {} + dispose: () => void + } + } + /** The handles previously assigned, indexed by the objects they were created for. */ + ownTrackedObjectHandles: WeakMap<{}, number> + } +} + +export interface MarshaledObjectProxy extends IDisposable { + _jsonrpcMarshaledHandle: number +} + +export module MarshaledObjectProxy { + export function is(value: any): value is MarshaledObjectProxy { + const valueCandidate = value as MarshaledObjectProxy | undefined + return typeof valueCandidate?._jsonrpcMarshaledHandle === 'number' + } +} + +interface MarshaledObjectProxyTarget extends MarshaledObjectProxy { + messageConnection: MessageConnection +} + +const rpcProxy = { + // The proxy is expected to implement MarshaledObjectProxy & T + get: (target: MarshaledObjectProxyTarget, property: PropertyKey) => { + switch (property.toString()) { + case 'dispose': + return target.dispose + + case '_jsonrpcMarshaledHandle': + return target._jsonrpcMarshaledHandle + + case 'then': + // When the proxy is returned from async methods, + // promises look at the return value to see if it too is a promise. + return undefined + + case 'toJSON': + // Tests sometimes fail after trying to call this function, so make it clear it isn't supported. + return undefined + + default: + return function () { + const rpcMethod = constructProxyMethodName(target._jsonrpcMarshaledHandle, property.toString()) + return invokeRpc(rpcMethod, arguments, target.messageConnection) + } + } + }, +} + +function getJsonConnectionMarshalingTracker(messageConnection: MessageConnection) { + const jsonConnectionWithCounter = messageConnection as MessageConnectionWithMarshaldObjectSupport + jsonConnectionWithCounter._marshaledObjectTracker ??= { counter: 0, ownByHandle: {}, theirsByHandle: {}, ownTrackedObjectHandles: new WeakMap() } + return jsonConnectionWithCounter._marshaledObjectTracker +} + +export module IJsonRpcMarshaledObject { + /** + * Tests whether a given object implements {@link IJsonRpcMarshaledObject}. + * @param value the value to be tested. + * @returns true if the object conforms to the contract. + */ + export function is(value: any): value is IJsonRpcMarshaledObject { + const valueCandidate = value as IJsonRpcMarshaledObject | undefined + return typeof valueCandidate?.__jsonrpc_marshaled === 'number' && typeof valueCandidate.handle === 'number' + } + + /** + * Creates a JSON-RPC serializable object that provides the receiver with a means to invoke methods on the object. + * @param value The object to create a JSON-RPC marshalable wrapper around. + */ + export function wrap(value: RpcMarshalable | MarshaledObjectProxy, jsonConnection: MessageConnection): IJsonRpcMarshaledObject { + // Use the JSON-RPC connection itself to track the unique counter for us. + const connectionMarshalingTracker = getJsonConnectionMarshalingTracker(jsonConnection) + + if (MarshaledObjectProxy.is(value)) { + return { + __jsonrpc_marshaled: JsonRpcMarshaled.proxyReturned, + handle: value._jsonrpcMarshaledHandle, + } + } else { + if (value._jsonRpcMarshalableLifetime === 'call') { + throw new Error('Receiving marshaled objects scoped to the lifetime of a single RPC request is not yet supported.') + } + + const alreadyMarshaled = connectionMarshalingTracker.ownTrackedObjectHandles.has(value) + const handle: number = alreadyMarshaled ? connectionMarshalingTracker.ownTrackedObjectHandles.get(value)! : ++connectionMarshalingTracker.counter + if (!alreadyMarshaled) { + // Associate this object and this message connection tuple with the handle so that if we ever wrap it again, we'll use the same handle. + connectionMarshalingTracker.ownTrackedObjectHandles.set(value, handle) + + // Register for requests on the connection to invoke the local object when the receiving side sends requests. + const registration = registerInstanceMethodsAsRpcTargets(value, jsonConnection, methodName => constructProxyMethodName(handle, methodName)) + + // Arrange to release the object and registrations when the remote side sends the release notification. + connectionMarshalingTracker.ownByHandle[handle] = { + target: value, + dispose: () => { + registration.dispose() + delete connectionMarshalingTracker.ownByHandle[handle] + connectionMarshalingTracker.ownTrackedObjectHandles.delete(value) + if ('dispose' in value && typeof value.dispose === 'function') { + value.dispose() + } + }, + } + } + + return { + __jsonrpc_marshaled: JsonRpcMarshaled.realObject, + handle, + lifetime: value._jsonRpcMarshalableLifetime, + optionalInterfaces: value._jsonRpcOptionalInterfaces, + } + } + } + + export function cancelWrap(value: IJsonRpcMarshaledObject, jsonConnection: MessageConnection) { + const connectionMarshalingTracker = getJsonConnectionMarshalingTracker(jsonConnection) + const tracker = connectionMarshalingTracker.ownByHandle[value.handle] + tracker?.dispose() + } + + /** + * Produces a proxy for a marshaled object received over JSON-RPC. + * @param value The value received over JSON-RPC that is expected to contain data for remotely invoking another object. + * @returns An RPC proxy. This should be disposed of when done to release resources held by the remote party. + */ + export function unwrap(value: IJsonRpcMarshaledObject, jsonConnection: MessageConnection): T { + if (value.lifetime === 'call') { + throw new Error('Receiving marshaled objects scoped to the lifetime of a single RPC request is not yet supported.') + } + + const connectionMarshalingTracker = getJsonConnectionMarshalingTracker(jsonConnection) + switch (value.__jsonrpc_marshaled) { + case JsonRpcMarshaled.realObject: + let proxy = connectionMarshalingTracker.theirsByHandle[value.handle]?.proxy + + if (!proxy) { + // A novel object has been provided to us. + const target: MarshaledObjectProxyTarget = { + messageConnection: jsonConnection, + _jsonrpcMarshaledHandle: value.handle, + dispose: () => { + if (connectionMarshalingTracker.theirsByHandle[value.handle]) { + delete connectionMarshalingTracker.theirsByHandle[value.handle] + + // We need to notify the owner of the remote object. + const releaseArgs: ReleaseMarshaledObjectArgs = { + handle: value.handle, + ownedBySender: false, + } + jsonConnection.sendNotification(releaseMarshaledObjectMethodName, ParameterStructures.byName, releaseArgs) + } + }, + } + proxy = new Proxy(target, rpcProxy) + connectionMarshalingTracker.theirsByHandle[value.handle] = { + proxy, + dispose: () => { + // This is invoked when the remote party that owns the object notifies us that the object is destroyed. + delete connectionMarshalingTracker.theirsByHandle[value.handle] + }, + } + } + + return proxy as unknown as T & IDisposable & MarshaledObjectProxy + case JsonRpcMarshaled.proxyReturned: + // A marshaled object that we provided to the remote party has come back to us. + // Make sure we unwrap it as the original object. + const tracker = connectionMarshalingTracker.ownByHandle[value.handle] + if (!tracker) { + throw new Error(`Unrecognized handle ${value.handle}`) + } + return tracker.target as T + default: + throw new Error(`Unsupported value for __jsonrpc_marshaled: ${value.__jsonrpc_marshaled}.`) + } + } +} diff --git a/src/servicebroker-npm/src/jsonRpc/Observer.ts b/src/servicebroker-npm/src/jsonRpc/Observer.ts new file mode 100644 index 00000000..820fb3ff --- /dev/null +++ b/src/servicebroker-npm/src/jsonRpc/Observer.ts @@ -0,0 +1,61 @@ +import { MarshaledObjectLifetime, RpcMarshalable } from './MarshalableObject' + +/** + * An observer of some value production. + */ +export interface IObserver { + /** + * Notifies the observer of the next object in the sequence. + * @param value The next value in the observable sequence. + */ + onNext(value: T): void + + /** + * Notifies the observer that the end of the sequence has been reached, and that no more values will be produced. + */ + onCompleted(): void + + /** + * Notifies the observer that an error occurred at the value source, and that no more values will be produced. + * @param reason The error that occurred at the value source. + */ + onError(reason: any): void +} + +export interface IObservable { + /** + * Adds an observer to an observable object. + * @param observer The observer to receive values. + * @returns A function to call to cancel the subscription. + */ + subscribe(observer: IObserver): () => void +} + +export class Observer implements IObserver, RpcMarshalable { + readonly _jsonRpcMarshalableLifetime: MarshaledObjectLifetime = 'explicit' + error: any + + get completed() { + return this.error !== undefined + } + + constructor(private readonly next: (value: T) => void, private readonly completion?: (error?: any) => void) {} + + onNext(value: T): void { + this.next(value) + } + + onCompleted(): void { + this.error = null + if (this.completion) { + this.completion(null) + } + } + + onError(reason: any): void { + this.error = reason + if (this.completion) { + this.completion(reason) + } + } +} diff --git a/src/servicebroker-npm/src/jsonRpc/rpcUtilities.ts b/src/servicebroker-npm/src/jsonRpc/rpcUtilities.ts new file mode 100644 index 00000000..e7397070 --- /dev/null +++ b/src/servicebroker-npm/src/jsonRpc/rpcUtilities.ts @@ -0,0 +1,149 @@ +import { MessageConnection, CancellationToken as vscodeCancellationToken, ParameterStructures, Disposable } from 'vscode-jsonrpc' +import { CancellationTokenAdapters } from '../CancellationTokenAdapter' +import { IJsonRpcMarshaledObject, MarshaledObjectProxy, RpcMarshalable } from './MarshalableObject' + +export async function invokeRpc(methodName: string, inputArgs: IArguments, messageConnection: MessageConnection): Promise { + if (inputArgs.length > 0) { + if (vscodeCancellationToken.is(inputArgs[inputArgs.length - 1])) { + const ct = inputArgs[inputArgs.length - 1] + const args = filterOutboundArgs(messageConnection, Array.prototype.slice.call(inputArgs, 0, inputArgs.length - 1)) + return messageConnection.sendRequest(methodName, ParameterStructures.byPosition, ...args, ct) + } else if (CancellationTokenAdapters.isCancellationToken(inputArgs[inputArgs.length - 1])) { + const ct = CancellationTokenAdapters.cancellationTokenToVSCode(inputArgs[inputArgs.length - 1]) + const args = filterOutboundArgs(messageConnection, Array.prototype.slice.call(inputArgs, 0, inputArgs.length - 1)) + return messageConnection.sendRequest(methodName, ParameterStructures.byPosition, ...args, ct) + } else if (inputArgs[inputArgs.length - 1] === undefined) { + // The last arg is most likely a `CancellationToken?` that was propagated to the RPC call from another method that made it optional. + // We can't tell, but we mustn't claim it's a CancellationToken nor an ordinary argument or else an RPC server + // may fail to match the RPC call to a method because of an extra argument. + // If this truly was a value intended to propagate, they should use `null` as the argument. + const args = filterOutboundArgs(messageConnection, Array.prototype.slice.call(inputArgs, 0, inputArgs.length - 1)) + return messageConnection.sendRequest(methodName, ParameterStructures.byPosition, ...args) + } + } + + const validatedArgs = filterOutboundArgs(messageConnection, Array.prototype.slice.call(inputArgs)) + try { + const result = await messageConnection.sendRequest(methodName, ParameterStructures.byPosition, ...validatedArgs) + return filterInboundResult(messageConnection, result) + } catch (reason) { + // If any args were marshaled objects, dispose of them. + for (const arg of validatedArgs) { + if (IJsonRpcMarshaledObject.is(arg)) { + IJsonRpcMarshaledObject.cancelWrap(arg, messageConnection) + } + } + + throw reason + } +} + +function filterOutboundArgs(connection: MessageConnection, args: any[]): any[] { + return validateNoUndefinedElements(args).map(v => filterOutboundMarshalableObject(connection, v)) +} + +async function filterOutboundResult(connection: MessageConnection, value: any | Promise): Promise { + const unwrappedPromiseValue = await value + return filterOutboundMarshalableObject(connection, unwrappedPromiseValue) +} + +function filterOutboundMarshalableObject(connection: MessageConnection, value: any): any | IJsonRpcMarshaledObject { + if (RpcMarshalable.is(value) || MarshaledObjectProxy.is(value)) { + return IJsonRpcMarshaledObject.wrap(value, connection) + } else { + return value + } +} + +function validateNoUndefinedElements(values: T[]): T[] { + for (let i = 0; i < values.length; i++) { + if (values[i] === undefined) { + throw new Error(`Argument at 0-based index ${i} is set as undefined, which is not a valid JSON value.`) + } + } + + return values +} + +function isMethod(obj: object, name: string): boolean { + const desc = Object.getOwnPropertyDescriptor(obj, name) + return !!desc && typeof desc.value === 'function' +} + +function getInstanceMethodNames(obj: object, stopPrototype?: any): string[] { + const array: string[] = [] + let proto = Object.getPrototypeOf(obj) + while (proto && proto !== stopPrototype) { + Object.getOwnPropertyNames(proto).forEach(name => { + if (name !== 'constructor') { + if (isMethod(proto, name)) { + array.push(name) + } + } + }) + proto = Object.getPrototypeOf(proto) + } + + return array +} + +function wrapCancellationTokenIfPresent(args: any[]): any[] { + if (args.length > 0 && CancellationTokenAdapters.isVSCode(args[args.length - 1])) { + const adaptedCancellationToken = CancellationTokenAdapters.vscodeToCancellationToken(args[args.length - 1]) + args[args.length - 1] = adaptedCancellationToken + } + + return args +} + +function filterInboundMarshalableObject(connection: MessageConnection, value: any | IJsonRpcMarshaledObject): any { + if (IJsonRpcMarshaledObject.is(value)) { + return IJsonRpcMarshaledObject.unwrap(value, connection) + } else { + return value + } +} + +function filterInboundValue(connection: MessageConnection, value: any): any { + return filterInboundMarshalableObject(connection, value) +} + +function filterInboundArguments(connection: MessageConnection, args: any[]): any[] { + return wrapCancellationTokenIfPresent(args).map(v => filterInboundValue(connection, v)) +} + +function filterInboundResult(connection: MessageConnection, value: any): any { + return filterInboundValue(connection, value) +} + +export function registerInstanceMethodsAsRpcTargets( + rpcTarget: any, + connection: MessageConnection, + rpcMethodNameTransform?: (functionName: string) => string +): Disposable { + const disposables: Disposable[] = [] + + function registerRequestAndNotification(methodName: string, method: any) { + const rpcMethodName = rpcMethodNameTransform ? rpcMethodNameTransform(methodName) : methodName + disposables.push( + connection.onRequest(rpcMethodName, (...args: []) => + filterOutboundResult(connection, method.apply(rpcTarget, filterInboundArguments(connection, args))) + ) + ) + disposables.push(connection.onNotification(rpcMethodName, (...args: []) => method.apply(rpcTarget, filterInboundArguments(connection, args)))) + } + + getInstanceMethodNames(rpcTarget, Object.prototype).forEach(methodName => { + if (methodName !== 'dispose') { + const method = rpcTarget[methodName] + registerRequestAndNotification(methodName, method) + + // Add an alias for the method so that we support with and without the Async suffix. + const suffix = 'Async' + const alias = methodName.endsWith(suffix) ? methodName.substring(0, methodName.length - suffix.length) : `${methodName}${suffix}` + registerRequestAndNotification(alias, method) + } + }) + + return Disposable.create(() => disposables.forEach(d => d.dispose())) +} diff --git a/src/servicebroker-npm/src/utilities.ts b/src/servicebroker-npm/src/utilities.ts index 2cc3d10c..a18dfb75 100644 --- a/src/servicebroker-npm/src/utilities.ts +++ b/src/servicebroker-npm/src/utilities.ts @@ -1,6 +1,7 @@ import { Channel } from 'nerdbank-streams' import { MessageConnection } from 'vscode-jsonrpc' import assert from 'assert' +import { registerReleaseMarshaledObjectCallback } from './jsonRpc/MarshalableObject' /** * Constructs a message connection to a given pipe @@ -29,6 +30,8 @@ export function constructMessageConnection( rpc.onClose(() => pipe.dispose()) } + registerReleaseMarshaledObjectCallback(rpc) + return rpc } diff --git a/src/servicebroker-npm/test/ObserverTests.ts b/src/servicebroker-npm/test/ObserverTests.ts new file mode 100644 index 00000000..e155f8ef --- /dev/null +++ b/src/servicebroker-npm/test/ObserverTests.ts @@ -0,0 +1,68 @@ +import assert from 'assert' +import { Observer } from '../src' + +describe('Observer', function () { + it('next callback is fired with values', function () { + const values: number[] = [] + const observer = new Observer(v => values.push(v)) + observer.onNext(1) + observer.onNext(3) + assert.deepEqual(values, [1, 3]) + }) + + it('completion callback is fired on success', function () { + let error + const observer = new Observer( + v => {}, + reason => (error = reason) + ) + observer.onCompleted() + assert.strictEqual(error, null) + }) + + it('completion callback is fired on failure', function () { + let error + const observer = new Observer( + v => {}, + reason => (error = reason) + ) + observer.onError('fail') + assert.strictEqual(error, 'fail') + }) + + describe('completed', function () { + it('is false at start', function () { + assert.strictEqual(false, new Observer(v => {}).completed) + }) + + it('is true at successful end', function () { + const observer = new Observer(v => {}) + observer.onCompleted() + assert.strictEqual(true, observer.completed) + }) + + it('is true at failed end', function () { + const observer = new Observer(v => {}) + observer.onError('failed') + assert.strictEqual(true, observer.completed) + }) + }) + + describe('error', function () { + it('before completion', function () { + assert.strictEqual(undefined, new Observer(v => {}).error) + }) + + it('after successful completion', function () { + const observer = new Observer(v => {}) + observer.onCompleted() + assert.strictEqual(null, observer.error) + }) + + it('after failed completion', function () { + const observer = new Observer(v => {}) + observer.onError('fail') + assert.strictEqual('fail', observer.error) + }) + }) +}) diff --git a/src/servicebroker-npm/test/remoteServiceBrokerTests.ts b/src/servicebroker-npm/test/remoteServiceBrokerTests.ts index 5b241049..9c7918cf 100644 --- a/src/servicebroker-npm/test/remoteServiceBrokerTests.ts +++ b/src/servicebroker-npm/test/remoteServiceBrokerTests.ts @@ -3,12 +3,20 @@ import CancellationToken from 'cancellationtoken' import { CancellationToken as vscodeCancellationToken } from 'vscode-jsonrpc' import { Channel, FullDuplexStream, MultiplexingStream } from 'nerdbank-streams' import { Deferred } from 'nerdbank-streams/js/Deferred' -import { Formatters, MessageDelimiters, RemoteServiceConnections } from '../src/constants' -import { FrameworkServices } from '../src/FrameworkServices' -import { RemoteServiceBroker } from '../src/RemoteServiceBroker' -import { ServiceActivationOptions } from '../src/ServiceActivationOptions' -import { ServiceJsonRpcDescriptor } from '../src/ServiceJsonRpcDescriptor' -import { ServiceMoniker } from '../src/ServiceMoniker' +import { + Formatters, + MessageDelimiters, + RemoteServiceConnections, + FrameworkServices, + RemoteServiceBroker, + ServiceActivationOptions, + ServiceJsonRpcDescriptor, + ServiceMoniker, + BrokeredServicesChangedArgs, + IRemoteServiceBroker, + Observer, + IDisposable, +} from '../src' import { AlwaysThrowingRemoteBroker } from './testAssets/alwaysThrowingBroker' import { CallMeBackClient } from './testAssets/callMeBackClient' import { ICalculatorService, IActivationService, ICallMeBackService } from './testAssets/interfaces' @@ -23,8 +31,6 @@ import { calcDescriptorMsgPackBE32, callBackDescriptor, } from './testAssets/testUtilities' -import { BrokeredServicesChangedArgs } from '../src/BrokeredServicesChangedArgs' -import { IRemoteServiceBroker } from '../src/IRemoteServiceBroker' describe('Service Broker tests', function () { let defaultTokenSource: { @@ -268,6 +274,82 @@ describe('Service Broker tests', function () { mx?.dispose() } }) + + describe('IObserver interop with .NET process', () => { + it('with successful end', async function () { + let mx: MultiplexingStream | null = null + try { + mx = await startCP(defaultToken) + const channel = await mx.acceptChannelAsync('', undefined, defaultToken) + const s = new MultiplexingRemoteServiceBroker(channel) + const broker = await RemoteServiceBroker.connectToMultiplexingRemoteServiceBroker(s, mx, defaultToken) + const proxy = await broker.getProxy(calcDescriptorUtf8BE32, undefined, defaultToken) + const values = await new Promise(async (resolve, reject) => { + const values: number[] = [] + const observer = new Observer( + value => values.push(value), + error => { + if (error) { + reject(error) + } else { + resolve(values) + } + } + ) + let disposed = false + const disposableObservable = observer as unknown as IDisposable + disposableObservable.dispose = () => { + disposed = true + } + await proxy?.observeNumbers(observer, 3, false) + assert(disposed) + }) + console.log(values) + assert.deepEqual(values, [1, 2, 3]) + broker.dispose() + await channel.completion + } finally { + mx?.dispose() + } + }) + + it('with failure end', async function () { + let mx: MultiplexingStream | null = null + try { + mx = await startCP(defaultToken) + const channel = await mx.acceptChannelAsync('', undefined, defaultToken) + const s = new MultiplexingRemoteServiceBroker(channel) + const broker = await RemoteServiceBroker.connectToMultiplexingRemoteServiceBroker(s, mx, defaultToken) + const proxy = await broker.getProxy(calcDescriptorUtf8BE32, undefined, defaultToken) + await assert.rejects( + new Promise(async (resolve, reject) => { + const values: number[] = [] + const observer = new Observer( + value => values.push(value), + error => { + if (error) { + reject(error) + } else { + resolve(values) + } + } + ) + let disposed = false + const disposableObservable = observer as unknown as IDisposable + disposableObservable.dispose = () => { + disposed = true + } + await proxy?.observeNumbers(observer, 3, true) + assert(disposed) + }) + ) + broker.dispose() + await channel.completion + } finally { + mx?.dispose() + } + }) + }) }) describe('Tests that do not start an external process', function () { diff --git a/src/servicebroker-npm/test/serviceJsonRpcDescriptorTests.ts b/src/servicebroker-npm/test/serviceJsonRpcDescriptorTests.ts index 703a10b1..fa283006 100644 --- a/src/servicebroker-npm/test/serviceJsonRpcDescriptorTests.ts +++ b/src/servicebroker-npm/test/serviceJsonRpcDescriptorTests.ts @@ -1,15 +1,21 @@ import assert from 'assert' import CancellationToken from 'cancellationtoken' -import { IRemoteServiceBroker } from '../src/IRemoteServiceBroker' import { FullDuplexStream } from 'nerdbank-streams' -import { Formatters, MessageDelimiters, RemoteServiceConnections } from '../src/constants' -import { ServiceBrokerClientMetadata } from '../src/ServiceBrokerClientMetadata' -import { ServiceJsonRpcDescriptor } from '../src/ServiceJsonRpcDescriptor' -import { ServiceMoniker } from '../src/ServiceMoniker' +import { + IDisposable, + IRemoteServiceBroker, + ServiceMoniker, + ServiceJsonRpcDescriptor, + ServiceBrokerClientMetadata, + Formatters, + MessageDelimiters, + RemoteServiceConnections, + MarshaledObjectLifetime, + RpcMarshalable, +} from '../src' import { Calculator } from './testAssets/calculatorService' import { IAppleTreeService, ApplePickedEventArgs, ICalculatorService, ICallMeBackClient, ICallMeBackService, IWaitToBeCanceled } from './testAssets/interfaces' import { TestRemoteServiceBroker } from './testAssets/testRemoteServiceBroker' -import { IDisposable } from '../src/IDisposable' import { appleTreeDescriptor, calcDescriptorUtf8Http, callBackDescriptor, cancellationWaiter } from './testAssets/testUtilities' import { CallMeBackService } from './testAssets/callMeBackService' import { CallMeBackClient } from './testAssets/callMeBackClient' @@ -150,6 +156,177 @@ describe('ServiceJsonRpcDescriptor', function () { assert(!info2.equals(info3a), 'Should not be equal with different formatter and message delimiter') assert(!info2.equals(info3b), 'Should not be equal with different message delimiter') }) + + describe('general marshalable objects', function () { + interface IPhone extends IDisposable { + placeCall(callerName: string): Promise + } + + interface IServer { + callMeBack(clientPhone: IPhone): Promise + callingAllPhones(...clientPhones: IPhone[]): Promise + callMeLater(clientPhone: IPhone): Promise | void + providePhone(): IPhone | Promise + providePhoneWithCallLifetime(): IPhone | Promise + isThisYourPhone(phone: IPhone): IPhone | null | Promise + throwInside(phone: IPhone): Promise + } + + class Server implements IServer { + public clientReadyForCall: Promise | undefined + public deferredClientCallResult: Promise | undefined + readonly serverPhone = new Phone('server', 'explicit') + + async callMeBack(clientPhone: IPhone): Promise { + const response = await clientPhone.placeCall('server') + return response + } + + async callingAllPhones(...clientPhonesAndCT: (IPhone | CancellationToken)[]): Promise { + const clientPhones = clientPhonesAndCT.slice(0, clientPhonesAndCT.length - 1) as IPhone[] + return Promise.all(clientPhones.map((phone, i) => phone.placeCall(`server ${i + 1}`))) + } + + callMeLater(clientPhone: IPhone) { + this.deferredClientCallResult = new Promise(async (resolve, reject) => { + try { + await this.clientReadyForCall + const result = await clientPhone.placeCall('server') + clientPhone.dispose() + resolve(result) + } catch (reason) { + reject(reason) + } + }) + } + + providePhone() { + return this.serverPhone + } + + providePhoneWithCallLifetime() { + // This will always fail, because returning call-lifetime object is not allowed. + return new Phone('server', 'call') + } + + isThisYourPhone(phone: IPhone) { + return phone === this.serverPhone ? phone : null + } + + async throwInside() { + throw new Error('throwing as requested.') + } + } + + class Phone implements IPhone, RpcMarshalable { + readonly _jsonRpcMarshalableLifetime: MarshaledObjectLifetime + readonly disposed: Promise + private disposalSource?: () => void + + constructor(public readonly owner: string, lifetime?: MarshaledObjectLifetime) { + this._jsonRpcMarshalableLifetime = lifetime ?? 'explicit' // call lifetime isn't supported yet. + this.disposed = new Promise(resolve => (this.disposalSource = resolve)) + } + + placeCall(callerName: string): Promise { + return Promise.resolve(`Hi, ${this.owner}. This is ${callerName}.`) + } + + dispose() { + this.disposalSource!() + } + } + + let server: Server + let rpc: IServer & IDisposable + + const serverDescriptor = new ServiceJsonRpcDescriptor(ServiceMoniker.create('test'), Formatters.Utf8, MessageDelimiters.HttpLikeHeaders) + + beforeEach(function () { + server = new Server() + + const pipes = FullDuplexStream.CreatePair() + serverDescriptor.constructRpc(server, pipes.first) + rpc = serverDescriptor.constructRpc(pipes.second) + }) + + it('as arguments', async function () { + const response = await rpc.callMeBack(new Phone('client')) + assert.strictEqual(response, 'Hi, client. This is server.') + }) + + it('as return value', async function () { + const serverPhone = await rpc.providePhone() + const response = await serverPhone.placeCall('client') + assert.strictEqual(response, 'Hi, server. This is client.') + }) + + it('disposal of proxy releases memory', async function () { + const serverPhone = await rpc.providePhone() + serverPhone.dispose() + await assert.rejects(serverPhone.placeCall('client')) + await server.serverPhone.disposed + }) + + it('call lifetime is not yet supported', async function () { + const phone = new Phone('client', 'call') + await assert.rejects(rpc.callMeBack(phone)) + }) + + it.skip('lifetime is scoped to the call', async function () { + const phone = new Phone('client', 'call') + server.clientReadyForCall = new Promise(async (resolve, reject) => { + try { + await rpc.callMeLater(phone) + resolve() + } catch (reason) { + reject(reason) + } + }) + await server.clientReadyForCall + await assert.rejects(server.deferredClientCallResult!) + }) + + it('lifetime exceeds scope of the call', async function () { + const phone = new Phone('client', 'explicit') + server.clientReadyForCall = new Promise(async (resolve, reject) => { + try { + await rpc.callMeLater(phone) + resolve() + } catch (reason) { + reject(reason) + } + }) + await server.clientReadyForCall + const response = await server.deferredClientCallResult + assert.strictEqual(response, 'Hi, client. This is server.') + }) + + it('can pass the proxy back and forth', async function () { + const serverPhone = await rpc.providePhone() + const result = await rpc.isThisYourPhone(serverPhone) + assert.strictEqual(result, serverPhone) + }) + + it('lifetime of call in return value is disallowed', async function () { + await assert.rejects(async () => await rpc.providePhoneWithCallLifetime()) + }) + + it('multiple marshaled objects', async function () { + const phone1 = new Phone('client 1') + const phone2 = new Phone('client 2') + const responses = await rpc.callingAllPhones(phone1, phone2) + assert.deepEqual(responses, ['Hi, client 1. This is server 1.', 'Hi, client 2. This is server 2.']) + }) + + it('resources released when server throws', async function () { + const phone = new Phone('client') + await assert.rejects(rpc.throwInside(phone)) + await phone.disposed + }) + }) + + describe('IObserver', function () {}) }) describe('Various formatters and delimiters', function () { diff --git a/src/servicebroker-npm/test/testAssets/calculatorService.ts b/src/servicebroker-npm/test/testAssets/calculatorService.ts index fc5a4b0f..788b3251 100644 --- a/src/servicebroker-npm/test/testAssets/calculatorService.ts +++ b/src/servicebroker-npm/test/testAssets/calculatorService.ts @@ -1,5 +1,6 @@ import { ICalculatorService } from './interfaces' import CancellationToken from 'cancellationtoken' +import { IDisposable, IObserver } from '../../src' export class Calculator implements ICalculatorService { public isDisposed: boolean = false @@ -10,15 +11,32 @@ export class Calculator implements ICalculatorService { this.disposed = new Promise(resolve => (this.notifyDisposed = resolve)) } - public add(a: number, b: number, cancellationToken?: CancellationToken): Promise { + add(a: number, b: number, cancellationToken?: CancellationToken): Promise { return Promise.resolve(a + b) } - public add5(a: number, cancellationToken?: CancellationToken): Promise { + add5(a: number, cancellationToken?: CancellationToken): Promise { return Promise.resolve(a + 5) } - public dispose(): void { + async observeNumbers(observer: IObserver & IDisposable, length: number, failAtEnd: boolean = false): Promise { + for (let i = 0; i <= length; i++) { + await Promise.resolve() + observer.onNext(i) + } + + if (failAtEnd) { + observer.onError('Requested failure.') + } else { + observer.onCompleted() + } + + if ('dispose' in observer) { + observer.dispose() + } + } + + dispose(): void { this.isDisposed = true this.notifyDisposed!() } diff --git a/src/servicebroker-npm/test/testAssets/interfaces.ts b/src/servicebroker-npm/test/testAssets/interfaces.ts index 4419809c..393e8262 100644 --- a/src/servicebroker-npm/test/testAssets/interfaces.ts +++ b/src/servicebroker-npm/test/testAssets/interfaces.ts @@ -1,6 +1,7 @@ import CancellationToken from 'cancellationtoken' import { EventEmitter } from 'events' import StrictEventEmitter from 'strict-event-emitter-types' +import { IObserver } from '../../src' export interface IFakeService { fakeProperty: string @@ -10,6 +11,7 @@ export interface IFakeService { export interface ICalculatorService { add(a: number, b: number, cancellationToken?: CancellationToken): Promise add5(a: number, cancellationToken?: CancellationToken): Promise + observeNumbers(observer: IObserver, length: number, failAtEnd: boolean): Promise } export interface ICallMeBackService { diff --git a/test/ServiceBrokerTest/Calculator.cs b/test/ServiceBrokerTest/Calculator.cs index 54f271b6..3c825b0c 100644 --- a/test/ServiceBrokerTest/Calculator.cs +++ b/test/ServiceBrokerTest/Calculator.cs @@ -18,4 +18,23 @@ public async Task Add5Async(int a) await Task.Yield(); return a + 5; } + + [StreamJsonRpc.JsonRpcMethod("observeNumbers")] + public async Task ObserveNumbersAsync(IObserver observer, int length, bool failAtEnd) + { + for (int i = 1; i <= length; i++) + { + await Task.Yield(); + observer.OnNext(i); + } + + if (failAtEnd) + { + observer.OnError(new InvalidOperationException("Requested failure.")); + } + else + { + observer.OnCompleted(); + } + } } diff --git a/test/ServiceBrokerTest/Program.cs b/test/ServiceBrokerTest/Program.cs index 1686895c..38e1bbed 100644 --- a/test/ServiceBrokerTest/Program.cs +++ b/test/ServiceBrokerTest/Program.cs @@ -2,8 +2,7 @@ // Licensed under the MIT license. See LICENSE file in the project root for full license information. using System.CommandLine; -using System.Runtime.InteropServices; -using Microsoft; +using System.Diagnostics; using Microsoft.ServiceHub.Framework; using Nerdbank.Streams;