Skip to content

Commit 664299b

Browse files
authored
feat(client, server): message port transfer (#1113)
This PR introduces a new `transfer` feature that allows `message port adapter` to utilize their own `transfer/structureClone layer`. Currently, messages are encoded to string/binary before being sent. By using this feature, oRPC can achieve near-native performance and support additional instances that can't be serialized/described, such as `OffscreenCanvas`. - [x] draft - [x] minify + transform message before send (can't clone standard request/response's `URL`) - [x] test - [x] docs Closes: #1101 <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **New Features** * Experimental MessagePort transfer support — optional transferable payloads for faster postMessage transfers; client/server handlers and links can opt into transfer-aware messaging. * Expanded serialization API — new serialize/deserialize helpers and codec improvements to handle richer payloads (Blobs, streams, event iterators). * **Documentation** * Added "Transfer" docs with examples, guidance, structured-clone warning, and JSON-serializer tip. * **Tests** * New tests covering transfer flows and serialization/deserialization scenarios. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent ad7f4d9 commit 664299b

12 files changed

Lines changed: 788 additions & 129 deletions

File tree

apps/content/docs/adapters/message-port.md

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,3 +48,38 @@ clientPort.start()
4848
:::info
4949
This only shows how to configure the link. For full client examples, see [Client-Side Clients](/docs/client/client-side).
5050
:::
51+
52+
## Transfer
53+
54+
By default, oRPC serializes request/response messages to string/binary data before sending over message port. If needed, you can define the `transfer` option to utilize full power of [MessagePort: postMessage() method](https://developer.mozilla.org/en-US/docs/Web/API/MessagePort/postMessage), such as transferring ownership of objects to the other side or support unserializable objects like `OffscreenCanvas`.
55+
56+
::: code-group
57+
58+
```ts [handler]
59+
const handler = new RPCHandler(router, {
60+
experimental_transfer: (message, port) => {
61+
const transfer = deepFindTransferableObjects(message) // implement your own logic
62+
return transfer.length ? transfer : null // only enable when needed
63+
}
64+
})
65+
```
66+
67+
```ts [link]
68+
const link = new RPCLink({
69+
port: clientPort,
70+
experimental_transfer: (message) => {
71+
const transfer = deepFindTransferableObjects(message) // implement your own logic
72+
return transfer.length ? transfer : null // only enable when needed
73+
}
74+
})
75+
```
76+
77+
:::
78+
79+
::: warning
80+
When `transfer` returns an array, messages using [the structured clone algorithm](https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm) for sending, which doesn't support all data types such as [Event Iterator's Metadata](/docs/event-iterator#last-event-id-event-metadata). So I recommend you only enable this when needed.
81+
:::
82+
83+
::: tip
84+
The `transfer` option run after [RPC JSON Serializer](/docs/advanced/rpc-json-serializer) so you can combine them together to support more data types.
85+
:::

packages/client/src/adapters/message-port/link-client.ts

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,61 @@
1+
import type { Promisable, Value } from '@orpc/shared'
12
import type { StandardLazyResponse, StandardRequest } from '@orpc/standard-server'
3+
import type { DecodedRequestMessage, serializeResponseMessage } from '@orpc/standard-server-peer'
24
import type { ClientContext, ClientOptions } from '../../types'
35
import type { StandardLinkClient } from '../standard'
46
import type { SupportedMessagePort } from './message-port'
5-
import { ClientPeer } from '@orpc/standard-server-peer'
7+
import { isObject, value } from '@orpc/shared'
8+
import { experimental_ClientPeerWithoutCodec as ClientPeerWithoutCodec, decodeResponseMessage, deserializeResponseMessage, encodeRequestMessage, serializeRequestMessage } from '@orpc/standard-server-peer'
69
import { onMessagePortClose, onMessagePortMessage, postMessagePortMessage } from './message-port'
710

811
export interface LinkMessagePortClientOptions {
912
port: SupportedMessagePort
13+
14+
/**
15+
* By default, oRPC serializes request/response messages to string/binary data before sending over message port.
16+
* If needed, you can define the this option to utilize full power of [MessagePort: postMessage() method](https://developer.mozilla.org/en-US/docs/Web/API/MessagePort/postMessage),
17+
* such as transferring ownership of objects to the other side or support unserializable objects like `OffscreenCanvas`.
18+
*
19+
* @remarks
20+
* - return null | undefined to disable this feature
21+
*
22+
* @warning Make sure your message port supports `transfer` before using this feature.
23+
* @example
24+
* ```ts
25+
* experimental_transfer: (message, port) => {
26+
* const transfer = deepFindTransferableObjects(message) // implement your own logic
27+
* return transfer.length ? transfer : null // only enable when needed
28+
* }
29+
* ```
30+
*
31+
* @see {@link https://orpc.unnoq.com/docs/adapters/message-port#transfer Message Port Transfer Docs}
32+
*/
33+
experimental_transfer?: Value<Promisable<object[] | null | undefined>, [message: DecodedRequestMessage, port: SupportedMessagePort]>
1034
}
1135

1236
export class LinkMessagePortClient<T extends ClientContext> implements StandardLinkClient<T> {
13-
private readonly peer: ClientPeer
37+
private readonly peer: ClientPeerWithoutCodec
1438

1539
constructor(options: LinkMessagePortClientOptions) {
16-
this.peer = new ClientPeer((message) => {
17-
return postMessagePortMessage(options.port, message)
40+
this.peer = new ClientPeerWithoutCodec(async (message) => {
41+
const [id, type, payload] = message
42+
const transfer = await value(options.experimental_transfer, message, options.port)
43+
44+
if (transfer) {
45+
postMessagePortMessage(options.port, serializeRequestMessage(id, type, payload), transfer)
46+
}
47+
else {
48+
postMessagePortMessage(options.port, await encodeRequestMessage(id, type, payload))
49+
}
1850
})
1951

2052
onMessagePortMessage(options.port, async (message) => {
21-
await this.peer.message(message)
53+
if (isObject(message)) {
54+
await this.peer.message(deserializeResponseMessage(message as any as ReturnType<typeof serializeResponseMessage>))
55+
}
56+
else {
57+
await this.peer.message(await decodeResponseMessage(message))
58+
}
2259
})
2360

2461
onMessagePortClose(options.port, () => {

packages/client/src/adapters/message-port/message-port.test.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,18 @@ describe('postMessagePortMessage', () => {
1111
expect(mockPort.postMessage).toBeCalledTimes(1)
1212
expect(mockPort.postMessage).toHaveBeenCalledWith(data)
1313
})
14+
15+
it('calls postMessage on the port with transfer', () => {
16+
const mockPort = {
17+
addEventListener: vi.fn(),
18+
postMessage: vi.fn(),
19+
}
20+
const data = new Uint8Array([1, 2, 3])
21+
const transfer = [data.buffer]
22+
postMessagePortMessage(mockPort, data, transfer)
23+
expect(mockPort.postMessage).toBeCalledTimes(1)
24+
expect(mockPort.postMessage).toHaveBeenCalledWith(data, transfer)
25+
})
1426
})
1527

1628
describe('onMessagePortMessage', () => {

packages/client/src/adapters/message-port/message-port.ts

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
*/
44
export interface MessagePortMainLike {
55
on: <T extends string>(event: T, callback: (event?: { data: any }) => void) => void
6-
postMessage: (data: any) => void
6+
postMessage: (data: any, transfer?: any[]) => void
77
}
88

99
/**
@@ -24,10 +24,19 @@ export type SupportedMessagePort = Pick<MessagePort, 'addEventListener' | 'postM
2424
/**
2525
* Message port can support [The structured clone algorithm](https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm)
2626
*/
27-
export type SupportedMessagePortData = string | ArrayBufferLike | Uint8Array
27+
export type SupportedMessagePortData = any
2828

29-
export function postMessagePortMessage(port: SupportedMessagePort, data: SupportedMessagePortData): void {
30-
port.postMessage(data)
29+
export function postMessagePortMessage(
30+
port: SupportedMessagePort,
31+
data: SupportedMessagePortData,
32+
transfer?: any[],
33+
): void {
34+
if (transfer) {
35+
port.postMessage(data, transfer)
36+
}
37+
else {
38+
port.postMessage(data)
39+
}
3140
}
3241

3342
export function onMessagePortMessage(port: SupportedMessagePort, callback: (data: SupportedMessagePortData) => void): void {

packages/client/src/adapters/message-port/rpc-link.test.ts

Lines changed: 48 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
import { MessageChannel } from 'node:worker_threads'
2-
import { decodeRequestMessage, encodeResponseMessage, MessageType } from '@orpc/standard-server-peer'
2+
import { isObject } from '@orpc/shared'
3+
import { decodeRequestMessage, deserializeRequestMessage, encodeResponseMessage, MessageType, serializeResponseMessage } from '@orpc/standard-server-peer'
34
import { createORPCClient } from '../../client'
45
import { RPCLink } from './rpc-link'
56

67
describe('rpcLink', () => {
78
let orpc: any
8-
let sentMessages: any[]
9+
let receivedMessages: any[]
910
let clientPort: any
1011
let serverPort: any
12+
let transfer: ReturnType<typeof vi.fn>
1113

1214
beforeEach(() => {
1315
const channel = new MessageChannel()
@@ -17,22 +19,24 @@ describe('rpcLink', () => {
1719
clientPort.start()
1820
serverPort.start()
1921

20-
sentMessages = []
22+
receivedMessages = []
2123
serverPort.addEventListener('message', (event: any) => {
22-
sentMessages.push(event.data)
24+
receivedMessages.push(event.data)
2325
})
2426

27+
transfer = vi.fn()
2528
orpc = createORPCClient(new RPCLink({
2629
port: clientPort,
30+
experimental_transfer: transfer,
2731
}))
2832
})
2933

3034
it('on success', async () => {
3135
expect(orpc.ping('input')).resolves.toEqual('pong')
3236

33-
await vi.waitFor(() => expect(sentMessages.length).toBe(1))
37+
await vi.waitFor(() => expect(receivedMessages.length).toBe(1))
3438

35-
const [id, , payload] = (await decodeRequestMessage(sentMessages[0]))
39+
const [id, , payload] = (await decodeRequestMessage(receivedMessages[0]))
3640

3741
expect(id).toBeTypeOf('string')
3842
expect(payload).toEqual({
@@ -50,9 +54,9 @@ describe('rpcLink', () => {
5054
it('on success with blob', async () => {
5155
expect(orpc.ping(new Blob(['input']))).resolves.toEqual('pong')
5256

53-
await vi.waitFor(() => expect(sentMessages.length).toBe(1))
57+
await vi.waitFor(() => expect(receivedMessages.length).toBe(1))
5458

55-
const [id, , payload] = (await decodeRequestMessage(sentMessages[0]))
59+
const [id, , payload] = (await decodeRequestMessage(receivedMessages[0]))
5660

5761
expect(id).toBeTypeOf('string')
5862
expect(payload).toEqual({
@@ -67,6 +71,42 @@ describe('rpcLink', () => {
6771
)
6872
})
6973

74+
it('on success with transfer', async () => {
75+
const array = new Uint8Array([1, 2, 3])
76+
77+
transfer.mockResolvedValueOnce([array.buffer])
78+
79+
const promise = expect(orpc.ping(array)).resolves.toEqual('pong')
80+
81+
await vi.waitFor(() => expect(receivedMessages.length).toBe(1))
82+
expect(receivedMessages[0]).toSatisfy(isObject)
83+
const [id, type, payload] = deserializeRequestMessage(receivedMessages[0])
84+
85+
expect(array.byteLength).toBe(0) // transferred so length is 0
86+
87+
expect(transfer).toHaveBeenCalledTimes(1)
88+
expect(transfer).toHaveBeenCalledWith([id, type, expect.objectContaining({
89+
url: new URL('orpc://localhost/ping'),
90+
body: { json: expect.toBeOneOf([array]) },
91+
headers: {},
92+
method: 'POST',
93+
})], expect.toBeOneOf([clientPort]))
94+
95+
expect(id).toBeTypeOf('string')
96+
expect(payload).toEqual({
97+
url: new URL('orpc://localhost/ping'),
98+
body: { json: expect.toSatisfy(v => v !== array && v instanceof Uint8Array && v.byteLength === 3) },
99+
headers: {},
100+
method: 'POST',
101+
})
102+
103+
serverPort.postMessage(
104+
serializeResponseMessage(id, MessageType.RESPONSE, { body: { json: 'pong' }, status: 200, headers: {} }),
105+
)
106+
107+
await promise
108+
})
109+
70110
it('on close', async () => {
71111
expect(orpc.ping('input')).rejects.toThrow(/aborted/)
72112

packages/server/src/adapters/message-port/handler.ts

Lines changed: 55 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,80 @@
11
import type { SupportedMessagePort } from '@orpc/client/message-port'
2-
import type { MaybeOptionalOptions } from '@orpc/shared'
2+
import type { MaybeOptionalOptions, Promisable, Value } from '@orpc/shared'
3+
import type { DecodedResponseMessage, serializeRequestMessage } from '@orpc/standard-server-peer'
34
import type { Context } from '../../context'
45
import type { StandardHandler } from '../standard'
56
import type {
67
HandleStandardServerPeerMessageOptions,
78
} from '../standard-peer'
89
import { onMessagePortClose, onMessagePortMessage, postMessagePortMessage } from '@orpc/client/message-port'
9-
import { resolveMaybeOptionalOptions } from '@orpc/shared'
10-
import { ServerPeer } from '@orpc/standard-server-peer'
10+
import { isObject, resolveMaybeOptionalOptions, value } from '@orpc/shared'
11+
import { decodeRequestMessage, deserializeRequestMessage, encodeResponseMessage, serializeResponseMessage, experimental_ServerPeerWithoutCodec as ServerPeerWithoutCodec } from '@orpc/standard-server-peer'
1112
import { createServerPeerHandleRequestFn } from '../standard-peer'
1213

14+
export interface MessagePortHandlerOptions<_T extends Context> {
15+
/**
16+
* By default, oRPC serializes request/response messages to string/binary data before sending over message port.
17+
* If needed, you can define the this option to utilize full power of [MessagePort: postMessage() method](https://developer.mozilla.org/en-US/docs/Web/API/MessagePort/postMessage),
18+
* such as transferring ownership of objects to the other side or support unserializable objects like `OffscreenCanvas`.
19+
*
20+
* @remarks
21+
* - return null | undefined to disable this feature
22+
*
23+
* @warning Make sure your message port supports `transfer` before using this feature.
24+
* @example
25+
* ```ts
26+
* experimental_transfer: (message, port) => {
27+
* const transfer = deepFindTransferableObjects(message) // implement your own logic
28+
* return transfer.length ? transfer : null // only enable when needed
29+
* }
30+
* ```
31+
*
32+
* @see {@link https://orpc.unnoq.com/docs/adapters/message-port#transfer Message Port Transfer Docs}
33+
*/
34+
experimental_transfer?: Value<Promisable<object[] | null | undefined>, [message: DecodedResponseMessage, port: SupportedMessagePort]>
35+
}
36+
1337
export class MessagePortHandler<T extends Context> {
38+
private readonly transfer: MessagePortHandlerOptions<T>['experimental_transfer']
39+
1440
constructor(
1541
private readonly standardHandler: StandardHandler<T>,
42+
options: NoInfer<MessagePortHandlerOptions<T>> = {},
1643
) {
44+
this.transfer = options.experimental_transfer
1745
}
1846

1947
upgrade(
2048
port: SupportedMessagePort,
2149
...rest: MaybeOptionalOptions<HandleStandardServerPeerMessageOptions<T>>
2250
): void {
23-
const peer = new ServerPeer((message) => {
24-
return postMessagePortMessage(port, message)
51+
const peer = new ServerPeerWithoutCodec(async (message) => {
52+
const [id, type, payload] = message
53+
const transfer = await value(this.transfer, message, port)
54+
55+
if (transfer) {
56+
postMessagePortMessage(port, serializeResponseMessage(id, type, payload), transfer)
57+
}
58+
else {
59+
postMessagePortMessage(port, await encodeResponseMessage(id, type, payload))
60+
}
2561
})
2662

2763
onMessagePortMessage(port, async (message) => {
28-
await peer.message(
29-
message,
30-
createServerPeerHandleRequestFn(this.standardHandler, resolveMaybeOptionalOptions(rest)),
31-
)
64+
const handleFn = createServerPeerHandleRequestFn(this.standardHandler, resolveMaybeOptionalOptions(rest))
65+
66+
if (isObject(message)) {
67+
await peer.message(
68+
deserializeRequestMessage(message as any as ReturnType<typeof serializeRequestMessage>),
69+
handleFn,
70+
)
71+
}
72+
else {
73+
await peer.message(
74+
await decodeRequestMessage(message),
75+
handleFn,
76+
)
77+
}
3278
})
3379

3480
onMessagePortClose(port, () => {

0 commit comments

Comments
 (0)