-
Notifications
You must be signed in to change notification settings - Fork 63
/
emitTaskEvents.js
164 lines (129 loc) · 4.46 KB
/
emitTaskEvents.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
const CronJob = require('cron').CronJob
const Redlock = require('redlock')
const _ = require('lodash')
const apm = require('elastic-apm-node')
const { logError } = require('../../logger')
const {
getRedisClient,
getAllStelaceTasks,
didStelaceTaskExecute,
addStelaceTaskExecutionDate
} = require('../redis')
const {
getRoundedDate,
computeRecurringDates,
computeDate
} = require('../util/time')
let eventRequester
let client
let redlock
// get date rounded to nearest minute
// WARNING: MUST match cron job interval, adjust it if the cron interval changes
const nbMinutes = 1
const randomSecond = _.random(0, 20) // random second that will be rounded to inferior minute
const job = new CronJob(
`${randomSecond} * * * * *`, // check every minute
emitTaskEvents
)
// create a lock time so another server can claim the lock after that duration
// even if the server that has the lock crashes
const lockTtl = nbMinutes * 60 * 1000 // milliseconds
async function emitTaskEvents () {
let fetchEventsTransaction = apm.startTransaction('Fetch task events to emit via cron')
try {
// use ref date because cron job cannot trigger at the specified time (with 0 millisecond)
const refDate = getRoundedDate(new Date(), nbMinutes)
const taskConfigs = await getAllStelaceTasks()
const filteredTaskConfigs = filterTasks(taskConfigs, refDate, nbMinutes)
fetchEventsTransaction.end()
fetchEventsTransaction = null // set null to prevent stopping a second time in the finally block
for (let i = 0; i < filteredTaskConfigs.length; i++) {
const taskConfig = filteredTaskConfigs[i]
const { platformId, env, task } = taskConfig
const emitEventTransaction = apm.startTransaction('Emit task event via cron')
apm.setUserContext({ id: platformId })
apm.addLabels({ env, platformId, eventType: task.eventType })
apm.setCustomContext({ taskId: task.id })
try {
// use redlock to ensure the cron process is handled only by one server at a time
// even within a distributed system
const lockResource = `locks:stelace_tasks:${task.id}_${refDate}`
const lock = await redlock.lock(lockResource, lockTtl)
const alreadyExecuted = await didStelaceTaskExecute({ taskId: task.id, executionDate: refDate })
if (!alreadyExecuted) {
await addStelaceTaskExecutionDate({ taskId: task.id, executionDate: refDate })
await emitTaskEvent({ platformId, env, task })
}
await lock.unlock()
} catch (err) {
if (err.name !== 'LockError') {
logError(err, { platformId, env, message: 'Fail to emit task event' })
}
} finally {
emitEventTransaction.end()
}
}
} catch (err) {
logError(err, { message: 'Fail to load Stelace tasks' })
} finally {
fetchEventsTransaction && fetchEventsTransaction.end()
}
}
function filterTasks (taskConfigs, refDate, nbMinutes) {
return taskConfigs.filter(taskConfig => {
const invalidConfig = !taskConfig.platformId ||
!taskConfig.env ||
!taskConfig.task
if (invalidConfig) return false
const { task } = taskConfig
if (!task.active) return false
// if task date matches exactly the ref date, then it's time to trigger the task event
const isRecurringTask = !!task.recurringPattern
if (isRecurringTask) {
const intervalSeconds = nbMinutes * 30
const computedRecurringDates = computeRecurringDates(task.recurringPattern, {
startDate: computeDate(refDate, { s: -intervalSeconds }),
endDate: computeDate(refDate, { s: intervalSeconds }),
timezone: task.recurringTimezone
})
return computedRecurringDates.includes(refDate)
} else {
return task.executionDate === refDate
}
})
}
async function emitTaskEvent ({ platformId, env, task }) {
await eventRequester.send({
type: 'create',
platformId,
env,
emitter: 'task',
emitterId: task.id,
eventType: task.eventType,
objectId: task.eventObjectId,
metadata: task.eventMetadata
})
}
function start ({ communication }) {
const { getRequester } = communication
eventRequester = getRequester({
name: 'Emit task event cron > Event Requester',
key: 'event'
})
if (!client) {
client = getRedisClient()
}
if (!redlock) {
redlock = new Redlock([client], { retryCount: 3 })
}
job.start()
}
function stop () {
eventRequester.close()
eventRequester = null
job.stop()
}
module.exports = {
start,
stop
}