Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workflow fixes #9

Merged
merged 5 commits into from
Dec 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ Components Control. Can be configured to do one o more things (or nothing)
| Variable Name | Usage |
| ----- | ----- |
| COMMANDER_DISABLED | disable internal commander api |
| MONITORING_DISABLED | disable monitoring system. system monitors will not be checked anymore. will only change when bots and agents send updates |
| MONITORING_DISABLED | disable monitoring system. The core will stop checking monitors status. Events for monitor status will be triggered on API updates |
| API_DISABLED | disable rest api |
| SCHEDULER_JOBS_DISABLED | disable internal scheduler execution. scheduler-jobs will be created using the rest api but task will never be executed. theeye-jobs execution timeout will be never checked. |
| SCHEDULER_JOBS_DISABLED | disable internal scheduler execution. scheduled jobs can be created but jobs will not be created. |

### Start development sample

Expand Down
3 changes: 0 additions & 3 deletions core/controllers/workflow/crud.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@ const logger = require('../../lib/logger')('controller:workflow')
const audit = require('../../lib/audit')
const router = require('../../router')
// const audit = require('../../lib/audit')
const Workflow = require('../../entity/workflow').Workflow
const Task = require('../../entity/task').Entity
const Tag = require('../../entity/tag').Entity

const ACL = require('../../lib/acl');
const dbFilter = require('../../lib/db-filter');
Expand Down
278 changes: 131 additions & 147 deletions core/controllers/workflow/crudv2.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,188 +3,172 @@ const ObjectId = require('mongoose').Types.ObjectId
const isMongoId = require('validator/lib/isMongoId')
const graphlib = require('graphlib')
const App = require('../../app')
const TopicsConstants = require('../../constants/topics')
const logger = require('../../lib/logger')('controller:workflow')
const audit = require('../../lib/audit')
const router = require('../../router')
// const audit = require('../../lib/audit')
const Workflow = require('../../entity/workflow').Workflow
const Task = require('../../entity/task').Entity
const Tag = require('../../entity/tag').Entity

const ACL = require('../../lib/acl')
const dbFilter = require('../../lib/db-filter')
//const logger = require('../../lib/logger')('controller:workflow:crudv2')
const AsyncController = require('../../lib/async-controller')
const { ClientError, ServerError } = require('../../lib/error-handler')

