/
csv_processor.js
48 lines (36 loc) · 1.17 KB
/
csv_processor.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
const csv = require('csv-parser');
const fs = require('fs');
const SuperLoop = require('../index');
const stats = require('measured-core').createCollection();
async function main() {
const intv = setInterval(function () {
console.log(JSON.stringify(stats, null, 2));
}, 1000);
try {
const upstream = fs.createReadStream('./examples/csv_sample.csv').pipe(csv())
const consumer = (data) => {
stats.meter('consumerTps').mark();
// processing ...
// console.log(data)
// calling another API to process csv records
//throw new Error('processing error');
}
const loop = new SuperLoop();
const lastFor = 120_000;
loop.on('warn', (err) => {
console.error(err);
});
await loop.pipeFrom(upstream)
.consumedBy(consumer)
.concurrency(200)
.rate(1000)
//.repeat(100)
.until(Date.now() + lastFor)
.exec();
console.log('loop ends')
} catch (e) {
console.error('something went wrong', e)
}
clearInterval(intv);
}
main().catch(console.error)