Skip to content

Commit 703c1fe

Browse files
nokomebeneboy
authored andcommitted
feat(Executor): Just-in-time connect to clients
1 parent 4db1436 commit 703c1fe

14 files changed

Lines changed: 328 additions & 30 deletions

src/base/Client.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { Node } from '@stencila/schema'
22
import { Interface, Method, Manifest } from './Executor'
33
import Request from './Request'
44
import Response from './Response'
5+
import { Address } from './Transports'
56

67
/**
78
* A base client class which acts as a proxy to a remote `Executor`.
@@ -109,3 +110,7 @@ export default abstract class Client implements Interface {
109110
delete this.requests[response.id]
110111
}
111112
}
113+
114+
export interface ClientType {
115+
new (address: Address): Client
116+
}

src/base/Executor.test.ts

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
import Executor, { Peer, Manifest, Method } from './Executor'
2+
import DirectServer from '../direct/DirectServer'
3+
import DirectClient from '../direct/DirectClient'
4+
import { ClientType } from './Client'
5+
import { Transport } from './Transports'
6+
import StdioClient from '../stdio/StdioClient'
7+
8+
describe('Peer', () => {
9+
test('capable: no capabilities', () => {
10+
const peer = new Peer(
11+
{
12+
capabilities: {},
13+
addresses: {}
14+
},
15+
[]
16+
)
17+
18+
expect(peer.capable(Method.compile, { foo: 'bar' })).toBe(false)
19+
expect(peer.capable(Method.execute, {})).toBe(false)
20+
})
21+
22+
test('capable: boolean capabilties', () => {
23+
const peer = new Peer(
24+
{
25+
capabilities: {
26+
decode: false,
27+
compile: true,
28+
execute: false
29+
},
30+
addresses: {}
31+
},
32+
[]
33+
)
34+
35+
expect(peer.capable(Method.decode, {})).toBe(false)
36+
expect(peer.capable(Method.compile, {})).toBe(true)
37+
expect(peer.capable(Method.execute, {})).toBe(false)
38+
})
39+
40+
test('capable: schema object capabilties', () => {
41+
const peer = new Peer(
42+
{
43+
capabilities: {
44+
decode: {
45+
required: ['content', 'format'],
46+
properties: {
47+
content: {
48+
type: 'string'
49+
},
50+
format: {
51+
enum: ['julia']
52+
}
53+
}
54+
},
55+
compile: {
56+
type: 'object',
57+
required: ['node'],
58+
properties: {
59+
node: {
60+
type: 'object',
61+
required: ['type', 'programmingLanguage'],
62+
properties: {
63+
type: {
64+
enum: ['CodeChunk', 'CodeExpression']
65+
},
66+
programmingLanguage: {
67+
enum: ['python']
68+
}
69+
}
70+
}
71+
}
72+
}
73+
},
74+
addresses: {}
75+
},
76+
[]
77+
)
78+
79+
expect(peer.capable(Method.decode, {})).toBe(false)
80+
expect(peer.capable(Method.decode, { content: 42 })).toBe(false)
81+
expect(peer.capable(Method.decode, { content: '42' })).toBe(false)
82+
expect(peer.capable(Method.decode, { content: '42', format: 'foo' })).toBe(
83+
false
84+
)
85+
expect(
86+
peer.capable(Method.decode, { content: '42', format: 'julia' })
87+
).toBe(true)
88+
89+
expect(peer.capable(Method.compile, {})).toBe(false)
90+
expect(peer.capable(Method.compile, { node: 42 })).toBe(false)
91+
expect(peer.capable(Method.compile, { node: { type: 'CodeChunk' } })).toBe(
92+
false
93+
)
94+
expect(
95+
peer.capable(Method.compile, {
96+
node: { type: 'CodeChunk', programmingLanguage: 'javascript' }
97+
})
98+
).toBe(false)
99+
expect(
100+
peer.capable(Method.compile, {
101+
node: { type: 'CodeChunk', programmingLanguage: 'python' }
102+
})
103+
).toBe(true)
104+
expect(
105+
peer.capable(Method.compile, {
106+
node: { type: 'CodeExpression', programmingLanguage: 'python' }
107+
})
108+
).toBe(true)
109+
})
110+
111+
test('connect: no addresses', async () => {
112+
const peer = new Peer(
113+
{
114+
capabilities: {},
115+
addresses: {}
116+
},
117+
[DirectClient as ClientType]
118+
)
119+
120+
expect(peer.connect()).toBe(false)
121+
})
122+
123+
test('connect: no client types', async () => {
124+
const peer = new Peer(
125+
{
126+
capabilities: {},
127+
addresses: {}
128+
},
129+
[]
130+
)
131+
132+
expect(peer.connect()).toBe(false)
133+
})
134+
135+
test('connect: no addresses match client types', async () => {
136+
const peer = new Peer(
137+
{
138+
capabilities: {},
139+
addresses: {
140+
http: {
141+
type: Transport.http
142+
}
143+
}
144+
},
145+
[DirectClient as ClientType, StdioClient as ClientType]
146+
)
147+
148+
expect(peer.connect()).toBe(false)
149+
})
150+
151+
test('connect: order of client types equals preference', async () => {
152+
const directServer = new DirectServer()
153+
const manifest: Manifest = {
154+
capabilities: {},
155+
addresses: {
156+
direct: {
157+
type: Transport.direct,
158+
server: directServer
159+
},
160+
stdio: {
161+
type: Transport.stdio,
162+
command: 'echo'
163+
}
164+
}
165+
}
166+
const peer1 = new Peer(manifest, [
167+
DirectClient as ClientType,
168+
StdioClient as ClientType
169+
])
170+
const peer2 = new Peer(manifest, [
171+
StdioClient as ClientType,
172+
DirectClient as ClientType
173+
])
174+
175+
expect(peer1.connect()).toBe(true)
176+
// @ts-ignore
177+
expect(peer1.client instanceof DirectClient).toBe(true)
178+
179+
expect(peer2.connect()).toBe(true)
180+
// @ts-ignore
181+
expect(peer2.client instanceof StdioClient).toBe(true)
182+
})
183+
})

src/base/Executor.ts

Lines changed: 60 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { Node } from '@stencila/schema'
22
import { JSONSchema7Definition } from 'json-schema'
33
import Ajv from 'ajv'
4-
import Client from './Client'
4+
import Client, { ClientType } from './Client'
55
import Server from './Server'
66
import { Address, Transport } from './Transports'
77
import { getLogger } from '@stencila/logga'
@@ -132,13 +132,22 @@ const ajv = new Ajv()
132132
* a `WebSocketClient` to an executor running on
133133
* a remote machine.
134134
*/
135-
class Peer {
135+
export class Peer {
136136
/**
137137
* The manifest of the peer executor.
138138
*/
139139
private manifest: Manifest
140140

141-
// private Clients: new ()
141+
/**
142+
* A list of classes, that extend `Client`, and are available
143+
* to connect to peer executors.
144+
*
145+
* This property is used for dependency injection, rather than importing
146+
* clients for all transports into this module when they may
147+
* not be used (e.g. `StdioClient` in a browser hosted `Executor`).
148+
* The order of this list, defines the preference for the transport.
149+
*/
150+
private readonly clientTypes: ClientType[]
142151

143152
/**
144153
* The client for the peer executor.
@@ -156,8 +165,9 @@ class Peer {
156165
*/
157166
private validators: { [key: string]: Ajv.ValidateFunction } = {}
158167

159-
public constructor(manifest: Manifest) {
168+
public constructor(manifest: Manifest, clientTypes: ClientType[]) {
160169
this.manifest = manifest
170+
this.clientTypes = clientTypes
161171
}
162172

163173
/**
@@ -186,25 +196,56 @@ class Peer {
186196
return validator(params) as boolean
187197
}
188198

189-
public connect(): Client {
190-
let client: Client
191-
// @ts-ignore
192-
return client
199+
/**
200+
* Connect to the remote `Executor`.
201+
*
202+
* Finds the first client type that the peer
203+
* executor supports.
204+
*
205+
* @returns A client instance or `undefined` if not able to connect
206+
*/
207+
public connect(): boolean {
208+
for (const ClientType of this.clientTypes) {
209+
// Get the transport for the client type
210+
// There should be a better way to do this
211+
const transportMap: { [key: string]: Transport } = {
212+
DirectClient: Transport.direct,
213+
StdioClient: Transport.stdio,
214+
VsockClient: Transport.vsock,
215+
TcpClient: Transport.tcp,
216+
HttpClient: Transport.http,
217+
WebsocketClient: Transport.ws
218+
}
219+
const transport = transportMap[ClientType.name]
220+
if (transport === undefined)
221+
throw new Error('Wooah! This should not happen!')
222+
223+
// See if the peer has an address the transport
224+
const address = this.manifest.addresses[transport]
225+
if (address !== undefined) {
226+
this.client = new ClientType(address)
227+
return true
228+
}
229+
}
230+
// Unable to connect to the peer
231+
return false
193232
}
194233

195234
/**
196235
* Call a method of a remote `Executor`.
197236
*
237+
* Ensures that there is a connection to the
238+
* executor and then passes the request to it.
239+
*
198240
* @param method The name of the method
199241
* @param params Values of parameters (i.e. arguments)
200242
*/
201243
public async call<Type>(
202244
method: Method,
203245
params: { [key: string]: any } = {}
204246
): Promise<Type> {
205-
if (this.client === undefined) {
206-
this.client = this.connect()
207-
}
247+
if (this.client === undefined)
248+
throw new Error("WTF, no client! You shouldn't be calling this!")
208249
return this.client.call<Type>(method, params)
209250
}
210251
}
@@ -230,9 +271,11 @@ export default class Executor implements Interface {
230271
*/
231272
private servers: Server[] = []
232273

233-
public constructor(peers: Manifest[] = []) {
234-
// Create peers using the manifests provided
235-
this.peers = peers.map(peer => new Peer(peer))
274+
public constructor(
275+
manifests: Manifest[] = [],
276+
clientTypes: ClientType[] = []
277+
) {
278+
this.peers = manifests.map(manifest => new Peer(manifest, clientTypes))
236279
}
237280

238281
/**
@@ -355,7 +398,9 @@ export default class Executor implements Interface {
355398
// Attempt to delegate to a peer
356399
for (const peer of this.peers) {
357400
if (peer.capable(method, params)) {
358-
return peer.call<Type>(method, params)
401+
if (peer.connect()) {
402+
return peer.call<Type>(method, params)
403+
}
359404
}
360405
}
361406
// No peer has necessary capability so resort to fallback

src/base/Server.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,12 +113,12 @@ export default abstract class Server {
113113
/**
114114
* Start the server
115115
*/
116-
public abstract start(): void
116+
public start(): void {}
117117

118118
/**
119119
* Stop the server
120120
*/
121-
public abstract stop(): void
121+
public stop(): void {}
122122

123123
/**
124124
* Run the server with graceful shutdown on `SIGINT` or `SIGTERM`

src/base/Transports.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,17 @@
11
export enum Transport {
2+
direct = 'direct',
23
stdio = 'stdio',
34
vsock = 'vsock',
45
tcp = 'tcp',
56
http = 'http',
67
ws = 'ws'
78
}
89

10+
export interface DirectAddress {
11+
type: Transport.direct
12+
server: any
13+
}
14+
915
export interface StdioAddress {
1016
type: Transport.stdio
1117
command: string
@@ -65,6 +71,7 @@ export interface WebsocketAddress extends HttpAddress {
6571
}
6672

6773
export type Address =
74+
| DirectAddress
6875
| StdioAddress
6976
| VsockAddress
7077
| TcpAddress

src/direct/DirectClient.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import Client from '../base/Client'
2+
import Request from '../base/Request'
3+
import Server from '../base/Server'
4+
import { DirectAddress } from '../base/Transports'
5+
6+
export default class DirectClient extends Client {
7+
private server: Server
8+
9+
public constructor(address: Omit<DirectAddress, 'type'>) {
10+
super()
11+
this.server = address.server
12+
}
13+
14+
protected send(request: Request): void {
15+
// @ts-ignore server.receive is private
16+
// eslint-disable-next-line @typescript-eslint/no-floating-promises
17+
this.server.receive(request).then(response => this.receive(response))
18+
}
19+
}

0 commit comments

Comments
 (0)