Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(1710): support external join. BREAKING CHANGE: add new external join logic [4] #27

Merged
merged 14 commits into from
Nov 5, 2019
189 changes: 156 additions & 33 deletions lib/getWorkflow.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,80 +9,203 @@
const filterNodeName = name =>
(/^~(pr|commit|release|tag|sd@)/.test(name) ? name : name.replace('~', ''));
d2lam marked this conversation as resolved.
Show resolved Hide resolved

/**
* Build external nodes for its downstream jobs, DFS to traverse its children
* @method buildExternalNodes
* @param {String} root Source name. ex `sd@123:main`
* @param {Array} children Array of downstream jobs. ex: [sd@456:test, sd@789:test]
* @param {Set} nodes List of graph nodes
* @param {TriggerFactory} triggerFactory triggerFactory to figure out its downstream jobs
* @return {Promise} List of graph nodes
*/
const buildExternalNodes = async (root, children, nodes, triggerFactory) => {
await Promise.all(children.map(async (dest) => {
nodes.add(dest);

const newRoot = dest;
const newChildren = await triggerFactory.getDestFromSrc(newRoot);

await buildExternalNodes(newRoot, newChildren, nodes, triggerFactory);
}));
};

/**
* Get the list of nodes for the graph
* @method calculateNodes
* @param {Object} jobs Hash of job configs
* @return {Array} List of nodes (jobs)
* @param {Object} jobs Hash of job configs
* @param {TriggerFactory} triggerFactory Trigger Factory to find external triggers
* @param {Number} pipelineId Id of the current pipeline
* @return {Array} List of nodes (jobs)
*/
const calculateNodes = (jobs) => {
const calculateNodes = async (jobs, triggerFactory, pipelineId) => {
const nodes = new Set(['~pr', '~commit']);

Object.keys(jobs).forEach((name) => {
nodes.add(name);
if (Array.isArray(jobs[name].requires)) {
jobs[name].requires.forEach(n => nodes.add(filterNodeName(n)));
// for backward compatibility. TODO: remove this block later
if (!triggerFactory) {
Object.keys(jobs).forEach((name) => {
nodes.add(name);
if (Array.isArray(jobs[name].requires)) {
jobs[name].requires.forEach(n => nodes.add(filterNodeName(n)));
}
});

return [...nodes].map(name => ({ name }));
}

// new implementation. allow external join
await Promise.all(Object.keys(jobs).map(async (jobName) => {
nodes.add(jobName);

// upstream nodes
if (Array.isArray(jobs[jobName].requires)) {
jobs[jobName].requires.forEach(n => nodes.add(filterNodeName(n)));
}
});

// downstream nodes
const srcName = `sd@${pipelineId}:${jobName}`;

// Calculate which downstream jobs are triggered BY current job
// Only need to take care of external triggers, since internal will be taken care automatically
const externalDownstreamOr = await triggerFactory.getDestFromSrc(`~${srcName}`);
tkyi marked this conversation as resolved.
Show resolved Hide resolved
const externalDownstreamAnd = await triggerFactory.getDestFromSrc(srcName);

externalDownstreamOr.forEach((dest) => {
nodes.add(dest);
});

await buildExternalNodes(
jobName, externalDownstreamAnd, nodes, triggerFactory);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
jobName, externalDownstreamAnd, nodes, triggerFactory);
srcName, externalDownstreamAnd, nodes, triggerFactory);

}));

return [...nodes].map(name => ({ name }));
};

/**
* Build external edges for its downstream jobs, DFS to traverse its children
* @method buildExternalEdges
* @param {String} root Source name. ex `sd@123:main`
* @param {Array} children Array of downstream jobs. ex: [sd@456:test, sd@789:test]
* @param {Array} edges List of graph edges { src, dest }
* @param {TriggerFactory} triggerFactory triggerFactory to figure out its downstream jobs
* @return {Promise} List of graph edges { src, dest }
*/
const buildExternalEdges = async (root, children, edges, triggerFactory) => {
await Promise.all(children.map(async (dest) => {
edges.push({ src: root, dest });

const newRoot = dest;
const newChildren = await triggerFactory.getDestFromSrc(newRoot);

await buildExternalEdges(newRoot, newChildren, edges, triggerFactory);
}));
};

