Skip to content

Commit

Permalink
Add missing hooks and tracing for graphql operations
Browse files Browse the repository at this point in the history
  • Loading branch information
Nick Clark committed Jul 7, 2020
1 parent 6bf4ced commit 45f11b5
Show file tree
Hide file tree
Showing 2 changed files with 401 additions and 87 deletions.
142 changes: 103 additions & 39 deletions src/service.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,44 @@ module.exports = function(mixinOptions) {
};
},
},
context: {
visibility: "private",
tracing: false,
handler(ctx) {
const { req, connection } = ctx.params;
let context = {
dataLoaders: new Map(), // create an empty map to load DataLoader instances into
};

if (req) {
context = {
...context,
ctx: req.$ctx,
service: req.$service,
params: req.$params,
};
} else if (connection) {
context = {
...context,
ctx,
connectionCtx: connection.context.$ctx,
service: connection.context.$service,
params: connection.context.$params,
};
} else {
throw new Error("Unrecognized request type for context action");
}

return context;
},
},
actionOptions: {
visibility: "private",
tracing: false,
handler() {
return {};
},
},
},

events: {
Expand Down Expand Up @@ -95,7 +133,7 @@ module.exports = function(mixinOptions) {

if (service.version != null)
return (
(typeof service.version == "number"
(typeof service.version === "number"
? "v" + service.version
: service.version) +
"." +
Expand Down Expand Up @@ -155,10 +193,18 @@ module.exports = function(mixinOptions) {
params: staticParams = {},
rootParams = {},
fileUploadArg = null,
fieldName = "",
typeName = "",
} = def;
const rootKeys = Object.keys(rootParams);

return async (root, args, context) => {
// Record the span if possible
let operationSpan;
if (context.ctx)
operationSpan = context.ctx.startSpan(`GQL ${typeName} ${fieldName}`);
let result;

try {
if (useDataLoader) {
const dataLoaderMapKey = this.getDataLoaderMapKey(
Expand Down Expand Up @@ -205,33 +251,41 @@ module.exports = function(mixinOptions) {
}

if (dataLoaderKey == null) {
return null;
result = null;
} else {
result = Array.isArray(dataLoaderKey)
? await dataLoader.loadMany(dataLoaderKey)
: await dataLoader.load(dataLoaderKey);
}

return Array.isArray(dataLoaderKey)
? await dataLoader.loadMany(dataLoaderKey)
: await dataLoader.load(dataLoaderKey);
} else if (fileUploadArg != null && args[fileUploadArg] != null) {
if (Array.isArray(args[fileUploadArg])) {
return await Promise.all(
result = await Promise.all(
args[fileUploadArg].map(async uploadPromise => {
const {
createReadStream,
...$fileInfo
} = await uploadPromise;
const stream = createReadStream();
return context.ctx.call(actionName, stream, {
meta: { $fileInfo },
});
return context.ctx.call(
actionName,
stream,
{ meta: { $fileInfo } },
await this.actions.actionOptions(root, args, context)
);
})
);
} else {
const { createReadStream, ...$fileInfo } = await args[
fileUploadArg
];
const stream = createReadStream();
result = await context.ctx.call(
actionName,
stream,
{ meta: { $fileInfo } },
await this.actions.actionOptions(root, args, context)
);
}

const { createReadStream, ...$fileInfo } = await args[fileUploadArg];
const stream = createReadStream();
return await context.ctx.call(actionName, stream, {
meta: { $fileInfo },
});
} else {
const params = {};
if (root && rootKeys) {
Expand All @@ -240,12 +294,17 @@ module.exports = function(mixinOptions) {
});
}

return await context.ctx.call(
result = await context.ctx.call(
actionName,
_.defaultsDeep({}, args, params, staticParams)
_.defaultsDeep({}, args, params, staticParams),
{},
await this.actions.actionOptions(root, args, context)
);
}
if (operationSpan) operationSpan.finish();
return result;
} catch (err) {
if (operationSpan) operationSpan.finish();
if (nullIfError) {
return null;
}
Expand Down Expand Up @@ -324,17 +383,29 @@ module.exports = function(mixinOptions) {
subscribe: filter
? withFilter(
() => this.pubsub.asyncIterator(tags),
async (payload, params, { ctx }) =>
payload !== undefined
? ctx.call(filter, { ...params, payload })
: false
async (payload, params) => {
return payload !== undefined
? this.createAsyncIteratorContext().call(filter, {
...params,
payload,
})
: false;
}
)
: () => this.pubsub.asyncIterator(tags),
resolve: (payload, params, { ctx }) =>
ctx.call(actionName, { ...params, payload }),
resolve: (payload, params) => {
return this.createAsyncIteratorContext().call(actionName, {
...params,
payload,
});
},
};
},

createAsyncIteratorContext() {
return this.broker.ContextFactory.create(this.broker, null, {}, {});
},

/**
* Generate GraphQL Schema
*
Expand Down Expand Up @@ -442,7 +513,11 @@ module.exports = function(mixinOptions) {
const name = this.getFieldName(query);
queries.push(query);
resolver.Query[name] = this.createActionResolver(
action.name
action.name,
{
typeName: "query",
fieldName: name,
}
);
});
}
Expand All @@ -457,6 +532,8 @@ module.exports = function(mixinOptions) {
action.name,
{
fileUploadArg: def.fileUploadArg,
typeName: "mutation",
fieldName: name,
}
);
});
Expand Down Expand Up @@ -607,20 +684,7 @@ module.exports = function(mixinOptions) {
this.apolloServer = new ApolloServer({
schema,
..._.defaultsDeep({}, mixinOptions.serverOptions, {
context: ({ req, connection }) => ({
...(req
? {
ctx: req.$ctx,
service: req.$service,
params: req.$params,
}
: {
ctx: connection.context.$ctx,
service: connection.context.$service,
params: connection.context.$params,
}),
dataLoaders: new Map(), // create an empty map to load DataLoader instances into
}),
context: integrationContext => this.actions.context(integrationContext),
subscriptions: {
onConnect: (connectionParams, socket) =>
this.actions.ws({ connectionParams, socket }),
Expand Down

0 comments on commit 45f11b5

Please sign in to comment.