-
Notifications
You must be signed in to change notification settings - Fork 0
/
KinesisRecordReader.js
38 lines (33 loc) · 2.43 KB
/
KinesisRecordReader.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import { v4 as uuidv4 } from 'uuid';
import { RecordParseHint } from '../model/RecordParseHint.js';
import { RecordHintType } from '../model/RecordHintType.js';
import { ParseDocumentResult } from '../model/ParseDocumentResult.js';
import { ParseDocumentResultStatus } from '../model/ParseDocumentResultStatus.js';
import { Document } from '../../documents/Document.js';
import { DocumentType } from '../../documents/DocumentType.js';
import { ErrorDoc } from '../../documents/ErrorDoc.js';
import { logger } from '../../../letsdata_utils/logging_utils.js';
export class KinesisRecordReader {
constructor() {
}
/**
The Implementation simply echoes the incoming record. You could add custom logic as needed.
*/
parseMessage(streamArn, shardId, partitionKey, sequenceNumber, approximateArrivalTimestamp, data) {
if (!data || data.length <= 0) {
logger.error(`record data is null or empty, returning error - streamArn: ${streamArn}, shardId: ${shardId}, partitionKey: ${partitionKey}, sequenceNumber: ${sequenceNumber}, approximateArrivalTimestamp: ${approximateArrivalTimestamp}, data: ${data}`);
const errorDoc = new ErrorDoc(uuidv4(), "KINESIS_ERROR", partitionKey, {}, {}, { "sequenceNumber": sequenceNumber }, { "sequenceNumber": sequenceNumber }, "empty message body");
return new ParseDocumentResult(null, errorDoc, ParseDocumentResultStatus.ERROR);
}
try {
logger.debug(`processing record - sequenceNumber: ${sequenceNumber}`);
const keyValuesMap = JSON.parse(new TextDecoder().decode(data));
logger.debug(`returning success - docId: ${keyValuesMap.documentId}`);
return new ParseDocumentResult(null, new Document(DocumentType.Document, keyValuesMap.documentId, "DOCUMENT", partitionKey, {}, keyValuesMap), ParseDocumentResultStatus.SUCCESS);
} catch (ex) {
logger.error(`Exception in reading the document - streamArn: ${streamArn}, shardId: ${shardId}, partitionKey: ${partitionKey}, sequenceNumber: ${sequenceNumber}, approximateArrivalTimestamp: ${approximateArrivalTimestamp}, data: ${data}, ex: ${ex}`);
const errorDoc = new ErrorDoc(uuidv4(), "KINESIS_ERROR", partitionKey, {}, {}, { "sequenceNumber": sequenceNumber }, { "sequenceNumber": sequenceNumber }, `Exception - ${ex}`);
return new ParseDocumentResult(null, errorDoc, ParseDocumentResultStatus.ERROR);
}
}
}