From f8eb5f06e02836a5cbc417d54589a4eb37f987f5 Mon Sep 17 00:00:00 2001 From: nischit Date: Thu, 4 Sep 2025 18:37:14 +0545 Subject: [PATCH 1/4] log publish failures --- docker-compose.yml | 2 ++ internal/publisher/publisher.go | 2 +- internal/storage/kafka_publisher.go | 8 ++++++-- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index f5112ce0..67adc076 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -87,6 +87,8 @@ services: KAFKA_LOG_DIRS: /tmp/kraft-combined-logs KAFKA_LOG_CLEANUP_POLICY: compact KAFKA_LOG_SEGMENT_MS: 10000 + KAFKA_MESSAGE_MAX_BYTES: 104857600 + KAFKA_REPLICA_FETCH_MAX_BYTES: 104857600 CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk profiles: - streaming diff --git a/internal/publisher/publisher.go b/internal/publisher/publisher.go index 68b97ce6..0df01c1f 100644 --- a/internal/publisher/publisher.go +++ b/internal/publisher/publisher.go @@ -65,7 +65,7 @@ func (p *Publisher) initialize() error { kgo.ProducerBatchCompression(kgo.SnappyCompression()), kgo.ClientID(fmt.Sprintf("insight-indexer-%s", config.Cfg.RPC.ChainID)), kgo.MaxBufferedRecords(1_000_000), - kgo.ProducerBatchMaxBytes(16_000_000), + kgo.ProducerBatchMaxBytes(100 * 1024 * 1024), // 100MB kgo.RecordPartitioner(kgo.UniformBytesPartitioner(1_000_000, false, false, nil)), kgo.MetadataMaxAge(60 * time.Second), kgo.DialTimeout(10 * time.Second), diff --git a/internal/storage/kafka_publisher.go b/internal/storage/kafka_publisher.go index 72dc96fd..c64c1f26 100644 --- a/internal/storage/kafka_publisher.go +++ b/internal/storage/kafka_publisher.go @@ -69,7 +69,7 @@ func NewKafkaPublisher(cfg *config.KafkaConfig) (*KafkaPublisher, error) { kgo.TransactionalID(fmt.Sprintf("insight-producer-%s", chainID)), kgo.MaxBufferedBytes(2 * 1024 * 1024 * 1024), // 2GB kgo.MaxBufferedRecords(1_000_000), - kgo.ProducerBatchMaxBytes(16_000_000), + kgo.ProducerBatchMaxBytes(100 * 1024 * 1024), // 100MB kgo.RecordPartitioner(kgo.ManualPartitioner()), kgo.ProduceRequestTimeout(30 * time.Second), kgo.MetadataMaxAge(60 * time.Second), @@ -163,7 +163,11 @@ func (p *KafkaPublisher) publishMessages(ctx context.Context, messages []*kgo.Re // Produce all messages in the transaction for _, msg := range messages { - p.client.Produce(ctx, msg, nil) + p.client.Produce(ctx, msg, func(r *kgo.Record, err error) { + if err != nil { + log.Error().Err(err).Any("headers", r.Headers).Msg(">>>>>>>>>>>>>>>>>>>>>>>BLOCK WATCH:: KAFKA PUBLISHER::publishMessages::err") + } + }) } // Flush all messages From 63d706152164d712124e05fa9f4ed34c6301053a Mon Sep 17 00:00:00 2001 From: nischit Date: Thu, 4 Sep 2025 18:41:29 +0545 Subject: [PATCH 2/4] fail on any publish failure --- internal/storage/kafka_publisher.go | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/internal/storage/kafka_publisher.go b/internal/storage/kafka_publisher.go index c64c1f26..147bdacc 100644 --- a/internal/storage/kafka_publisher.go +++ b/internal/storage/kafka_publisher.go @@ -161,11 +161,21 @@ func (p *KafkaPublisher) publishMessages(ctx context.Context, messages []*kgo.Re return fmt.Errorf("failed to begin transaction: %v", err) } + // Track if any produce errors occur + var produceErrors []error + var produceErrorsMu sync.Mutex + var wg sync.WaitGroup + // Produce all messages in the transaction for _, msg := range messages { + wg.Add(1) p.client.Produce(ctx, msg, func(r *kgo.Record, err error) { + defer wg.Done() if err != nil { log.Error().Err(err).Any("headers", r.Headers).Msg(">>>>>>>>>>>>>>>>>>>>>>>BLOCK WATCH:: KAFKA PUBLISHER::publishMessages::err") + produceErrorsMu.Lock() + produceErrors = append(produceErrors, err) + produceErrorsMu.Unlock() } }) } @@ -176,6 +186,18 @@ func (p *KafkaPublisher) publishMessages(ctx context.Context, messages []*kgo.Re return fmt.Errorf("failed to flush messages: %v", err) } + // Wait for all callbacks to complete + wg.Wait() + + // Check if any produce errors occurred + hasErrors := len(produceErrors) > 0 + + if hasErrors { + // Abort the transaction if any produce errors occurred + p.client.EndTransaction(ctx, kgo.TryAbort) + return fmt.Errorf("transaction aborted due to produce errors: %v", produceErrors) + } + // Commit the transaction if err := p.client.EndTransaction(ctx, kgo.TryCommit); err != nil { return fmt.Errorf("failed to commit transaction: %v", err) From 9f849192ef3b1d1b0aabfbfa3ece195060c6e551 Mon Sep 17 00:00:00 2001 From: nischit Date: Thu, 4 Sep 2025 18:44:03 +0545 Subject: [PATCH 3/4] reduce max batch bytes for test --- internal/storage/kafka_publisher.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/storage/kafka_publisher.go b/internal/storage/kafka_publisher.go index 147bdacc..c8c6b6c3 100644 --- a/internal/storage/kafka_publisher.go +++ b/internal/storage/kafka_publisher.go @@ -69,7 +69,7 @@ func NewKafkaPublisher(cfg *config.KafkaConfig) (*KafkaPublisher, error) { kgo.TransactionalID(fmt.Sprintf("insight-producer-%s", chainID)), kgo.MaxBufferedBytes(2 * 1024 * 1024 * 1024), // 2GB kgo.MaxBufferedRecords(1_000_000), - kgo.ProducerBatchMaxBytes(100 * 1024 * 1024), // 100MB + kgo.ProducerBatchMaxBytes(10 * 1024 * 1024), // 100MB kgo.RecordPartitioner(kgo.ManualPartitioner()), kgo.ProduceRequestTimeout(30 * time.Second), kgo.MetadataMaxAge(60 * time.Second), From 965cb46f3d96c95bf2e5c85b3d85622121f5a482 Mon Sep 17 00:00:00 2001 From: nischit Date: Thu, 4 Sep 2025 19:11:27 +0545 Subject: [PATCH 4/4] 100mb max batch size --- internal/storage/kafka_publisher.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/storage/kafka_publisher.go b/internal/storage/kafka_publisher.go index c8c6b6c3..9a053669 100644 --- a/internal/storage/kafka_publisher.go +++ b/internal/storage/kafka_publisher.go @@ -69,7 +69,7 @@ func NewKafkaPublisher(cfg *config.KafkaConfig) (*KafkaPublisher, error) { kgo.TransactionalID(fmt.Sprintf("insight-producer-%s", chainID)), kgo.MaxBufferedBytes(2 * 1024 * 1024 * 1024), // 2GB kgo.MaxBufferedRecords(1_000_000), - kgo.ProducerBatchMaxBytes(10 * 1024 * 1024), // 100MB + kgo.ProducerBatchMaxBytes(100 * 1024 * 1024), // 100MB kgo.RecordPartitioner(kgo.ManualPartitioner()), kgo.ProduceRequestTimeout(30 * time.Second), kgo.MetadataMaxAge(60 * time.Second), @@ -172,7 +172,7 @@ func (p *KafkaPublisher) publishMessages(ctx context.Context, messages []*kgo.Re p.client.Produce(ctx, msg, func(r *kgo.Record, err error) { defer wg.Done() if err != nil { - log.Error().Err(err).Any("headers", r.Headers).Msg(">>>>>>>>>>>>>>>>>>>>>>>BLOCK WATCH:: KAFKA PUBLISHER::publishMessages::err") + log.Error().Err(err).Any("headers", r.Headers).Msg("KAFKA PUBLISHER::publishMessages::err") produceErrorsMu.Lock() produceErrors = append(produceErrors, err) produceErrorsMu.Unlock()