-
Notifications
You must be signed in to change notification settings - Fork 10
/
run.ts
116 lines (100 loc) · 3.78 KB
/
run.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import createDebugger from 'debug';
import { Job } from '.';
import { JobError } from '../utils';
const debug = createDebugger('pulse:job');
export type RunMethod = () => Promise<Job>;
/**
* Internal method (RUN)
* @name Job#run
* @function
*/
export const run: RunMethod = async function (this: Job) {
const { pulse } = this;
const definition = pulse._definitions[this.attrs.name];
return new Promise(async (resolve, reject) => {
this.attrs.lastRunAt = new Date();
const previousRunAt = this.attrs.nextRunAt;
debug('[%s:%s] setting lastRunAt to: %s', this.attrs.name, this.attrs._id, this.attrs.lastRunAt.toISOString());
this.computeNextRunAt();
await this.save();
let finished = false;
let resumeOnRestartSkipped = false;
const jobCallback = async (error?: Error, result?: unknown) => {
// We don't want to complete the job multiple times
if (finished) {
return;
}
finished = true;
if (error) {
this.fail(error);
} else {
if (!resumeOnRestartSkipped) {
this.attrs.lastFinishedAt = new Date();
this.attrs.finishedCount = (this.attrs.finishedCount || 0) + 1;
if (this.attrs.shouldSaveResult && result) {
this.attrs.result = result;
}
}
}
this.attrs.lockedAt = null;
await this.save().catch((error: Error) => {
debug('[%s:%s] failed to be saved to MongoDB', this.attrs.name, this.attrs._id);
reject(error);
});
debug('[%s:%s] was saved successfully to MongoDB', this.attrs.name, this.attrs._id);
if (error) {
pulse.emit('fail', error, this);
pulse.emit('fail:' + this.attrs.name, error, this);
debug('[%s:%s] has failed [%s]', this.attrs.name, this.attrs._id, error.message);
} else {
pulse.emit('success', this);
pulse.emit('success:' + this.attrs.name, this);
debug('[%s:%s] has succeeded', this.attrs.name, this.attrs._id);
}
pulse.emit('complete', this);
pulse.emit('complete:' + this.attrs.name, this);
debug(
'[%s:%s] job finished at [%s] and was unlocked',
this.attrs.name,
this.attrs._id,
this.attrs.lastFinishedAt
);
// Curiously, we still resolve successfully if the job processor failed.
// Pulse is not equipped to handle errors originating in user code, so, we leave them to inspect the side-effects of job.fail()
resolve(this);
};
try {
pulse.emit('start', this);
pulse.emit('start:' + this.attrs.name, this);
debug('[%s:%s] starting job', this.attrs.name, this.attrs._id);
if (!definition) {
debug('[%s:%s] has no definition, can not run', this.attrs.name, this.attrs._id);
throw new JobError('Undefined job');
}
// on restart, skip the job if it's not time to run
if (
!this.pulse._resumeOnRestart &&
previousRunAt &&
this.pulse._readyAt >= previousRunAt &&
this.attrs.nextRunAt
) {
debug('[%s:%s] job resumeOnRestart skipped', this.attrs.name, this.attrs._id);
resumeOnRestartSkipped = true;
await jobCallback(undefined, 'skipped');
return;
}
this.attrs.runCount = (this.attrs.runCount || 0) + 1;
if (definition.fn.length === 2) {
debug('[%s:%s] process function being called', this.attrs.name, this.attrs._id);
await definition.fn(this, jobCallback);
} else {
debug('[%s:%s] process function being called', this.attrs.name, this.attrs._id);
const result = await definition.fn(this);
await jobCallback(undefined, result);
}
} catch (error) {
debug('[%s:%s] unknown error occurred', this.attrs.name, this.attrs._id);
await jobCallback(error as Error);
}
});
};