Skip to content

Commit

Permalink
Merge pull request #65 from AArnott/marshalableObjects
Browse files Browse the repository at this point in the history
Add support for marshalable objects, including `Observer<T>`
  • Loading branch information
AArnott authored Mar 14, 2023
2 parents 851ebe1 + 1bc115d commit 2ad3ea8
Show file tree
Hide file tree
Showing 20 changed files with 977 additions and 103 deletions.
5 changes: 5 additions & 0 deletions doc/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Documentation

## `@microsoft/servicehub-framework` NPM package documentation

[Documentation specific to this package is available elsewhere](../src/servicebroker-npm/doc/index.md).
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ protected virtual void Dispose(bool disposing)
private async Task HandleIncomingConnectionAsync(Stream stream, Guid requestId, IDuplexPipe servicePipe)
{
// Once a connection is made (or fails), it is no longer cancelable.
ImmutableInterlocked.TryRemove(ref this.remoteServiceRequests, requestId, out IAsyncDisposable _);
ImmutableInterlocked.TryRemove(ref this.remoteServiceRequests, requestId, out IAsyncDisposable? _);

// Link the two pipes so that all incoming/outgoing calls get forwarded
await Task.WhenAll(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public async Task<RemoteServiceConnectionInfo> RequestServiceChannelAsync(Servic
};
MultiplexingStream.Channel outerChannel = this.multiplexingStreamWithClient.CreateChannel(channelOptions);
Assumes.True(this.channelsOfferedToClient.TryAdd(requestId, outerChannel));
outerChannel.Acceptance.ContinueWith(_ => this.channelsOfferedToClient.TryRemove(requestId, out MultiplexingStream.Channel _), TaskScheduler.Default).Forget();
outerChannel.Acceptance.ContinueWith(_ => this.channelsOfferedToClient.TryRemove(requestId, out MultiplexingStream.Channel? _), TaskScheduler.Default).Forget();

return new RemoteServiceConnectionInfo
{
Expand Down
1 change: 1 addition & 0 deletions src/servicebroker-npm/.npmignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ coverage/
jest.config.ts
.eslintrc.json
*/test/
doc/
3 changes: 3 additions & 0 deletions src/servicebroker-npm/doc/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# `@microsoft/servicehub-framework` NPM package documentation

- [Marshalable objects](marshalable_objects.md)
61 changes: 61 additions & 0 deletions src/servicebroker-npm/doc/marshalable_objects.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Marshalable objects

This package implements [the marshalable object protocol](https://github.com/microsoft/vs-streamjsonrpc/blob/main/doc/general_marshaled_objects.md) first implemented by StreamJsonRpc.
The overall behavior and wire protocol are defined there.

The TypeScript/javascript-specific behaviors are outlined here.

## Sending a marshaled object

By default, all values passed as arguments or return values are serialized by value.
To marshal an object for remote function invocation instead of serializing by value, the object should implement the `RpcMarshalable` interface.

For example:

```ts
interface ICalculator {
add(a: number, b: number): Promise<number> | number
}

class Calculator implements ICalculator, RpcMarshalable {
readonly _jsonRpcMarshalableLifetime: MarshaledObjectLifetime = 'explicit'

add(a: number, b: number) {
return a + b
}
}
```

Only top-level arguments and return values are tested for marshalability.
This means that the Calculator object must appear as its own argument or return value in order to be marshaled.
If it appears deep in the object graph of an argument or return value, it will simply be serialized.

If the receiver disposes the proxy, and the real object defines a `dispose` method, the `dispose` method will be invoked on the real object.
This means that when you pass an object across RPC, you are effectively transferring lifetime ownership to the remote party.

## Receiving a marshaled object

When it arrives on the remote side, it will no longer be an instance of the `Calculator` class, but instead will be a proxy.
This proxy will relay all function calls to the original object.

The proxy must be disposed of when done or it will occupy resources for as long as the JSON-RPC connection lasts on both ends of the connection.

You can leverage Typescript for type safety for this. For example, the calculator might be accepted as an argument like this:

```ts
interface IServer {
doSomeMath(calc: ICalculator): Promise<number>
}
```

The server may be implemented like this:

```ts
class Server {
doSomeMath(calc: ICalculator & IDisposable): Promise<number> {
const sum = await calc.add(2, 3)
calc.dispose()
return sum // 5
}
}
```
9 changes: 9 additions & 0 deletions src/servicebroker-npm/src/CancellationTokenAdapter.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
import CancellationToken from 'cancellationtoken'
import { CancellationToken as vscodeCancellationToken, CancellationTokenSource as vscodeCancellationTokenSource } from 'vscode-jsonrpc'

function isProxy(value: any): boolean {
// It turns out that it's really hard to detect a proxy in general:
// https://stackoverflow.com/questions/36372611/how-to-test-if-an-object-is-a-proxy
// Our strategy here is to at least detect pass-through proxies that claim that everything exists.
// So we make up a property name that should never exist. If it does, then we know it must be a proxy.
return value && value.ImAProxy !== undefined
}

export class CancellationTokenAdapters {
/** Tests whether an object satisfies the {@link vscodeCancellationToken} interface. */
static isVSCode(value: any): value is vscodeCancellationToken {
Expand All @@ -11,6 +19,7 @@ export class CancellationTokenAdapters {
static isCancellationToken(value: any): value is CancellationToken {
return (
value &&
!isProxy(value) &&
typeof value.throwIfCancelled === 'function' &&
typeof value.onCancelled === 'function' &&
value.isCancelled !== undefined &&
Expand Down
87 changes: 5 additions & 82 deletions src/servicebroker-npm/src/ServiceJsonRpcDescriptor.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import { Channel } from 'nerdbank-streams'
import { MessageConnection, CancellationToken as vscodeCancellationToken, createMessageConnection, Message, ParameterStructures } from 'vscode-jsonrpc'
import { MessageConnection, createMessageConnection, Message, ParameterStructures } from 'vscode-jsonrpc/node'
import { Formatters, MessageDelimiters } from './constants'
import { ServiceMoniker } from './ServiceMoniker'
import { RpcConnection, RpcEventServer, ServiceRpcDescriptor } from './ServiceRpcDescriptor'
import { clone, constructMessageConnection, isChannel } from './utilities'
import { IDisposable } from './IDisposable'
import { BE32MessageReader, BE32MessageWriter } from './BigEndianInt32LengthHeaderMessageHandler'
import * as msgpack from 'msgpack-lite'
import { CancellationTokenAdapters } from './CancellationTokenAdapter'
import { MultiplexingStream, MultiplexingStreamOptions } from 'nerdbank-streams'
import { EventEmitter } from 'stream'
import { NodeStreamMessageReader, NodeStreamMessageWriter } from './NodeStreamMessageWrappers'
import { invokeRpc, registerInstanceMethodsAsRpcTargets } from './jsonRpc/rpcUtilities'

/**
* Constructs a JSON RPC message connection to a service
Expand Down Expand Up @@ -194,74 +194,19 @@ const rpcProxy = {
default:
return function () {
const methodName = property.toString()

if (arguments.length > 0) {
if (vscodeCancellationToken.is(arguments[arguments.length - 1])) {
const ct = arguments[arguments.length - 1]
const args = validateNoUndefinedElements(Array.prototype.slice.call(arguments, 0, arguments.length - 1))
return target.messageConnection.sendRequest(methodName, ParameterStructures.byPosition, ...args, ct)
} else if (CancellationTokenAdapters.isCancellationToken(arguments[arguments.length - 1])) {
const ct = CancellationTokenAdapters.cancellationTokenToVSCode(arguments[arguments.length - 1])
const args = validateNoUndefinedElements(Array.prototype.slice.call(arguments, 0, arguments.length - 1))
return target.messageConnection.sendRequest(methodName, ParameterStructures.byPosition, ...args, ct)
} else if (arguments[arguments.length - 1] === undefined) {
// The last arg is most likely a `CancellationToken?` that was propagated to the RPC call from another method that made it optional.
// We can't tell, but we mustn't claim it's a CancellationToken nor an ordinary argument or else an RPC server
// may fail to match the RPC call to a method because of an extra argument.
// If this truly was a value intended to propagate, they should use `null` as the argument.
const args = validateNoUndefinedElements(Array.prototype.slice.call(arguments, 0, arguments.length - 1))
return target.messageConnection.sendRequest(methodName, ParameterStructures.byPosition, ...args)
}
}

const validatedArgs = validateNoUndefinedElements(Array.prototype.slice.call(arguments))
return target.messageConnection.sendRequest(methodName, ParameterStructures.byPosition, ...validatedArgs)
return invokeRpc(property.toString(), arguments, target.messageConnection)
}
}
},
}

function validateNoUndefinedElements<T>(values: T[]): T[] {
for (let i = 0; i < values.length; i++) {
if (values[i] === undefined) {
throw new Error(`Argument at 0-based index ${i} is set as undefined, which is not a valid JSON value.`)
}
}

return values
}

export class JsonRpcConnection extends RpcConnection {
constructor(public readonly messageConnection: MessageConnection) {
super()
}

public addLocalRpcTarget(rpcTarget: any | RpcEventServer): void {
function wrapCancellationTokenIfPresent(args: any[]): any[] {
if (args.length > 0 && CancellationTokenAdapters.isVSCode(args[args.length - 1])) {
const adaptedCancellationToken = CancellationTokenAdapters.vscodeToCancellationToken(args[args.length - 1])
args[args.length - 1] = adaptedCancellationToken
}

return args
}

function registerRequestAndNotification(connection: MessageConnection, methodName: string, method: any) {
connection.onRequest(methodName, (...args: []) => method.apply(rpcTarget, wrapCancellationTokenIfPresent(args)))
connection.onNotification(methodName, (...args: []) => method.apply(rpcTarget, wrapCancellationTokenIfPresent(args)))
}

JsonRpcConnection.getInstanceMethodNames(rpcTarget, Object.prototype).forEach(methodName => {
if (methodName !== 'dispose') {
const method = rpcTarget[methodName]
registerRequestAndNotification(this.messageConnection, methodName, method)

// Add an alias for the method so that we support with and without the Async suffix.
const suffix = 'Async'
const alias = methodName.endsWith(suffix) ? methodName.substring(0, methodName.length - suffix.length) : `${methodName}${suffix}`
registerRequestAndNotification(this.messageConnection, alias, method)
}
})
registerInstanceMethodsAsRpcTargets(rpcTarget, this.messageConnection)

// If the RPC target is an event emitter, hook up a handler that forwards all events across RPC.
if (RpcConnection.IsRpcEventServer(rpcTarget)) {
Expand Down Expand Up @@ -295,31 +240,9 @@ export class JsonRpcConnection extends RpcConnection {
}

public dispose(): void {}

private static isMethod(obj: object, name: string): boolean {
const desc = Object.getOwnPropertyDescriptor(obj, name)
return !!desc && typeof desc.value === 'function'
}

private static getInstanceMethodNames(obj: object, stopPrototype?: any): string[] {
const array: string[] = []
let proto = Object.getPrototypeOf(obj)
while (proto && proto !== stopPrototype) {
Object.getOwnPropertyNames(proto).forEach(name => {
if (name !== 'constructor') {
if (JsonRpcConnection.isMethod(proto, name)) {
array.push(name)
}
}
})
proto = Object.getPrototypeOf(proto)
}

return array
}
}

interface IProxyTarget {
export interface IProxyTarget {
messageConnection: MessageConnection
eventEmitter: EventEmitter
}
2 changes: 2 additions & 0 deletions src/servicebroker-npm/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export { IJsonRpcClientProxy } from './IJsonRpcClientProxy'
export { IpcRelayServiceBroker } from './IpcRelayServiceBroker'
export { IRemoteServiceBroker } from './IRemoteServiceBroker'
export { IServiceBroker, ServiceBrokerEvents } from './IServiceBroker'
export { RpcMarshalable, MarshaledObjectLifetime } from './jsonRpc/MarshalableObject'
export { MultiplexingRelayServiceBroker } from './MultiplexingRelayServiceBroker'
export { ProtectedOperation } from './ProtectedOperation'
export { RemoteServiceBroker } from './RemoteServiceBroker'
Expand All @@ -17,4 +18,5 @@ export { ServiceBrokerClientMetadata } from './ServiceBrokerClientMetadata'
export { ServiceJsonRpcDescriptor, JsonRpcConnection } from './ServiceJsonRpcDescriptor'
export { ServiceMoniker } from './ServiceMoniker'
export { ServiceRpcDescriptor, RpcEventServer } from './ServiceRpcDescriptor'
export { IObserver, Observer } from './jsonRpc/Observer'
export * from './container'
Loading

0 comments on commit 2ad3ea8

Please sign in to comment.