From 6395af30e37417cef94628b248aac566bf14e8dd Mon Sep 17 00:00:00 2001 From: Hoang Vo Date: Sun, 23 May 2021 13:58:21 +0700 Subject: [PATCH 1/6] Implement subscription --- src/execution.ts | 243 +++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 233 insertions(+), 10 deletions(-) diff --git a/src/execution.ts b/src/execution.ts index 29c5ce54..f09cd6a5 100644 --- a/src/execution.ts +++ b/src/execution.ts @@ -27,10 +27,13 @@ import { isObjectType, isSpecifiedScalarType, Kind, + locatedError, TypeNameMetaFieldDef } from "graphql"; import { ExecutionContext as GraphQLContext } from "graphql/execution/execute"; +import { pathToArray } from "graphql/jsutils/Path"; import { FieldNode, OperationDefinitionNode } from "graphql/language/ast"; +import mapAsyncIterator from "graphql/subscription/mapAsyncIterator"; import { GraphQLTypeResolver } from "graphql/type/definition"; import { addPath, @@ -167,6 +170,11 @@ export interface CompiledQuery { context: any, variables: Maybe<{ [key: string]: any }> ) => Promise | ExecutionResult; + subscribe?: ( + root: any, + context: any, + variables: Maybe<{ [key: string]: any }> + ) => Promise | ExecutionResult>; stringify: (v: any) => string; } @@ -232,7 +240,17 @@ export function compileQuery( context.operation.variableDefinitions || [] ); - const functionBody = compileOperation(context); + const type = getOperationRootType(context.schema, context.operation); + const fieldMap = collectFields( + context, + type, + context.operation.selectionSet, + Object.create(null), + Object.create(null) + ); + + const functionBody = compileOperation(context, type, fieldMap); + const compiledQuery: InternalCompiledQuery = { query: createBoundQuery( context, @@ -245,6 +263,24 @@ export function compileQuery( ), stringify }; + + if (context.operation.operation === "subscription") { + compiledQuery.subscribe = createBoundSubscribe( + context, + document, + compileSubscriptionOperation( + context, + type, + fieldMap, + compiledQuery.query + ), + getVariables, + context.operation.name != null + ? context.operation.name.value + : undefined + ); + } + if ((options as any).debug) { // result of the compilation useful for debugging issues // and visualization tools like try-jit. @@ -369,16 +405,12 @@ function postProcessResult({ * @param {CompilationContext} context compilation context with the execution context * @returns {string} a function body to be instantiated together with the header, footer */ -function compileOperation(context: CompilationContext) { - const type = getOperationRootType(context.schema, context.operation); +function compileOperation( + context: CompilationContext, + type: GraphQLObjectType, + fieldMap: FieldsAndNodes +) { const serialExecution = context.operation.operation === "mutation"; - const fieldMap = collectFields( - context, - type, - context.operation.selectionSet, - Object.create(null), - Object.create(null) - ); const topLevel = compileObjectType( context, type, @@ -1637,3 +1669,194 @@ function getParentArgIndexes(context: CompilationContext) { function getJsFieldName(fieldName: string) { return `${LOCAL_JS_FIELD_NAME_PREFIX}${fieldName}`; } + +/** + * Subscription + * Implements the "CreateSourceEventStream" algorithm described in the + * GraphQL specification, resolving the subscription source event stream. + * + * Returns a Promise which resolves to either an AsyncIterable (if successful) + * or an ExecutionResult (error). The promise will be rejected if the schema or + * other arguments to this function are invalid, or if the resolved event stream + * is not an async iterable. + * + * If the client-provided arguments to this function do not result in a + * compliant subscription, a GraphQL Response (ExecutionResult) with + * descriptive errors and no data will be returned. + * + * If the the source stream could not be created due to faulty subscription + * resolver logic or underlying systems, the promise will resolve to a single + * ExecutionResult containing `errors` and no `data`. + * + * If the operation succeeded, the promise resolves to the AsyncIterable for the + * event stream returned by the resolver. + * + * A Source Event Stream represents a sequence of events, each of which triggers + * a GraphQL execution for that event. + * + * This may be useful when hosting the stateful subscription service in a + * different process or machine than the stateless GraphQL execution engine, + * or otherwise separating these two steps. For more on this, see the + * "Supporting Subscriptions at Scale" information in the GraphQL specification. + * + * Since createSourceEventStream only builds execution context and reports errors + * in doing so, which we did, we simply call directly the later called + * executeSubscription. + */ + +function isAsyncIterable( + val: unknown +): val is AsyncIterableIterator { + return typeof Object(val)[Symbol.asyncIterator] === "function"; +} + +function compileSubscriptionOperation( + context: CompilationContext, + type: GraphQLObjectType, + fieldMap: FieldsAndNodes, + queryFn: CompiledQuery["query"] +) { + function reportGraphQLError(error: any): ExecutionResult { + if (error instanceof GraphQLError) { + return { + errors: [error] + }; + } + throw error; + } + + const fieldNodes = Object.values(fieldMap)[0]; + const fieldNode = fieldNodes[0]; + const fieldName = fieldNode.name.value; + + const field = resolveFieldDef(context, type, fieldNodes); + + if (!field) { + throw new GraphQLError( + `The subscription field "${fieldName}" is not defined.`, + fieldNodes + ); + } + + const responsePath = addPath(undefined, fieldName); + const resolveInfoName = createResolveInfoName(responsePath); + + // Call the `subscribe()` resolver or the default resolver to produce an + // AsyncIterable yielding raw payloads. + const subscriber = field.subscribe; + + return async function subscribe(executionContext: ExecutionContext) { + const resolveInfo = executionContext.resolveInfos[resolveInfoName]( + executionContext.rootValue, + executionContext.variables, + responsePath + ); + + let resultOrStream: AsyncIterableIterator; + + try { + try { + resultOrStream = + subscriber && + (await subscriber( + executionContext.rootValue, + executionContext.variables, + executionContext.context, + resolveInfo + )); + if (resultOrStream instanceof Error) { + throw resultOrStream; + } + } catch (error) { + throw locatedError( + error, + resolveInfo.fieldNodes, + pathToArray(resolveInfo.path) + ); + } + if (!isAsyncIterable(resultOrStream)) { + throw new Error( + "Subscription field must return Async Iterable. " + + `Received: ${inspect(resultOrStream)}.` + ); + } + } catch (e) { + return reportGraphQLError(e); + } + + // For each payload yielded from a subscription, map it over the normal + // GraphQL `execute` function, with `payload` as the rootValue. + // This implements the "MapSourceToResponseEvent" algorithm described in + // the GraphQL specification. The `execute` function provides the + // "ExecuteSubscriptionEvent" algorithm, as it is nearly identical to the + // "ExecuteQuery" algorithm, for which `execute` is also used. + // We use our `query` function in place of `execute` + const mapSourceToResponse = (payload: any) => + queryFn(payload, executionContext.context, executionContext.variables); + + return mapAsyncIterator( + resultOrStream, + mapSourceToResponse, + reportGraphQLError + ); + }; +} + +function createBoundSubscribe( + compilationContext: CompilationContext, + document: DocumentNode, + func: ( + context: ExecutionContext + ) => Promise | ExecutionResult>, + getVariableValues: (inputs: { [key: string]: any }) => CoercedVariableValues, + operationName: string | undefined +): CompiledQuery["subscribe"] { + const { + resolvers, + typeResolvers, + isTypeOfs, + serializers, + resolveInfos + } = compilationContext; + const trimmer = createNullTrimmer(compilationContext); + const fnName = operationName ? operationName : "subscribe"; + + const ret = { + async [fnName]( + rootValue: any, + context: any, + variables: Maybe<{ [key: string]: any }> + ): Promise | ExecutionResult> { + // this can be shared across in a batch request + const parsedVariables = getVariableValues(variables || {}); + + // Return early errors if variable coercing failed. + if (failToParseVariables(parsedVariables)) { + return { errors: parsedVariables.errors }; + } + + const executionContext: ExecutionContext = { + rootValue, + context, + variables: parsedVariables.coerced, + safeMap, + inspect, + GraphQLError: GraphqlJitError, + resolvers, + typeResolvers, + isTypeOfs, + serializers, + resolveInfos, + trimmer, + promiseCounter: 0, + nullErrors: [], + errors: [], + data: {} + }; + + return func.call(null, executionContext); + } + }; + + return ret[fnName]; +} From 61df1c767961b57a69221c32c8fbf74bfae53134 Mon Sep 17 00:00:00 2001 From: Hoang Vo Date: Sun, 23 May 2021 14:12:56 +0700 Subject: [PATCH 2/6] Add original subscription test --- src/__tests__/subscription.test.ts | 1043 ++++++++++++++++++++++++++++ 1 file changed, 1043 insertions(+) create mode 100644 src/__tests__/subscription.test.ts diff --git a/src/__tests__/subscription.test.ts b/src/__tests__/subscription.test.ts new file mode 100644 index 00000000..80517637 --- /dev/null +++ b/src/__tests__/subscription.test.ts @@ -0,0 +1,1043 @@ +/** + * Based on https://github.com/graphql/graphql-js/blob/main/src/subscription/__tests__/subscribe-test.js + * This test suite includes certain deviations from the original: + * graphql-jit does not support the root resolver pattern that this test uses + * so the part must be rewritten to include that root resolver in `subscribe` of + * the GraphQLObject in the schema. + */ + +import { + ExecutionResult, + GraphQLBoolean, + GraphQLInt, + GraphQLList, + GraphQLObjectType, + GraphQLSchema, + GraphQLString, + parse, + SubscriptionArgs +} from "graphql"; +import { compileQuery, isCompiledQuery } from "../execution"; + +type Email = { + from: string, + subject: string, + message: string, + unread: boolean, +}; + +const EmailType = new GraphQLObjectType({ + name: "Email", + fields: { + from: { type: GraphQLString }, + subject: { type: GraphQLString }, + message: { type: GraphQLString }, + unread: { type: GraphQLBoolean }, + }, +}); + +const InboxType = new GraphQLObjectType({ + name: "Inbox", + fields: { + total: { + type: GraphQLInt, + resolve: (inbox) => inbox.emails.length, + }, + unread: { + type: GraphQLInt, + resolve: (inbox) => inbox.emails.filter((email) => email.unread).length, + }, + emails: { type: new GraphQLList(EmailType) }, + }, +}); + +const QueryType = new GraphQLObjectType({ + name: "Query", + fields: { + inbox: { type: InboxType }, + }, +}); + +const EmailEventType = new GraphQLObjectType({ + name: "EmailEvent", + fields: { + email: { type: EmailType }, + inbox: { type: InboxType }, + }, +}); + +const emailSchema = new GraphQLSchema({ + query: QueryType, + subscription: new GraphQLObjectType({ + name: "Subscription", + fields: { + importantEmail: { + type: EmailEventType, + args: { + priority: { type: GraphQLInt }, + }, + }, + }, + }), +}); + +async function subscribe({ + schema, + document, + operationName, + rootValue, + contextValue, + variableValues +}: Partial): Promise< + AsyncIterableIterator | ExecutionResult +> { + const prepared = compileQuery(schema, document, operationName || ""); + if (!isCompiledQuery(prepared)) return prepared; + return prepared.subscribe!(rootValue, contextValue, variableValues); +} + +function createSubscription(pubsub: SimplePubSub) { + const document = parse(` + subscription ($priority: Int = 0) { + importantEmail(priority: $priority) { + email { + from + subject + } + inbox { + unread + total + } + } + } + `); + + const emails = [ + { + from: "joe@graphql.org", + subject: "Hello", + message: "Hello World", + unread: false, + }, + ]; + + const data = { + inbox: { emails }, + // FIXME: we shouldn't use mapAsyncIterator here since it makes tests way more complex + importantEmail: pubsub.getSubscriber((newEmail) => { + emails.push(newEmail); + + return { + importantEmail: { + email: newEmail, + inbox: data.inbox, + }, + }; + }), + }; + + return subscribe({ schema: emailSchema, document, rootValue: data }); +} + +async function expectPromise(promise: Promise) { + let caughtError; + + try { + await promise; + throw new Error("promise should have thrown but did not"); + } catch (error) { + caughtError = error; + } + + return { + toReject() { + expect(caughtError).toBeInstanceOf(Error); + }, + toRejectWith(message) { + expect(caughtError).toBeInstanceOf(Error); + expect(caughtError).toHaveProperty("message", message); + }, + }; +} + +const DummyQueryType = new GraphQLObjectType({ + name: "Query", + fields: { + dummy: { type: GraphQLString }, + }, +}); + +// Check all error cases when initializing the subscription. +describe("Subscription Initialization Phase", () => { + it("accepts multiple subscription fields defined in schema", async () => { + const schema = new GraphQLSchema({ + query: DummyQueryType, + subscription: new GraphQLObjectType({ + name: "Subscription", + fields: { + foo: { type: GraphQLString }, + bar: { type: GraphQLString }, + }, + }), + }); + + async function* fooGenerator() { + yield { foo: "FooValue" }; + } + + const subscription = await subscribe({ + schema, + document: parse("subscription { foo }"), + rootValue: { foo: fooGenerator }, + }) as AsyncIterableIterator; + expect(isAsyncIterable(subscription)).toBeTruthy(); + + expect(await subscription.next()).toEqual({ + done: false, + value: { data: { foo: "FooValue" } }, + }); + + // Close subscription + await subscription.return(); + }); + + it("accepts type definition with sync subscribe function", async () => { + async function* fooGenerator() { + yield { foo: "FooValue" }; + } + + const schema = new GraphQLSchema({ + query: DummyQueryType, + subscription: new GraphQLObjectType({ + name: "Subscription", + fields: { + foo: { + type: GraphQLString, + subscribe: fooGenerator, + }, + }, + }), + }); + + const subscription = await subscribe({ + schema, + document: parse("subscription { foo }"), + }) as AsyncIterableIterator; + expect(isAsyncIterable(subscription)).toBeTruthy(); + + expect(await subscription.next()).toEqual({ + done: false, + value: { data: { foo: "FooValue" } }, + }); + + // Close subscription + await subscription.return(); + }); + + it("accepts type definition with async subscribe function", async () => { + async function* fooGenerator() { + yield { foo: "FooValue" }; + } + + const schema = new GraphQLSchema({ + query: DummyQueryType, + subscription: new GraphQLObjectType({ + name: "Subscription", + fields: { + foo: { + type: GraphQLString, + async subscribe() { + await resolveOnNextTick(); + return fooGenerator(); + }, + }, + }, + }), + }); + + const subscription = await subscribe({ + schema, + document: parse("subscription { foo }"), + }) as AsyncIterableIterator; + expect(isAsyncIterable(subscription)).toBeTruthy(); + + expect(await subscription.next()).toEqual({ + done: false, + value: { data: { foo: "FooValue" } }, + }); + + // Close subscription + await subscription.return(); + }); + + it("should only resolve the first field of invalid multi-field", async () => { + async function* fooGenerator() { + yield { foo: "FooValue" }; + } + + let didResolveFoo = false; + let didResolveBar = false; + + const schema = new GraphQLSchema({ + query: DummyQueryType, + subscription: new GraphQLObjectType({ + name: "Subscription", + fields: { + foo: { + type: GraphQLString, + subscribe() { + didResolveFoo = true; + return fooGenerator(); + }, + }, + bar: { + type: GraphQLString, + // istanbul ignore next (Shouldn't be called) + subscribe() { + didResolveBar = true; + }, + }, + }, + }), + }); + + const subscription = await subscribe({ + schema, + document: parse("subscription { foo bar }"), + }) as AsyncIterableIterator; + expect(isAsyncIterable(subscription)).toBeTruthy(); + + expect(didResolveFoo).toBe(true); + expect(didResolveBar).toBe(false); + + expect(await subscription.next()).toHaveProperty("done", false); + + // Close subscription + await subscription.return(); + }); + + it("throws an error if some of required arguments are missing", async () => { + const document = parse("subscription { foo }"); + const schema = new GraphQLSchema({ + query: DummyQueryType, + subscription: new GraphQLObjectType({ + name: "Subscription", + fields: { + foo: { type: GraphQLString }, + }, + }), + }); + + // @ts-ignore + (await expectPromise(subscribe({ schema: null, document }))).toRejectWith( + "Expected null to be a GraphQL schema." + ); + + // @ts-ignore + (await expectPromise(subscribe({ document }))).toRejectWith( + "Expected undefined to be a GraphQL schema." + ); + + // @ts-ignore + (await expectPromise(subscribe({ schema, document: null }))).toRejectWith( + "Must provide document." + ); + + // @ts-ignore + (await expectPromise(subscribe({ schema }))).toRejectWith( + "Must provide document." + ); + }); + + it("resolves to an error for unknown subscription field", async () => { + const schema = new GraphQLSchema({ + query: DummyQueryType, + subscription: new GraphQLObjectType({ + name: "Subscription", + fields: { + foo: { type: GraphQLString }, + }, + }), + }); + const document = parse("subscription { unknownField }"); + + const result = await subscribe({ schema, document }); + expect(result).toEqual({ + errors: [ + { + message: 'The subscription field "unknownField" is not defined.', + locations: [{ line: 1, column: 16 }], + }, + ], + }); + }); + + it("should pass through unexpected errors thrown in subscribe", async () => { + const schema = new GraphQLSchema({ + query: DummyQueryType, + subscription: new GraphQLObjectType({ + name: "Subscription", + fields: { + foo: { type: GraphQLString }, + }, + }), + }); + + // @ts-ignore + (await expectPromise(subscribe({ schema, document: {} }))).toReject(); + }); + + it("throws an error if subscribe does not return an iterator", async () => { + const schema = new GraphQLSchema({ + query: DummyQueryType, + subscription: new GraphQLObjectType({ + name: "Subscription", + fields: { + foo: { + type: GraphQLString, + subscribe: () => "test", + }, + }, + }), + }); + + const document = parse("subscription { foo }"); + + (await expectPromise(subscribe({ schema, document }))).toRejectWith( + 'Subscription field must return Async Iterable. Received: "test".' + ); + }); + + it("resolves to an error for subscription resolver errors", async () => { + async function subscribeWithFn(subscribeFn: () => any) { + const schema = new GraphQLSchema({ + query: DummyQueryType, + subscription: new GraphQLObjectType({ + name: "Subscription", + fields: { + foo: { type: GraphQLString, subscribe: subscribeFn }, + }, + }), + }); + const document = parse("subscription { foo }"); + const result = await subscribe({ schema, document }); + + return result; + } + + const expectedResult = { + errors: [ + { + message: "test error", + locations: [{ line: 1, column: 16 }], + path: ["foo"], + }, + ], + }; + + expect( + // Returning an error + await subscribeWithFn(() => new Error("test error")) + ).toEqual(expectedResult); + + expect( + // Throwing an error + await subscribeWithFn(() => { + throw new Error("test error"); + }) + ).toEqual(expectedResult); + + expect( + // Resolving to an error + await subscribeWithFn(() => Promise.resolve(new Error("test error"))) + ).toEqual(expectedResult); + + expect( + // Rejecting with an error + await subscribeWithFn(() => Promise.reject(new Error("test error"))) + ).toEqual(expectedResult); + }); + + it("resolves to an error if variables were wrong type", async () => { + const schema = new GraphQLSchema({ + query: DummyQueryType, + subscription: new GraphQLObjectType({ + name: "Subscription", + fields: { + foo: { + type: GraphQLString, + args: { arg: { type: GraphQLInt } }, + }, + }, + }), + }); + + const variableValues = { arg: "meow" }; + const document = parse(` + subscription ($arg: Int) { + foo(arg: $arg) + } + `); + + // If we receive variables that cannot be coerced correctly, subscribe() will + // resolve to an ExecutionResult that contains an informative error description. + const result = await subscribe({ schema, document, variableValues }); + expect(result).toEqual({ + errors: [ + { + message: + 'Variable "$arg" got invalid value "meow"; Int cannot represent non-integer value: "meow"', + locations: [{ line: 2, column: 21 }], + }, + ], + }); + }); +}); + +// Once a subscription returns a valid AsyncIterator, it can still yield errors. +describe("Subscription Publish Phase", () => { + it("produces a payload for multiple subscribe in same subscription", async () => { + const pubsub = new SimplePubSub(); + + const subscription = await createSubscription(pubsub) as AsyncIterableIterator; + expect(isAsyncIterable(subscription)).toBeTruthy(); + + const secondSubscription = await createSubscription(pubsub) as AsyncIterableIterator; + expect(isAsyncIterable(secondSubscription)).toBeTruthy(); + + const payload1 = subscription.next(); + const payload2 = secondSubscription.next(); + + expect( + pubsub.emit({ + from: "yuzhi@graphql.org", + subject: "Alright", + message: "Tests are good", + unread: true, + }) + ).toBe(true); + + const expectedPayload = { + done: false, + value: { + data: { + importantEmail: { + email: { + from: "yuzhi@graphql.org", + subject: "Alright", + }, + inbox: { + unread: 1, + total: 2, + }, + }, + }, + }, + }; + + expect(await payload1).toEqual(expectedPayload); + expect(await payload2).toEqual(expectedPayload); + }); + + it("produces a payload per subscription event", async () => { + const pubsub = new SimplePubSub(); + const subscription = await createSubscription(pubsub) as AsyncIterableIterator; + expect(isAsyncIterable(subscription)).toBeTruthy(); + + // Wait for the next subscription payload. + const payload = subscription.next(); + + // A new email arrives! + expect( + pubsub.emit({ + from: "yuzhi@graphql.org", + subject: "Alright", + message: "Tests are good", + unread: true, + }) + ).toBe(true); + + // The previously waited on payload now has a value. + expect(await payload).toEqual({ + done: false, + value: { + data: { + importantEmail: { + email: { + from: "yuzhi@graphql.org", + subject: "Alright", + }, + inbox: { + unread: 1, + total: 2, + }, + }, + }, + }, + }); + + // Another new email arrives, before subscription.next() is called. + expect( + pubsub.emit({ + from: "hyo@graphql.org", + subject: "Tools", + message: "I <3 making things", + unread: true, + }) + ).toBe(true); + + // The next waited on payload will have a value. + expect(await subscription.next()).toEqual({ + done: false, + value: { + data: { + importantEmail: { + email: { + from: "hyo@graphql.org", + subject: "Tools", + }, + inbox: { + unread: 2, + total: 3, + }, + }, + }, + }, + }); + + // The client decides to disconnect. + expect(await subscription.return()).toEqual({ + done: true, + value: undefined, + }); + + // Which may result in disconnecting upstream services as well. + expect( + pubsub.emit({ + from: "adam@graphql.org", + subject: "Important", + message: "Read me please", + unread: true, + }) + ).toBe(false); // No more listeners. + + // Awaiting a subscription after closing it results in completed results. + expect(await subscription.next()).toEqual({ + done: true, + value: undefined, + }); + }); + + it("produces a payload when there are multiple events", async () => { + const pubsub = new SimplePubSub(); + const subscription = await createSubscription(pubsub) as AsyncIterableIterator; + expect(isAsyncIterable(subscription)).toBeTruthy(); + + let payload = subscription.next(); + + // A new email arrives! + expect( + pubsub.emit({ + from: "yuzhi@graphql.org", + subject: "Alright", + message: "Tests are good", + unread: true, + }) + ).toBe(true); + + expect(await payload).toEqual({ + done: false, + value: { + data: { + importantEmail: { + email: { + from: "yuzhi@graphql.org", + subject: "Alright", + }, + inbox: { + unread: 1, + total: 2, + }, + }, + }, + }, + }); + + payload = subscription.next(); + + // A new email arrives! + expect( + pubsub.emit({ + from: "yuzhi@graphql.org", + subject: "Alright 2", + message: "Tests are good 2", + unread: true, + }) + ).toBe(true); + + expect(await payload).toEqual({ + done: false, + value: { + data: { + importantEmail: { + email: { + from: "yuzhi@graphql.org", + subject: "Alright 2", + }, + inbox: { + unread: 2, + total: 3, + }, + }, + }, + }, + }); + }); + + it("should not trigger when subscription is already done", async () => { + const pubsub = new SimplePubSub(); + const subscription = await createSubscription(pubsub) as AsyncIterableIterator; + expect(isAsyncIterable(subscription)).toBeTruthy(); + + let payload = subscription.next(); + + // A new email arrives! + expect( + pubsub.emit({ + from: "yuzhi@graphql.org", + subject: "Alright", + message: "Tests are good", + unread: true, + }) + ).toBe(true); + + expect(await payload).toEqual({ + done: false, + value: { + data: { + importantEmail: { + email: { + from: "yuzhi@graphql.org", + subject: "Alright", + }, + inbox: { + unread: 1, + total: 2, + }, + }, + }, + }, + }); + + payload = subscription.next(); + await subscription.return(); + + // A new email arrives! + expect( + pubsub.emit({ + from: "yuzhi@graphql.org", + subject: "Alright 2", + message: "Tests are good 2", + unread: true, + }) + ).toBe(false); + + expect(await payload).toEqual({ + done: true, + value: undefined, + }); + }); + + it("should not trigger when subscription is thrown", async () => { + const pubsub = new SimplePubSub(); + const subscription = await createSubscription(pubsub) as AsyncIterableIterator; + expect(isAsyncIterable(subscription)).toBeTruthy(); + + let payload = subscription.next(); + + // A new email arrives! + expect( + pubsub.emit({ + from: "yuzhi@graphql.org", + subject: "Alright", + message: "Tests are good", + unread: true, + }) + ).toBe(true); + + expect(await payload).toEqual({ + done: false, + value: { + data: { + importantEmail: { + email: { + from: "yuzhi@graphql.org", + subject: "Alright", + }, + inbox: { + unread: 1, + total: 2, + }, + }, + }, + }, + }); + + payload = subscription.next(); + + // Throw error + let caughtError; + try { + await subscription.throw("ouch"); + } catch (e) { + caughtError = e; + } + expect(caughtError).toBe("ouch"); + + expect(await payload).toEqual({ + done: true, + value: undefined, + }); + }); + + it("event order is correct for multiple publishes", async () => { + const pubsub = new SimplePubSub(); + const subscription = await createSubscription(pubsub) as AsyncIterableIterator; + expect(isAsyncIterable(subscription)).toBeTruthy(); + + let payload = subscription.next(); + + // A new email arrives! + expect( + pubsub.emit({ + from: "yuzhi@graphql.org", + subject: "Message", + message: "Tests are good", + unread: true, + }) + ).toBe(true); + + // A new email arrives! + expect( + pubsub.emit({ + from: "yuzhi@graphql.org", + subject: "Message 2", + message: "Tests are good 2", + unread: true, + }) + ).toBe(true); + + expect(await payload).toEqual({ + done: false, + value: { + data: { + importantEmail: { + email: { + from: "yuzhi@graphql.org", + subject: "Message", + }, + inbox: { + unread: 2, + total: 3, + }, + }, + }, + }, + }); + + payload = subscription.next(); + + expect(await payload).toEqual({ + done: false, + value: { + data: { + importantEmail: { + email: { + from: "yuzhi@graphql.org", + subject: "Message 2", + }, + inbox: { + unread: 2, + total: 3, + }, + }, + }, + }, + }); + }); + + it("should handle error during execution of source event", async () => { + async function* generateMessages() { + yield "Hello"; + yield "Goodbye"; + yield "Bonjour"; + } + + const schema = new GraphQLSchema({ + query: DummyQueryType, + subscription: new GraphQLObjectType({ + name: "Subscription", + fields: { + newMessage: { + type: GraphQLString, + subscribe: generateMessages, + resolve(message) { + if (message === "Goodbye") { + throw new Error("Never leave."); + } + return message; + }, + }, + }, + }), + }); + + const document = parse("subscription { newMessage }"); + const subscription = await subscribe({ schema, document }) as AsyncIterableIterator; + expect(isAsyncIterable(subscription)).toBeTruthy(); + + expect(await subscription.next()).toEqual({ + done: false, + value: { + data: { newMessage: "Hello" }, + }, + }); + + // An error in execution is presented as such. + expect(await subscription.next()).toEqual({ + done: false, + value: { + data: { newMessage: null }, + errors: [ + { + message: "Never leave.", + locations: [{ line: 1, column: 16 }], + path: ["newMessage"], + }, + ], + }, + }); + + // However that does not close the response event stream. + // Subsequent events are still executed. + expect(await subscription.next()).toEqual({ + done: false, + value: { + data: { newMessage: "Bonjour" }, + }, + }); + }); + + it("should pass through error thrown in source event stream", async () => { + async function* generateMessages() { + yield "Hello"; + throw new Error("test error"); + } + + const schema = new GraphQLSchema({ + query: DummyQueryType, + subscription: new GraphQLObjectType({ + name: "Subscription", + fields: { + newMessage: { + type: GraphQLString, + resolve: (message) => message, + subscribe: generateMessages, + }, + }, + }), + }); + + const document = parse("subscription { newMessage }"); + const subscription = await subscribe({ schema, document }) as AsyncIterableIterator; + expect(isAsyncIterable(subscription)).toBeTruthy(); + + expect(await subscription.next()).toEqual({ + done: false, + value: { + data: { newMessage: "Hello" }, + }, + }); + + (await expectPromise(subscription.next())).toRejectWith("test error"); + + expect(await subscription.next()).toEqual({ + done: true, + value: undefined, + }); + }); +}); + + +class SimplePubSub { + _subscribers: Set<(value: T) => void>; + + constructor() { + this._subscribers = new Set(); + } + + emit(event: T): boolean { + for (const subscriber of this._subscribers) { + subscriber(event); + } + return this._subscribers.size > 0; + } + + getSubscriber(transform: (value: T) => R): AsyncGenerator { + const pullQueue: Array<(result: IteratorResult) => void> = []; + const pushQueue = []; + let listening = true; + this._subscribers.add(pushValue); + + const emptyQueue = () => { + listening = false; + this._subscribers.delete(pushValue); + for (const resolve of pullQueue) { + resolve({ value: undefined, done: true }); + } + pullQueue.length = 0; + pushQueue.length = 0; + }; + + return { + next() { + if (!listening) { + return Promise.resolve({ value: undefined, done: true }); + } + + if (pushQueue.length > 0) { + return Promise.resolve({ value: pushQueue.shift(), done: false }); + } + return new Promise((resolve) => pullQueue.push(resolve)); + }, + return(): Promise> { + emptyQueue(); + return Promise.resolve({ value: undefined, done: true }); + }, + throw(error: any) { + emptyQueue(); + return Promise.reject(error); + }, + [Symbol.asyncIterator]() { + return this; + }, + }; + + function pushValue(event: T): void { + const value: R = transform(event); + if (pullQueue.length > 0) { + pullQueue.shift()({ value, done: false }); + } else { + pushQueue.push(value); + } + } + } +} + +function isAsyncIterable(maybeAsyncIterable) { + return typeof maybeAsyncIterable?.[Symbol.asyncIterator] === 'function'; +} + +function resolveOnNextTick(): Promise { + return Promise.resolve(undefined); +} From 29b7241f5ee9216eba1ea7469e09c701cc09de1f Mon Sep 17 00:00:00 2001 From: Hoang Vo Date: Sun, 23 May 2021 15:15:20 +0700 Subject: [PATCH 3/6] Refactor implementation --- src/__tests__/execution.test.ts | 4 +- src/execution.ts | 175 ++++++++++++++++++-------------- 2 files changed, 99 insertions(+), 80 deletions(-) diff --git a/src/__tests__/execution.test.ts b/src/__tests__/execution.test.ts index 4e15b8da..d3ace172 100644 --- a/src/__tests__/execution.test.ts +++ b/src/__tests__/execution.test.ts @@ -2,6 +2,7 @@ * Based on https://github.com/graphql/graphql-js/blob/master/src/execution/__tests__/execution-test.js */ +import { makeExecutableSchema } from "@graphql-tools/schema"; import { DocumentNode, ExecutableDefinitionNode, @@ -16,7 +17,6 @@ import { parse } from "graphql"; import { CompiledQuery, compileQuery, isCompiledQuery } from "../execution"; -import { makeExecutableSchema } from "@graphql-tools/schema"; function executeArgs(args: any) { const { @@ -67,7 +67,7 @@ describe("Execute: Handles basic execution tasks", () => { }) }); - expect(() => executeQuery(schema)).toThrow("Must provide document"); + expect(() => executeQuery(schema)).toThrow("Must provide document."); }); test("throws if no resolve info enricher is not a function", async () => { const schema = new GraphQLSchema({ diff --git a/src/execution.ts b/src/execution.ts index f09cd6a5..11ca2b1f 100644 --- a/src/execution.ts +++ b/src/execution.ts @@ -33,7 +33,6 @@ import { import { ExecutionContext as GraphQLContext } from "graphql/execution/execute"; import { pathToArray } from "graphql/jsutils/Path"; import { FieldNode, OperationDefinitionNode } from "graphql/language/ast"; -import mapAsyncIterator from "graphql/subscription/mapAsyncIterator"; import { GraphQLTypeResolver } from "graphql/type/definition"; import { addPath, @@ -200,7 +199,7 @@ export function compileQuery( throw new Error(`Expected ${schema} to be a GraphQL schema.`); } if (!document) { - throw new Error("Must provide document"); + throw new Error("Must provide document."); } if ( @@ -1670,41 +1669,7 @@ function getJsFieldName(fieldName: string) { return `${LOCAL_JS_FIELD_NAME_PREFIX}${fieldName}`; } -/** - * Subscription - * Implements the "CreateSourceEventStream" algorithm described in the - * GraphQL specification, resolving the subscription source event stream. - * - * Returns a Promise which resolves to either an AsyncIterable (if successful) - * or an ExecutionResult (error). The promise will be rejected if the schema or - * other arguments to this function are invalid, or if the resolved event stream - * is not an async iterable. - * - * If the client-provided arguments to this function do not result in a - * compliant subscription, a GraphQL Response (ExecutionResult) with - * descriptive errors and no data will be returned. - * - * If the the source stream could not be created due to faulty subscription - * resolver logic or underlying systems, the promise will resolve to a single - * ExecutionResult containing `errors` and no `data`. - * - * If the operation succeeded, the promise resolves to the AsyncIterable for the - * event stream returned by the resolver. - * - * A Source Event Stream represents a sequence of events, each of which triggers - * a GraphQL execution for that event. - * - * This may be useful when hosting the stateful subscription service in a - * different process or machine than the stateless GraphQL execution engine, - * or otherwise separating these two steps. For more on this, see the - * "Supporting Subscriptions at Scale" information in the GraphQL specification. - * - * Since createSourceEventStream only builds execution context and reports errors - * in doing so, which we did, we simply call directly the later called - * executeSubscription. - */ - -function isAsyncIterable( +export function isAsyncIterable( val: unknown ): val is AsyncIterableIterator { return typeof Object(val)[Symbol.asyncIterator] === "function"; @@ -1716,15 +1681,6 @@ function compileSubscriptionOperation( fieldMap: FieldsAndNodes, queryFn: CompiledQuery["query"] ) { - function reportGraphQLError(error: any): ExecutionResult { - if (error instanceof GraphQLError) { - return { - errors: [error] - }; - } - throw error; - } - const fieldNodes = Object.values(fieldMap)[0]; const fieldNode = fieldNodes[0]; const fieldName = fieldNode.name.value; @@ -1741,47 +1697,63 @@ function compileSubscriptionOperation( const responsePath = addPath(undefined, fieldName); const resolveInfoName = createResolveInfoName(responsePath); - // Call the `subscribe()` resolver or the default resolver to produce an - // AsyncIterable yielding raw payloads. const subscriber = field.subscribe; - return async function subscribe(executionContext: ExecutionContext) { + async function executeSubscription(executionContext: ExecutionContext) { const resolveInfo = executionContext.resolveInfos[resolveInfoName]( executionContext.rootValue, executionContext.variables, responsePath ); - let resultOrStream: AsyncIterableIterator; - try { - try { - resultOrStream = - subscriber && - (await subscriber( - executionContext.rootValue, - executionContext.variables, - executionContext.context, - resolveInfo - )); - if (resultOrStream instanceof Error) { - throw resultOrStream; - } - } catch (error) { - throw locatedError( - error, - resolveInfo.fieldNodes, - pathToArray(resolveInfo.path) - ); + const eventStream = await subscriber!( + executionContext.rootValue, + executionContext.variables, + executionContext.context, + resolveInfo + ); + if (eventStream instanceof Error) { + throw eventStream; } - if (!isAsyncIterable(resultOrStream)) { + return eventStream; + } catch (error) { + throw locatedError( + error, + resolveInfo.fieldNodes, + pathToArray(resolveInfo.path) + ); + } + } + + async function createSourceEventStream(executionContext: ExecutionContext) { + try { + const eventStream = await executeSubscription(executionContext); + + // Assert field returned an event stream, otherwise yield an error. + if (!isAsyncIterable(eventStream)) { throw new Error( "Subscription field must return Async Iterable. " + - `Received: ${inspect(resultOrStream)}.` + `Received: ${inspect(eventStream)}.` ); } - } catch (e) { - return reportGraphQLError(e); + + return eventStream; + } catch (error) { + // If it is a GraphQLError, report it as an ExecutionResult, containing only errors and no data. + // Otherwise treat the error as a system-class error and re-throw it. + if (error instanceof GraphQLError) { + return { errors: [error] }; + } + throw error; + } + } + + return async function subscribe(executionContext: ExecutionContext) { + const resultOrStream = await createSourceEventStream(executionContext); + + if (!isAsyncIterable(resultOrStream)) { + return resultOrStream; } // For each payload yielded from a subscription, map it over the normal @@ -1794,11 +1766,7 @@ function compileSubscriptionOperation( const mapSourceToResponse = (payload: any) => queryFn(payload, executionContext.context, executionContext.variables); - return mapAsyncIterator( - resultOrStream, - mapSourceToResponse, - reportGraphQLError - ); + return mapAsyncIterator(resultOrStream, mapSourceToResponse); }; } @@ -1860,3 +1828,54 @@ function createBoundSubscribe( return ret[fnName]; } + +/** + * Given an AsyncIterable and a callback function, return an AsyncIterator + * which produces values mapped via calling the callback function. + */ +function mapAsyncIterator( + iterable: AsyncGenerator | AsyncIterable, + callback: (value: T) => U | Promise +): AsyncGenerator { + const iterator = iterable[Symbol.asyncIterator](); + + async function mapResult( + result: IteratorResult + ): Promise> { + if (result.done) { + return result; + } + + try { + return { value: await callback(result.value), done: false }; + } catch (error) { + if (typeof iterator.return === "function") { + try { + await iterator.return(); + } catch (e) { + /* ignore error */ + } + } + throw error; + } + } + + return { + async next() { + return mapResult(await iterator.next()); + }, + async return(): Promise> { + return typeof iterator.return === "function" + ? mapResult(await iterator.return()) + : { value: (undefined as unknown) as R, done: true }; + }, + async throw(error?: Error) { + return typeof iterator.throw === "function" + ? mapResult(await iterator.throw(error)) + : Promise.reject(error); + }, + [Symbol.asyncIterator]() { + return this; + } + }; +} From 6c8cfc621d4c09abf8db13b02406b66205ac4936 Mon Sep 17 00:00:00 2001 From: Hoang Vo Date: Sun, 23 May 2021 15:27:19 +0700 Subject: [PATCH 4/6] Update original subscription test to use resolver instead of rootValue --- src/__tests__/subscription.test.ts | 90 ++++++++++++++---------------- 1 file changed, 43 insertions(+), 47 deletions(-) diff --git a/src/__tests__/subscription.test.ts b/src/__tests__/subscription.test.ts index 80517637..52d3b298 100644 --- a/src/__tests__/subscription.test.ts +++ b/src/__tests__/subscription.test.ts @@ -1,5 +1,6 @@ /** * Based on https://github.com/graphql/graphql-js/blob/main/src/subscription/__tests__/subscribe-test.js + * (commit 39b69da863fca34f0e921bb9ac6a1b99797e17cf) * This test suite includes certain deviations from the original: * graphql-jit does not support the root resolver pattern that this test uses * so the part must be rewritten to include that root resolver in `subscribe` of @@ -17,7 +18,7 @@ import { parse, SubscriptionArgs } from "graphql"; -import { compileQuery, isCompiledQuery } from "../execution"; +import { compileQuery, isAsyncIterable, isCompiledQuery } from "../execution"; type Email = { from: string, @@ -51,13 +52,6 @@ const InboxType = new GraphQLObjectType({ }, }); -const QueryType = new GraphQLObjectType({ - name: "Query", - fields: { - inbox: { type: InboxType }, - }, -}); - const EmailEventType = new GraphQLObjectType({ name: "EmailEvent", fields: { @@ -66,21 +60,6 @@ const EmailEventType = new GraphQLObjectType({ }, }); -const emailSchema = new GraphQLSchema({ - query: QueryType, - subscription: new GraphQLObjectType({ - name: "Subscription", - fields: { - importantEmail: { - type: EmailEventType, - args: { - priority: { type: GraphQLInt }, - }, - }, - }, - }), -}); - async function subscribe({ schema, document, @@ -121,26 +100,48 @@ function createSubscription(pubsub: SimplePubSub) { }, ]; - const data = { - inbox: { emails }, - // FIXME: we shouldn't use mapAsyncIterator here since it makes tests way more complex - importantEmail: pubsub.getSubscriber((newEmail) => { - emails.push(newEmail); + const inbox = { emails }; + + const QueryType = new GraphQLObjectType({ + name: "Query", + fields: { + inbox: { type: InboxType, resolve: () => emails }, + }, + }); - return { + const emailSchema = new GraphQLSchema({ + query: QueryType, + subscription: new GraphQLObjectType({ + name: "Subscription", + fields: { importantEmail: { - email: newEmail, - inbox: data.inbox, + type: EmailEventType, + args: { + priority: { type: GraphQLInt }, + }, + // FIXME: we shouldn't use mapAsyncIterator here since it makes tests way more complex + subscribe() { + return pubsub.getSubscriber((newEmail) => { + emails.push(newEmail); + + return { + importantEmail: { + email: newEmail, + inbox, + }, + }; + }) + } }, - }; + }, }), - }; + }); - return subscribe({ schema: emailSchema, document, rootValue: data }); + return subscribe({ schema: emailSchema, document }); } async function expectPromise(promise: Promise) { - let caughtError; + let caughtError: any; try { await promise; @@ -153,7 +154,7 @@ async function expectPromise(promise: Promise) { toReject() { expect(caughtError).toBeInstanceOf(Error); }, - toRejectWith(message) { + toRejectWith(message: string) { expect(caughtError).toBeInstanceOf(Error); expect(caughtError).toHaveProperty("message", message); }, @@ -170,25 +171,24 @@ const DummyQueryType = new GraphQLObjectType({ // Check all error cases when initializing the subscription. describe("Subscription Initialization Phase", () => { it("accepts multiple subscription fields defined in schema", async () => { + async function* fooGenerator() { + yield { foo: "FooValue" }; + } + const schema = new GraphQLSchema({ query: DummyQueryType, subscription: new GraphQLObjectType({ name: "Subscription", fields: { - foo: { type: GraphQLString }, + foo: { type: GraphQLString, subscribe: fooGenerator }, bar: { type: GraphQLString }, }, }), }); - async function* fooGenerator() { - yield { foo: "FooValue" }; - } - const subscription = await subscribe({ schema, document: parse("subscription { foo }"), - rootValue: { foo: fooGenerator }, }) as AsyncIterableIterator; expect(isAsyncIterable(subscription)).toBeTruthy(); @@ -486,7 +486,7 @@ describe("Subscription Initialization Phase", () => { errors: [ { message: - 'Variable "$arg" got invalid value "meow"; Int cannot represent non-integer value: "meow"', + 'Variable "$arg" got invalid value "meow"; Expected type Int; Int cannot represent non-integer value: "meow"', locations: [{ line: 2, column: 21 }], }, ], @@ -1034,10 +1034,6 @@ class SimplePubSub { } } -function isAsyncIterable(maybeAsyncIterable) { - return typeof maybeAsyncIterable?.[Symbol.asyncIterator] === 'function'; -} - function resolveOnNextTick(): Promise { return Promise.resolve(undefined); } From 6c3cf5194172dff3df06a6ee405a33db4d2bef88 Mon Sep 17 00:00:00 2001 From: Hoang Vo Date: Sun, 23 May 2021 15:29:11 +0700 Subject: [PATCH 5/6] Fix TS errors and format --- src/__tests__/subscription.test.ts | 418 +++++++++++++++-------------- 1 file changed, 218 insertions(+), 200 deletions(-) diff --git a/src/__tests__/subscription.test.ts b/src/__tests__/subscription.test.ts index 52d3b298..e2220dca 100644 --- a/src/__tests__/subscription.test.ts +++ b/src/__tests__/subscription.test.ts @@ -1,6 +1,5 @@ /** * Based on https://github.com/graphql/graphql-js/blob/main/src/subscription/__tests__/subscribe-test.js - * (commit 39b69da863fca34f0e921bb9ac6a1b99797e17cf) * This test suite includes certain deviations from the original: * graphql-jit does not support the root resolver pattern that this test uses * so the part must be rewritten to include that root resolver in `subscribe` of @@ -21,10 +20,10 @@ import { import { compileQuery, isAsyncIterable, isCompiledQuery } from "../execution"; type Email = { - from: string, - subject: string, - message: string, - unread: boolean, + from: string; + subject: string; + message: string; + unread: boolean; }; const EmailType = new GraphQLObjectType({ @@ -33,8 +32,8 @@ const EmailType = new GraphQLObjectType({ from: { type: GraphQLString }, subject: { type: GraphQLString }, message: { type: GraphQLString }, - unread: { type: GraphQLBoolean }, - }, + unread: { type: GraphQLBoolean } + } }); const InboxType = new GraphQLObjectType({ @@ -42,22 +41,22 @@ const InboxType = new GraphQLObjectType({ fields: { total: { type: GraphQLInt, - resolve: (inbox) => inbox.emails.length, + resolve: inbox => inbox.emails.length }, unread: { type: GraphQLInt, - resolve: (inbox) => inbox.emails.filter((email) => email.unread).length, + resolve: inbox => inbox.emails.filter((email: any) => email.unread).length }, - emails: { type: new GraphQLList(EmailType) }, - }, + emails: { type: new GraphQLList(EmailType) } + } }); const EmailEventType = new GraphQLObjectType({ name: "EmailEvent", fields: { email: { type: EmailType }, - inbox: { type: InboxType }, - }, + inbox: { type: InboxType } + } }); async function subscribe({ @@ -67,7 +66,7 @@ async function subscribe({ rootValue, contextValue, variableValues -}: Partial): Promise< +}: SubscriptionArgs): Promise< AsyncIterableIterator | ExecutionResult > { const prepared = compileQuery(schema, document, operationName || ""); @@ -96,8 +95,8 @@ function createSubscription(pubsub: SimplePubSub) { from: "joe@graphql.org", subject: "Hello", message: "Hello World", - unread: false, - }, + unread: false + } ]; const inbox = { emails }; @@ -105,8 +104,8 @@ function createSubscription(pubsub: SimplePubSub) { const QueryType = new GraphQLObjectType({ name: "Query", fields: { - inbox: { type: InboxType, resolve: () => emails }, - }, + inbox: { type: InboxType, resolve: () => emails } + } }); const emailSchema = new GraphQLSchema({ @@ -117,24 +116,24 @@ function createSubscription(pubsub: SimplePubSub) { importantEmail: { type: EmailEventType, args: { - priority: { type: GraphQLInt }, + priority: { type: GraphQLInt } }, // FIXME: we shouldn't use mapAsyncIterator here since it makes tests way more complex subscribe() { - return pubsub.getSubscriber((newEmail) => { + return pubsub.getSubscriber(newEmail => { emails.push(newEmail); return { importantEmail: { email: newEmail, - inbox, - }, + inbox + } }; - }) + }); } - }, - }, - }), + } + } + }) }); return subscribe({ schema: emailSchema, document }); @@ -157,15 +156,15 @@ async function expectPromise(promise: Promise) { toRejectWith(message: string) { expect(caughtError).toBeInstanceOf(Error); expect(caughtError).toHaveProperty("message", message); - }, + } }; } const DummyQueryType = new GraphQLObjectType({ name: "Query", fields: { - dummy: { type: GraphQLString }, - }, + dummy: { type: GraphQLString } + } }); // Check all error cases when initializing the subscription. @@ -181,24 +180,24 @@ describe("Subscription Initialization Phase", () => { name: "Subscription", fields: { foo: { type: GraphQLString, subscribe: fooGenerator }, - bar: { type: GraphQLString }, - }, - }), + bar: { type: GraphQLString } + } + }) }); - const subscription = await subscribe({ + const subscription = (await subscribe({ schema, - document: parse("subscription { foo }"), - }) as AsyncIterableIterator; + document: parse("subscription { foo }") + })) as AsyncIterableIterator; expect(isAsyncIterable(subscription)).toBeTruthy(); expect(await subscription.next()).toEqual({ done: false, - value: { data: { foo: "FooValue" } }, + value: { data: { foo: "FooValue" } } }); // Close subscription - await subscription.return(); + await subscription.return!(); }); it("accepts type definition with sync subscribe function", async () => { @@ -213,25 +212,25 @@ describe("Subscription Initialization Phase", () => { fields: { foo: { type: GraphQLString, - subscribe: fooGenerator, - }, - }, - }), + subscribe: fooGenerator + } + } + }) }); - const subscription = await subscribe({ + const subscription = (await subscribe({ schema, - document: parse("subscription { foo }"), - }) as AsyncIterableIterator; + document: parse("subscription { foo }") + })) as AsyncIterableIterator; expect(isAsyncIterable(subscription)).toBeTruthy(); expect(await subscription.next()).toEqual({ done: false, - value: { data: { foo: "FooValue" } }, + value: { data: { foo: "FooValue" } } }); // Close subscription - await subscription.return(); + await subscription.return!(); }); it("accepts type definition with async subscribe function", async () => { @@ -249,25 +248,25 @@ describe("Subscription Initialization Phase", () => { async subscribe() { await resolveOnNextTick(); return fooGenerator(); - }, - }, - }, - }), + } + } + } + }) }); - const subscription = await subscribe({ + const subscription = (await subscribe({ schema, - document: parse("subscription { foo }"), - }) as AsyncIterableIterator; + document: parse("subscription { foo }") + })) as AsyncIterableIterator; expect(isAsyncIterable(subscription)).toBeTruthy(); expect(await subscription.next()).toEqual({ done: false, - value: { data: { foo: "FooValue" } }, + value: { data: { foo: "FooValue" } } }); // Close subscription - await subscription.return(); + await subscription.return!(); }); it("should only resolve the first field of invalid multi-field", async () => { @@ -288,23 +287,22 @@ describe("Subscription Initialization Phase", () => { subscribe() { didResolveFoo = true; return fooGenerator(); - }, + } }, bar: { type: GraphQLString, - // istanbul ignore next (Shouldn't be called) subscribe() { didResolveBar = true; - }, - }, - }, - }), + } + } + } + }) }); - const subscription = await subscribe({ + const subscription = (await subscribe({ schema, - document: parse("subscription { foo bar }"), - }) as AsyncIterableIterator; + document: parse("subscription { foo bar }") + })) as AsyncIterableIterator; expect(isAsyncIterable(subscription)).toBeTruthy(); expect(didResolveFoo).toBe(true); @@ -313,7 +311,7 @@ describe("Subscription Initialization Phase", () => { expect(await subscription.next()).toHaveProperty("done", false); // Close subscription - await subscription.return(); + await subscription.return!(); }); it("throws an error if some of required arguments are missing", async () => { @@ -323,9 +321,9 @@ describe("Subscription Initialization Phase", () => { subscription: new GraphQLObjectType({ name: "Subscription", fields: { - foo: { type: GraphQLString }, - }, - }), + foo: { type: GraphQLString } + } + }) }); // @ts-ignore @@ -355,9 +353,9 @@ describe("Subscription Initialization Phase", () => { subscription: new GraphQLObjectType({ name: "Subscription", fields: { - foo: { type: GraphQLString }, - }, - }), + foo: { type: GraphQLString } + } + }) }); const document = parse("subscription { unknownField }"); @@ -366,9 +364,9 @@ describe("Subscription Initialization Phase", () => { errors: [ { message: 'The subscription field "unknownField" is not defined.', - locations: [{ line: 1, column: 16 }], - }, - ], + locations: [{ line: 1, column: 16 }] + } + ] }); }); @@ -378,9 +376,9 @@ describe("Subscription Initialization Phase", () => { subscription: new GraphQLObjectType({ name: "Subscription", fields: { - foo: { type: GraphQLString }, - }, - }), + foo: { type: GraphQLString } + } + }) }); // @ts-ignore @@ -395,10 +393,10 @@ describe("Subscription Initialization Phase", () => { fields: { foo: { type: GraphQLString, - subscribe: () => "test", - }, - }, - }), + subscribe: () => "test" + } + } + }) }); const document = parse("subscription { foo }"); @@ -415,9 +413,9 @@ describe("Subscription Initialization Phase", () => { subscription: new GraphQLObjectType({ name: "Subscription", fields: { - foo: { type: GraphQLString, subscribe: subscribeFn }, - }, - }), + foo: { type: GraphQLString, subscribe: subscribeFn } + } + }) }); const document = parse("subscription { foo }"); const result = await subscribe({ schema, document }); @@ -430,9 +428,9 @@ describe("Subscription Initialization Phase", () => { { message: "test error", locations: [{ line: 1, column: 16 }], - path: ["foo"], - }, - ], + path: ["foo"] + } + ] }; expect( @@ -466,10 +464,10 @@ describe("Subscription Initialization Phase", () => { fields: { foo: { type: GraphQLString, - args: { arg: { type: GraphQLInt } }, - }, - }, - }), + args: { arg: { type: GraphQLInt } } + } + } + }) }); const variableValues = { arg: "meow" }; @@ -486,10 +484,11 @@ describe("Subscription Initialization Phase", () => { errors: [ { message: + // DIFF: 'Variable "$arg" got invalid value "meow"; Int cannot represent non-integer value: "meow"', 'Variable "$arg" got invalid value "meow"; Expected type Int; Int cannot represent non-integer value: "meow"', - locations: [{ line: 2, column: 21 }], - }, - ], + locations: [{ line: 2, column: 21 }] + } + ] }); }); }); @@ -499,10 +498,14 @@ describe("Subscription Publish Phase", () => { it("produces a payload for multiple subscribe in same subscription", async () => { const pubsub = new SimplePubSub(); - const subscription = await createSubscription(pubsub) as AsyncIterableIterator; + const subscription = (await createSubscription( + pubsub + )) as AsyncIterableIterator; expect(isAsyncIterable(subscription)).toBeTruthy(); - const secondSubscription = await createSubscription(pubsub) as AsyncIterableIterator; + const secondSubscription = (await createSubscription( + pubsub + )) as AsyncIterableIterator; expect(isAsyncIterable(secondSubscription)).toBeTruthy(); const payload1 = subscription.next(); @@ -513,7 +516,7 @@ describe("Subscription Publish Phase", () => { from: "yuzhi@graphql.org", subject: "Alright", message: "Tests are good", - unread: true, + unread: true }) ).toBe(true); @@ -524,15 +527,15 @@ describe("Subscription Publish Phase", () => { importantEmail: { email: { from: "yuzhi@graphql.org", - subject: "Alright", + subject: "Alright" }, inbox: { unread: 1, - total: 2, - }, - }, - }, - }, + total: 2 + } + } + } + } }; expect(await payload1).toEqual(expectedPayload); @@ -541,7 +544,9 @@ describe("Subscription Publish Phase", () => { it("produces a payload per subscription event", async () => { const pubsub = new SimplePubSub(); - const subscription = await createSubscription(pubsub) as AsyncIterableIterator; + const subscription = (await createSubscription( + pubsub + )) as AsyncIterableIterator; expect(isAsyncIterable(subscription)).toBeTruthy(); // Wait for the next subscription payload. @@ -553,7 +558,7 @@ describe("Subscription Publish Phase", () => { from: "yuzhi@graphql.org", subject: "Alright", message: "Tests are good", - unread: true, + unread: true }) ).toBe(true); @@ -565,15 +570,15 @@ describe("Subscription Publish Phase", () => { importantEmail: { email: { from: "yuzhi@graphql.org", - subject: "Alright", + subject: "Alright" }, inbox: { unread: 1, - total: 2, - }, - }, - }, - }, + total: 2 + } + } + } + } }); // Another new email arrives, before subscription.next() is called. @@ -582,7 +587,7 @@ describe("Subscription Publish Phase", () => { from: "hyo@graphql.org", subject: "Tools", message: "I <3 making things", - unread: true, + unread: true }) ).toBe(true); @@ -594,21 +599,21 @@ describe("Subscription Publish Phase", () => { importantEmail: { email: { from: "hyo@graphql.org", - subject: "Tools", + subject: "Tools" }, inbox: { unread: 2, - total: 3, - }, - }, - }, - }, + total: 3 + } + } + } + } }); // The client decides to disconnect. - expect(await subscription.return()).toEqual({ + expect(await subscription.return!()).toEqual({ done: true, - value: undefined, + value: undefined }); // Which may result in disconnecting upstream services as well. @@ -617,20 +622,22 @@ describe("Subscription Publish Phase", () => { from: "adam@graphql.org", subject: "Important", message: "Read me please", - unread: true, + unread: true }) ).toBe(false); // No more listeners. // Awaiting a subscription after closing it results in completed results. expect(await subscription.next()).toEqual({ done: true, - value: undefined, + value: undefined }); }); it("produces a payload when there are multiple events", async () => { const pubsub = new SimplePubSub(); - const subscription = await createSubscription(pubsub) as AsyncIterableIterator; + const subscription = (await createSubscription( + pubsub + )) as AsyncIterableIterator; expect(isAsyncIterable(subscription)).toBeTruthy(); let payload = subscription.next(); @@ -641,7 +648,7 @@ describe("Subscription Publish Phase", () => { from: "yuzhi@graphql.org", subject: "Alright", message: "Tests are good", - unread: true, + unread: true }) ).toBe(true); @@ -652,15 +659,15 @@ describe("Subscription Publish Phase", () => { importantEmail: { email: { from: "yuzhi@graphql.org", - subject: "Alright", + subject: "Alright" }, inbox: { unread: 1, - total: 2, - }, - }, - }, - }, + total: 2 + } + } + } + } }); payload = subscription.next(); @@ -671,7 +678,7 @@ describe("Subscription Publish Phase", () => { from: "yuzhi@graphql.org", subject: "Alright 2", message: "Tests are good 2", - unread: true, + unread: true }) ).toBe(true); @@ -682,21 +689,23 @@ describe("Subscription Publish Phase", () => { importantEmail: { email: { from: "yuzhi@graphql.org", - subject: "Alright 2", + subject: "Alright 2" }, inbox: { unread: 2, - total: 3, - }, - }, - }, - }, + total: 3 + } + } + } + } }); }); it("should not trigger when subscription is already done", async () => { const pubsub = new SimplePubSub(); - const subscription = await createSubscription(pubsub) as AsyncIterableIterator; + const subscription = (await createSubscription( + pubsub + )) as AsyncIterableIterator; expect(isAsyncIterable(subscription)).toBeTruthy(); let payload = subscription.next(); @@ -707,7 +716,7 @@ describe("Subscription Publish Phase", () => { from: "yuzhi@graphql.org", subject: "Alright", message: "Tests are good", - unread: true, + unread: true }) ).toBe(true); @@ -718,19 +727,19 @@ describe("Subscription Publish Phase", () => { importantEmail: { email: { from: "yuzhi@graphql.org", - subject: "Alright", + subject: "Alright" }, inbox: { unread: 1, - total: 2, - }, - }, - }, - }, + total: 2 + } + } + } + } }); payload = subscription.next(); - await subscription.return(); + await subscription.return!(); // A new email arrives! expect( @@ -738,19 +747,21 @@ describe("Subscription Publish Phase", () => { from: "yuzhi@graphql.org", subject: "Alright 2", message: "Tests are good 2", - unread: true, + unread: true }) ).toBe(false); expect(await payload).toEqual({ done: true, - value: undefined, + value: undefined }); }); it("should not trigger when subscription is thrown", async () => { const pubsub = new SimplePubSub(); - const subscription = await createSubscription(pubsub) as AsyncIterableIterator; + const subscription = (await createSubscription( + pubsub + )) as AsyncIterableIterator; expect(isAsyncIterable(subscription)).toBeTruthy(); let payload = subscription.next(); @@ -761,7 +772,7 @@ describe("Subscription Publish Phase", () => { from: "yuzhi@graphql.org", subject: "Alright", message: "Tests are good", - unread: true, + unread: true }) ).toBe(true); @@ -772,15 +783,15 @@ describe("Subscription Publish Phase", () => { importantEmail: { email: { from: "yuzhi@graphql.org", - subject: "Alright", + subject: "Alright" }, inbox: { unread: 1, - total: 2, - }, - }, - }, - }, + total: 2 + } + } + } + } }); payload = subscription.next(); @@ -788,7 +799,7 @@ describe("Subscription Publish Phase", () => { // Throw error let caughtError; try { - await subscription.throw("ouch"); + await subscription.throw!("ouch"); } catch (e) { caughtError = e; } @@ -796,13 +807,15 @@ describe("Subscription Publish Phase", () => { expect(await payload).toEqual({ done: true, - value: undefined, + value: undefined }); }); it("event order is correct for multiple publishes", async () => { const pubsub = new SimplePubSub(); - const subscription = await createSubscription(pubsub) as AsyncIterableIterator; + const subscription = (await createSubscription( + pubsub + )) as AsyncIterableIterator; expect(isAsyncIterable(subscription)).toBeTruthy(); let payload = subscription.next(); @@ -813,7 +826,7 @@ describe("Subscription Publish Phase", () => { from: "yuzhi@graphql.org", subject: "Message", message: "Tests are good", - unread: true, + unread: true }) ).toBe(true); @@ -823,7 +836,7 @@ describe("Subscription Publish Phase", () => { from: "yuzhi@graphql.org", subject: "Message 2", message: "Tests are good 2", - unread: true, + unread: true }) ).toBe(true); @@ -834,15 +847,15 @@ describe("Subscription Publish Phase", () => { importantEmail: { email: { from: "yuzhi@graphql.org", - subject: "Message", + subject: "Message" }, inbox: { unread: 2, - total: 3, - }, - }, - }, - }, + total: 3 + } + } + } + } }); payload = subscription.next(); @@ -854,15 +867,15 @@ describe("Subscription Publish Phase", () => { importantEmail: { email: { from: "yuzhi@graphql.org", - subject: "Message 2", + subject: "Message 2" }, inbox: { unread: 2, - total: 3, - }, - }, - }, - }, + total: 3 + } + } + } + } }); }); @@ -886,21 +899,24 @@ describe("Subscription Publish Phase", () => { throw new Error("Never leave."); } return message; - }, - }, - }, - }), + } + } + } + }) }); const document = parse("subscription { newMessage }"); - const subscription = await subscribe({ schema, document }) as AsyncIterableIterator; + const subscription = (await subscribe({ + schema, + document + })) as AsyncIterableIterator; expect(isAsyncIterable(subscription)).toBeTruthy(); expect(await subscription.next()).toEqual({ done: false, value: { - data: { newMessage: "Hello" }, - }, + data: { newMessage: "Hello" } + } }); // An error in execution is presented as such. @@ -912,10 +928,10 @@ describe("Subscription Publish Phase", () => { { message: "Never leave.", locations: [{ line: 1, column: 16 }], - path: ["newMessage"], - }, - ], - }, + path: ["newMessage"] + } + ] + } }); // However that does not close the response event stream. @@ -923,8 +939,8 @@ describe("Subscription Publish Phase", () => { expect(await subscription.next()).toEqual({ done: false, value: { - data: { newMessage: "Bonjour" }, - }, + data: { newMessage: "Bonjour" } + } }); }); @@ -941,34 +957,36 @@ describe("Subscription Publish Phase", () => { fields: { newMessage: { type: GraphQLString, - resolve: (message) => message, - subscribe: generateMessages, - }, - }, - }), + resolve: message => message, + subscribe: generateMessages + } + } + }) }); const document = parse("subscription { newMessage }"); - const subscription = await subscribe({ schema, document }) as AsyncIterableIterator; + const subscription = (await subscribe({ + schema, + document + })) as AsyncIterableIterator; expect(isAsyncIterable(subscription)).toBeTruthy(); expect(await subscription.next()).toEqual({ done: false, value: { - data: { newMessage: "Hello" }, - }, + data: { newMessage: "Hello" } + } }); (await expectPromise(subscription.next())).toRejectWith("test error"); expect(await subscription.next()).toEqual({ done: true, - value: undefined, + value: undefined }); }); }); - class SimplePubSub { _subscribers: Set<(value: T) => void>; @@ -985,7 +1003,7 @@ class SimplePubSub { getSubscriber(transform: (value: T) => R): AsyncGenerator { const pullQueue: Array<(result: IteratorResult) => void> = []; - const pushQueue = []; + const pushQueue: any[] = []; let listening = true; this._subscribers.add(pushValue); @@ -1008,7 +1026,7 @@ class SimplePubSub { if (pushQueue.length > 0) { return Promise.resolve({ value: pushQueue.shift(), done: false }); } - return new Promise((resolve) => pullQueue.push(resolve)); + return new Promise(resolve => pullQueue.push(resolve)); }, return(): Promise> { emptyQueue(); @@ -1020,13 +1038,13 @@ class SimplePubSub { }, [Symbol.asyncIterator]() { return this; - }, + } }; function pushValue(event: T): void { const value: R = transform(event); if (pullQueue.length > 0) { - pullQueue.shift()({ value, done: false }); + pullQueue.shift()!({ value, done: false }); } else { pushQueue.push(value); } From 27c197cd2bf5c82e6a1e0999e4373b21af99499a Mon Sep 17 00:00:00 2001 From: Hoang Vo Date: Sun, 23 May 2021 15:39:17 +0700 Subject: [PATCH 6/6] Update README --- README.md | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index e52d393e..e690702b 100644 --- a/README.md +++ b/README.md @@ -29,8 +29,7 @@ Done in 141.94s. ### Support for GraphQL spec -The goal is to support the [June 2018 version of the GraphQL spec](https://facebook.github.io/graphql/June2018/). At this moment, -the only missing feature is support for Subscriptions. +The goal is to support the [June 2018 version of the GraphQL spec](https://facebook.github.io/graphql/June2018/). #### Differences to `graphql-js` @@ -91,10 +90,19 @@ if (!isCompiledQuery(compiledQuery)) { #### Execute the Query ```js -const executionResult = await compiledQuery.query(); +const executionResult = await compiledQuery.query(root, context, variables); console.log(executionResult); ``` +#### Subscribe to the Query + +```js +const result = await compiledQuery.subscribe(root, context, variables); +for await (const value of result) { + console.log(value); +} +``` + ## API ### compiledQuery = compileQuery(schema, document, operationName, compilerOptions) @@ -112,14 +120,18 @@ Compiles the `document` AST, using an optional operationName and compiler option for overly expensive serializers - `customJSONSerializer` {boolean, default: false} - Whether to produce also a JSON serializer function using `fast-json-stringify`. The default stringifier function is `JSON.stringify` -#### compiledQuery.compiled(root: any, context: any, variables: Maybe<{ [key: string]: any }>) +#### compiledQuery.query(root: any, context: any, variables: Maybe<{ [key: string]: any }>) the compiled function that can be called with a root value, a context and the required variables. +#### compiledQuery.subscribe(root: any, context: any, variables: Maybe<{ [key: string]: any }>) + +(available for GraphQL Subscription only) the compiled function that can be called with a root value, a context and the required variables to produce either an AsyncIterator (if successful) or an ExecutionResult (error). + #### compiledQuery.stringify(value: any) the compiled function for producing a JSON string. It will be `JSON.stringify` unless `compilerOptions.customJSONSerializer` is true. -The value argument should the return of the compiled GraphQL function. +The value argument should be the return of the compiled GraphQL function. ## LICENSE