Skip to content

Commit

Permalink
Workflow fixes (#9)
Browse files Browse the repository at this point in the history
* Docs

* remove unused

* improve code legibility

* add missing deps

* DEBUG default to err level
  • Loading branch information
facugon committed Mar 4, 2022
1 parent abcc832 commit 416844c
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 173 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ Components Control. Can be configured to do one o more things (or nothing)
| Variable Name | Usage |
| ----- | ----- |
| COMMANDER_DISABLED | disable internal commander api |
| MONITORING_DISABLED | disable monitoring system. system monitors will not be checked anymore. will only change when bots and agents send updates |
| MONITORING_DISABLED | disable monitoring system. The core will stop checking monitors status. Events for monitor status will be triggered on API updates |
| API_DISABLED | disable rest api |
| SCHEDULER_JOBS_DISABLED | disable internal scheduler execution. scheduler-jobs will be created using the rest api but task will never be executed. theeye-jobs execution timeout will be never checked. |
| SCHEDULER_JOBS_DISABLED | disable internal scheduler execution. scheduled jobs can be created but jobs will not be created. |

### Start development sample

Expand Down
3 changes: 0 additions & 3 deletions core/controllers/workflow/crud.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@ const logger = require('../../lib/logger')('controller:workflow')
const audit = require('../../lib/audit')
const router = require('../../router')
// const audit = require('../../lib/audit')
const Workflow = require('../../entity/workflow').Workflow
const Task = require('../../entity/task').Entity
const Tag = require('../../entity/tag').Entity

const ACL = require('../../lib/acl');
const dbFilter = require('../../lib/db-filter');
Expand Down
278 changes: 131 additions & 147 deletions core/controllers/workflow/crudv2.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,188 +3,172 @@ const ObjectId = require('mongoose').Types.ObjectId
const isMongoId = require('validator/lib/isMongoId')
const graphlib = require('graphlib')
const App = require('../../app')
const TopicsConstants = require('../../constants/topics')
const logger = require('../../lib/logger')('controller:workflow')
const audit = require('../../lib/audit')
const router = require('../../router')
// const audit = require('../../lib/audit')
const Workflow = require('../../entity/workflow').Workflow
const Task = require('../../entity/task').Entity
const Tag = require('../../entity/tag').Entity

const ACL = require('../../lib/acl')
const dbFilter = require('../../lib/db-filter')
//const logger = require('../../lib/logger')('controller:workflow:crudv2')
const AsyncController = require('../../lib/async-controller')
const { ClientError, ServerError } = require('../../lib/error-handler')

/**
*
* @method POST
*
*/
const create = async (req, res, next) => {
const { customer, user, body } = req

if (body.graph.nodes.length === 0) {
throw new ClientError('invalid graph definition')
}

const workflow = new Workflow(
Object.assign({}, body, {
_type: 'Workflow',
customer: customer.id,
customer_id: customer.id,
user: user.id,
user_id: user.id,
version: 2
})
)

const payload = { workflow, customer, user, body }
const { tasks, graph } = await createTasks(payload)

// updated grph with created tasks ids
workflow.graph = graph
await workflow.save()
module.exports = {
/**
*
* @method POST
*
*/
create: AsyncController(async (req, res, next) => {
const { customer, user, body } = req

if (body.graph.nodes.length === 0) {
throw new ClientError('invalid graph definition')
}

//createTags(req)
req.workflow = workflow
return workflow
}
const workflow = new App.Models.Workflow.Workflow(
Object.assign({}, body, {
_type: 'Workflow',
customer: customer.id,
customer_id: customer.id,
user: user.id,
user_id: user.id,
version: 2
})
)

/**
*
* @method PUT
*
*/
const replace = async (req, res, next) => {
const { workflow, customer, user, body } = req
const { graph, tasks, start_task_id } = req.body

if (body.graph.nodes.length === 0) {
throw new ClientError('invalid graph definition')
}
const payload = { workflow, customer, user, body }
const { tasks, graph } = await createTasks(payload)

// updated grph with created tasks ids
workflow.graph = graph
await workflow.save()

//createTags(req)
req.workflow = workflow
return workflow
}),
/**
*
* @method DELETE
*
*/
remove: AsyncController(async (req) => {
const { workflow, body } = req

await Promise.all([
workflow.remove(),
App.Models.Job.Workflow.deleteMany({ workflow_id: workflow._id.toString() }),
App.scheduler.unscheduleWorkflow(workflow),
removeWorkflowTasks(workflow, body?.keepTasks)
])

return
}),
/**
*
* @method PUT
*
*/
replace: AsyncController(async (req, res, next) => {
const { workflow, customer, user, body } = req
const { graph, tasks, start_task_id } = req.body

if (body.graph.nodes.length === 0) {
throw new ClientError('invalid graph definition')
}

const oldgraphlib = graphlib.json.read(workflow.graph)

const checkRemovedNodes = async () => {
const newgraph = graphlib.json.read(graph)
// update removed nodes
const oldNodes = oldgraphlib.nodes()
for (let id of oldNodes) {
const node = newgraph.node(id)
if (!node) { // it is not in the workflow no more
const oldNode = oldgraphlib.node(id)
if (workflow.version === 2) {
await App.task.destroy(oldNode.id)
} else {
await App.task.unlinkTaskFromWorkflow(oldNode.id)
const oldgraphlib = graphlib.json.read(workflow.graph)

const checkRemovedNodes = async () => {
const newgraph = graphlib.json.read(graph)
// update removed nodes
const oldNodes = oldgraphlib.nodes()
for (let id of oldNodes) {
const node = newgraph.node(id)
if (!node) { // it is not in the workflow no more
const oldNode = oldgraphlib.node(id)
if (workflow.version === 2) {
await App.task.destroy(oldNode.id)
} else {
await App.task.unlinkTaskFromWorkflow(oldNode.id)
}
}
}
}
}

const checkCreatedNodes = async (graph) => {
// add new nodes
for (let node of graph.nodes) {
if (!oldgraphlib.node(node.v)) { // is not in the workflow

const props = tasks.find(task => {
return (task.id === node.value.id)
})
const checkCreatedNodes = async (graph) => {
// add new nodes
for (let node of graph.nodes) {
if (!oldgraphlib.node(node.v)) { // is not in the workflow

const model = await App.task.factory(
Object.assign({}, props, {
customer_id: customer._id,
customer: customer,
user: user,
user_id: user.id,
workflow_id: workflow._id,
workflow: workflow,
id: undefined
const props = tasks.find(task => {
return (task.id === node.value.id)
})
)

if (props.id === start_task_id) {
workflow.start_task = model._id
workflow.start_task_id = model._id
}

// update node and edges
const newid = model._id.toString()

// first: update the edges.
for (let edge of graph.edges) {
const eventName = edge.value

if (edge.v === node.v) {
edge.v = newid

await createTaskEvent({
const model = await App.task.factory(
Object.assign({}, props, {
customer_id: customer._id,
name: eventName,
task_id: model._id
customer: customer,
user: user,
user_id: user.id,
workflow_id: workflow._id,
workflow: workflow,
id: undefined
})
)

if (props.id === start_task_id) {
workflow.start_task = model._id
workflow.start_task_id = model._id
}

if (edge.w === node.v) {
edge.w = newid
// update node and edges
const newid = model._id.toString()

// first: update the edges.
for (let edge of graph.edges) {
const eventName = edge.value

if (edge.v === node.v) {
edge.v = newid

if (isMongoId(edge.v)) {
const task_id = ObjectId(edge.v)
await createTaskEvent({
customer_id: customer._id,
name: eventName,
task_id
task_id: model._id
})
}

if (edge.w === node.v) {
edge.w = newid

if (isMongoId(edge.v)) {
const task_id = ObjectId(edge.v)
await createTaskEvent({
customer_id: customer._id,
name: eventName,
task_id
})
}
}
}
}

// second: update the node.
node.value.id = node.v = newid
// second: update the node.
node.value.id = node.v = newid
}
}
}
}

await checkRemovedNodes()
await checkCreatedNodes(graph)
await checkRemovedNodes()
await checkCreatedNodes(graph)

workflow.set(body)
await workflow.save()

createTags(req)
return workflow
}
workflow.set(body)
await workflow.save()

/**
*
* @method DELETE
*
*/
const remove = async (req) => {
const { workflow, body } = req

await Promise.all([
workflow.remove(),
App.Models.Job.Workflow.deleteMany({ workflow_id: workflow._id.toString() }),
App.scheduler.unscheduleWorkflow(workflow),
removeWorkflowTasks(workflow, body?.keepTasks)
])

return
}

module.exports = {
create: AsyncController(create),
remove: AsyncController(remove),
replace: AsyncController(replace)
createTags(req)
return workflow
})
}

const createTags = ({ body, customer }) => {
const tags = body.tags
if (tags && Array.isArray(tags)) {
Tag.create(tags, customer)
App.Models.Tag.Tag.create(tags, customer)
}
}

Expand Down
21 changes: 1 addition & 20 deletions core/lib/async-controller.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
const App = require('../app')
const { ServerError, ClientError } = require('./error-handler')

module.exports = (fn, options = {}) => {

const controller = async (req, res, next) => {
try {
const result = await fn(req, res)
Expand All @@ -19,24 +19,5 @@ module.exports = (fn, options = {}) => {
}
}

//const transactionalController = async (req, res, next) => {
// const db = App.db
// const session = await db.startSession()
// req.db_session = session
// try {
// session.startTransaction()
// const result = await fn(req, res)
// await session.commitTransaction()
// const body = (result||'ok')
// res.send(body)
// next()
// } catch (err) {
// await session.abortTransaction()
// res.sendError(err)
// }
// session.endSession()
//}

//return (options.withTransaction === true) ? transactionalController : controller
return controller
}
2 changes: 1 addition & 1 deletion local.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@

if [ -z "${DEBUG}" ]; then
export DEBUG="*eye*"
export DEBUG="*eye*err*"
fi

if [ -z "${NODE_ENV}" ]; then
Expand Down

0 comments on commit 416844c

Please sign in to comment.