Skip to content

Commit

Permalink
Merge pull request #168 from neume-network/11/lifecycle
Browse files Browse the repository at this point in the history
upgrade lifecycle handler
  • Loading branch information
il3ven committed Jul 16, 2022
2 parents 40fcf59 + b941378 commit fdac308
Show file tree
Hide file tree
Showing 10 changed files with 236 additions and 205 deletions.
287 changes: 179 additions & 108 deletions src/lifecycle.mjs
@@ -1,5 +1,5 @@
//@format
import path from "path";
import path, { resolve } from "path";
import { fileURLToPath } from "url";
import { createInterface } from "readline";
import { createReadStream } from "fs";
Expand All @@ -8,14 +8,9 @@ import EventEmitter from "events";
import { env, exit } from "process";

import Ajv from "ajv";
import partition from "lodash.partition";
import { lifecycleMessage } from "@neume-network/message-schema";

import {
NotImplementedError,
NotFoundError,
ValidationError,
} from "./errors.mjs";
import { NotFoundError, ValidationError } from "./errors.mjs";
import { loadStrategies, write } from "./disc.mjs";
import logger from "./logger.mjs";

Expand All @@ -29,9 +24,6 @@ const fileNames = {
extractor: "extractor.mjs",
};
const timeout = 3000;
const ajv = new Ajv();
const validate = ajv.compile(lifecycleMessage);
class LifeCycleHandler extends EventEmitter {}

function fill(buffer, write, messages) {
if (write) {
Expand Down Expand Up @@ -60,7 +52,6 @@ export async function lineReader(path, strategy) {
});

await once(rl, "close");
log(`Ending transformer strategy with name "${strategy.name}"`);
const { write, messages } = strategy.onClose();
buffer = fill(buffer, write, messages);
return buffer;
Expand All @@ -87,72 +78,27 @@ export async function setupFinder() {
};
}

export function check(message) {
const valid = validate(message);
if (!valid) {
const sMessage = JSON.stringify(message);
log(JSON.stringify(validate.errors));
throw new ValidationError(
`Found 1 or more validation error when checking lifecycle message: "${sMessage}"`
);
}
}

export function filterLifeCycle(messages) {
return messages.type === "extraction" || messages.type === "transformation";
}

export function generatePath(name, type) {
return path.resolve(dataDir, `${name}-${type}`);
}

async function transform(strategy, name, type) {
const filePath = generatePath(name, type);
return await lineReader(filePath, strategy);
}
const result = await lineReader(filePath, strategy);