/**
*
* @method POST
*
*/
const create = async (req, res, next) => {
const { customer, user, body } = req

if (body.graph.nodes.length === 0) {
throw new ClientError('invalid graph definition')
}

const workflow = new Workflow(
Object.assign({}, body, {
_type: 'Workflow',
customer: customer.id,
customer_id: customer.id,
user: user.id,
user_id: user.id,
version: 2
})
)

const payload = { workflow, customer, user, body }
const { tasks, graph } = await createTasks(payload)

// updated grph with created tasks ids
workflow.graph = graph
await workflow.save()
module.exports = {
/**
*
* @method POST
*
*/
create: AsyncController(async (req, res, next) => {
const { customer, user, body } = req

if (body.graph.nodes.length === 0) {
throw new ClientError('invalid graph definition')
}

//createTags(req)
req.workflow = workflow
return workflow
}
const workflow = new App.Models.Workflow.Workflow(
Object.assign({}, body, {
_type: 'Workflow',
customer: customer.id,
customer_id: customer.id,
user: user.id,
user_id: user.id,
version: 2
})
)

/**
*
* @method PUT
*
*/
const replace = async (req, res, next) => {
const { workflow, customer, user, body } = req
const { graph, tasks, start_task_id } = req.body

if (body.graph.nodes.length === 0) {
throw new ClientError('invalid graph definition')
}
const payload = { workflow, customer, user, body }
const { tasks, graph } = await createTasks(payload)

// updated grph with created tasks ids
workflow.graph = graph
await workflow.save()

//createTags(req)
req.workflow = workflow
return workflow
}),
/**
*
* @method DELETE
*
*/
remove: AsyncController(async (req) => {
const { workflow, body } = req

await Promise.all([
workflow.remove(),
App.Models.Job.Workflow.deleteMany({ workflow_id: workflow._id.toString() }),
App.scheduler.unscheduleWorkflow(workflow),
removeWorkflowTasks(workflow, body?.keepTasks)
])

return
}),
/**
*
* @method PUT
*
*/
replace: AsyncController(async (req, res, next) => {
const { workflow, customer, user, body } = req
const { graph, tasks, start_task_id } = req.body

if (body.graph.nodes.length === 0) {
throw new ClientError('invalid graph definition')
}

const oldgraphlib = graphlib.json.read(workflow.graph)

const checkRemovedNodes = async () => {
const newgraph = graphlib.json.read(graph)
// update removed nodes
const oldNodes = oldgraphlib.nodes()
for (let id of oldNodes) {
const node = newgraph.node(id)
if (!node) { // it is not in the workflow no more
const oldNode = oldgraphlib.node(id)
if (workflow.version === 2) {
await App.task.destroy(oldNode.id)
} else {
await App.task.unlinkTaskFromWorkflow(oldNode.id)
const oldgraphlib = graphlib.json.read(workflow.graph)

const checkRemovedNodes = async () => {
const newgraph = graphlib.json.read(graph)
// update removed nodes
const oldNodes = oldgraphlib.nodes()
for (let id of oldNodes) {
const node = newgraph.node(id)
if (!node) { // it is not in the workflow no more
const oldNode = oldgraphlib.node(id)
if (workflow.version === 2) {
await App.task.destroy(oldNode.id)
} else {
await App.task.unlinkTaskFromWorkflow(oldNode.id)
}
}
}
}
}

const checkCreatedNodes = async (graph) => {
// add new nodes
for (let node of graph.nodes) {
if (!oldgraphlib.node(node.v)) { // is not in the workflow

const props = tasks.find(task => {
return (task.id === node.value.id)
})
const checkCreatedNodes = async (graph) => {
// add new nodes
for (let node of graph.nodes) {
if (!oldgraphlib.node(node.v)) { // is not in the workflow

const model = await App.task.factory(
Object.assign({}, props, {
customer_id: customer._id,
customer: customer,
user: user,
user_id: user.id,
workflow_id: workflow._id,
workflow: workflow,
id: undefined
const props = tasks.find(task => {
return (task.id === node.value.id)
})
)

if (props.id === start_task_id) {
workflow.start_task = model._id
workflow.start_task_id = model._id
}

// update node and edges
const newid = model._id.toString()

// first: update the edges.
for (let edge of graph.edges) {
const eventName = edge.value

if (edge.v === node.v) {
edge.v = newid

await createTaskEvent({
const model = await App.task.factory(
Object.assign({}, props, {
customer_id: customer._id,
name: eventName,
task_id: model._id
customer: customer,
user: user,
user_id: user.id,
workflow_id: workflow._id,
workflow: workflow,
id: undefined
})
)

if (props.id === start_task_id) {
workflow.start_task = model._id
workflow.start_task_id = model._id
}

if (edge.w === node.v) {
edge.w = newid
// update node and edges
const newid = model._id.toString()

// first: update the edges.
for (let edge of graph.edges) {
const eventName = edge.value

if (edge.v === node.v) {
edge.v = newid

if (isMongoId(edge.v)) {
const task_id = ObjectId(edge.v)
await createTaskEvent({
customer_id: customer._id,
name: eventName,
task_id
task_id: model._id
})
}

if (edge.w === node.v) {
edge.w = newid

if (isMongoId(edge.v)) {
const task_id = ObjectId(edge.v)
await createTaskEvent({
customer_id: customer._id,
name: eventName,
task_id
})
}
}
}
}

// second: update the node.
node.value.id = node.v = newid
// second: update the node.
node.value.id = node.v = newid
}
}
}
}

await checkRemovedNodes()
await checkCreatedNodes(graph)
await checkRemovedNodes()
await checkCreatedNodes(graph)

workflow.set(body)
await workflow.save()

createTags(req)
return workflow
}
workflow.set(body)
await workflow.save()

/**
*
* @method DELETE
*
*/
const remove = async (req) => {
const { workflow, body } = req

await Promise.all([
workflow.remove(),
App.Models.Job.Workflow.deleteMany({ workflow_id: workflow._id.toString() }),
App.scheduler.unscheduleWorkflow(workflow),
removeWorkflowTasks(workflow, body?.keepTasks)
])

return
}

module.exports = {
create: AsyncController(create),
remove: AsyncController(remove),
replace: AsyncController(replace)
createTags(req)
return workflow
})
}

const createTags = ({ body, customer }) => {
const tags = body.tags
if (tags && Array.isArray(tags)) {
Tag.create(tags, customer)
App.Models.Tag.Tag.create(tags, customer)
}
}

Expand Down
21 changes: 1 addition & 20 deletions core/lib/async-controller.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
const App = require('../app')
const { ServerError, ClientError } = require('./error-handler')

module.exports = (fn, options = {}) => {

const controller = async (req, res, next) => {
try {
const result = await fn(req, res)
Expand All @@ -19,24 +19,5 @@ module.exports = (fn, options = {}) => {
}
}

//const transactionalController = async (req, res, next) => {
// const db = App.db
// const session = await db.startSession()
// req.db_session = session
// try {
// session.startTransaction()
// const result = await fn(req, res)
// await session.commitTransaction()
// const body = (result||'ok')
// res.send(body)
// next()
// } catch (err) {
// await session.abortTransaction()
// res.sendError(err)
// }
// session.endSession()
//}

//return (options.withTransaction === true) ? transactionalController : controller
return controller
}
2 changes: 1 addition & 1 deletion local.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@

if [ -z "${DEBUG}" ]; then
export DEBUG="*eye*"
export DEBUG="*eye*err*"
fi

if [ -z "${NODE_ENV}" ]; then
Expand Down