Skip to content

Commit

Permalink
Add way to filter InFlightMessageCounter (#32) resolves #31
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed May 22, 2019
1 parent 18bdca8 commit a43c341
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 5 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Expand Up @@ -10,10 +10,11 @@ The `Unreleased` section name is replaced by the expected version of next releas

### Added

- split batching behaviors out into `BatchedProducer`/`BatchedConsumer` [#30](https://github.com/jet/Jet.ConfluentKafka.FSharp/pull/30)
- mechanism to remove logging regarding polling backoff [#32](https://github.com/jet/Jet.ConfluentKafka.FSharp/pull/32) HT [@szer](https://github.com/Szer) re [#31](https://github.com/jet/Jet.ConfluentKafka.FSharp/issues/31)

### Changed

- split batching behaviors out into `BatchedProducer`/`BatchedConsumer` [#30](https://github.com/jet/Jet.ConfluentKafka.FSharp/pull/30)
- default auto-commit interval dropped from 10s to 5s (which is the `Confluent.Kafka` default) [#30](https://github.com/jet/Jet.ConfluentKafka.FSharp/pull/30)
- removed curried `member` Method arguments in `Start` methods

Expand Down
8 changes: 7 additions & 1 deletion README.md
Expand Up @@ -13,7 +13,7 @@ See [the Equinox QuickStart](https://github.com/jet/equinox#quickstart) for exam
The components within this repository are delivered as (presently single) multi-targeted Nuget package targeting `net461` (F# 3.1+) and `netstandard2.0` (F# 4.5+) profiles

- [![NuGet](https://img.shields.io/nuget/vpre/Jet.ConfluentKafka.FSharp.svg)](https://www.nuget.org/packages/Jet.ConfluentKafka.FSharp/) `Jet.ConfluentKafka.FSharp`: Wraps `Confluent.Kafka` to provide efficient batched Kafka Producer and Consumer configurations, with basic logging instrumentation.
[Depends](https://www.fuget.org/packages/Jet.ConfluentKafka.FSharp) on `Confluent.Kafka [1.0.0]`, `librdkafka [1.0.0]` (pinned to ensure we use a tested pairing enterprise wide), `Serilog` (but no specific Serilog sinks, i.e. you configure to emit to `NLog` etc) and `Newtonsoft.Json` (used internally to parse Statistics for logging purposes).
[Depends](https://www.fuget.org/packages/Jet.ConfluentKafka.FSharp) on `Confluent.Kafka [1.0.0]`, `librdkafka [1.0.0]` (pinned to ensure we use a tested pairing enterprise wide), `Serilog` (but no specific Serilog sinks, i.e. you configure to emit to `NLog` etc) and `Newtonsoft.Json` (used internally to parse Broker-provided Statistics for logging purposes).

## CONTRIBUTING

Expand Down Expand Up @@ -51,6 +51,12 @@ dotnet build build.proj -v n

## FAQ

### How do I get rid of all the `breaking off polling` ... `resuming polling` spam?

- The `BatchedConsumer` implementation tries to give clear feedback as to when reading is not keeping up, for diagnostic purposes. As of [#32](https://github.com/jet/Jet.ConfluentKafka.FSharp/pull/32), such messages are tagged with the type `Jet.ConfluentKafka.FSharp.InFlightMessageCounter`, and as such can be silenced by including the following in one's `LoggerConfiguration()`:

`.MinimumLevel.Override(Jet.ConfluentKafka.FSharp.Constants.messageCounterSourceContext, Serilog.Events.LogEventLevel.Warning)`

### What is this, why does it exist, where did it come from, is anyone using it ?

This code results from building out an end-to-end batteries-included set of libraries and templates as part of the [Equinox](https://github.com/jet/equinox) project.
Expand Down
10 changes: 7 additions & 3 deletions src/Jet.ConfluentKafka.FSharp/ConfluentKafka.fs
Expand Up @@ -158,6 +158,9 @@ type BatchedProducer private (log: ILogger, inner : IProducer<string, string>, t

type ConsumerBufferingConfig = { minInFlightBytes : int64; maxInFlightBytes : int64; maxBatchSize : int; maxBatchDelay : TimeSpan }

module Constants =
let messageCounterSourceContext = "Jet.ConfluentKafka.FSharp.InFlightMessageCounter"

module private ConsumerImpl =
/// guesstimate approximate message size in bytes
let approximateMessageBytes (message : ConsumeResult<string, string>) =
Expand Down Expand Up @@ -217,9 +220,9 @@ module private ConsumerImpl =

member __.AwaitThreshold() =
if inFlightBytes > maxInFlightBytes then
log.Warning("Consumer reached in-flight message threshold, breaking off polling, bytes={max}", inFlightBytes)
log.Information("Consumer reached in-flight message threshold, breaking off polling, bytes={max}", inFlightBytes)
while inFlightBytes > minInFlightBytes do Thread.Sleep 5
log.Information "Consumer resuming polling"
log.Verbose "Consumer resuming polling"

let mkBatchedMessageConsumer (log: ILogger) (buf : ConsumerBufferingConfig) (ct : CancellationToken) (consumer : IConsumer<string, string>)
(partitionedCollection: PartitionedBlockingCollection<TopicPartition, ConsumeResult<string, string>>)
Expand All @@ -230,7 +233,8 @@ module private ConsumerImpl =

use _ = consumer

let counter = new InFlightMessageCounter(log, buf.minInFlightBytes, buf.maxInFlightBytes)
let mcLog = log.ForContext(Serilog.Core.Constants.SourceContextPropertyName, Constants.messageCounterSourceContext)
let counter = new InFlightMessageCounter(mcLog, buf.minInFlightBytes, buf.maxInFlightBytes)

// starts a tail recursive loop that dequeues batches for a given partition buffer and schedules the user callback
let consumePartition (collection : BlockingCollection<ConsumeResult<string, string>>) =
Expand Down

0 comments on commit a43c341

Please sign in to comment.