/**
* Calculate edges of directed graph based on "requires" property of jobs
* @method calculateEdges
* @param {Object} jobs Hash of job configurations
* @return {Array} List of graph edges { src, dest }
* @param {Object} jobs Hash of job configurations
* @param {TriggerFactory} triggerFactory Trigger Factory to find external triggers
* @param {Number} pipelineId Id of the current pipeline
* @return {Array} List of graph edges { src, dest }
*/
const calculateEdges = (jobs) => {
const calculateEdges = async (jobs, triggerFactory, pipelineId) => {
const edges = [];

Object.keys(jobs).forEach((j) => {
const job = jobs[j];
const dest = j;
// for backward compatibility. TODO: remove this block later
if (!triggerFactory) {
Object.keys(jobs).forEach((j) => {
const job = jobs[j];
const dest = j;

if (Array.isArray(job.requires)) {
const specialTriggers = new Set(job.requires.filter(name => name.charAt(0) === '~'));
const normalTriggers = new Set(job.requires.filter(name => name.charAt(0) !== '~'));
const isJoin = normalTriggers.size > 1;
if (Array.isArray(job.requires)) {
const specialTriggers = new Set(
job.requires.filter(name => name.charAt(0) === '~'));
const normalTriggers = new Set(
job.requires.filter(name => name.charAt(0) !== '~'));
const isJoin = normalTriggers.size > 1;

specialTriggers.forEach((src) => {
edges.push({ src: filterNodeName(src), dest });
});
specialTriggers.forEach((src) => {
edges.push({ src: filterNodeName(src), dest });
});

normalTriggers.forEach((src) => {
const obj = { src, dest };

if (isJoin) {
obj.join = true;
}

edges.push(obj);
});
}
});

return edges;
}

// new implementation. allow external join
await Promise.all(Object.keys(jobs).map(async (jobName) => {
const job = jobs[jobName];

normalTriggers.forEach((src) => {
const obj = { src, dest };
if (Array.isArray(job.requires)) {
// Calculate which upstream jobs trigger the current job
const upstreamOr = new Set(
job.requires.filter(name => name.charAt(0) === '~'));
const upstreamAnd = new Set(
job.requires.filter(name => name.charAt(0) !== '~'));
const isJoin = upstreamAnd.size > 1;

upstreamOr.forEach((src) => {
edges.push({ src: filterNodeName(src), dest: jobName });
tkyi marked this conversation as resolved.
Show resolved Hide resolved
});
upstreamAnd.forEach((src) => {
const obj = { src, dest: jobName };
tkyi marked this conversation as resolved.
Show resolved Hide resolved

if (isJoin) {
obj.join = true;
}

edges.push(obj);
});
}
});

const srcName = `sd@${pipelineId}:${jobName}`;

// Calculate which downstream jobs are triggered BY current job
// Only need to take care of external triggers, since internal will be taken care automatically

const externalDownstreamOr = await triggerFactory.getDestFromSrc(`~${srcName}`);
const externalDownstreamAnd = await triggerFactory.getDestFromSrc(srcName);

externalDownstreamOr.forEach((dest) => {
edges.push({ src: jobName, dest });
});

await buildExternalEdges(
jobName, externalDownstreamAnd, edges, triggerFactory);
tkyi marked this conversation as resolved.
Show resolved Hide resolved
}));

return edges;
};

/**
* Given a pipeline config, return a directed graph configuration that describes the workflow
* @method getWorkflow
* @param {Object} pipelineConfig A Pipeline Config
* @param {Object} pipelineConfig.jobs Hash of job configs
* @return {Object} List of nodes and edges { nodes, edges }
* @param {Object} pipelineConfig A Pipeline Config
* @param {Object} pipelineConfig.jobs Hash of job configs
* @param {TriggerFactory} triggerFactory Trigger Factory to find external triggers
* @param {Number} pipelineId Id of the current pipeline
* @return {Object} List of nodes and edges {nodes, edges}
*/
const getWorkflow = (pipelineConfig) => {
const getWorkflow = async (pipelineConfig, triggerFactory, pipelineId) => {
const jobConfig = pipelineConfig.jobs;
let edges = [];

if (!jobConfig) {
throw new Error('No Job config provided');
}
const nodes = await calculateNodes(jobConfig, triggerFactory, pipelineId);
const edges = await calculateEdges(jobConfig, triggerFactory, pipelineId);

edges = calculateEdges(jobConfig);

return { nodes: calculateNodes(jobConfig), edges };
return { nodes, edges };
};

module.exports = getWorkflow;
5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "screwdriver-workflow-parser",
"version": "0.0.1",
"version": "2.0.0",
"description": "Parses and converts pipeline configuration into a workflow",
"main": "index.js",
"scripts": {
Expand Down Expand Up @@ -38,7 +38,8 @@
"eslint": "^4.19.1",
"eslint-config-screwdriver": "^3.0.0",
"jenkins-mocha": "^7.0.0",
"rewire": "^4.0.1"
"rewire": "^4.0.1",
"sinon": "^7.5.0"
},
"dependencies": {
"screwdriver-data-schema": "^18.39.2"
Expand Down
32 changes: 32 additions & 0 deletions test/data/expected-external-complex.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{
"nodes": [
{ "name": "~pr" },
{ "name": "~commit" },
{ "name": "A" },
{ "name": "B" },
{ "name": "C" },
{ "name": "sd@222:external-level2" },
{ "name": "sd@444:external-level2" },
{ "name": "sd@555:external-level2" },
{ "name": "~sd@888:external-level2" },
{ "name": "~sd@777:external-level1" },
{ "name": "sd@111:external-level1" },
{ "name": "sd@333:external-level1" },
{ "name": "sd@666:external-level3" }
],
"edges": [
{ "src": "A", "dest": "B" },
{ "src": "~sd@888:external-level2", "dest": "C" },
{ "src": "B", "dest": "C", "join": true },
{ "src": "sd@222:external-level2", "dest": "C", "join": true },
{ "src": "sd@444:external-level2", "dest": "C", "join": true },
{ "src": "sd@555:external-level2", "dest": "C", "join": true },
{ "src": "A", "dest": "~sd@777:external-level1" },
{ "src": "A", "dest": "sd@111:external-level1" },
{ "src": "A", "dest": "sd@333:external-level1" },
{ "src": "sd@111:external-level1", "dest": "sd@222:external-level2" },
{ "src": "sd@333:external-level1", "dest": "sd@444:external-level2" },
{ "src": "sd@333:external-level1", "dest": "sd@555:external-level2" },
{ "src": "sd@555:external-level2", "dest": "sd@666:external-level3" }
]
}
4 changes: 3 additions & 1 deletion test/data/expected-external.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
{ "name": "main" },
{ "name": "foo" },
{ "name": "bar" },
{ "name": "sd@111:baz" },
{ "name": "~sd@1234:foo" }
],
"edges": [
{ "src": "~pr", "dest": "main" },
{ "src": "~commit", "dest": "main" },
{ "src": "main", "dest": "foo" },
{ "src": "~sd@1234:foo", "dest": "bar" },
{ "src": "foo", "dest": "bar" }
{ "src": "foo", "dest": "bar", "join": true },
{ "src": "sd@111:baz", "dest": "bar", "join": true }
]
}
2 changes: 1 addition & 1 deletion test/data/requires-workflow-exttrigger.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"jobs": {
"main": { "requires": ["~pr", "~commit"] },
"foo": { "requires": ["main"] },
"bar": { "requires": ["foo", "~sd@1234:foo"] }
"bar": { "requires": ["foo", "sd@111:baz", "~sd@1234:foo"] }
}
}
19 changes: 19 additions & 0 deletions test/lib/getNextJobs.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
const assert = require('chai').assert;
const getNextJobs = require('../../lib/getNextJobs');
const WORKFLOW = require('../data/expected-output');
const EXTERNAL_WORKFLOW = require('../data/expected-external');
const EXTERNAL_COMPLEX_WORKFLOW = require('../data/expected-external-complex');

describe('getNextJobs', () => {
it('should throw if trigger not provided', () => {
Expand Down Expand Up @@ -59,6 +61,10 @@ describe('getNextJobs', () => {
}), []);
});

it('should figure out what jobs start next with parallel workflow with external', () => {
assert.deepEqual(getNextJobs(EXTERNAL_WORKFLOW, { trigger: 'sd@111:baz' }), ['bar']);
});

it('should figure out what jobs start next with parallel workflow', () => {
const parallelWorkflow = {
edges: [
Expand Down Expand Up @@ -109,4 +115,17 @@ describe('getNextJobs', () => {
assert.deepEqual(getNextJobs(specificBranchWorkflow, { trigger: '~pr:bar-foo-prod',
prNum: '123' }), ['PR-123:f', 'PR-123:g']);
});

it('should figure out what jobs start next with external workflow', () => {
/* A - B - C
\ sd@111:external-level1 -> sd@222:external-level2 /
\ sd@333:external-level1 -> sd@444:external-level2 /
sd@555:external-level2 /
\ ~sd@777:external-level1 -> ~sd@888:external-level2 /
*/
assert.deepEqual(getNextJobs(EXTERNAL_COMPLEX_WORKFLOW, { trigger: 'A' }),
['B', '~sd@777:external-level1', 'sd@111:external-level1', 'sd@333:external-level1']);
assert.deepEqual(getNextJobs(EXTERNAL_COMPLEX_WORKFLOW,
{ trigger: 'B' }), ['C']);
});
});
Loading