Skip to content

Commit

Permalink
Retry sessions
Browse files Browse the repository at this point in the history
IBM/sarama#1685
based on suggestion here
  • Loading branch information
alok87 committed Mar 4, 2021
1 parent bbadc3e commit 5cc9052
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 52 deletions.
3 changes: 2 additions & 1 deletion redshiftsink/pkg/kafka/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,13 +246,14 @@ func (c *Manager) Consume(ctx context.Context, wg *sync.WaitGroup) {
// Consume ultimately calls ConsumeClaim for every topic partition
err := c.consumerGroup.Consume(ctx, topics)
if err != nil {
klog.Fatalf("Error from consumer: %v", err)
klog.Errorf("Error from consumer handler: %v", err)
}
// check if context was cancelled, the consumer should stop
if ctx.Err() != nil {
klog.V(2).Infof("Manager: %s, Context cancelled", c.consumerGroupID)
return
}

klog.V(2).Infof(
"Completed ConsumeClaim for (%s), I will rerun\n",
c.consumerGroupID,
Expand Down
37 changes: 19 additions & 18 deletions redshiftsink/pkg/redshiftbatcher/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,26 +177,22 @@ func removeEmptyNullValues(value map[string]*string) map[string]*string {
return value
}

func (b *batchProcessor) ctxCancelled(ctx context.Context) bool {
func (b *batchProcessor) ctxCancelled(ctx context.Context) error {
select {
case <-ctx.Done():
err := ctx.Err()
klog.Warningf("Processing stopped! main ctx done, ctxErr: %v", err)
klog.Warningf(
"%s, batchId:%d, lastCommitted:%d: main ctx done. Cancelled.\n",
b.topic, b.batchId, b.lastCommittedOffset,
)
return true
return fmt.Errorf("Processing stopped! main ctx done (recreate), ctxErr: %v", ctx.Err())
case <-b.session.Context().Done():
err := ctx.Err()
klog.Warningf("Processing stopped! ctx done, ctxErr: %v", err)
klog.Warningf(
"%s, batchId:%d, lastCommitted:%d: session ctx done. Cancelled.\n",
b.topic, b.batchId, b.lastCommittedOffset,
)
return true
return fmt.Errorf("Processing stopped! session ctx done (recreate), ctxErr: %v", b.session.Context().Err())
default:
return false
return nil
}
}

Expand Down Expand Up @@ -403,47 +399,50 @@ func (b *batchProcessor) processMessage(message *serializer.Message, id int) {
func (b *batchProcessor) processBatch(
ctx context.Context,
msgBuf []*serializer.Message,
) bool {
) error {

b.s3Key = ""
for id, message := range msgBuf {
select {
case <-ctx.Done():
return false
return fmt.Errorf("Main context done, recreate, err: %v", ctx.Err())
case <-b.session.Context().Done():
return fmt.Errorf("Session context done, recreate, err: %v", b.session.Context().Err())
default:
b.processMessage(message, id)
}
}

return true
return nil
}

// Process implements serializer.MessageBatch
func (b *batchProcessor) Process(ctx context.Context, msgBuf []*serializer.Message) {
func (b *batchProcessor) Process(ctx context.Context, msgBuf []*serializer.Message) error {
now := time.Now()

b.setBatchId()
b.batchSchemaId = -1
b.skipMerge = true

if b.ctxCancelled(ctx) {
return
err := b.ctxCancelled(ctx)
if err != nil {
return err
}

klog.Infof("topic:%s, batchId:%d, size:%d: Processing...\n",
b.topic, b.batchId, len(msgBuf),
)

done := b.processBatch(ctx, msgBuf)
if !done {
err = b.processBatch(ctx, msgBuf)
if err != nil {
b.handleShutdown()
return
return err
}

klog.Infof("topic:%s, batchId:%d, size:%d: Uploading...\n",
b.topic, b.batchId, len(msgBuf),
)
err := b.s3sink.Upload(b.s3Key, b.bodyBuf)
err = b.s3sink.Upload(b.s3Key, b.bodyBuf)
if err != nil {
klog.Fatalf("Error writing to s3, err=%v\n", err)
}
Expand All @@ -469,4 +468,6 @@ func (b *batchProcessor) Process(ctx context.Context, msgBuf []*serializer.Messa
)

setBatchProcessingSeconds(now, b.topic)

return nil
}
21 changes: 17 additions & 4 deletions redshiftsink/pkg/redshiftbatcher/batcher_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func (h *batcherHandler) ConsumeClaim(session sarama.ConsumerGroupSession,
)

var lastSchemaId *int
var err error
processor := newBatchProcessor(
session,
claim.Topic(),
Expand Down Expand Up @@ -149,7 +150,10 @@ func (h *batcherHandler) ConsumeClaim(session sarama.ConsumerGroupSession,
"topic:%s: ConsumeClaim ending, hit",
claim.Topic(),
)
msgBatch.Process(h.ctx)
err = msgBatch.Process(h.ctx)
if err != nil {
return err
}
klog.V(2).Infof(
"ConsumeClaim ended for topic: %s, partition: %d (would rerun by manager)\n",
claim.Topic(),
Expand Down Expand Up @@ -193,19 +197,28 @@ func (h *batcherHandler) ConsumeClaim(session sarama.ConsumerGroupSession,
*lastSchemaId,
msg.SchemaId,
)
msgBatch.Process(h.ctx)
err = msgBatch.Process(h.ctx)
if err != nil {
return err
}
} else {
}
// Process the batch by size or insert in batch
msgBatch.Insert(h.ctx, msg)
err = msgBatch.Insert(h.ctx, msg)
if err != nil {
return err
}
*lastSchemaId = msg.SchemaId
case <-maxWaitTicker.C:
// Process the batch by time
klog.V(2).Infof(
"topic:%s: maxWaitSeconds hit",
claim.Topic(),
)
msgBatch.Process(h.ctx)
err = msgBatch.Process(h.ctx)
if err != nil {
return err
}
}
}
}
41 changes: 21 additions & 20 deletions redshiftsink/pkg/redshiftloader/load_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,26 +123,22 @@ func newLoadProcessor(
}
}

func (b *loadProcessor) ctxCancelled(ctx context.Context) bool {
func (b *loadProcessor) ctxCancelled(ctx context.Context) error {
select {
case <-ctx.Done():
err := ctx.Err()
klog.Warningf("Processing stopped! main ctx done, ctxErr: %v", err)
klog.Warningf(
"%s, batchId:%d, lastCommitted:%d: main ctx done. Cancelled.\n",
b.topic, b.batchId, b.lastCommittedOffset,
)
return true
return fmt.Errorf("Processing stopped! main ctx done (recreate), ctxErr: %v", ctx.Err())
case <-b.session.Context().Done():
err := ctx.Err()
klog.Warningf("Processing stopped! ctx done, ctxErr: %v", err)
klog.Warningf(
"%s, batchId:%d, lastCommitted:%d: session ctx done. Cancelled.\n",
b.topic, b.batchId, b.lastCommittedOffset,
)
return true
return fmt.Errorf("Processing stopped! session ctx done (recreate), ctxErr: %v", b.session.Context().Err())
default:
return false
return nil
}
}

Expand Down Expand Up @@ -181,8 +177,8 @@ func (b *loadProcessor) markOffset(msgBuf []*serializer.Message) {
}
}

// handleShutdown is mostly used to log the messages before going down
func (b *loadProcessor) handleShutdown() {
// printCurrentState is mostly used to log the messages before going down
func (b *loadProcessor) printCurrentState() {
klog.Infof(
"%s, batchId:%d: Batch processing gracefully shutdown.\n",
b.topic,
Expand Down Expand Up @@ -582,7 +578,7 @@ func (b *loadProcessor) migrateSchema(schemaId int, inputTable redshift.Table) {
func (b *loadProcessor) processBatch(
ctx context.Context,
msgBuf []*serializer.Message,
) bool {
) error {

if b.redshiftStats {
klog.V(2).Infof("dbstats: %+v\n", b.redshifter.Stats())
Expand All @@ -598,7 +594,9 @@ func (b *loadProcessor) processBatch(
for id, message := range msgBuf {
select {
case <-ctx.Done():
return false
return fmt.Errorf("Main context done, recreate, err: %v", ctx.Err())
case <-b.session.Context().Done():
return fmt.Errorf("Session context done, recreate, err: %v", b.session.Context().Err())
default:
job := StringMapToJob(message.Value.(map[string]interface{}))
schemaId = job.SchemaId
Expand Down Expand Up @@ -665,25 +663,26 @@ func (b *loadProcessor) processBatch(
klog.V(2).Infof("endbatch dbstats: %+v\n", b.redshifter.Stats())
}

return true
return nil
}

// Process implements serializer.MessageBatch
func (b *loadProcessor) Process(ctx context.Context, msgBuf []*serializer.Message) {
func (b *loadProcessor) Process(ctx context.Context, msgBuf []*serializer.Message) error {
start := time.Now()
b.setBatchId()
if b.ctxCancelled(ctx) {
return
err := b.ctxCancelled(ctx)
if err != nil {
return err
}

klog.Infof("%s, batchId:%d, size:%d: Processing...\n",
b.topic, b.batchId, len(msgBuf),
)

done := b.processBatch(ctx, msgBuf)
if !done {
b.handleShutdown()
return
err = b.processBatch(ctx, msgBuf)
if err != nil {
b.printCurrentState()
return err
}
b.markOffset(msgBuf)

Expand All @@ -699,4 +698,6 @@ func (b *loadProcessor) Process(ctx context.Context, msgBuf []*serializer.Messag
"%s, batchId:%d, size:%d, end:%d:, Processed in %s",
b.topic, b.batchId, len(msgBuf), b.batchEndOffset, timeTaken,
)

return nil
}
21 changes: 17 additions & 4 deletions redshiftsink/pkg/redshiftloader/loader_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func (h *loaderHandler) ConsumeClaim(session sarama.ConsumerGroupSession,
)

var lastSchemaId *int
var err error
processor := newLoadProcessor(
session,
claim.Topic(),
Expand Down Expand Up @@ -126,7 +127,10 @@ func (h *loaderHandler) ConsumeClaim(session sarama.ConsumerGroupSession,
"topic:%s: ConsumeClaim ending, hit",
claim.Topic(),
)
msgBatch.Process(h.ctx)
err = msgBatch.Process(h.ctx)
if err != nil {
return err
}
klog.V(2).Infof(
"ConsumeClaim ended for topic: %s, partition: %d (would rerun by manager)\n",
claim.Topic(),
Expand Down Expand Up @@ -167,18 +171,27 @@ func (h *loaderHandler) ConsumeClaim(session sarama.ConsumerGroupSession,
*lastSchemaId,
upstreamJobSchemaId,
)
msgBatch.Process(h.ctx)
err = msgBatch.Process(h.ctx)
if err != nil {
return err
}
}
// Process the batch by size or insert in batch
msgBatch.Insert(h.ctx, msg)
err = msgBatch.Insert(h.ctx, msg)
if err != nil {
return err
}
*lastSchemaId = upstreamJobSchemaId
case <-maxWaitTicker.C:
// Process the batch by time
klog.V(2).Infof(
"topic:%s: maxWaitSeconds hit",
claim.Topic(),
)
msgBatch.Process(h.ctx)
err = msgBatch.Process(h.ctx)
if err != nil {
return err
}
}
}
}
17 changes: 12 additions & 5 deletions redshiftsink/pkg/serializer/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type Message struct {
}

type MessageBatchProcessor interface {
Process(ctx context.Context, msgBuf []*Message)
Process(ctx context.Context, msgBuf []*Message) error
}

type MessageBatch struct {
Expand All @@ -40,33 +40,40 @@ func NewMessageBatch(topic string, partition int32, maxSize int, processor Messa
}

// process calls the processor to process the batch
func (b *MessageBatch) Process(ctx context.Context) {
func (b *MessageBatch) Process(ctx context.Context) error {
if len(b.msgBuf) > 0 {
klog.V(2).Infof(
"topic:%s: calling processor...",
b.topic,
)
b.processor.Process(ctx, b.msgBuf)
err := b.processor.Process(ctx, b.msgBuf)
if err != nil {
return err
}
b.msgBuf = make([]*Message, 0, b.maxSize)
} else {
klog.V(2).Infof(
"topic:%s: no msgs",
b.topic,
)
}

return nil
}

// insert makes the batch and also calls the processor if batchSize >= maxSize
func (b *MessageBatch) Insert(
ctx context.Context,
msg *Message,
) {
) error {
b.msgBuf = append(b.msgBuf, msg)
if len(b.msgBuf) >= b.maxSize {
klog.V(2).Infof(
"topic:%s: maxSize hit",
msg.Topic,
)
b.Process(ctx)
return b.Process(ctx)
}

return nil
}

0 comments on commit 5cc9052

Please sign in to comment.