Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for marshalable objects, including Observer<T> #65

Merged
merged 16 commits into from
Mar 14, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ protected virtual void Dispose(bool disposing)
private async Task HandleIncomingConnectionAsync(Stream stream, Guid requestId, IDuplexPipe servicePipe)
{
// Once a connection is made (or fails), it is no longer cancelable.
ImmutableInterlocked.TryRemove(ref this.remoteServiceRequests, requestId, out IAsyncDisposable _);
ImmutableInterlocked.TryRemove(ref this.remoteServiceRequests, requestId, out IAsyncDisposable? _);

// Link the two pipes so that all incoming/outgoing calls get forwarded
await Task.WhenAll(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public async Task<RemoteServiceConnectionInfo> RequestServiceChannelAsync(Servic
};
MultiplexingStream.Channel outerChannel = this.multiplexingStreamWithClient.CreateChannel(channelOptions);
Assumes.True(this.channelsOfferedToClient.TryAdd(requestId, outerChannel));
outerChannel.Acceptance.ContinueWith(_ => this.channelsOfferedToClient.TryRemove(requestId, out MultiplexingStream.Channel _), TaskScheduler.Default).Forget();
outerChannel.Acceptance.ContinueWith(_ => this.channelsOfferedToClient.TryRemove(requestId, out MultiplexingStream.Channel? _), TaskScheduler.Default).Forget();

return new RemoteServiceConnectionInfo
{
Expand Down
9 changes: 9 additions & 0 deletions src/servicebroker-npm/src/CancellationTokenAdapter.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
import CancellationToken from 'cancellationtoken'
import { CancellationToken as vscodeCancellationToken, CancellationTokenSource as vscodeCancellationTokenSource } from 'vscode-jsonrpc'

function isProxy(value: any): boolean {
// It turns out that it's really hard to detect a proxy in general:
// https://stackoverflow.com/questions/36372611/how-to-test-if-an-object-is-a-proxy
// Our strategy here is to at least detect pass-through proxies that claim that everything exists.
// So we make up a property name that should never exist. If it does, then we know it must be a proxy.
return value && value.ImAProxy !== undefined
}

export class CancellationTokenAdapters {
/** Tests whether an object satisfies the {@link vscodeCancellationToken} interface. */
static isVSCode(value: any): value is vscodeCancellationToken {
Expand All @@ -11,6 +19,7 @@ export class CancellationTokenAdapters {
static isCancellationToken(value: any): value is CancellationToken {
return (
value &&
!isProxy(value) &&
typeof value.throwIfCancelled === 'function' &&
typeof value.onCancelled === 'function' &&
value.isCancelled !== undefined &&
Expand Down
87 changes: 5 additions & 82 deletions src/servicebroker-npm/src/ServiceJsonRpcDescriptor.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import { Channel } from 'nerdbank-streams'
import { MessageConnection, CancellationToken as vscodeCancellationToken, createMessageConnection, Message, ParameterStructures } from 'vscode-jsonrpc'
import { MessageConnection, createMessageConnection, Message, ParameterStructures } from 'vscode-jsonrpc/node'
import { Formatters, MessageDelimiters } from './constants'
import { ServiceMoniker } from './ServiceMoniker'
import { RpcConnection, RpcEventServer, ServiceRpcDescriptor } from './ServiceRpcDescriptor'
import { clone, constructMessageConnection, isChannel } from './utilities'
import { IDisposable } from './IDisposable'
import { BE32MessageReader, BE32MessageWriter } from './BigEndianInt32LengthHeaderMessageHandler'
import * as msgpack from 'msgpack-lite'
import { CancellationTokenAdapters } from './CancellationTokenAdapter'
import { MultiplexingStream, MultiplexingStreamOptions } from 'nerdbank-streams'
import { EventEmitter } from 'stream'
import { NodeStreamMessageReader, NodeStreamMessageWriter } from './NodeStreamMessageWrappers'
import { invokeRpc, registerInstanceMethodsAsRpcTargets } from './jsonRpc/rpcUtilities'

/**
* Constructs a JSON RPC message connection to a service
Expand Down Expand Up @@ -194,74 +194,19 @@ const rpcProxy = {
default:
return function () {
const methodName = property.toString()

if (arguments.length > 0) {
if (vscodeCancellationToken.is(arguments[arguments.length - 1])) {
const ct = arguments[arguments.length - 1]
const args = validateNoUndefinedElements(Array.prototype.slice.call(arguments, 0, arguments.length - 1))
return target.messageConnection.sendRequest(methodName, ParameterStructures.byPosition, ...args, ct)
} else if (CancellationTokenAdapters.isCancellationToken(arguments[arguments.length - 1])) {
const ct = CancellationTokenAdapters.cancellationTokenToVSCode(arguments[arguments.length - 1])
const args = validateNoUndefinedElements(Array.prototype.slice.call(arguments, 0, arguments.length - 1))
return target.messageConnection.sendRequest(methodName, ParameterStructures.byPosition, ...args, ct)
} else if (arguments[arguments.length - 1] === undefined) {
// The last arg is most likely a `CancellationToken?` that was propagated to the RPC call from another method that made it optional.
// We can't tell, but we mustn't claim it's a CancellationToken nor an ordinary argument or else an RPC server
// may fail to match the RPC call to a method because of an extra argument.
// If this truly was a value intended to propagate, they should use `null` as the argument.
const args = validateNoUndefinedElements(Array.prototype.slice.call(arguments, 0, arguments.length - 1))
return target.messageConnection.sendRequest(methodName, ParameterStructures.byPosition, ...args)
}
}

const validatedArgs = validateNoUndefinedElements(Array.prototype.slice.call(arguments))
return target.messageConnection.sendRequest(methodName, ParameterStructures.byPosition, ...validatedArgs)
return invokeRpc(property.toString(), arguments, target.messageConnection)
}
}
},
}

function validateNoUndefinedElements<T>(values: T[]): T[] {
for (let i = 0; i < values.length; i++) {
if (values[i] === undefined) {
throw new Error(`Argument at 0-based index ${i} is set as undefined, which is not a valid JSON value.`)
}
}

return values
}

export class JsonRpcConnection extends RpcConnection {
constructor(public readonly messageConnection: MessageConnection) {
super()
}

public addLocalRpcTarget(rpcTarget: any | RpcEventServer): void {
function wrapCancellationTokenIfPresent(args: any[]): any[] {
if (args.length > 0 && CancellationTokenAdapters.isVSCode(args[args.length - 1])) {
const adaptedCancellationToken = CancellationTokenAdapters.vscodeToCancellationToken(args[args.length - 1])
args[args.length - 1] = adaptedCancellationToken
}

return args
}

function registerRequestAndNotification(connection: MessageConnection, methodName: string, method: any) {
connection.onRequest(methodName, (...args: []) => method.apply(rpcTarget, wrapCancellationTokenIfPresent(args)))
connection.onNotification(methodName, (...args: []) => method.apply(rpcTarget, wrapCancellationTokenIfPresent(args)))
}

JsonRpcConnection.getInstanceMethodNames(rpcTarget, Object.prototype).forEach(methodName => {
if (methodName !== 'dispose') {
const method = rpcTarget[methodName]
registerRequestAndNotification(this.messageConnection, methodName, method)

// Add an alias for the method so that we support with and without the Async suffix.
const suffix = 'Async'
const alias = methodName.endsWith(suffix) ? methodName.substring(0, methodName.length - suffix.length) : `${methodName}${suffix}`
registerRequestAndNotification(this.messageConnection, alias, method)
}
})
registerInstanceMethodsAsRpcTargets(rpcTarget, this.messageConnection)

// If the RPC target is an event emitter, hook up a handler that forwards all events across RPC.
if (RpcConnection.IsRpcEventServer(rpcTarget)) {
Expand Down Expand Up @@ -295,31 +240,9 @@ export class JsonRpcConnection extends RpcConnection {
}

public dispose(): void {}

private static isMethod(obj: object, name: string): boolean {
const desc = Object.getOwnPropertyDescriptor(obj, name)
return !!desc && typeof desc.value === 'function'
}

private static getInstanceMethodNames(obj: object, stopPrototype?: any): string[] {
const array: string[] = []
let proto = Object.getPrototypeOf(obj)
while (proto && proto !== stopPrototype) {
Object.getOwnPropertyNames(proto).forEach(name => {
if (name !== 'constructor') {
if (JsonRpcConnection.isMethod(proto, name)) {
array.push(name)
}
}
})
proto = Object.getPrototypeOf(proto)
}

return array
}
}

interface IProxyTarget {
export interface IProxyTarget {
messageConnection: MessageConnection
eventEmitter: EventEmitter
}
2 changes: 2 additions & 0 deletions src/servicebroker-npm/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export { IJsonRpcClientProxy } from './IJsonRpcClientProxy'
export { IpcRelayServiceBroker } from './IpcRelayServiceBroker'
export { IRemoteServiceBroker } from './IRemoteServiceBroker'
export { IServiceBroker, ServiceBrokerEvents } from './IServiceBroker'
export { RpcMarshalable, MarshaledObjectLifetime } from './jsonRpc/MarshalableObject'
export { MultiplexingRelayServiceBroker } from './MultiplexingRelayServiceBroker'
export { ProtectedOperation } from './ProtectedOperation'
export { RemoteServiceBroker } from './RemoteServiceBroker'
Expand All @@ -17,4 +18,5 @@ export { ServiceBrokerClientMetadata } from './ServiceBrokerClientMetadata'
export { ServiceJsonRpcDescriptor, JsonRpcConnection } from './ServiceJsonRpcDescriptor'
export { ServiceMoniker } from './ServiceMoniker'
export { ServiceRpcDescriptor, RpcEventServer } from './ServiceRpcDescriptor'
export { IObserver, Observer } from './jsonRpc/Observer'
export * from './container'
Loading