export async function run(strategy, type, fun, params) {
let result;
if (fun === "init") {
log(
`Starting strategy with type "${type}" and name "${
strategy.module.name
}" with fn "init" and params "${JSON.stringify(params)}"`
);
}
if (type === "extraction") {
if (params) {
result = await strategy.module[fun](...params);
} else {
result = await strategy.module[fun]();
}
} else if (type === "transformation") {
result = await transform(
strategy.module,
strategy.module.name,
"extraction"
if (result && result.write) {
const filePath = generatePath(
transformStrategy.module.name,
"transformation"
);
}

const filePath = generatePath(strategy.module.name, type);
if (result) {
if (result.write) {
await write(filePath, `${result.write}\n`);
}
await write(filePath, `${result.write}\n`);
} else {
throw new Error(
`Strategy "${
strategy.module.name
}" and call "${fun}" didn't return a valid result: "${JSON.stringify(
result
)}"`
}-tranformation" didn't return a valid result: "${JSON.stringify(result)}`
);
}
const [lifecycle, worker] = partition(result.messages, filterLifeCycle);
return {
lifecycle,
worker,
};
}

function applyTimeout(message) {
Expand All @@ -162,60 +108,185 @@ function applyTimeout(message) {
return message;
}

export function extract(strategy, worker, messageRouter, args = []) {
return new Promise(async (resolve, reject) => {
let numberOfMessages = 0;
const type = "extraction";
const checkResult = (result) => {
if (!result) {
reject(
`Strategy "${
strategy.module.name
}-extraction" didn't return a valid result: "${JSON.stringify(
result
)}`
);
return;
}
return result;
};

const result = checkResult(await strategy.module.init(...args));

if (result.write) {
const filePath = generatePath(strategy.module.name, type);
await write(filePath, `${result.write}\n`);
}

const callback = async (message) => {
numberOfMessages--;

const result = checkResult(strategy.module.update(message));

if (!result)
reject(
`Strategy "${
strategy.module.name
}" and call init didn't return a valid result: "${JSON.stringify(
result
)}`
);

result.messages?.forEach((message) => {
numberOfMessages++;
worker.postMessage(applyTimeout(message));
});

if (result.write) {
const filePath = generatePath(strategy.module.name, type);
await write(filePath, `${result.write}\n`);
}

if (numberOfMessages === 0) {
messageRouter.off(`${strategy.module.name}-${type}`, callback);
resolve();
}
};

messageRouter.on(`${strategy.module.name}-${type}`, callback);

if (result.messages.length !== 0) {
result.messages.forEach((message) => {
numberOfMessages++;
worker.postMessage(applyTimeout(message));
});
} else {
messageRouter.off(`${strategy.module.name}-${type}`, callback);
resolve();
}
});
}

export async function init(worker) {
const lch = new LifeCycleHandler();
const finder = await setupFinder();
const messageRouter = new EventEmitter();

worker.on("message", async (message) => {
if (message.error) {
throw new Error(message.error);
throw new Error(message.commissioner + ":" + message.error);
}

const lifeCycleType = "extraction";
const strategy = finder(lifeCycleType, message.commissioner);
const messages = await run(strategy, lifeCycleType, "update", [message]);
messageRouter.emit(`${message.commissioner}-extraction`, message);
});

if (
messages.lifecycle &&
messages.lifecycle.length >= 1 &&
messages.lifecycle[0].type === "transformation"
) {
log(`Ending extractor strategy with name "${strategy.module.name}"`);
}
// crawlPath[i] and crawlPath[i+1] are executed in sequence
// crawlPath[i][j] and crawlPath[i][j+1] are executed in parallel
// TODO: Define and check for valid message schema. Current lifecycle message schema
// doesn't work. https://github.com/neume-network/message-schema/issues/19
const crawlPath = [
[{ name: "web3subgraph", extractor: {}, transform: {} }],
[
{
name: "soundxyz-call-tokenuri",
extractor: {
args: [resolve(env.DATA_DIR, "web3subgraph-transformation")],
},
transformer: {},
},
{
name: "zora-call-tokenuri",
extractor: {
args: [resolve(env.DATA_DIR, "web3subgraph-transformation")],
},
transformer: {},
},
{
name: "zora-call-tokenmetadatauri",
extractor: {
args: [resolve(env.DATA_DIR, "web3subgraph-transformation")],
},
transformer: {},
},
{
name: "soundxyz-metadata",
extractor: {
args: [resolve(env.DATA_DIR, "web3subgraph-transformation")],
},
transformer: {},
},
],
[
{
name: "soundxyz-get-tokenuri",
extractor: {
args: [
resolve(env.DATA_DIR, "soundxyz-call-tokenuri-transformation"),
],
},
transformer: {},
},
{
name: "zora-get-tokenuri",
extractor: {
args: [
resolve(env.DATA_DIR, "zora-call-tokenmetadatauri-transformation"),
],
},
transformer: {},
},
],
[
{
name: "music-os-accumulator",
extractor: { args: [] },
transformer: {},
},
],
];

messages.worker.forEach((message) => {
worker.postMessage(applyTimeout(message));
});
messages.lifecycle.forEach((message) => {
lch.emit("message", message);
});
});
for await (const path of crawlPath) {
await Promise.all(
path.map(async (strategy) => {
if (strategy.extractor) {
const extractStrategy = finder("extraction", strategy.name);
log(
`Starting extractor strategy with name "${
extractStrategy.module.name
}" with params "${JSON.stringify(strategy.extractor.args)}"`
);
await extract(
extractStrategy,
worker,
messageRouter,
strategy.extractor.args
);
}

lch.on("message", async (message) => {
check(message);
log(`Received new lifecycle message: "${JSON.stringify(message)}"`);
if (message.type === "exit") {
exit();
}
const lifeCycleType = message.type;
const strategy = finder(lifeCycleType, message.name);
const messages = await run(strategy, lifeCycleType, "init", message.args);
messages.worker.forEach((message) =>
worker.postMessage(applyTimeout(message))
if (strategy.transformer) {
const transformStrategy = finder("transformation", strategy.name);
log(
`Starting transformer strategy with name "${transformStrategy.module.name}"`
);
await transform(
transformStrategy.module,
transformStrategy.module.name,
"extraction"
);
log(
`Ending transformer strategy with name "${transformStrategy.module.name}"`
);
}
})
);
messages.lifecycle.forEach((message) => lch.emit("message", message));
});

lch.emit("message", {
type: "extraction",
version: "0.0.1",
name: "web3subgraph",
args: [],
});
//lch.emit("message", {
// type: "transformation",
// version: "0.0.1",
// name: "web3subgraph",
// args: null,
//});
}
}
15 changes: 1 addition & 14 deletions src/strategies/soundxyz-get-tokenuri/extractor.mjs
Expand Up @@ -22,7 +22,6 @@ export async function init(filePath) {

messages.push(makeRequest(tokenURI));
}
messages[messages.length - 1].last = true;
return {
write: null,
messages,
Expand All @@ -44,20 +43,8 @@ export function makeRequest(tokenURI) {
}

export function update(message) {
let messages = [];
if (message.last) {
messages = [
{
type: "transformation",
version,
name,
args: null,
},
];
}

return {
messages,
messages: [],
write: JSON.stringify({
metadata: {
tokenURI: message.options.url,
Expand Down
11 changes: 1 addition & 10 deletions src/strategies/soundxyz-metadata/extractor.mjs
Expand Up @@ -111,16 +111,7 @@ export function update(message) {

if (message.results.length === LENGTH_OF_EDITIONS_RESPONSE) {
return {
messages: [
{
type: "transformation",
version,
name,
args: null,
results: null,
error: null,
},
],
messages: [],
write: JSON.stringify({
metadata: message.metadata,
results: message.results,
Expand Down

0 comments on commit fdac308

Please sign in to comment.