diff --git a/README.md b/README.md index e52d393e..e690702b 100644 --- a/README.md +++ b/README.md @@ -29,8 +29,7 @@ Done in 141.94s. ### Support for GraphQL spec -The goal is to support the [June 2018 version of the GraphQL spec](https://facebook.github.io/graphql/June2018/). At this moment, -the only missing feature is support for Subscriptions. +The goal is to support the [June 2018 version of the GraphQL spec](https://facebook.github.io/graphql/June2018/). #### Differences to `graphql-js` @@ -91,10 +90,19 @@ if (!isCompiledQuery(compiledQuery)) { #### Execute the Query ```js -const executionResult = await compiledQuery.query(); +const executionResult = await compiledQuery.query(root, context, variables); console.log(executionResult); ``` +#### Subscribe to the Query + +```js +const result = await compiledQuery.subscribe(root, context, variables); +for await (const value of result) { + console.log(value); +} +``` + ## API ### compiledQuery = compileQuery(schema, document, operationName, compilerOptions) @@ -112,14 +120,18 @@ Compiles the `document` AST, using an optional operationName and compiler option for overly expensive serializers - `customJSONSerializer` {boolean, default: false} - Whether to produce also a JSON serializer function using `fast-json-stringify`. The default stringifier function is `JSON.stringify` -#### compiledQuery.compiled(root: any, context: any, variables: Maybe<{ [key: string]: any }>) +#### compiledQuery.query(root: any, context: any, variables: Maybe<{ [key: string]: any }>) the compiled function that can be called with a root value, a context and the required variables. +#### compiledQuery.subscribe(root: any, context: any, variables: Maybe<{ [key: string]: any }>) + +(available for GraphQL Subscription only) the compiled function that can be called with a root value, a context and the required variables to produce either an AsyncIterator (if successful) or an ExecutionResult (error). + #### compiledQuery.stringify(value: any) the compiled function for producing a JSON string. It will be `JSON.stringify` unless `compilerOptions.customJSONSerializer` is true. -The value argument should the return of the compiled GraphQL function. +The value argument should be the return of the compiled GraphQL function. ## LICENSE diff --git a/src/__tests__/execution.test.ts b/src/__tests__/execution.test.ts index 4e15b8da..d3ace172 100644 --- a/src/__tests__/execution.test.ts +++ b/src/__tests__/execution.test.ts @@ -2,6 +2,7 @@ * Based on https://github.com/graphql/graphql-js/blob/master/src/execution/__tests__/execution-test.js */ +import { makeExecutableSchema } from "@graphql-tools/schema"; import { DocumentNode, ExecutableDefinitionNode, @@ -16,7 +17,6 @@ import { parse } from "graphql"; import { CompiledQuery, compileQuery, isCompiledQuery } from "../execution"; -import { makeExecutableSchema } from "@graphql-tools/schema"; function executeArgs(args: any) { const { @@ -67,7 +67,7 @@ describe("Execute: Handles basic execution tasks", () => { }) }); - expect(() => executeQuery(schema)).toThrow("Must provide document"); + expect(() => executeQuery(schema)).toThrow("Must provide document."); }); test("throws if no resolve info enricher is not a function", async () => { const schema = new GraphQLSchema({ diff --git a/src/__tests__/subscription.test.ts b/src/__tests__/subscription.test.ts new file mode 100644 index 00000000..e2220dca --- /dev/null +++ b/src/__tests__/subscription.test.ts @@ -0,0 +1,1057 @@ +/** + * Based on https://github.com/graphql/graphql-js/blob/main/src/subscription/__tests__/subscribe-test.js + * This test suite includes certain deviations from the original: + * graphql-jit does not support the root resolver pattern that this test uses + * so the part must be rewritten to include that root resolver in `subscribe` of + * the GraphQLObject in the schema. + */ + +import { + ExecutionResult, + GraphQLBoolean, + GraphQLInt, + GraphQLList, + GraphQLObjectType, + GraphQLSchema, + GraphQLString, + parse, + SubscriptionArgs +} from "graphql"; +import { compileQuery, isAsyncIterable, isCompiledQuery } from "../execution"; + +type Email = { + from: string; + subject: string; + message: string; + unread: boolean; +}; + +const EmailType = new GraphQLObjectType({ + name: "Email", + fields: { + from: { type: GraphQLString }, + subject: { type: GraphQLString }, + message: { type: GraphQLString }, + unread: { type: GraphQLBoolean } + } +}); + +const InboxType = new GraphQLObjectType({ + name: "Inbox", + fields: { + total: { + type: GraphQLInt, + resolve: inbox => inbox.emails.length + }, + unread: { + type: GraphQLInt, + resolve: inbox => inbox.emails.filter((email: any) => email.unread).length + }, + emails: { type: new GraphQLList(EmailType) } + } +}); + +const EmailEventType = new GraphQLObjectType({ + name: "EmailEvent", + fields: { + email: { type: EmailType }, + inbox: { type: InboxType } + } +}); + +async function subscribe({ + schema, + document, + operationName, + rootValue, + contextValue, + variableValues +}: SubscriptionArgs): Promise< + AsyncIterableIterator | ExecutionResult +> { + const prepared = compileQuery(schema, document, operationName || ""); + if (!isCompiledQuery(prepared)) return prepared; + return prepared.subscribe!(rootValue, contextValue, variableValues); +} + +function createSubscription(pubsub: SimplePubSub) { + const document = parse(` + subscription ($priority: Int = 0) { + importantEmail(priority: $priority) { + email { + from + subject + } + inbox { + unread + total + } + } + } + `); + + const emails = [ + { + from: "joe@graphql.org", + subject: "Hello", + message: "Hello World", + unread: false + } + ]; + + const inbox = { emails }; + + const QueryType = new GraphQLObjectType({ + name: "Query", + fields: { + inbox: { type: InboxType, resolve: () => emails } + } + }); + + const emailSchema = new GraphQLSchema({ + query: QueryType, + subscription: new GraphQLObjectType({ + name: "Subscription", + fields: { + importantEmail: { + type: EmailEventType, + args: { + priority: { type: GraphQLInt } + }, + // FIXME: we shouldn't use mapAsyncIterator here since it makes tests way more complex + subscribe() { + return pubsub.getSubscriber(newEmail => { + emails.push(newEmail); + + return { + importantEmail: { + email: newEmail, + inbox + } + }; + }); + } + } + } + }) + }); + + return subscribe({ schema: emailSchema, document }); +} + +async function expectPromise(promise: Promise) { + let caughtError: any; + + try { + await promise; + throw new Error("promise should have thrown but did not"); + } catch (error) { + caughtError = error; + } + + return { + toReject() { + expect(caughtError).toBeInstanceOf(Error); + }, + toRejectWith(message: string) { + expect(caughtError).toBeInstanceOf(Error); + expect(caughtError).toHaveProperty("message", message); + } + }; +} + +const DummyQueryType = new GraphQLObjectType({ + name: "Query", + fields: { + dummy: { type: GraphQLString } + } +}); + +// Check all error cases when initializing the subscription. +describe("Subscription Initialization Phase", () => { + it("accepts multiple subscription fields defined in schema", async () => { + async function* fooGenerator() { + yield { foo: "FooValue" }; + } + + const schema = new GraphQLSchema({ + query: DummyQueryType, + subscription: new GraphQLObjectType({ + name: "Subscription", + fields: { + foo: { type: GraphQLString, subscribe: fooGenerator }, + bar: { type: GraphQLString } + } + }) + }); + + const subscription = (await subscribe({ + schema, + document: parse("subscription { foo }") + })) as AsyncIterableIterator; + expect(isAsyncIterable(subscription)).toBeTruthy(); + + expect(await subscription.next()).toEqual({ + done: false, + value: { data: { foo: "FooValue" } } + }); + + // Close subscription + await subscription.return!(); + }); + + it("accepts type definition with sync subscribe function", async () => { + async function* fooGenerator() { + yield { foo: "FooValue" }; + } + + const schema = new GraphQLSchema({ + query: DummyQueryType, + subscription: new GraphQLObjectType({ + name: "Subscription", + fields: { + foo: { + type: GraphQLString, + subscribe: fooGenerator + } + } + }) + }); + + const subscription = (await subscribe({ + schema, + document: parse("subscription { foo }") + })) as AsyncIterableIterator; + expect(isAsyncIterable(subscription)).toBeTruthy(); + + expect(await subscription.next()).toEqual({ + done: false, + value: { data: { foo: "FooValue" } } + }); + + // Close subscription + await subscription.return!(); + }); + + it("accepts type definition with async subscribe function", async () => { + async function* fooGenerator() { + yield { foo: "FooValue" }; + } + + const schema = new GraphQLSchema({ + query: DummyQueryType, + subscription: new GraphQLObjectType({ + name: "Subscription", + fields: { + foo: { + type: GraphQLString, + async subscribe() { + await resolveOnNextTick(); + return fooGenerator(); + } + } + } + }) + }); + + const subscription = (await subscribe({ + schema, + document: parse("subscription { foo }") + })) as AsyncIterableIterator; + expect(isAsyncIterable(subscription)).toBeTruthy(); + + expect(await subscription.next()).toEqual({ + done: false, + value: { data: { foo: "FooValue" } } + }); + + // Close subscription + await subscription.return!(); + }); + + it("should only resolve the first field of invalid multi-field", async () => { + async function* fooGenerator() { + yield { foo: "FooValue" }; + } + + let didResolveFoo = false; + let didResolveBar = false; + + const schema = new GraphQLSchema({ + query: DummyQueryType, + subscription: new GraphQLObjectType({ + name: "Subscription", + fields: { + foo: { + type: GraphQLString, + subscribe() { + didResolveFoo = true; + return fooGenerator(); + } + }, + bar: { + type: GraphQLString, + subscribe() { + didResolveBar = true; + } + } + } + }) + }); + + const subscription = (await subscribe({ + schema, + document: parse("subscription { foo bar }") + })) as AsyncIterableIterator; + expect(isAsyncIterable(subscription)).toBeTruthy(); + + expect(didResolveFoo).toBe(true); + expect(didResolveBar).toBe(false); + + expect(await subscription.next()).toHaveProperty("done", false); + + // Close subscription + await subscription.return!(); + }); + + it("throws an error if some of required arguments are missing", async () => { + const document = parse("subscription { foo }"); + const schema = new GraphQLSchema({ + query: DummyQueryType, + subscription: new GraphQLObjectType({ + name: "Subscription", + fields: { + foo: { type: GraphQLString } + } + }) + }); + + // @ts-ignore + (await expectPromise(subscribe({ schema: null, document }))).toRejectWith( + "Expected null to be a GraphQL schema." + ); + + // @ts-ignore + (await expectPromise(subscribe({ document }))).toRejectWith( + "Expected undefined to be a GraphQL schema." + ); + + // @ts-ignore + (await expectPromise(subscribe({ schema, document: null }))).toRejectWith( + "Must provide document." + ); + + // @ts-ignore + (await expectPromise(subscribe({ schema }))).toRejectWith( + "Must provide document." + ); + }); + + it("resolves to an error for unknown subscription field", async () => { + const schema = new GraphQLSchema({ + query: DummyQueryType, + subscription: new GraphQLObjectType({ + name: "Subscription", + fields: { + foo: { type: GraphQLString } + } + }) + }); + const document = parse("subscription { unknownField }"); + + const result = await subscribe({ schema, document }); + expect(result).toEqual({ + errors: [ + { + message: 'The subscription field "unknownField" is not defined.', + locations: [{ line: 1, column: 16 }] + } + ] + }); + }); + + it("should pass through unexpected errors thrown in subscribe", async () => { + const schema = new GraphQLSchema({ + query: DummyQueryType, + subscription: new GraphQLObjectType({ + name: "Subscription", + fields: { + foo: { type: GraphQLString } + } + }) + }); + + // @ts-ignore + (await expectPromise(subscribe({ schema, document: {} }))).toReject(); + }); + + it("throws an error if subscribe does not return an iterator", async () => { + const schema = new GraphQLSchema({ + query: DummyQueryType, + subscription: new GraphQLObjectType({ + name: "Subscription", + fields: { + foo: { + type: GraphQLString, + subscribe: () => "test" + } + } + }) + }); + + const document = parse("subscription { foo }"); + + (await expectPromise(subscribe({ schema, document }))).toRejectWith( + 'Subscription field must return Async Iterable. Received: "test".' + ); + }); + + it("resolves to an error for subscription resolver errors", async () => { + async function subscribeWithFn(subscribeFn: () => any) { + const schema = new GraphQLSchema({ + query: DummyQueryType, + subscription: new GraphQLObjectType({ + name: "Subscription", + fields: { + foo: { type: GraphQLString, subscribe: subscribeFn } + } + }) + }); + const document = parse("subscription { foo }"); + const result = await subscribe({ schema, document }); + + return result; + } + + const expectedResult = { + errors: [ + { + message: "test error", + locations: [{ line: 1, column: 16 }], + path: ["foo"] + } + ] + }; + + expect( + // Returning an error + await subscribeWithFn(() => new Error("test error")) + ).toEqual(expectedResult); + + expect( + // Throwing an error + await subscribeWithFn(() => { + throw new Error("test error"); + }) + ).toEqual(expectedResult); + + expect( + // Resolving to an error + await subscribeWithFn(() => Promise.resolve(new Error("test error"))) + ).toEqual(expectedResult); + + expect( + // Rejecting with an error + await subscribeWithFn(() => Promise.reject(new Error("test error"))) + ).toEqual(expectedResult); + }); + + it("resolves to an error if variables were wrong type", async () => { + const schema = new GraphQLSchema({ + query: DummyQueryType, + subscription: new GraphQLObjectType({ + name: "Subscription", + fields: { + foo: { + type: GraphQLString, + args: { arg: { type: GraphQLInt } } + } + } + }) + }); + + const variableValues = { arg: "meow" }; + const document = parse(` + subscription ($arg: Int) { + foo(arg: $arg) + } + `); + + // If we receive variables that cannot be coerced correctly, subscribe() will + // resolve to an ExecutionResult that contains an informative error description. + const result = await subscribe({ schema, document, variableValues }); + expect(result).toEqual({ + errors: [ + { + message: + // DIFF: 'Variable "$arg" got invalid value "meow"; Int cannot represent non-integer value: "meow"', + 'Variable "$arg" got invalid value "meow"; Expected type Int; Int cannot represent non-integer value: "meow"', + locations: [{ line: 2, column: 21 }] + } + ] + }); + }); +}); + +// Once a subscription returns a valid AsyncIterator, it can still yield errors. +describe("Subscription Publish Phase", () => { + it("produces a payload for multiple subscribe in same subscription", async () => { + const pubsub = new SimplePubSub(); + + const subscription = (await createSubscription( + pubsub + )) as AsyncIterableIterator; + expect(isAsyncIterable(subscription)).toBeTruthy(); + + const secondSubscription = (await createSubscription( + pubsub + )) as AsyncIterableIterator; + expect(isAsyncIterable(secondSubscription)).toBeTruthy(); + + const payload1 = subscription.next(); + const payload2 = secondSubscription.next(); + + expect( + pubsub.emit({ + from: "yuzhi@graphql.org", + subject: "Alright", + message: "Tests are good", + unread: true + }) + ).toBe(true); + + const expectedPayload = { + done: false, + value: { + data: { + importantEmail: { + email: { + from: "yuzhi@graphql.org", + subject: "Alright" + }, + inbox: { + unread: 1, + total: 2 + } + } + } + } + }; + + expect(await payload1).toEqual(expectedPayload); + expect(await payload2).toEqual(expectedPayload); + }); + + it("produces a payload per subscription event", async () => { + const pubsub = new SimplePubSub(); + const subscription = (await createSubscription( + pubsub + )) as AsyncIterableIterator; + expect(isAsyncIterable(subscription)).toBeTruthy(); + + // Wait for the next subscription payload. + const payload = subscription.next(); + + // A new email arrives! + expect( + pubsub.emit({ + from: "yuzhi@graphql.org", + subject: "Alright", + message: "Tests are good", + unread: true + }) + ).toBe(true); + + // The previously waited on payload now has a value. + expect(await payload).toEqual({ + done: false, + value: { + data: { + importantEmail: { + email: { + from: "yuzhi@graphql.org", + subject: "Alright" + }, + inbox: { + unread: 1, + total: 2 + } + } + } + } + }); + + // Another new email arrives, before subscription.next() is called. + expect( + pubsub.emit({ + from: "hyo@graphql.org", + subject: "Tools", + message: "I <3 making things", + unread: true + }) + ).toBe(true); + + // The next waited on payload will have a value. + expect(await subscription.next()).toEqual({ + done: false, + value: { + data: { + importantEmail: { + email: { + from: "hyo@graphql.org", + subject: "Tools" + }, + inbox: { + unread: 2, + total: 3 + } + } + } + } + }); + + // The client decides to disconnect. + expect(await subscription.return!()).toEqual({ + done: true, + value: undefined + }); + + // Which may result in disconnecting upstream services as well. + expect( + pubsub.emit({ + from: "adam@graphql.org", + subject: "Important", + message: "Read me please", + unread: true + }) + ).toBe(false); // No more listeners. + + // Awaiting a subscription after closing it results in completed results. + expect(await subscription.next()).toEqual({ + done: true, + value: undefined + }); + }); + + it("produces a payload when there are multiple events", async () => { + const pubsub = new SimplePubSub(); + const subscription = (await createSubscription( + pubsub + )) as AsyncIterableIterator; + expect(isAsyncIterable(subscription)).toBeTruthy(); + + let payload = subscription.next(); + + // A new email arrives! + expect( + pubsub.emit({ + from: "yuzhi@graphql.org", + subject: "Alright", + message: "Tests are good", + unread: true + }) + ).toBe(true); + + expect(await payload).toEqual({ + done: false, + value: { + data: { + importantEmail: { + email: { + from: "yuzhi@graphql.org", + subject: "Alright" + }, + inbox: { + unread: 1, + total: 2 + } + } + } + } + }); + + payload = subscription.next(); + + // A new email arrives! + expect( + pubsub.emit({ + from: "yuzhi@graphql.org", + subject: "Alright 2", + message: "Tests are good 2", + unread: true + }) + ).toBe(true); + + expect(await payload).toEqual({ + done: false, + value: { + data: { + importantEmail: { + email: { + from: "yuzhi@graphql.org", + subject: "Alright 2" + }, + inbox: { + unread: 2, + total: 3 + } + } + } + } + }); + }); + + it("should not trigger when subscription is already done", async () => { + const pubsub = new SimplePubSub(); + const subscription = (await createSubscription( + pubsub + )) as AsyncIterableIterator; + expect(isAsyncIterable(subscription)).toBeTruthy(); + + let payload = subscription.next(); + + // A new email arrives! + expect( + pubsub.emit({ + from: "yuzhi@graphql.org", + subject: "Alright", + message: "Tests are good", + unread: true + }) + ).toBe(true); + + expect(await payload).toEqual({ + done: false, + value: { + data: { + importantEmail: { + email: { + from: "yuzhi@graphql.org", + subject: "Alright" + }, + inbox: { + unread: 1, + total: 2 + } + } + } + } + }); + + payload = subscription.next(); + await subscription.return!(); + + // A new email arrives! + expect( + pubsub.emit({ + from: "yuzhi@graphql.org", + subject: "Alright 2", + message: "Tests are good 2", + unread: true + }) + ).toBe(false); + + expect(await payload).toEqual({ + done: true, + value: undefined + }); + }); + + it("should not trigger when subscription is thrown", async () => { + const pubsub = new SimplePubSub(); + const subscription = (await createSubscription( + pubsub + )) as AsyncIterableIterator; + expect(isAsyncIterable(subscription)).toBeTruthy(); + + let payload = subscription.next(); + + // A new email arrives! + expect( + pubsub.emit({ + from: "yuzhi@graphql.org", + subject: "Alright", + message: "Tests are good", + unread: true + }) + ).toBe(true); + + expect(await payload).toEqual({ + done: false, + value: { + data: { + importantEmail: { + email: { + from: "yuzhi@graphql.org", + subject: "Alright" + }, + inbox: { + unread: 1, + total: 2 + } + } + } + } + }); + + payload = subscription.next(); + + // Throw error + let caughtError; + try { + await subscription.throw!("ouch"); + } catch (e) { + caughtError = e; + } + expect(caughtError).toBe("ouch"); + + expect(await payload).toEqual({ + done: true, + value: undefined + }); + }); + + it("event order is correct for multiple publishes", async () => { + const pubsub = new SimplePubSub(); + const subscription = (await createSubscription( + pubsub + )) as AsyncIterableIterator; + expect(isAsyncIterable(subscription)).toBeTruthy(); + + let payload = subscription.next(); + + // A new email arrives! + expect( + pubsub.emit({ + from: "yuzhi@graphql.org", + subject: "Message", + message: "Tests are good", + unread: true + }) + ).toBe(true); + + // A new email arrives! + expect( + pubsub.emit({ + from: "yuzhi@graphql.org", + subject: "Message 2", + message: "Tests are good 2", + unread: true + }) + ).toBe(true); + + expect(await payload).toEqual({ + done: false, + value: { + data: { + importantEmail: { + email: { + from: "yuzhi@graphql.org", + subject: "Message" + }, + inbox: { + unread: 2, + total: 3 + } + } + } + } + }); + + payload = subscription.next(); + + expect(await payload).toEqual({ + done: false, + value: { + data: { + importantEmail: { + email: { + from: "yuzhi@graphql.org", + subject: "Message 2" + }, + inbox: { + unread: 2, + total: 3 + } + } + } + } + }); + }); + + it("should handle error during execution of source event", async () => { + async function* generateMessages() { + yield "Hello"; + yield "Goodbye"; + yield "Bonjour"; + } + + const schema = new GraphQLSchema({ + query: DummyQueryType, + subscription: new GraphQLObjectType({ + name: "Subscription", + fields: { + newMessage: { + type: GraphQLString, + subscribe: generateMessages, + resolve(message) { + if (message === "Goodbye") { + throw new Error("Never leave."); + } + return message; + } + } + } + }) + }); + + const document = parse("subscription { newMessage }"); + const subscription = (await subscribe({ + schema, + document + })) as AsyncIterableIterator; + expect(isAsyncIterable(subscription)).toBeTruthy(); + + expect(await subscription.next()).toEqual({ + done: false, + value: { + data: { newMessage: "Hello" } + } + }); + + // An error in execution is presented as such. + expect(await subscription.next()).toEqual({ + done: false, + value: { + data: { newMessage: null }, + errors: [ + { + message: "Never leave.", + locations: [{ line: 1, column: 16 }], + path: ["newMessage"] + } + ] + } + }); + + // However that does not close the response event stream. + // Subsequent events are still executed. + expect(await subscription.next()).toEqual({ + done: false, + value: { + data: { newMessage: "Bonjour" } + } + }); + }); + + it("should pass through error thrown in source event stream", async () => { + async function* generateMessages() { + yield "Hello"; + throw new Error("test error"); + } + + const schema = new GraphQLSchema({ + query: DummyQueryType, + subscription: new GraphQLObjectType({ + name: "Subscription", + fields: { + newMessage: { + type: GraphQLString, + resolve: message => message, + subscribe: generateMessages + } + } + }) + }); + + const document = parse("subscription { newMessage }"); + const subscription = (await subscribe({ + schema, + document + })) as AsyncIterableIterator; + expect(isAsyncIterable(subscription)).toBeTruthy(); + + expect(await subscription.next()).toEqual({ + done: false, + value: { + data: { newMessage: "Hello" } + } + }); + + (await expectPromise(subscription.next())).toRejectWith("test error"); + + expect(await subscription.next()).toEqual({ + done: true, + value: undefined + }); + }); +}); + +class SimplePubSub { + _subscribers: Set<(value: T) => void>; + + constructor() { + this._subscribers = new Set(); + } + + emit(event: T): boolean { + for (const subscriber of this._subscribers) { + subscriber(event); + } + return this._subscribers.size > 0; + } + + getSubscriber(transform: (value: T) => R): AsyncGenerator { + const pullQueue: Array<(result: IteratorResult) => void> = []; + const pushQueue: any[] = []; + let listening = true; + this._subscribers.add(pushValue); + + const emptyQueue = () => { + listening = false; + this._subscribers.delete(pushValue); + for (const resolve of pullQueue) { + resolve({ value: undefined, done: true }); + } + pullQueue.length = 0; + pushQueue.length = 0; + }; + + return { + next() { + if (!listening) { + return Promise.resolve({ value: undefined, done: true }); + } + + if (pushQueue.length > 0) { + return Promise.resolve({ value: pushQueue.shift(), done: false }); + } + return new Promise(resolve => pullQueue.push(resolve)); + }, + return(): Promise> { + emptyQueue(); + return Promise.resolve({ value: undefined, done: true }); + }, + throw(error: any) { + emptyQueue(); + return Promise.reject(error); + }, + [Symbol.asyncIterator]() { + return this; + } + }; + + function pushValue(event: T): void { + const value: R = transform(event); + if (pullQueue.length > 0) { + pullQueue.shift()!({ value, done: false }); + } else { + pushQueue.push(value); + } + } + } +} + +function resolveOnNextTick(): Promise { + return Promise.resolve(undefined); +} diff --git a/src/execution.ts b/src/execution.ts index 29c5ce54..11ca2b1f 100644 --- a/src/execution.ts +++ b/src/execution.ts @@ -27,9 +27,11 @@ import { isObjectType, isSpecifiedScalarType, Kind, + locatedError, TypeNameMetaFieldDef } from "graphql"; import { ExecutionContext as GraphQLContext } from "graphql/execution/execute"; +import { pathToArray } from "graphql/jsutils/Path"; import { FieldNode, OperationDefinitionNode } from "graphql/language/ast"; import { GraphQLTypeResolver } from "graphql/type/definition"; import { @@ -167,6 +169,11 @@ export interface CompiledQuery { context: any, variables: Maybe<{ [key: string]: any }> ) => Promise | ExecutionResult; + subscribe?: ( + root: any, + context: any, + variables: Maybe<{ [key: string]: any }> + ) => Promise | ExecutionResult>; stringify: (v: any) => string; } @@ -192,7 +199,7 @@ export function compileQuery( throw new Error(`Expected ${schema} to be a GraphQL schema.`); } if (!document) { - throw new Error("Must provide document"); + throw new Error("Must provide document."); } if ( @@ -232,7 +239,17 @@ export function compileQuery( context.operation.variableDefinitions || [] ); - const functionBody = compileOperation(context); + const type = getOperationRootType(context.schema, context.operation); + const fieldMap = collectFields( + context, + type, + context.operation.selectionSet, + Object.create(null), + Object.create(null) + ); + + const functionBody = compileOperation(context, type, fieldMap); + const compiledQuery: InternalCompiledQuery = { query: createBoundQuery( context, @@ -245,6 +262,24 @@ export function compileQuery( ), stringify }; + + if (context.operation.operation === "subscription") { + compiledQuery.subscribe = createBoundSubscribe( + context, + document, + compileSubscriptionOperation( + context, + type, + fieldMap, + compiledQuery.query + ), + getVariables, + context.operation.name != null + ? context.operation.name.value + : undefined + ); + } + if ((options as any).debug) { // result of the compilation useful for debugging issues // and visualization tools like try-jit. @@ -369,16 +404,12 @@ function postProcessResult({ * @param {CompilationContext} context compilation context with the execution context * @returns {string} a function body to be instantiated together with the header, footer */ -function compileOperation(context: CompilationContext) { - const type = getOperationRootType(context.schema, context.operation); +function compileOperation( + context: CompilationContext, + type: GraphQLObjectType, + fieldMap: FieldsAndNodes +) { const serialExecution = context.operation.operation === "mutation"; - const fieldMap = collectFields( - context, - type, - context.operation.selectionSet, - Object.create(null), - Object.create(null) - ); const topLevel = compileObjectType( context, type, @@ -1637,3 +1668,214 @@ function getParentArgIndexes(context: CompilationContext) { function getJsFieldName(fieldName: string) { return `${LOCAL_JS_FIELD_NAME_PREFIX}${fieldName}`; } + +export function isAsyncIterable( + val: unknown +): val is AsyncIterableIterator { + return typeof Object(val)[Symbol.asyncIterator] === "function"; +} + +function compileSubscriptionOperation( + context: CompilationContext, + type: GraphQLObjectType, + fieldMap: FieldsAndNodes, + queryFn: CompiledQuery["query"] +) { + const fieldNodes = Object.values(fieldMap)[0]; + const fieldNode = fieldNodes[0]; + const fieldName = fieldNode.name.value; + + const field = resolveFieldDef(context, type, fieldNodes); + + if (!field) { + throw new GraphQLError( + `The subscription field "${fieldName}" is not defined.`, + fieldNodes + ); + } + + const responsePath = addPath(undefined, fieldName); + const resolveInfoName = createResolveInfoName(responsePath); + + const subscriber = field.subscribe; + + async function executeSubscription(executionContext: ExecutionContext) { + const resolveInfo = executionContext.resolveInfos[resolveInfoName]( + executionContext.rootValue, + executionContext.variables, + responsePath + ); + + try { + const eventStream = await subscriber!( + executionContext.rootValue, + executionContext.variables, + executionContext.context, + resolveInfo + ); + if (eventStream instanceof Error) { + throw eventStream; + } + return eventStream; + } catch (error) { + throw locatedError( + error, + resolveInfo.fieldNodes, + pathToArray(resolveInfo.path) + ); + } + } + + async function createSourceEventStream(executionContext: ExecutionContext) { + try { + const eventStream = await executeSubscription(executionContext); + + // Assert field returned an event stream, otherwise yield an error. + if (!isAsyncIterable(eventStream)) { + throw new Error( + "Subscription field must return Async Iterable. " + + `Received: ${inspect(eventStream)}.` + ); + } + + return eventStream; + } catch (error) { + // If it is a GraphQLError, report it as an ExecutionResult, containing only errors and no data. + // Otherwise treat the error as a system-class error and re-throw it. + if (error instanceof GraphQLError) { + return { errors: [error] }; + } + throw error; + } + } + + return async function subscribe(executionContext: ExecutionContext) { + const resultOrStream = await createSourceEventStream(executionContext); + + if (!isAsyncIterable(resultOrStream)) { + return resultOrStream; + } + + // For each payload yielded from a subscription, map it over the normal + // GraphQL `execute` function, with `payload` as the rootValue. + // This implements the "MapSourceToResponseEvent" algorithm described in + // the GraphQL specification. The `execute` function provides the + // "ExecuteSubscriptionEvent" algorithm, as it is nearly identical to the + // "ExecuteQuery" algorithm, for which `execute` is also used. + // We use our `query` function in place of `execute` + const mapSourceToResponse = (payload: any) => + queryFn(payload, executionContext.context, executionContext.variables); + + return mapAsyncIterator(resultOrStream, mapSourceToResponse); + }; +} + +function createBoundSubscribe( + compilationContext: CompilationContext, + document: DocumentNode, + func: ( + context: ExecutionContext + ) => Promise | ExecutionResult>, + getVariableValues: (inputs: { [key: string]: any }) => CoercedVariableValues, + operationName: string | undefined +): CompiledQuery["subscribe"] { + const { + resolvers, + typeResolvers, + isTypeOfs, + serializers, + resolveInfos + } = compilationContext; + const trimmer = createNullTrimmer(compilationContext); + const fnName = operationName ? operationName : "subscribe"; + + const ret = { + async [fnName]( + rootValue: any, + context: any, + variables: Maybe<{ [key: string]: any }> + ): Promise | ExecutionResult> { + // this can be shared across in a batch request + const parsedVariables = getVariableValues(variables || {}); + + // Return early errors if variable coercing failed. + if (failToParseVariables(parsedVariables)) { + return { errors: parsedVariables.errors }; + } + + const executionContext: ExecutionContext = { + rootValue, + context, + variables: parsedVariables.coerced, + safeMap, + inspect, + GraphQLError: GraphqlJitError, + resolvers, + typeResolvers, + isTypeOfs, + serializers, + resolveInfos, + trimmer, + promiseCounter: 0, + nullErrors: [], + errors: [], + data: {} + }; + + return func.call(null, executionContext); + } + }; + + return ret[fnName]; +} + +/** + * Given an AsyncIterable and a callback function, return an AsyncIterator + * which produces values mapped via calling the callback function. + */ +function mapAsyncIterator( + iterable: AsyncGenerator | AsyncIterable, + callback: (value: T) => U | Promise +): AsyncGenerator { + const iterator = iterable[Symbol.asyncIterator](); + + async function mapResult( + result: IteratorResult + ): Promise> { + if (result.done) { + return result; + } + + try { + return { value: await callback(result.value), done: false }; + } catch (error) { + if (typeof iterator.return === "function") { + try { + await iterator.return(); + } catch (e) { + /* ignore error */ + } + } + throw error; + } + } + + return { + async next() { + return mapResult(await iterator.next()); + }, + async return(): Promise> { + return typeof iterator.return === "function" + ? mapResult(await iterator.return()) + : { value: (undefined as unknown) as R, done: true }; + }, + async throw(error?: Error) { + return typeof iterator.throw === "function" + ? mapResult(await iterator.throw(error)) + : Promise.reject(error); + }, + [Symbol.asyncIterator]() { + return this; + } + }; +}