Skip to content

Commit

Permalink
feat(server): add hooks for all object created
Browse files Browse the repository at this point in the history
  • Loading branch information
thesophiaxu committed Dec 19, 2021
1 parent de8fdfe commit bc8848e
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 14 deletions.
5 changes: 4 additions & 1 deletion packages/unigraph-dev-backend/src/executableManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { PackageDeclaration } from "unigraph-dev-common/lib/types/packages";
import { addHook } from "./hooks";
import Babel from '@babel/core';
import { getRandomInt } from "unigraph-dev-common/lib/utils/utils";
import { mergeWithConcatArray } from "./utils";

export type Executable = {
name?: string,
Expand Down Expand Up @@ -92,17 +93,19 @@ export function buildExecutable(exec: Executable, context: ExecContext, unigraph
}

export function initExecutables(executables: [string, Executable][], context: Partial<ExecContext>, unigraph: Unigraph, schedule: Record<string, cron.ScheduledTask>, states: any) {
let newHooks = {};
executables.forEach(([key, el]) => {
if (key.startsWith("0x") && el.periodic) {
schedule[el["unigraph.id"]]?.stop();
schedule[el["unigraph.id"]] = cron.schedule(el.periodic, () => buildExecutable(el, {...context, definition: el, params: {}}, unigraph, states)())
}
if (key.startsWith("0x") && el.on_hook) {
states.hooks = addHook(states.hooks, el.on_hook, async (params: any) => {
newHooks = addHook(newHooks, el.on_hook, async (params: any) => {
return (buildExecutable(el, {...context, definition: el, params}, unigraph, states))();
})
}
})
states.hooks = _.mergeWith({}, states.defaultHooks, newHooks, mergeWithConcatArray);
}

/** Routines */
Expand Down
34 changes: 33 additions & 1 deletion packages/unigraph-dev-backend/src/hooks.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import _ from "lodash";
import { Cache } from "./caches";
import DgraphClient from "./dgraphClient";
import { Subscription } from "./subscriptions";
import { mergeWithConcatArray } from "./utils";

/* eslint-disable */ // TODO: Temporarily appease the linter, remember to fix it later
export type Hooks = Record<string, Function[]>
Expand All @@ -10,7 +12,7 @@ export async function callHooks<T>(hooks: Hooks, type: string, context: T) {
}

export function addHook(hooks: Hooks, type: string, fn: any) {
return _.merge({}, hooks, {[type]: [fn]});
return _.mergeWith({}, hooks, {[type]: [fn]}, mergeWithConcatArray);
}

// Default hooks
Expand All @@ -27,4 +29,34 @@ export type HookAfterObjectChangedParams = {
subscriptions: Subscription[],
caches: Record<string, Cache<any>>,
subIds?: any[],
}

export async function initEntityHeads (states: any, schemas: string[], client: DgraphClient) {
const queries = schemas.map((el, index) => `query${index} (func: eq(<unigraph.id>, "${el}")) {
uid
<~type> (first: -1) {
uid
}
}`);
const res: any[] = await client.queryDgraph(`query { ${queries.join('\n')} }`);
const newHeads = res.map((el, index) => [schemas[index], (el[0]?.['~type']?.[0].uid || "0x1")]);
states.entityHeadByType = Object.fromEntries(newHeads);
}

export async function afterObjectCreatedHooks (states: any, hooks: Record<string, any[]>, client: DgraphClient) {
const queries = Object.keys(hooks).map((el, index) => `query${index} (func: eq(<unigraph.id>, "${el}")) {
uid
<~type> (after: ${states.entityHeadByType[el]}) {
uid
}
}`);
const res: any[] = await client.queryDgraph(`query { ${queries.join('\n')} }`);
const newEntities = res.map((el) => (el[0]?.['~type'] || []).map((el: any) => el.uid));
newEntities.forEach((el, index) => { if (el.length > 0) states.entityHeadByType[Object.keys(hooks)[index]] = el[el.length - 1] });
// Call hooks based on objects created
newEntities.forEach((el, index) => {
if (el.length > 0) {
Object.values(hooks)[index].forEach(it => it({uids: el}));
}
});
}
40 changes: 28 additions & 12 deletions packages/unigraph-dev-backend/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { addUnigraphPackage, checkOrCreateDefaultDataModel, createPackageCache,
import { Cache } from './caches';
import repl from 'repl';
import { createSubscriptionLocal, MsgCallbackFn, pollSubscriptions, removeOrHibernateSubscriptionsById, Subscription } from './subscriptions';
import { callHooks, HookAfterObjectChangedParams, HookAfterSchemaUpdatedParams, HookAfterSubscriptionAddedParams, Hooks } from './hooks';
import { afterObjectCreatedHooks, callHooks, HookAfterObjectChangedParams, HookAfterSchemaUpdatedParams, HookAfterSubscriptionAddedParams, Hooks, initEntityHeads } from './hooks';
import { getAsyncLock } from './asyncManager';
import fetch from 'node-fetch';
import { uniqueId } from 'lodash';
Expand Down Expand Up @@ -76,6 +76,19 @@ export default async function startServer(client: DgraphClient) {
if (context.subIds && !Array.isArray(context.subIds)) context.subIds = [context.subIds]
pollSubscriptions(context.subscriptions, dgraphClient, pollCallback, context.subIds, serverStates);
await context.caches["executables"].updateNow();

// Call after_object_created hooks with uids cached
const objectCreatedHooks: any = {}
Object.keys(serverStates.hooks).forEach((el) => {
if (el.startsWith('after_object_created/')) {
const schemaName = el.replace('after_object_created/', '$/schema/');
objectCreatedHooks[schemaName] = serverStates.hooks[el];
}
})
lock.acquire('caches/head', async function (done: Function) {
await afterObjectCreatedHooks(serverStates, objectCreatedHooks, client);
done(false, null);
});
}],
}

Expand All @@ -86,6 +99,7 @@ export default async function startServer(client: DgraphClient) {
caches: caches,
subscriptions: _subscriptions,
hooks: hooks,
defaultHooks: hooks,
namespaceMap: namespaceMap,
localApi: {} as Unigraph,
httpCallbacks: {},
Expand Down Expand Up @@ -115,7 +129,8 @@ export default async function startServer(client: DgraphClient) {
}))
})
}, 250)
}
},
entityHeadByType: {},
})

