Skip to content

Commit

Permalink
calculate nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
d2lam committed Oct 4, 2019
1 parent 9159919 commit d916bb3
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 46 deletions.
87 changes: 60 additions & 27 deletions lib/getWorkflow.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,26 @@
const filterNodeName = name =>
(/^~(pr|commit|release|tag|sd@)/.test(name) ? name : name.replace('~', ''));

/**
* Build external nodes for its downstream jobs, DFS to traverse its children
* @method buildExternalNodes
* @param {String} root Source name. ex `sd@123:main`
* @param {Array} children Array of downstream jobs. ex: [sd@456:test, sd@789:test]
* @param {Set} nodes List of graph nodes
* @param {TriggerFactory} triggerFactory triggerFactory to figure out its downstream jobs
* @return {Promise} List of graph nodes
*/
const buildExternalNodes = async (root, children, nodes, triggerFactory) => {
await Promise.all(children.map(async (dest) => {
nodes.add(dest);

const newRoot = dest;
const newChildren = await triggerFactory.getDestFromSrc(newRoot);

await buildExternalNodes(newRoot, newChildren, nodes, triggerFactory);
}));
};

/**
* Get the list of nodes for the graph
* @method calculateNodes
Expand All @@ -18,36 +38,52 @@ const filterNodeName = name =>
* @return {Array} List of nodes (jobs)
*/
const calculateNodes = async (jobs, triggerFactory, pipelineId) => {
console.log(triggerFactory);
const nodes = new Set(['~pr', '~commit']);

Object.keys(jobs).forEach((name) => {
nodes.add(name);
if (Array.isArray(jobs[name].requires)) {
jobs[name].requires.forEach(n => nodes.add(filterNodeName(n)));
await Promise.all(Object.keys(jobs).map(async (jobName) => {
nodes.add(jobName);

// upstream nodes
if (Array.isArray(jobs[jobName].requires)) {
jobs[jobName].requires.forEach(n => nodes.add(filterNodeName(n)));
}
});

// downstream nodes
const srcName = `sd@${pipelineId}:${jobName}`;

// Calculate which downstream jobs are triggered BY current job
// Only need to take care of external triggers, since internal will be taken care automatically
const externalDownstreamOr = await triggerFactory.getDestFromSrc(`~${srcName}`);
const externalDownstreamAnd = await triggerFactory.getDestFromSrc(srcName);

externalDownstreamOr.forEach((dest) => {
nodes.add(dest);
});

await buildExternalNodes(
jobName, externalDownstreamAnd, nodes, triggerFactory);
}));

return [...nodes].map(name => ({ name }));
};

/**
* Build external edges for its downstream jobs, DFS to traverse its children
* @method buildExternalEdge
* @method buildExternalEdges
* @param {String} root Source name. ex `sd@123:main`
* @param {Array} children Array of downstream jobs. ex: [sd@456:test, sd@789:test]
* @param {Object} edges List of graph edges { src, dest }
* @param {Array} edges List of graph edges { src, dest }
* @param {TriggerFactory} triggerFactory triggerFactory to figure out its downstream jobs
* @return {Promise} List of graph edges { src, dest }
*/
const buildExternalEdge = async (root, children, edges, triggerFactory) => {
const buildExternalEdges = async (root, children, edges, triggerFactory) => {
await Promise.all(children.map(async (dest) => {
edges.push({ src: root, dest });

const newRoot = dest;
const newChildren = await triggerFactory.getDestFromSrc(newRoot);

await buildExternalEdge(newRoot, newChildren, edges, triggerFactory);
await buildExternalEdges(newRoot, newChildren, edges, triggerFactory);
}));
};

Expand All @@ -62,8 +98,8 @@ const buildExternalEdge = async (root, children, edges, triggerFactory) => {
const calculateEdges = async (jobs, triggerFactory, pipelineId) => {
const edges = [];

await Promise.all(Object.keys(jobs).map(async (jobname) => {
const job = jobs[jobname];
await Promise.all(Object.keys(jobs).map(async (jobName) => {
const job = jobs[jobName];

if (Array.isArray(job.requires)) {
// Calculate which upstream jobs trigger the current job
Expand All @@ -74,10 +110,10 @@ const calculateEdges = async (jobs, triggerFactory, pipelineId) => {
const isJoin = upstreamAnd.size > 1;

upstreamOr.forEach((src) => {
edges.push({ src: filterNodeName(src), dest: jobname });
edges.push({ src: filterNodeName(src), dest: jobName });
});
upstreamAnd.forEach((src) => {
const obj = { src, dest: jobname };
const obj = { src, dest: jobName };

if (isJoin) {
obj.join = true;
Expand All @@ -86,23 +122,20 @@ const calculateEdges = async (jobs, triggerFactory, pipelineId) => {
});
}

// for backward compatibility
if (triggerFactory) {
const srcName = `sd@${pipelineId}:${jobname}`;
const srcName = `sd@${pipelineId}:${jobName}`;

// Calculate which downstream jobs are triggered BY current job
// Only need to take care of external triggers, since internal will be taken care automatically
// Calculate which downstream jobs are triggered BY current job
// Only need to take care of external triggers, since internal will be taken care automatically

const externalDownstreamOr = await triggerFactory.getDestFromSrc(`~${srcName}`);
const externalDownstreamAnd = await triggerFactory.getDestFromSrc(srcName);
const externalDownstreamOr = await triggerFactory.getDestFromSrc(`~${srcName}`);
const externalDownstreamAnd = await triggerFactory.getDestFromSrc(srcName);

externalDownstreamOr.forEach((dest) => {
edges.push({ src: jobname, dest });
});
externalDownstreamOr.forEach((dest) => {
edges.push({ src: jobName, dest });
});

await buildExternalEdge(
jobname, externalDownstreamAnd, edges, triggerFactory);
}
await buildExternalEdges(
jobName, externalDownstreamAnd, edges, triggerFactory);
}));

return edges;
Expand Down
51 changes: 32 additions & 19 deletions test/lib/getWorkflow.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,16 @@ describe('getWorkflow', () => {

it('should throw if it is not given correct input', async () => {
try {
await getWorkflow({ config: {} });
await getWorkflow({ config: {} }, triggerFactoryMock);
} catch (e) {
console.log(e);
assert.equal(e.message, 'No Job config provided');
}
});

it('should convert a config with job-requires workflow to directed graph', async () => {
const requires = await getWorkflow(REQUIRES_WORKFLOW);
const legacyRequires = await getWorkflow(LEGACY_AND_REQUIRES_WORKFLOW);
const external = await getWorkflow(EXTERNAL_TRIGGER);
const requires = await getWorkflow(REQUIRES_WORKFLOW, triggerFactoryMock);
const legacyRequires = await getWorkflow(LEGACY_AND_REQUIRES_WORKFLOW, triggerFactoryMock);
const external = await getWorkflow(EXTERNAL_TRIGGER, triggerFactoryMock);

assert.deepEqual(requires, EXPECTED_OUTPUT);
assert.deepEqual(legacyRequires, EXPECTED_OUTPUT);
Expand All @@ -42,7 +41,7 @@ describe('getWorkflow', () => {
foo: {},
bar: { requires: ['foo'] }
}
});
}, triggerFactoryMock);

assert.deepEqual(result, {
nodes: [
Expand All @@ -63,7 +62,7 @@ describe('getWorkflow', () => {
B: { requires: ['foo'] },
C: { requires: ['~A', '~B', '~sd@1234:foo'] }
}
});
}, triggerFactoryMock);

assert.deepEqual(result, {
nodes: [
Expand Down Expand Up @@ -96,7 +95,7 @@ describe('getWorkflow', () => {
D: {},
E: {}
}
});
}, triggerFactoryMock);

assert.deepEqual(result, {
nodes: [
Expand Down Expand Up @@ -127,7 +126,7 @@ describe('getWorkflow', () => {
foo: { requires: ['A', 'A', 'A'] },
A: {}
}
});
}, triggerFactoryMock);

assert.deepEqual(result, {
nodes: [
Expand All @@ -150,7 +149,7 @@ describe('getWorkflow', () => {
baz: { requires: ['foo'] },
bax: { requires: ['bar', 'baz'] }
}
});
}, triggerFactoryMock);

assert.deepEqual(result, {
nodes: [
Expand All @@ -171,16 +170,19 @@ describe('getWorkflow', () => {
});

it('should handle external upstream & downstream join', async () => {
/* A -> B --> C
sd@111:external-level1 -> sd@222:external-level2
sd@333:external-level1 -> sd@444:external-level2
sd@555:external-level2
/* A - B - C
\ sd@111:external-level1 -> sd@222:external-level2 /
\ sd@333:external-level1 -> sd@444:external-level2 /
sd@555:external-level2 /
\ ~sd@777:external-level1 -> ~sd@888:external-level2 /
*/

triggerFactoryMock.getDestFromSrc.withArgs('sd@123:A').resolves([
'sd@111:external-level1',
'sd@333:external-level1'
]);
triggerFactoryMock.getDestFromSrc.withArgs('~sd@123:A').resolves([
'~sd@777:external-level1'
]);
triggerFactoryMock.getDestFromSrc.withArgs('sd@111:external-level1').resolves([
'sd@222:external-level2'
]);
Expand All @@ -190,6 +192,9 @@ describe('getWorkflow', () => {
triggerFactoryMock.getDestFromSrc.withArgs('sd@555:external-level2').resolves([
'sd@666:external-level3'
]);
triggerFactoryMock.getDestFromSrc.withArgs('~sd@777:external-level1').resolves([
'~sd@888:external-level2'
]);

const result = await getWorkflow({
jobs: {
Expand All @@ -198,29 +203,37 @@ describe('getWorkflow', () => {
C: { requires: [
'B',
'sd@222:external-level2',
'sd@444:external-level2'
'sd@444:external-level2',
'sd@555:external-level2',
'~sd@888:external-level2'
] }
}
}, triggerFactoryMock, 123);

console.log(result);
assert.deepEqual(result, {
nodes: [
{ name: '~pr' },
{ name: '~commit' },
{ name: 'A' },
{ name: 'B' },
{ name: 'C' },
{ name: 'sd@111:external-level1' },
{ name: 'sd@222:external-level2' },
{ name: 'sd@444:external-level2' },
{ name: 'sd@555:external-level2' },
{ name: '~sd@888:external-level2' },
{ name: '~sd@777:external-level1' },
{ name: 'sd@111:external-level1' },
{ name: 'sd@333:external-level1' },
{ name: 'sd@444:external-level2' }
{ name: 'sd@666:external-level3' }
],
edges: [
{ src: 'A', dest: 'B' },
{ src: '~sd@888:external-level2', dest: 'C' },
{ src: 'B', dest: 'C', join: true },
{ src: 'sd@222:external-level2', dest: 'C', join: true },
{ src: 'sd@444:external-level2', dest: 'C', join: true },
{ src: 'sd@555:external-level2', dest: 'C', join: true },
{ src: 'A', dest: '~sd@777:external-level1' },
{ src: 'A', dest: 'sd@111:external-level1' },
{ src: 'A', dest: 'sd@333:external-level1' },
{ src: 'sd@111:external-level1', dest: 'sd@222:external-level2' },
Expand Down

0 comments on commit d916bb3

Please sign in to comment.