-
Notifications
You must be signed in to change notification settings - Fork 0
/
consumer.js
51 lines (42 loc) · 1.61 KB
/
consumer.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
var kcl = require('kinesis-client-library')
var newlineBuffer = new Buffer('\n')
kcl.AbstractConsumer.extend({
// create places to hold some data about the consumer
initialize: function (done) {
this.cachedRecords = []
this.cachedRecordsSize = 0
// This MUST be called or processing will never start
// That is really really really bad
done()
},
processRecords: function (records, done) {
// Put each record into our list of cached records (separated by newlines) and update the size
records.forEach(function (record) {
// this.cachedRecords.push(record.Data)
// this.cachedRecords.push(newlineBuffer)
// this.cachedRecordsSize += record.Data.length + newlineBuffer.length
console.log(record.Data.toString('utf8'))
})
done(null, true) // tells the database that these records are processed
// done(null, false) to say that the message wasn't handled and someone else needs to
// // not very good for performance
// var shouldCheckpoint = this.cachedRecordsSize > 50000000
// // Get more records, but not save a checkpoint
// if (!shouldCheckpoint) return done()
// // Upload the records to S3
// s3.putObject(
// {
// Bucket: 'my-bucket-name',
// Key: 'path/to/records/' + Date.now(),
// Body: Buffer.concat(this.cachedRecords)
// },
// function (err) {
// if (err) return done(err)
// this.cachedRecords = []
// this.cachedRecordsSize = 0
// // Pass `true` to checkpoint the latest record we've received
// done(null, true)
// }.bind(this)
// )
}
})