Skip to content

Commit

Permalink
Merge pull request #1101 from neo4j/feature/subscriptions-meta-delete
Browse files Browse the repository at this point in the history
Feature/subscriptions meta delete
  • Loading branch information
angrykoala committed Mar 8, 2022
2 parents b9af5b8 + 2b594ef commit e4bcfb4
Show file tree
Hide file tree
Showing 15 changed files with 731 additions and 63 deletions.
6 changes: 3 additions & 3 deletions docs/rfcs/rfc-007-subscriptions.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,15 +172,15 @@ class Neo4jGraphQLSubscriptionsPlugin {
this.events = new EventEmitter();
}

abstract public publish(eventMeta: EventMeta);
abstract public publish(eventMeta: SubscriptionsEvent);
}
```

The "local" implementation of this will look something like:

```ts
class Neo4jGraphQLSubscriptionsLocalPlugin extends Neo4jGraphQLSubscriptionsPlugin {
public publish(eventMeta: EventMeta) {
public publish(eventMeta: SubscriptionsEvent) {
this.events.emit(eventMeta);
}
}
Expand All @@ -192,7 +192,7 @@ And in rough pseudocode, an implementation of this using an AMQP broker would lo
class Neo4jGraphQLSubscriptionsAMQPPlugin extends Neo4jGraphQLSubscriptionsPlugin {
private amqpConnection;

public publish(eventMeta: EventMeta) {
public publish(eventMeta: SubscriptionsEvent) {
amqpConnection.publish(eventMeta);
}

Expand Down
36 changes: 2 additions & 34 deletions packages/graphql/src/schema/resolvers/create.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ import { translateCreate } from "../../translate";
import { Node } from "../../classes";
import { Context } from "../../types";
import getNeo4jResolveTree from "../../utils/get-neo4j-resolve-tree";
import { EventMeta, RawEventMeta } from "../../subscriptions/event-meta";
import { serializeNeo4jValue } from "../../utils/neo4j-serializers";
import { publishEventsToPlugin } from "./subscriptions/publish-events-to-plugin";

export default function createResolver({ node }: { node: Node }) {
async function resolve(_root: any, args: any, _context: unknown, info: GraphQLResolveInfo) {
Expand All @@ -44,15 +43,7 @@ export default function createResolver({ node }: { node: Node }) {
) as FieldNode;
const nodeKey = nodeProjection?.alias ? nodeProjection.alias.value : nodeProjection?.name?.value;

const subscriptionsPlugin = context.plugins?.subscriptions;
if (subscriptionsPlugin) {
const metaData: RawEventMeta[] = executeResult.records[0]?.meta || [];
for (const meta of metaData) {
const serializedMeta = serializeEventMeta(meta);
// eslint-disable-next-line @typescript-eslint/no-floating-promises
subscriptionsPlugin.publish(serializedMeta);
}
}
publishEventsToPlugin(executeResult, context.plugins?.subscriptions);

return {
info: {
Expand All @@ -69,26 +60,3 @@ export default function createResolver({ node }: { node: Node }) {
args: { input: `[${node.name}CreateInput!]!` },
};
}

function serializeProperties(properties: Record<string, any> | undefined): Record<string, any> | undefined {
if (!properties) {
return undefined;
}

return Object.entries(properties).reduce((serializedProps, [k, v]) => {
serializedProps[k] = serializeNeo4jValue(v);
return serializedProps;
}, {} as Record<string, any>);
}

function serializeEventMeta(event: RawEventMeta): EventMeta {
return {
id: serializeNeo4jValue(event.id),
timestamp: serializeNeo4jValue(event.timestamp),
event: event.event,
properties: {
old: serializeProperties(event.properties.old),
new: serializeProperties(event.properties.new),
},
} as EventMeta;
}
3 changes: 3 additions & 0 deletions packages/graphql/src/schema/resolvers/delete.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import { execute } from "../../utils";
import { translateDelete } from "../../translate";
import { Context } from "../../types";
import { Node } from "../../classes";
import { publishEventsToPlugin } from "./subscriptions/publish-events-to-plugin";

export default function deleteResolver({ node }: { node: Node }) {
async function resolve(_root: any, args: any, _context: unknown, info: GraphQLResolveInfo) {
Expand All @@ -36,6 +37,8 @@ export default function deleteResolver({ node }: { node: Node }) {
context,
});

publishEventsToPlugin(executeResult, context.plugins?.subscriptions);

return { bookmark: executeResult.bookmark, ...executeResult.statistics };
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { ExecuteResult } from "../../../utils/execute";
import { serializeNeo4jValue } from "../../../utils/neo4j-serializers";
import { Neo4jGraphQLSubscriptionsPlugin } from "../../../types";
import { EventMeta, SubscriptionsEvent } from "../../../subscriptions/subscriptions-event";

export function publishEventsToPlugin(
executeResult: ExecuteResult,
plugin: Neo4jGraphQLSubscriptionsPlugin | undefined
): void {
if (plugin) {
const metadata: EventMeta[] = executeResult.records[0]?.meta || [];

for (const rawEvent of metadata) {
const subscriptionsEvent = serializeEvent(rawEvent);
// eslint-disable-next-line @typescript-eslint/no-floating-promises
plugin.publish(subscriptionsEvent);
}
}
}

function serializeEvent(event: EventMeta): SubscriptionsEvent {
return {
id: serializeNeo4jValue(event.id),
timestamp: serializeNeo4jValue(event.timestamp),
event: event.event,
properties: {
old: serializeProperties(event.properties.old),
new: serializeProperties(event.properties.new),
},
} as SubscriptionsEvent; // Casting here because ts is not smart enough to get the difference between create|update|delete
}

function serializeProperties(properties: Record<string, any> | undefined): Record<string, any> | undefined {
if (!properties) {
return undefined;
}

return Object.entries(properties).reduce((serializedProps, [k, v]) => {
serializedProps[k] = serializeNeo4jValue(v);
return serializedProps;
}, {} as Record<string, any>);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import * as neo4j from "neo4j-driver";

export type RawEventMeta = {
export type EventMeta = {
event: "create" | "update" | "delete";
properties: {
old: Record<string, any>;
Expand All @@ -29,7 +29,7 @@ export type RawEventMeta = {
timestamp: neo4j.Integer | string | number;
};

export type EventMeta = (
export type SubscriptionsEvent = (
| {
event: "create";
properties: {
Expand Down
20 changes: 17 additions & 3 deletions packages/graphql/src/translate/create-delete-and-params.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import { Node, Relationship } from "../classes";
import { Context } from "../types";
import createAuthAndParams from "./create-auth-and-params";
import createConnectionWhereAndParams from "./where/create-connection-where-and-params";
import { AUTH_FORBIDDEN_ERROR } from "../constants";
import { AUTH_FORBIDDEN_ERROR, META_CYPHER_VARIABLE } from "../constants";
import { createEventMetaObject } from "./subscriptions/create-event-meta";
import { filterMetaVariable } from "./subscriptions/filter-meta-variable";

interface Res {
strs: string[];
Expand Down Expand Up @@ -208,11 +210,23 @@ function createDeleteAndParams({
}
}

const nodeToDelete = `${_varName}_to_delete`;
res.strs.push(
`WITH ${[...withVars, `collect(DISTINCT ${_varName}) as ${_varName}_to_delete`].join(", ")}`
`WITH ${[...withVars, `collect(DISTINCT ${_varName}) as ${nodeToDelete}`].join(", ")}`
);
res.strs.push(`FOREACH(x IN ${_varName}_to_delete | DETACH DELETE x)`);

if (context.subscriptionsEnabled) {
const metaObjectStr = createEventMetaObject({
event: "delete",
nodeVariable: "n",
});
const reduceStr = `REDUCE(m=${META_CYPHER_VARIABLE}, n IN ${nodeToDelete} | m + ${metaObjectStr}) AS ${META_CYPHER_VARIABLE}`;
res.strs.push(
`WITH ${[...filterMetaVariable(withVars), nodeToDelete].join(", ")}, ${reduceStr}`
);
}

res.strs.push(`FOREACH(x IN ${_varName}_to_delete | DETACH DELETE x)`);
// TODO - relationship validation
});
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,18 @@

import { META_CYPHER_VARIABLE } from "../../constants";

export type EventMetaType = "create" | "update" | "delete";
export type SubscriptionsEventType = "create" | "update" | "delete";

export function createEventMeta({ event, nodeVariable }: { event: EventMetaType; nodeVariable: string }): string {
const properties = createEventMetaProperties({ event, nodeVariable });
export function createEventMeta({ event, nodeVariable }: { event: SubscriptionsEventType; nodeVariable: string }): string {
return `${META_CYPHER_VARIABLE} + ${createEventMetaObject({ event, nodeVariable })} AS ${META_CYPHER_VARIABLE}`;
}

return `${META_CYPHER_VARIABLE} + { event: "${event}", id: id(${nodeVariable}), ${properties}, timestamp: timestamp() } AS ${META_CYPHER_VARIABLE}`;
export function createEventMetaObject({ event, nodeVariable }: { event: SubscriptionsEventType; nodeVariable: string }): string {
const properties = createEventMetaProperties({ event, nodeVariable });
return `{ event: "${event}", id: id(${nodeVariable}), ${properties}, timestamp: timestamp() }`;
}

function createEventMetaProperties({ event, nodeVariable }: { event: EventMetaType; nodeVariable: string }): string {
function createEventMetaProperties({ event, nodeVariable }: { event: SubscriptionsEventType; nodeVariable: string }): string {
let oldProps: string;
let newProps: string;

Expand Down
39 changes: 33 additions & 6 deletions packages/graphql/src/translate/translate-delete.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@

import { Node } from "../classes";
import { Context } from "../types";
import { AUTH_FORBIDDEN_ERROR } from "../constants";
import { AUTH_FORBIDDEN_ERROR, META_CYPHER_VARIABLE } from "../constants";
import createAuthAndParams from "./create-auth-and-params";
import createDeleteAndParams from "./create-delete-and-params";
import translateTopLevelMatch from "./translate-top-level-match";
import { createEventMeta } from "./subscriptions/create-event-meta";

function translateDelete({ context, node }: { context: Context; node: Node }): [string, any] {
export default function translateDelete({ context, node }: { context: Context; node: Node }): [string, any] {
const { resolveTree } = context;
const deleteInput = resolveTree.args.delete;
const varName = "this";
Expand All @@ -33,6 +34,12 @@ function translateDelete({ context, node }: { context: Context; node: Node }): [
let deleteStr = "";
let cypherParams: { [k: string]: any } = {};

const withVars = [varName];

if (context.subscriptionsEnabled) {
withVars.push(META_CYPHER_VARIABLE);
}

const topLevelMatch = translateTopLevelMatch({ node, context, varName, operation: "DELETE" });
matchAndWhereStr = topLevelMatch[0];
cypherParams = { ...cypherParams, ...topLevelMatch[1] };
Expand All @@ -48,7 +55,9 @@ function translateDelete({ context, node }: { context: Context; node: Node }): [
});
if (allowAuth[0]) {
cypherParams = { ...cypherParams, ...allowAuth[1] };
allowStr = `WITH ${varName}\nCALL apoc.util.validate(NOT(${allowAuth[0]}), "${AUTH_FORBIDDEN_ERROR}", [0])`;
allowStr = `WITH ${withVars.join(", ")}\nCALL apoc.util.validate(NOT(${
allowAuth[0]
}), "${AUTH_FORBIDDEN_ERROR}", [0])`;
}

if (deleteInput) {
Expand All @@ -58,7 +67,7 @@ function translateDelete({ context, node }: { context: Context; node: Node }): [
deleteInput,
varName,
parentVar: varName,
withVars: [varName],
withVars,
parameterPrefix: `${varName}_${resolveTree.name}.args.delete`,
});
[deleteStr] = deleteAndParams;
Expand All @@ -71,9 +80,27 @@ function translateDelete({ context, node }: { context: Context; node: Node }): [
};
}

const cypher = [matchAndWhereStr, deleteStr, allowStr, `DETACH DELETE ${varName}`];
const eventMeta = createEventMeta({ event: "delete", nodeVariable: varName });

const cypher = [
...(context.subscriptionsEnabled ? [`WITH [] AS ${META_CYPHER_VARIABLE}`] : []),
matchAndWhereStr,
...(context.subscriptionsEnabled ? [`WITH ${varName}, ${eventMeta}`] : []),
deleteStr,
allowStr,
`DETACH DELETE ${varName}`,
...getDeleteReturn(context),
];

return [cypher.filter(Boolean).join("\n"), cypherParams];
}

export default translateDelete;
function getDeleteReturn(context: Context): Array<string> {
return context.subscriptionsEnabled
? [
`WITH ${META_CYPHER_VARIABLE}`,
`UNWIND ${META_CYPHER_VARIABLE} AS m`,
`RETURN collect(DISTINCT m) AS ${META_CYPHER_VARIABLE}`,
]
: [];
}
4 changes: 2 additions & 2 deletions packages/graphql/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import { ResolveTree } from "graphql-parse-resolve-info";
import { Driver, Integer } from "neo4j-driver";
import { Node, Relationship } from "./classes";
import { RelationshipQueryDirectionOption } from "./constants";
import { EventMeta } from "./subscriptions/event-meta";
import { SubscriptionsEvent } from "./subscriptions/subscriptions-event";

export type DriverConfig = {
database?: string;
Expand Down Expand Up @@ -346,7 +346,7 @@ export interface Neo4jGraphQLAuthPlugin {
export interface Neo4jGraphQLSubscriptionsPlugin {
events: EventEmitter;

publish(eventMeta: EventMeta): Promise<void>;
publish(eventMeta: SubscriptionsEvent): Promise<void>;
}

export interface Neo4jGraphQLPlugins {
Expand Down
8 changes: 4 additions & 4 deletions packages/graphql/src/utils/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import environment from "../environment";

const debug = Debug(DEBUG_EXECUTE);

interface ExecuteResult {
export interface ExecuteResult {
bookmark: string | null;
result: QueryResult;
statistics: Record<string, number>;
Expand Down Expand Up @@ -100,9 +100,9 @@ async function execute(input: {
try {
debug("%s", `About to execute Cypher:\nCypher:\n${cypher}\nParams:\n${JSON.stringify(input.params, null, 2)}`);

const result: QueryResult = await session[
`${input.defaultAccessMode.toLowerCase()}Transaction`
]((tx: Transaction) => tx.run(cypher, input.params));
const result: QueryResult = await session[`${input.defaultAccessMode.toLowerCase()}Transaction`](
(tx: Transaction) => tx.run(cypher, input.params)
);

const records = result.records.map((r) => r.toObject());

Expand Down
Loading

0 comments on commit e4bcfb4

Please sign in to comment.