const namespaceSub = createSubscriptionLocal(getRandomInt(), (data) => {
Expand Down Expand Up @@ -184,7 +199,7 @@ export default async function startServer(client: DgraphClient) {

"create_data_by_json": function (event: EventCreateDataByJson, ws: IWebsocket) {
dgraphClient.createData(event.data).then(_ => {
callHooks(hooks, "after_object_changed", {subscriptions: serverStates.subscriptions, caches: caches})
callHooks(serverStates.hooks, "after_object_changed", {subscriptions: serverStates.subscriptions, caches: caches})
ws.send(makeResponse(event, true));
}).catch(e => ws.send(makeResponse(event, false, {"error": e})))
},
Expand Down Expand Up @@ -245,7 +260,7 @@ export default async function startServer(client: DgraphClient) {
const schemaAutoref = processAutorefUnigraphId(schema);
const upsert: UnigraphUpsert = insertsToUpsert([schemaAutoref], undefined, serverStates.caches['schemas'].dataAlt![0]);
dgraphClient.createUnigraphUpsert(upsert).then(async _ => {
await callHooks(hooks, "after_schema_updated", {caches: caches});
await callHooks(serverStates.hooks, "after_schema_updated", {caches: caches});
ws.send(makeResponse(event, true));
done(false, null)
}).catch(e => {ws.send(makeResponse(event, false, {"error": e})); done(true, null)});
Expand All @@ -263,7 +278,7 @@ export default async function startServer(client: DgraphClient) {
// Falls back to add package
const eventb: any = {...event, package: event.fallback};
addUnigraphPackage(dgraphClient, eventb.package, caches).then(async _ => {
await callHooks(hooks, "after_schema_updated", {caches: caches});
await callHooks(serverStates.hooks, "after_schema_updated", {caches: caches});
done(false, null)
//console.log("Hooks called")
ws.send(makeResponse(eventb, true));
Expand All @@ -275,7 +290,7 @@ export default async function startServer(client: DgraphClient) {
"add_unigraph_package": function (event: EventAddUnigraphPackage, ws: IWebsocket) {
lock.acquire('caches/schema', function(done: Function) {
addUnigraphPackage(dgraphClient, event.package, caches, event.update).then(async _ => {
await callHooks(hooks, "after_schema_updated", {caches: caches});
await callHooks(serverStates.hooks, "after_schema_updated", {caches: caches});
done(false, null)
//console.log("Hooks called")
ws.send(makeResponse(event, true));
Expand All @@ -299,14 +314,13 @@ export default async function startServer(client: DgraphClient) {
"create_unigraph_object": function (event: EventCreateUnigraphObject, ws: IWebsocket) {
if (!event.schema) { ws.send(makeResponse(event, false, {"error": "Cannot add Unigraph object without a schema!"})); return false; }
localApi.addObject(event.object, event.schema, event.padding).then((uids: any[]) => {
callHooks(hooks, "after_object_changed", {subscriptions: serverStates.subscriptions, caches: caches})
ws.send(makeResponse(event, true, {results: uids}))
}).catch((e: any) => ws.send(makeResponse(event, false, {"error": e})));
},

"update_spo": function (event: EventUpdateSPO, ws: IWebsocket) {
dgraphClient.updateSPO(event.uid, event.predicate, event.value).then(_ => {
callHooks(hooks, "after_object_changed", {subscriptions: serverStates.subscriptions, caches: caches})
callHooks(serverStates.hooks, "after_object_changed", {subscriptions: serverStates.subscriptions, caches: caches})
ws.send(makeResponse(event, true))
}).catch(e => ws.send(makeResponse(event, false, {"error": e})));
},
Expand Down Expand Up @@ -342,7 +356,7 @@ export default async function startServer(client: DgraphClient) {
perfLogStartDbTransaction();
dgraphClient.createUnigraphUpsert(upsert).then(_ => {
perfLogAfterDbTransaction();
callHooks(hooks, "after_object_changed", {subscriptions: serverStates.subscriptions, caches: caches, subIds: event.subIds})
callHooks(serverStates.hooks, "after_object_changed", {subscriptions: serverStates.subscriptions, caches: caches, subIds: event.subIds})
ws.send(makeResponse(event, true))
}).catch(e => ws.send(makeResponse(event, false, {"error": e})));
}
Expand All @@ -362,7 +376,7 @@ export default async function startServer(client: DgraphClient) {

const finalUpsert = insertsToUpsert([finalUpdater], undefined, serverStates.caches['schemas'].dataAlt![0]);
dgraphClient.createUnigraphUpsert(finalUpsert).then(_ => {
callHooks(hooks, "after_object_changed", {subscriptions: serverStates.subscriptions, caches: caches, subIds: event.subIds})
callHooks(serverStates.hooks, "after_object_changed", {subscriptions: serverStates.subscriptions, caches: caches, subIds: event.subIds})
ws.send(makeResponse(event, true))
}).catch(e => ws.send(makeResponse(event, false, {"error": e})));
}
Expand Down Expand Up @@ -443,7 +457,7 @@ export default async function startServer(client: DgraphClient) {
fs.writeFileSync('imports_log.json', JSON.stringify(ref));
const upsert = insertsToUpsert(ref, undefined, serverStates.caches['schemas'].dataAlt![0]);
dgraphClient.createUnigraphUpsert(upsert).then(_ => {
callHooks(hooks, "after_object_changed", {subscriptions: serverStates.subscriptions, caches: caches})
callHooks(serverStates.hooks, "after_object_changed", {subscriptions: serverStates.subscriptions, caches: caches})
ws.send(makeResponse(event, true))
}).catch(e => ws.send(makeResponse(event, false, {"error": e})));
},
Expand All @@ -457,7 +471,7 @@ export default async function startServer(client: DgraphClient) {

"add_notification": async function (event: EventAddNotification, ws: IWebsocket) {
await addNotification(event.item, caches, dgraphClient).catch(e => ws.send(makeResponse(event, false, {error: e})));
callHooks(hooks, "after_object_changed", {subscriptions: serverStates.subscriptions, caches: caches});
callHooks(serverStates.hooks, "after_object_changed", {subscriptions: serverStates.subscriptions, caches: caches});
ws.send(makeResponse(event, true));
},

Expand All @@ -476,6 +490,8 @@ export default async function startServer(client: DgraphClient) {

await Promise.all(Object.values(caches).map((el: Cache<any>) => el.updateNow()));

initEntityHeads(serverStates, Object.keys(serverStates.caches['schemas'].data).filter(el => el.startsWith('$/schema/')), client);

const server = new WebSocket.Server({
port: PORT,
perMessageDeflate: true
Expand Down
7 changes: 7 additions & 0 deletions packages/unigraph-dev-backend/src/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import _ from "lodash";

export function mergeWithConcatArray(objValue: any, srcValue: any) {
if (_.isArray(objValue)) {
return objValue.concat(srcValue);
}
}

0 comments on commit bc8848e

Please sign in to comment.