From ea8481987544518cbd4eed09aa5a27e9431bd832 Mon Sep 17 00:00:00 2001 From: Jason Nall Date: Sat, 14 Jan 2023 19:29:14 -0500 Subject: [PATCH] Add subscriptions support. --- .changeset/beige-islands-type.md | 5 + README.md | 6 +- examples/basic/electron/api.ts | 18 +++ examples/basic/electron/index.ts | 4 +- examples/basic/src/index.tsx | 5 + examples/basic/tsconfig.json | 2 +- .../main/__tests__/handleIPCOperation.test.ts | 83 ++++++++++++ .../main/__tests__/resolveIPCResponse.test.ts | 56 -------- .../src/main/createIPCHandler.ts | 56 ++++++-- .../src/main/exposeElectronTRPC.ts | 4 +- .../src/main/handleIPCOperation.ts | 107 ++++++++++++++++ packages/electron-trpc/src/main/index.ts | 1 - .../src/main/resolveIPCResponse.ts | 109 ---------------- packages/electron-trpc/src/main/utils.ts | 15 +-- packages/electron-trpc/src/renderer/index.ts | 1 - .../electron-trpc/src/renderer/ipcLink.ts | 120 +++++++++++++++--- packages/electron-trpc/src/types.ts | 6 - 17 files changed, 384 insertions(+), 214 deletions(-) create mode 100644 .changeset/beige-islands-type.md create mode 100644 packages/electron-trpc/src/main/__tests__/handleIPCOperation.test.ts delete mode 100644 packages/electron-trpc/src/main/__tests__/resolveIPCResponse.test.ts create mode 100644 packages/electron-trpc/src/main/handleIPCOperation.ts delete mode 100644 packages/electron-trpc/src/main/resolveIPCResponse.ts delete mode 100644 packages/electron-trpc/src/types.ts diff --git a/.changeset/beige-islands-type.md b/.changeset/beige-islands-type.md new file mode 100644 index 00000000..788a1f26 --- /dev/null +++ b/.changeset/beige-islands-type.md @@ -0,0 +1,5 @@ +--- +'electron-trpc': minor +--- + +Added support for subscriptions. diff --git a/README.md b/README.md index 28c1f2d3..4ab8cb88 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ - Expose APIs from Electron's main process to one or more render processes. - Build fully type-safe IPC. - Secure alternative to opening servers on localhost. -- _Subscription support coming soon_. +- Full support for queries, mutations, and subscriptions. ## Installation @@ -44,14 +44,14 @@ npm install --save electron-trpc import { router } from './api'; app.on('ready', () => { - createIPCHandler({ router }); - const win = new BrowserWindow({ webPreferences: { // Replace this path with the path to your preload file (see next step) preload: 'path/to/preload.js', }, }); + + createIPCHandler({ router, windows: [win] }); }); ``` diff --git a/examples/basic/electron/api.ts b/examples/basic/electron/api.ts index 4c82f1be..1e3e6d92 100644 --- a/examples/basic/electron/api.ts +++ b/examples/basic/electron/api.ts @@ -1,5 +1,9 @@ import z from 'zod'; import { initTRPC } from '@trpc/server'; +import { observable } from '@trpc/server/observable'; +import { EventEmitter } from 'events'; + +const ee = new EventEmitter(); const t = initTRPC.create({ isServer: true }); @@ -7,10 +11,24 @@ export const router = t.router({ greeting: t.procedure.input(z.object({ name: z.string() })).query((req) => { const { input } = req; + ee.emit('greeting', `Greeted ${input.name}`); return { text: `Hello ${input.name}` as const, }; }), + subscription: t.procedure.subscription(() => { + return observable((emit) => { + function onGreet(text: string) { + emit.next({ text }); + } + + ee.on('greeting', onGreet); + + return () => { + ee.off('greeting', onGreet); + }; + }); + }), }); export type AppRouter = typeof router; diff --git a/examples/basic/electron/index.ts b/examples/basic/electron/index.ts index d850ac0b..62973aff 100644 --- a/examples/basic/electron/index.ts +++ b/examples/basic/electron/index.ts @@ -10,14 +10,14 @@ const preload = path.join(__dirname, './preload.js'); const url = process.env['VITE_DEV_SERVER_URL']; app.on('ready', () => { - createIPCHandler({ router }); - const win = new BrowserWindow({ webPreferences: { preload, }, }); + createIPCHandler({ router, windows: [win] }); + if (url) { win.loadURL(url); } else { diff --git a/examples/basic/src/index.tsx b/examples/basic/src/index.tsx index 668c764a..d71a4168 100644 --- a/examples/basic/src/index.tsx +++ b/examples/basic/src/index.tsx @@ -26,6 +26,11 @@ function App() { function HelloElectron() { const { data } = trpcReact.greeting.useQuery({ name: 'Electron' }); + trpcReact.subscription.useSubscription(undefined, { + onData: (data) => { + console.log(data); + }, + }); if (!data) { return null; diff --git a/examples/basic/tsconfig.json b/examples/basic/tsconfig.json index acc541af..1a26c4e1 100644 --- a/examples/basic/tsconfig.json +++ b/examples/basic/tsconfig.json @@ -6,7 +6,7 @@ "jsx": "react", "lib": ["dom", "esnext"], "module": "esnext", - "moduleResolution": "node", + "moduleResolution": "node16", "noEmit": true, "noFallthroughCasesInSwitch": true, "noUnusedLocals": true, diff --git a/packages/electron-trpc/src/main/__tests__/handleIPCOperation.test.ts b/packages/electron-trpc/src/main/__tests__/handleIPCOperation.test.ts new file mode 100644 index 00000000..ba2d0533 --- /dev/null +++ b/packages/electron-trpc/src/main/__tests__/handleIPCOperation.test.ts @@ -0,0 +1,83 @@ +import { describe, expect, test, vi } from 'vitest'; +import { z } from 'zod'; +import * as trpc from '@trpc/server'; +import { observable } from '@trpc/server/observable'; +import { EventEmitter } from 'events'; +import { handleIPCOperation } from '../handleIPCOperation'; + +const ee = new EventEmitter(); + +const t = trpc.initTRPC.create(); +const testRouter = t.router({ + testQuery: t.procedure + .input( + z.object({ + id: z.string(), + }) + ) + .query(({ input }) => { + return { id: input.id, isTest: true }; + }), + testSubscription: t.procedure.subscription(() => { + return observable((emit) => { + function testResponse() { + emit.next('test response'); + } + + ee.on('test', testResponse); + return () => ee.off('test', testResponse); + }); + }), +}); + +describe('api', () => { + test('can manually call into API', async () => { + const respond = vi.fn(); + await handleIPCOperation({ + createContext: async () => ({}), + operation: { context: {}, id: 1, input: { id: 'test-id' }, path: 'testQuery', type: 'query' }, + router: testRouter, + respond, + }); + + expect(respond).toHaveBeenCalledOnce(); + expect(respond.mock.lastCall[0]).toMatchObject({ + id: 1, + result: { + data: { + id: 'test-id', + isTest: true, + }, + }, + }); + }); + + test('does not handle subscriptions', async () => { + const respond = vi.fn(); + + await handleIPCOperation({ + createContext: async () => ({}), + operation: { + context: {}, + id: 1, + input: undefined, + path: 'testSubscription', + type: 'subscription', + }, + router: testRouter, + respond, + }); + + expect(respond).not.toHaveBeenCalled(); + + ee.emit('test'); + + expect(respond).toHaveBeenCalledOnce(); + expect(respond.mock.lastCall[0]).toMatchObject({ + id: 1, + result: { + data: 'test response', + }, + }); + }); +}); diff --git a/packages/electron-trpc/src/main/__tests__/resolveIPCResponse.test.ts b/packages/electron-trpc/src/main/__tests__/resolveIPCResponse.test.ts deleted file mode 100644 index 8f21412b..00000000 --- a/packages/electron-trpc/src/main/__tests__/resolveIPCResponse.test.ts +++ /dev/null @@ -1,56 +0,0 @@ -import { describe, expect, test } from 'vitest'; -import { z } from 'zod'; -import { resolveIPCResponse } from '../resolveIPCResponse'; -import * as trpc from '@trpc/server'; - -const t = trpc.initTRPC.create(); -const testRouter = t.router({ - test: t.procedure - .input( - z.object({ - id: z.string(), - }) - ) - .query(({ input }) => { - return { id: input.id, isTest: true }; - }), -}); - -describe('api', () => { - test('can manually call into API', async () => { - const resolved = await resolveIPCResponse({ - createContext: async () => ({}), - operation: { context: {}, id: 1, input: { id: 'test-id' }, path: 'test', type: 'query' }, - router: testRouter, - }); - - expect(resolved).toMatchObject({ - response: { - result: { - data: { - id: 'test-id', - isTest: true, - }, - }, - }, - }); - }); - - test('does not handle subscriptions', async () => { - resolveIPCResponse({ - createContext: async () => ({}), - operation: { - context: {}, - id: 1, - input: { id: 'test-id' }, - path: 'test', - type: 'subscription', - }, - router: testRouter, - }).catch((cause) => { - expect(cause.name).toBe('TRPCError'); - expect(cause.message).toBe('Subscriptions should use wsLink'); - expect(cause.code).toBe('METHOD_NOT_SUPPORTED'); - }); - }); -}); diff --git a/packages/electron-trpc/src/main/createIPCHandler.ts b/packages/electron-trpc/src/main/createIPCHandler.ts index 57fb554b..ce7ebe38 100644 --- a/packages/electron-trpc/src/main/createIPCHandler.ts +++ b/packages/electron-trpc/src/main/createIPCHandler.ts @@ -1,18 +1,58 @@ import type { Operation } from '@trpc/client'; import type { AnyRouter, inferRouterContext } from '@trpc/server'; +import type { TRPCResponseMessage } from '@trpc/server/rpc'; import { ipcMain } from 'electron'; -import type { IpcMainInvokeEvent } from 'electron'; -import { resolveIPCResponse } from './resolveIPCResponse'; +import type { BrowserWindow, IpcMainInvokeEvent } from 'electron'; +import { handleIPCOperation } from './handleIPCOperation'; import { ELECTRON_TRPC_CHANNEL } from '../constants'; -export function createIPCHandler({ +class IPCHandler { + #windows: BrowserWindow[]; + + constructor({ + createContext, + router, + windows = [], + }: { + createContext?: () => Promise>; + router: TRouter; + windows?: BrowserWindow[]; + }) { + this.#windows = windows; + + ipcMain.on(ELECTRON_TRPC_CHANNEL, (_event: IpcMainInvokeEvent, args: Operation) => { + handleIPCOperation({ + router, + createContext, + operation: args, + respond: (response) => this.#sendToAllWindows(response), + }); + }); + } + + #sendToAllWindows(response: TRPCResponseMessage) { + this.#windows.forEach((win) => { + win.webContents.send(ELECTRON_TRPC_CHANNEL, response); + }); + } + + attachWindow(win: BrowserWindow) { + this.#windows.push(win); + } + + detachWindow(win: BrowserWindow) { + this.#windows = this.#windows.filter((w) => w !== win); + } +} + +export const createIPCHandler = ({ createContext, router, + windows = [], }: { createContext?: () => Promise>; router: TRouter; -}) { - ipcMain.handle(ELECTRON_TRPC_CHANNEL, (_event: IpcMainInvokeEvent, args: Operation) => { - return resolveIPCResponse({ router, createContext, operation: args }); - }); -} + windows?: Electron.BrowserWindow[]; +}) => { + return new IPCHandler({ createContext, router, windows }); +}; diff --git a/packages/electron-trpc/src/main/exposeElectronTRPC.ts b/packages/electron-trpc/src/main/exposeElectronTRPC.ts index f7de43a2..9177822b 100644 --- a/packages/electron-trpc/src/main/exposeElectronTRPC.ts +++ b/packages/electron-trpc/src/main/exposeElectronTRPC.ts @@ -1,9 +1,11 @@ import type { Operation } from '@trpc/client'; +import type { TRPCResponseMessage } from '@trpc/server/rpc'; import { ipcRenderer, contextBridge } from 'electron'; import { ELECTRON_TRPC_CHANNEL } from '../constants'; export const exposeElectronTRPC = () => { contextBridge.exposeInMainWorld('electronTRPC', { - rpc: (args: Operation) => ipcRenderer.invoke(ELECTRON_TRPC_CHANNEL, args), + sendMessage: (args: Operation) => ipcRenderer.send(ELECTRON_TRPC_CHANNEL, args), + onMessage: (callback: (args: TRPCResponseMessage) => void) => ipcRenderer.on(ELECTRON_TRPC_CHANNEL, (_event, args) => callback(args)), }); }; diff --git a/packages/electron-trpc/src/main/handleIPCOperation.ts b/packages/electron-trpc/src/main/handleIPCOperation.ts new file mode 100644 index 00000000..0489fe74 --- /dev/null +++ b/packages/electron-trpc/src/main/handleIPCOperation.ts @@ -0,0 +1,107 @@ +import { callProcedure, TRPCError } from '@trpc/server'; +import type { AnyRouter, inferRouterContext } from '@trpc/server'; +import type { TRPCResponseMessage } from '@trpc/server/rpc'; +import { isObservable } from '@trpc/server/observable'; +import { Operation } from '@trpc/client'; +import { getTRPCErrorFromUnknown, transformTRPCResponseItem } from './utils'; + +export async function handleIPCOperation({ + router, + createContext, + operation, + respond, +}: { + router: TRouter; + createContext?: () => Promise>; + operation: Operation; + respond: (response: TRPCResponseMessage) => void; +}) { + const { type, input: serializedInput, id, path } = operation; + const input = router._def._config.transformer.input.deserialize(serializedInput); + + // type TSuccessResponse = TRPCSuccessResponse>; + // type TErrorResponse = TRPCErrorResponse>; + + const ctx = (await createContext?.()) ?? {}; + + try { + const result = await callProcedure({ + ctx, + path, + procedures: router._def.procedures, + rawInput: input, + type, + }); + + if (type !== 'subscription') { + const response = transformTRPCResponseItem(router, { + id, + result: { + type: 'data', + data: result, + }, + }); + + respond(response); + return; + } else { + // result is an observable + if (!isObservable(result)) { + throw new TRPCError({ + message: `Subscription ${path} did not return an observable`, + code: 'INTERNAL_SERVER_ERROR', + }); + } + } + + const subscription = result.subscribe({ + next(data) { + respond({ + id, + result: { + type: 'data', + data, + }, + }); + }, + error(err) { + const error = getTRPCErrorFromUnknown(err); + // opts.onError?.({ error, path, type, ctx, req, input }); + respond({ + id, + error: router.getErrorShape({ + error, + type, + path, + input, + ctx, + }), + }); + }, + complete() { + respond({ + id, + result: { + type: 'stopped', + }, + }); + }, + }); + + void subscription; + } catch (cause) { + const error: TRPCError = getTRPCErrorFromUnknown(cause); + + const response = transformTRPCResponseItem(router, { + error: router.getErrorShape({ + error, + type, + path, + input, + ctx, + }), + }); + + return { response }; + } +} diff --git a/packages/electron-trpc/src/main/index.ts b/packages/electron-trpc/src/main/index.ts index 3ff99370..290af0eb 100644 --- a/packages/electron-trpc/src/main/index.ts +++ b/packages/electron-trpc/src/main/index.ts @@ -1,4 +1,3 @@ export * from '../constants'; -export * from '../types'; export * from './createIPCHandler'; export * from './exposeElectronTRPC'; diff --git a/packages/electron-trpc/src/main/resolveIPCResponse.ts b/packages/electron-trpc/src/main/resolveIPCResponse.ts deleted file mode 100644 index 5b63e8fd..00000000 --- a/packages/electron-trpc/src/main/resolveIPCResponse.ts +++ /dev/null @@ -1,109 +0,0 @@ -import { callProcedure, TRPCError } from '@trpc/server'; -import type { AnyRouter, inferRouterContext, inferRouterError } from '@trpc/server'; -import type { TRPCResponse } from '@trpc/server/rpc'; -import type { IPCResponse } from '../types'; -import { Operation } from '@trpc/client'; -import { getTRPCErrorFromUnknown, transformTRPCResponseItem } from './utils' - -export async function resolveIPCResponse({ - router, - createContext, - operation, -}: { - router: TRouter; - createContext?: () => Promise>; - operation: Operation; -}): Promise { - const { type, input: serializedInput } = operation; - const { transformer } = router._def._config; - const deserializedInput = transformer.input.deserialize(serializedInput) as unknown; - - type TRouterError = inferRouterError; - type TRouterResponse = TRPCResponse; - - const ctx = await createContext?.() ?? {}; - - if (type === 'subscription') { - throw new TRPCError({ - message: 'Subscriptions should use wsLink', - code: 'METHOD_NOT_SUPPORTED', - }); - } - - type RawResult = - | { input: unknown; path: string; data: unknown } - | { input: unknown; path: string; error: TRPCError }; - - async function getRawResult(ctx: inferRouterContext): Promise { - const { path, type } = operation; - const { procedures } = router._def; - - try { - const output = await callProcedure({ - ctx, - path, - procedures, - rawInput: deserializedInput, - type, - }); - return { - input: deserializedInput, - path, - data: output, - }; - } catch (cause) { - const error = getTRPCErrorFromUnknown(cause); - return { - input: deserializedInput, - path, - error, - }; - } - } - - function getResultEnvelope(rawResult: RawResult): TRouterResponse { - const { path, input } = rawResult; - - if ('error' in rawResult) { - return { - error: router.getErrorShape({ - error: rawResult.error, - type, - path, - input, - ctx, - }), - }; - } else { - return { - result: { - data: rawResult.data, - }, - }; - } - } - - function getEndResponse(envelope: TRouterResponse): IPCResponse { - const transformed = transformTRPCResponseItem(router, envelope); - - return { - response: transformed, - }; - } - - try { - const rawResult = await getRawResult(ctx); - const resultEnvelope = getResultEnvelope(rawResult); - - return getEndResponse(resultEnvelope); - } catch (cause) { - const { input, path } = operation; - // we get here if - // - `createContext()` throws - // - input deserialization fails - const error = getTRPCErrorFromUnknown(cause); - const resultEnvelope = getResultEnvelope({ input, path, error }); - - return getEndResponse(resultEnvelope); - } -} diff --git a/packages/electron-trpc/src/main/utils.ts b/packages/electron-trpc/src/main/utils.ts index 1ba1e647..5bea1221 100644 --- a/packages/electron-trpc/src/main/utils.ts +++ b/packages/electron-trpc/src/main/utils.ts @@ -44,25 +44,24 @@ function getMessageFromUnkownError(err: unknown, fallback: string): string { } // from @trpc/server/src/internals/transformTRPCResonse -export function transformTRPCResponseItem< - TResponseItem extends TRPCResponse | TRPCResponseMessage ->(router: AnyRouter, item: TResponseItem): TResponseItem { +export function transformTRPCResponseItem( + router: AnyRouter, + item: TResponseItem +): TResponseItem { // explicitly use appRouter instead of router argument: https://github.com/trpc/trpc/issues/2804 - if ("error" in item) { + if ('error' in item) { return { ...item, error: router._def._config.transformer.output.serialize(item.error) as unknown, }; } - if ("data" in item.result) { + if ('data' in item.result) { return { ...item, result: { ...item.result, - data: router._def._config.transformer.output.serialize( - item.result.data - ) as unknown, + data: router._def._config.transformer.output.serialize(item.result.data) as unknown, }, }; } diff --git a/packages/electron-trpc/src/renderer/index.ts b/packages/electron-trpc/src/renderer/index.ts index 14c9a68d..5363e0cc 100644 --- a/packages/electron-trpc/src/renderer/index.ts +++ b/packages/electron-trpc/src/renderer/index.ts @@ -1,3 +1,2 @@ export * from '../constants'; -export * from '../types'; export * from './ipcLink'; diff --git a/packages/electron-trpc/src/renderer/ipcLink.ts b/packages/electron-trpc/src/renderer/ipcLink.ts index ee6557a1..1e6d7539 100644 --- a/packages/electron-trpc/src/renderer/ipcLink.ts +++ b/packages/electron-trpc/src/renderer/ipcLink.ts @@ -1,33 +1,117 @@ -import { TRPCClientError, TRPCLink } from '@trpc/client'; -import type { AnyRouter } from '@trpc/server'; -import { observable } from '@trpc/server/observable'; -import { IPCResponse } from '../types'; +import { Operation, TRPCClientError, TRPCLink } from '@trpc/client'; +import type { AnyRouter, inferRouterContext, ProcedureType } from '@trpc/server'; +import type { TRPCResponse, TRPCResponseMessage } from '@trpc/server/rpc'; +import { observable, Observer } from '@trpc/server/observable'; import { transformResult } from './utils'; +type IPCCallbackResult = TRPCResponseMessage< + unknown, + inferRouterContext +>; + +type IPCCallbacks = Observer< + IPCCallbackResult, + TRPCClientError +>; + +type IPCRequest = { + type: ProcedureType; + callbacks: IPCCallbacks; + op: Operation; +}; + +class IPCClient { + #pendingRequests = new Map(); + + constructor() { + (window as any).electronTRPC.onMessage((response: TRPCResponseMessage) => { + this.#handleResponse(response); + }); + } + + #handleResponse(response: TRPCResponseMessage) { + const request = response.id && this.#pendingRequests.get(response.id); + if (!request) { + return; + } + + request.callbacks.next(response); + + if ('result' in response && response.result.type === 'stopped') { + request.callbacks.complete(); + } + } + + request(op: Operation, callbacks: IPCCallbacks) { + const { id, type } = op; + + this.#pendingRequests.set(id, { + type, + callbacks, + op, + }); + + (window as any).electronTRPC.sendMessage(op) as Promise; + + return () => { + const callbacks = this.#pendingRequests.get(op.id)?.callbacks; + + this.#pendingRequests.delete(op.id); + + callbacks?.complete(); + + (window as any).electronTRPC.sendMessage({ + id, + method: 'subscription.stop', + }); + }; + } +} + export function ipcLink(): TRPCLink { - return (runtime) => - ({ op }) => { - return observable((observer) => { - const promise = (window as any).electronTRPC.rpc(op) as Promise; + return (runtime) => { + const client = new IPCClient(); - promise - .then((res) => { - const transformed = transformResult(res.response, runtime); + return ({ op }) => { + return observable((observer) => { + let isDone = false; + const unsubscribe = client.request(op, { + error(err) { + isDone = true; + observer.error(err as TRPCClientError); + unsubscribe(); + }, + complete() { + if (!isDone) { + isDone = true; + observer.error(TRPCClientError.from(new Error('Operation ended prematurely'))); + } else { + observer.complete(); + } + }, + next(response) { + const transformed = transformResult(response, runtime); if (!transformed.ok) { observer.error(TRPCClientError.from(transformed.error)); return; } - observer.next({ - result: transformed.result, - }); - observer.complete(); - }) - .catch((cause: Error) => observer.error(TRPCClientError.from(cause))); + + observer.next({ result: transformed.result }); + + if (op.type !== 'subscription') { + isDone = true; + unsubscribe(); + observer.complete(); + } + }, + }); return () => { - // cancel promise here + isDone = true; + unsubscribe(); }; }); }; + }; } diff --git a/packages/electron-trpc/src/types.ts b/packages/electron-trpc/src/types.ts deleted file mode 100644 index b6c943e4..00000000 --- a/packages/electron-trpc/src/types.ts +++ /dev/null @@ -1,6 +0,0 @@ -import type { TRPCResponse } from '@trpc/server/rpc'; - -export interface IPCResponse { - response: TRPCResponse; -} -