-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrecord_processor.go
More file actions
127 lines (109 loc) · 3.95 KB
/
record_processor.go
File metadata and controls
127 lines (109 loc) · 3.95 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package kinesis2sse
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/embano1/memlog"
kc "github.com/vmware/vmware-go-kcl-v2/clientlibrary/interfaces"
)
// NOTE(mroberts): I took this from
//
// https://github.com/vmware/vmware-go-kcl-v2/blob/main/test/worker_test.go
//
func recordProcessorFactory(ml *memlog.Log, t2o *Timestamp2Offset, logger *slog.Logger) kc.IRecordProcessorFactory {
return &dumpRecordProcessorFactory{
ml: ml,
t2o: t2o,
logger: logger,
}
}
type dumpRecordProcessorFactory struct {
ml *memlog.Log
t2o *Timestamp2Offset
logger *slog.Logger // required
}
func (d *dumpRecordProcessorFactory) CreateProcessor() kc.IRecordProcessor {
return &dumpRecordProcessor{
ml: d.ml,
t2o: d.t2o,
logger: d.logger,
}
}
type dumpRecordProcessor struct {
ml *memlog.Log
t2o *Timestamp2Offset
logger *slog.Logger // required
}
func (dd *dumpRecordProcessor) Initialize(input *kc.InitializationInput) {
dd.logger.Debug(fmt.Sprintf("Processing ShardId: %v at checkpoint: %v", input.ShardId, aws.ToString(input.ExtendedSequenceNumber.SequenceNumber)))
}
func (dd *dumpRecordProcessor) ProcessRecords(input *kc.ProcessRecordsInput) {
// don't process empty record
if len(input.Records) == 0 {
return
}
dd.t2o.Lock()
for _, v := range input.Records {
var awsEvent map[string]any
var err error
if err = json.Unmarshal(v.Data, &awsEvent); err != nil {
dd.logger.Warn("Skipping an event due to un-parseable JSON", "err", err)
continue
}
timestampString, ok := awsEvent["time"].(string)
if !ok {
dd.logger.Warn(`Skipping an event due to missing "time" key`)
continue
}
var timestamp time.Time
if timestamp, err = time.Parse(time.RFC3339, timestampString); err != nil {
dd.logger.Warn(`Skipping an event due to un-parseable "time" key`, "err", err)
continue
}
cloudEvent, ok := awsEvent["detail"]
if !ok {
dd.logger.Warn(`Skipping an event due to missing "detail" key`)
continue
}
bytes, err := json.Marshal(cloudEvent)
if err != nil {
dd.logger.Error(`Skipping an event because we were unable to marshal it to JSON`, "err", err)
continue
}
off, err := dd.ml.Write(context.Background(), bytes)
if err != nil {
dd.logger.Error(`Skipping an event because we were unable to write it to the memlog`, "err", err)
continue
}
if err = dd.t2o.Add(int(off), timestamp); err != nil {
// NOTE(mroberts): If we get an error here, it's really a programming error.
dd.logger.Error("Incorrect usage of Timestamp2Offset. Programming error or memory corruption? Exiting!", "err", err)
panic(err)
}
}
dd.t2o.Unlock()
// checkpoint it after processing this batch.
// Especially, for processing de-aggregated KPL records, checkpointing has to happen at the end of batch
// because de-aggregated records share the same sequence number.
lastRecordSequenceNumber := input.Records[len(input.Records)-1].SequenceNumber
// Calculate the time taken from polling records and delivering to record processor for a batch.
if input.CacheEntryTime != nil {
diff := input.CacheExitTime.Sub(*input.CacheEntryTime)
dd.logger.Debug(fmt.Sprintf("Checkpoint progress at: %v, MillisBehindLatest = %v, KCLProcessTime = %v", lastRecordSequenceNumber, input.MillisBehindLatest, diff))
}
if input.Checkpointer != nil {
_ = input.Checkpointer.Checkpoint(lastRecordSequenceNumber)
}
}
func (dd *dumpRecordProcessor) Shutdown(input *kc.ShutdownInput) {
dd.logger.Info(fmt.Sprintf("Shutdown Reason: %v", aws.ToString(kc.ShutdownReasonMessage(input.ShutdownReason))))
// When the value of {@link ShutdownInput#getShutdownReason()} is
// {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason#TERMINATE} it is required that you
// checkpoint. Failure to do so will result in an IllegalArgumentException, and the KCL no longer making progress.
if input.ShutdownReason == kc.TERMINATE {
_ = input.Checkpointer.Checkpoint(nil)
}
}