Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/subscriptions meta delete #1101

Merged
merged 7 commits into from
Mar 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/rfcs/rfc-007-subscriptions.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,15 +172,15 @@ class Neo4jGraphQLSubscriptionsPlugin {
this.events = new EventEmitter();
}

abstract public publish(eventMeta: EventMeta);
abstract public publish(eventMeta: SubscriptionsEvent);
}
```

The "local" implementation of this will look something like:

```ts
class Neo4jGraphQLSubscriptionsLocalPlugin extends Neo4jGraphQLSubscriptionsPlugin {
public publish(eventMeta: EventMeta) {
public publish(eventMeta: SubscriptionsEvent) {
this.events.emit(eventMeta);
}
}
Expand All @@ -192,7 +192,7 @@ And in rough pseudocode, an implementation of this using an AMQP broker would lo
class Neo4jGraphQLSubscriptionsAMQPPlugin extends Neo4jGraphQLSubscriptionsPlugin {
private amqpConnection;

public publish(eventMeta: EventMeta) {
public publish(eventMeta: SubscriptionsEvent) {
amqpConnection.publish(eventMeta);
}

Expand Down
36 changes: 2 additions & 34 deletions packages/graphql/src/schema/resolvers/create.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ import { translateCreate } from "../../translate";
import { Node } from "../../classes";
import { Context } from "../../types";
import getNeo4jResolveTree from "../../utils/get-neo4j-resolve-tree";
import { EventMeta, RawEventMeta } from "../../subscriptions/event-meta";
import { serializeNeo4jValue } from "../../utils/neo4j-serializers";
import { publishEventsToPlugin } from "./subscriptions/publish-events-to-plugin";

export default function createResolver({ node }: { node: Node }) {
async function resolve(_root: any, args: any, _context: unknown, info: GraphQLResolveInfo) {
Expand All @@ -44,15 +43,7 @@ export default function createResolver({ node }: { node: Node }) {
) as FieldNode;
const nodeKey = nodeProjection?.alias ? nodeProjection.alias.value : nodeProjection?.name?.value;

const subscriptionsPlugin = context.plugins?.subscriptions;
if (subscriptionsPlugin) {
const metaData: RawEventMeta[] = executeResult.records[0]?.meta || [];
for (const meta of metaData) {
const serializedMeta = serializeEventMeta(meta);
// eslint-disable-next-line @typescript-eslint/no-floating-promises
subscriptionsPlugin.publish(serializedMeta);
}
}
publishEventsToPlugin(executeResult, context.plugins?.subscriptions);

return {
info: {
Expand All @@ -69,26 +60,3 @@ export default function createResolver({ node }: { node: Node }) {
args: { input: `[${node.name}CreateInput!]!` },
};
}

function serializeProperties(properties: Record<string, any> | undefined): Record<string, any> | undefined {
if (!properties) {
return undefined;
}

return Object.entries(properties).reduce((serializedProps, [k, v]) => {
serializedProps[k] = serializeNeo4jValue(v);
return serializedProps;
}, {} as Record<string, any>);
}

function serializeEventMeta(event: RawEventMeta): EventMeta {
return {
id: serializeNeo4jValue(event.id),
timestamp: serializeNeo4jValue(event.timestamp),
event: event.event,
properties: {
old: serializeProperties(event.properties.old),
new: serializeProperties(event.properties.new),
},
} as EventMeta;
}
3 changes: 3 additions & 0 deletions packages/graphql/src/schema/resolvers/delete.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import { execute } from "../../utils";
import { translateDelete } from "../../translate";
import { Context } from "../../types";
import { Node } from "../../classes";
import { publishEventsToPlugin } from "./subscriptions/publish-events-to-plugin";

export default function deleteResolver({ node }: { node: Node }) {
async function resolve(_root: any, args: any, _context: unknown, info: GraphQLResolveInfo) {
Expand All @@ -36,6 +37,8 @@ export default function deleteResolver({ node }: { node: Node }) {
context,
});

publishEventsToPlugin(executeResult, context.plugins?.subscriptions);

return { bookmark: executeResult.bookmark, ...executeResult.statistics };
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { ExecuteResult } from "../../../utils/execute";
import { serializeNeo4jValue } from "../../../utils/neo4j-serializers";
import { Neo4jGraphQLSubscriptionsPlugin } from "../../../types";
import { EventMeta, SubscriptionsEvent } from "../../../subscriptions/subscriptions-event";

export function publishEventsToPlugin(
executeResult: ExecuteResult,
plugin: Neo4jGraphQLSubscriptionsPlugin | undefined
): void {
if (plugin) {
const metadata: EventMeta[] = executeResult.records[0]?.meta || [];

for (const rawEvent of metadata) {
const subscriptionsEvent = serializeEvent(rawEvent);
// eslint-disable-next-line @typescript-eslint/no-floating-promises
plugin.publish(subscriptionsEvent);
}
}
}

function serializeEvent(event: EventMeta): SubscriptionsEvent {
return {
id: serializeNeo4jValue(event.id),
timestamp: serializeNeo4jValue(event.timestamp),
event: event.event,
properties: {
old: serializeProperties(event.properties.old),
new: serializeProperties(event.properties.new),
},
} as SubscriptionsEvent; // Casting here because ts is not smart enough to get the difference between create|update|delete
}

function serializeProperties(properties: Record<string, any> | undefined): Record<string, any> | undefined {
if (!properties) {
return undefined;
}

return Object.entries(properties).reduce((serializedProps, [k, v]) => {
serializedProps[k] = serializeNeo4jValue(v);
return serializedProps;
}, {} as Record<string, any>);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import * as neo4j from "neo4j-driver";

export type RawEventMeta = {
export type EventMeta = {
event: "create" | "update" | "delete";
properties: {
old: Record<string, any>;
Expand All @@ -29,7 +29,7 @@ export type RawEventMeta = {
timestamp: neo4j.Integer | string | number;
};

export type EventMeta = (
export type SubscriptionsEvent = (
| {
event: "create";
properties: {
Expand Down
20 changes: 17 additions & 3 deletions packages/graphql/src/translate/create-delete-and-params.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import { Node, Relationship } from "../classes";
import { Context } from "../types";
import createAuthAndParams from "./create-auth-and-params";
import createConnectionWhereAndParams from "./where/create-connection-where-and-params";
import { AUTH_FORBIDDEN_ERROR } from "../constants";
import { AUTH_FORBIDDEN_ERROR, META_CYPHER_VARIABLE } from "../constants";
import { createEventMetaObject } from "./subscriptions/create-event-meta";
import { filterMetaVariable } from "./subscriptions/filter-meta-variable";

interface Res {
strs: string[];
Expand Down Expand Up @@ -208,11 +210,23 @@ function createDeleteAndParams({
}
}

const nodeToDelete = `${_varName}_to_delete`;
res.strs.push(
`WITH ${[...withVars, `collect(DISTINCT ${_varName}) as ${_varName}_to_delete`].join(", ")}`
`WITH ${[...withVars, `collect(DISTINCT ${_varName}) as ${nodeToDelete}`].join(", ")}`
);
res.strs.push(`FOREACH(x IN ${_varName}_to_delete | DETACH DELETE x)`);

if (context.subscriptionsEnabled) {
const metaObjectStr = createEventMetaObject({
event: "delete",
nodeVariable: "n",
});
const reduceStr = `REDUCE(m=${META_CYPHER_VARIABLE}, n IN ${nodeToDelete} | m + ${metaObjectStr}) AS ${META_CYPHER_VARIABLE}`;
res.strs.push(
`WITH ${[...filterMetaVariable(withVars), nodeToDelete].join(", ")}, ${reduceStr}`
);
}

res.strs.push(`FOREACH(x IN ${_varName}_to_delete | DETACH DELETE x)`);
// TODO - relationship validation
});
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,18 @@

import { META_CYPHER_VARIABLE } from "../../constants";

export type EventMetaType = "create" | "update" | "delete";
export type SubscriptionsEventType = "create" | "update" | "delete";

export function createEventMeta({ event, nodeVariable }: { event: EventMetaType; nodeVariable: string }): string {
const properties = createEventMetaProperties({ event, nodeVariable });
export function createEventMeta({ event, nodeVariable }: { event: SubscriptionsEventType; nodeVariable: string }): string {
return `${META_CYPHER_VARIABLE} + ${createEventMetaObject({ event, nodeVariable })} AS ${META_CYPHER_VARIABLE}`;
}

return `${META_CYPHER_VARIABLE} + { event: "${event}", id: id(${nodeVariable}), ${properties}, timestamp: timestamp() } AS ${META_CYPHER_VARIABLE}`;
export function createEventMetaObject({ event, nodeVariable }: { event: SubscriptionsEventType; nodeVariable: string }): string {
const properties = createEventMetaProperties({ event, nodeVariable });
return `{ event: "${event}", id: id(${nodeVariable}), ${properties}, timestamp: timestamp() }`;
}

function createEventMetaProperties({ event, nodeVariable }: { event: EventMetaType; nodeVariable: string }): string {
function createEventMetaProperties({ event, nodeVariable }: { event: SubscriptionsEventType; nodeVariable: string }): string {
let oldProps: string;
let newProps: string;

Expand Down
39 changes: 33 additions & 6 deletions packages/graphql/src/translate/translate-delete.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@

import { Node } from "../classes";
import { Context } from "../types";
import { AUTH_FORBIDDEN_ERROR } from "../constants";
import { AUTH_FORBIDDEN_ERROR, META_CYPHER_VARIABLE } from "../constants";
import createAuthAndParams from "./create-auth-and-params";
import createDeleteAndParams from "./create-delete-and-params";
import translateTopLevelMatch from "./translate-top-level-match";
import { createEventMeta } from "./subscriptions/create-event-meta";

function translateDelete({ context, node }: { context: Context; node: Node }): [string, any] {
export default function translateDelete({ context, node }: { context: Context; node: Node }): [string, any] {
const { resolveTree } = context;
const deleteInput = resolveTree.args.delete;
const varName = "this";
Expand All @@ -33,6 +34,12 @@ function translateDelete({ context, node }: { context: Context; node: Node }): [
let deleteStr = "";
let cypherParams: { [k: string]: any } = {};

const withVars = [varName];

if (context.subscriptionsEnabled) {
withVars.push(META_CYPHER_VARIABLE);
}

const topLevelMatch = translateTopLevelMatch({ node, context, varName, operation: "DELETE" });
matchAndWhereStr = topLevelMatch[0];
cypherParams = { ...cypherParams, ...topLevelMatch[1] };
Expand All @@ -48,7 +55,9 @@ function translateDelete({ context, node }: { context: Context; node: Node }): [
});
if (allowAuth[0]) {
cypherParams = { ...cypherParams, ...allowAuth[1] };
allowStr = `WITH ${varName}\nCALL apoc.util.validate(NOT(${allowAuth[0]}), "${AUTH_FORBIDDEN_ERROR}", [0])`;
allowStr = `WITH ${withVars.join(", ")}\nCALL apoc.util.validate(NOT(${
allowAuth[0]
}), "${AUTH_FORBIDDEN_ERROR}", [0])`;
}

if (deleteInput) {
Expand All @@ -58,7 +67,7 @@ function translateDelete({ context, node }: { context: Context; node: Node }): [
deleteInput,
varName,
parentVar: varName,
withVars: [varName],
withVars,
parameterPrefix: `${varName}_${resolveTree.name}.args.delete`,
});
[deleteStr] = deleteAndParams;
Expand All @@ -71,9 +80,27 @@ function translateDelete({ context, node }: { context: Context; node: Node }): [
};
}

const cypher = [matchAndWhereStr, deleteStr, allowStr, `DETACH DELETE ${varName}`];
const eventMeta = createEventMeta({ event: "delete", nodeVariable: varName });

const cypher = [
...(context.subscriptionsEnabled ? [`WITH [] AS ${META_CYPHER_VARIABLE}`] : []),
matchAndWhereStr,
...(context.subscriptionsEnabled ? [`WITH ${varName}, ${eventMeta}`] : []),
deleteStr,
allowStr,
`DETACH DELETE ${varName}`,
...getDeleteReturn(context),
];

return [cypher.filter(Boolean).join("\n"), cypherParams];
}

export default translateDelete;
function getDeleteReturn(context: Context): Array<string> {
return context.subscriptionsEnabled
? [
`WITH ${META_CYPHER_VARIABLE}`,
`UNWIND ${META_CYPHER_VARIABLE} AS m`,
`RETURN collect(DISTINCT m) AS ${META_CYPHER_VARIABLE}`,
]
: [];
}
4 changes: 2 additions & 2 deletions packages/graphql/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import { ResolveTree } from "graphql-parse-resolve-info";
import { Driver, Integer } from "neo4j-driver";
import { Node, Relationship } from "./classes";
import { RelationshipQueryDirectionOption } from "./constants";
import { EventMeta } from "./subscriptions/event-meta";
import { SubscriptionsEvent } from "./subscriptions/subscriptions-event";

export type DriverConfig = {
database?: string;
Expand Down Expand Up @@ -346,7 +346,7 @@ export interface Neo4jGraphQLAuthPlugin {
export interface Neo4jGraphQLSubscriptionsPlugin {
events: EventEmitter;

publish(eventMeta: EventMeta): Promise<void>;
publish(eventMeta: SubscriptionsEvent): Promise<void>;
}

export interface Neo4jGraphQLPlugins {
Expand Down
8 changes: 4 additions & 4 deletions packages/graphql/src/utils/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import environment from "../environment";

const debug = Debug(DEBUG_EXECUTE);

interface ExecuteResult {
export interface ExecuteResult {
bookmark: string | null;
result: QueryResult;
statistics: Record<string, number>;
Expand Down Expand Up @@ -100,9 +100,9 @@ async function execute(input: {
try {
debug("%s", `About to execute Cypher:\nCypher:\n${cypher}\nParams:\n${JSON.stringify(input.params, null, 2)}`);

const result: QueryResult = await session[
`${input.defaultAccessMode.toLowerCase()}Transaction`
]((tx: Transaction) => tx.run(cypher, input.params));
const result: QueryResult = await session[`${input.defaultAccessMode.toLowerCase()}Transaction`](
(tx: Transaction) => tx.run(cypher, input.params)
);

const records = result.records.map((r) => r.toObject());

Expand Down
Loading