Skip to content
This repository has been archived by the owner on Nov 20, 2020. It is now read-only.

Fix next offset bug #222

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 24 additions & 12 deletions src/kafunk/Protocol.fs
Original file line number Diff line number Diff line change
Expand Up @@ -796,18 +796,30 @@ module Protocol =
let numRecords = buf.ReadInt32()
if numRecords < 0 then
failwithf "invalid_record_count|num_records=%i compression=%i mb=%i first_offset=%i mc=%i" numRecords compression magicByte firstOffset mss.Count
match compression with
| CompressionCodec.None ->
MessageSet.ReadRecords (buf,magicByte,numRecords,firstOffset,timestampType,firstTimestamp,maxTimestamp,mss)
| compression ->
let recordsLength = sizeInBytes - RecordBatch.RECORD_BATCH_OVERHEAD
if buf.Buffer.Count < recordsLength then
buf.ShiftOffset buf.Buffer.Count
else
let compressedValue = buf.Slice recordsLength
let decompressedValue = CompressionCodec.decompress compression compressedValue
MessageSet.ReadRecords (BinaryZipper(decompressedValue),magicByte,numRecords,firstOffset,timestampType,firstTimestamp,maxTimestamp,mss)
buf.ShiftOffset recordsLength
let lastOffset : Offset =
match compression with
| CompressionCodec.None ->
//let c0 = mss.Count
MessageSet.ReadRecords (buf,magicByte,numRecords,firstOffset,timestampType,firstTimestamp,maxTimestamp,mss)
if mss.Count > 0 then
let lastMessage = mss.[mss.Count - 1]
//let count = mss.Count - c0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please remove the comments?

//if lastMessage.offset <> lastOffset then
// failwithf "unmatched_offsets|batch_last_offset=%i message_last_offset=%i count=%i num_records=%i" lastOffset lastMessage.offset count numRecords
lastMessage.offset
//lastOffset
else
lastOffset
| compression ->
let recordsLength = sizeInBytes - RecordBatch.RECORD_BATCH_OVERHEAD
if buf.Buffer.Count < recordsLength then
buf.ShiftOffset buf.Buffer.Count
else
let compressedValue = buf.Slice recordsLength
let decompressedValue = CompressionCodec.decompress compression compressedValue
MessageSet.ReadRecords (BinaryZipper(decompressedValue),magicByte,numRecords,firstOffset,timestampType,firstTimestamp,maxTimestamp,mss)
buf.ShiftOffset recordsLength
lastOffset
if checkCrc then
let crcCount = buf.Buffer.Count - attributesOffset
let crc = Crc.crc32C buf.Buffer.Array attributesOffset crcCount
Expand Down
15 changes: 14 additions & 1 deletion tests/kafunk.Tests/AsyncTest.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,17 @@ let go3 = async {
|> Async.parallelThrottledIgnore 1000
}

Async.RunSynchronously go2
let go4 = async {
let t : Task<unit> = Task.never
let N = 1000000
return!
Seq.init N id
|> Seq.map (fun i -> async {
let delay = Async.Sleep 100 |> Async.StartAsTask
let! t = Task.WhenAny [| t ; delay |] |> Async.AwaitTask
let r = t.Result
return () })
|> Async.parallelThrottledIgnore 1000
}

Async.RunSynchronously go4
14 changes: 13 additions & 1 deletion tests/kafunk.Tests/ConfluentConsumer.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
open System
open System.Text
open System.Collections.Generic
open System.Collections.Concurrent
open System.Diagnostics
open System.Threading
open Kafunk
Expand Down Expand Up @@ -69,8 +70,19 @@ let go = async {
let md = consumer.GetMetadata(true)
Log.info "metadata|%A" md.Topics

let partitionOffsets = new ConcurrentDictionary<Partition, int64> ()

let handle (m:Message) = async {
Log.info "handing message|p=%i key=%s" m.Partition (Encoding.UTF8.GetString m.Key)
//Log.info "handing message|p=%i key=%s" m.Partition (Encoding.UTF8.GetString m.Key)
let offset = m.Offset.Value
match partitionOffsets.TryGetValue (m.Partition) with
| true, lastOffset ->
if (lastOffset + 1L < offset) then
let gap = offset - (lastOffset + 1L)
failwithf "non_contig_offsets_detected|partition=%i last_offset=%i current_offset=%i gap=%i" m.Partition lastOffset offset gap
| _ -> ()
partitionOffsets.[m.Partition] <- offset

return () }

use counter = Metrics.counter Log 5000
Expand Down
42 changes: 28 additions & 14 deletions tests/kafunk.Tests/Consumer.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
open FSharp.Control
open Kafunk
open System
open System.Collections.Concurrent

//Log.MinLevel <- LogLevel.Trace
let Log = Log.create __SOURCE_FILE__
Expand Down Expand Up @@ -31,7 +32,7 @@ let go = async {
tcpConfig = chanConfig,
requestRetryPolicy = KafkaConfig.DefaultRequestRetryPolicy,
version = Versions.V_0_10_1,
autoApiVersions = true,
//autoApiVersions = true,
//version = Versions.V_0_9_0,
//autoApiVersions = false,
clientId = "leo")
Expand Down Expand Up @@ -68,20 +69,33 @@ let go = async {

let! _ = Async.StartChild showProgress

let partitionOffsets = new ConcurrentDictionary<Partition, Offset> ()

let handle (s:ConsumerState) (ms:ConsumerMessageSet) = async {
//use! _cnc = Async.OnCancel (fun () -> Log.warn "cancelling_handler")
//for m in ms.messageSet.messages do
// Log.info "key=%s" (Binary.toString m.message.key)
Log.trace "consuming_message_set|topic=%s partition=%i count=%i size=%i os=[%i-%i] ts=[%O] hwo=%i lag=%i"
ms.topic
ms.partition
(ms.messageSet.messages.Length)
(ConsumerMessageSet.size ms)
(ConsumerMessageSet.firstOffset ms)
(ConsumerMessageSet.lastOffset ms)
(ConsumerMessageSet.firstTimestamp ms)
(ms.highWatermarkOffset)
(ConsumerMessageSet.lag ms)

do! Async.Sleep 5000

if ms.partition = 0 then
Log.info "consuming_message_set|topic=%s partition=%i count=%i size=%i os=[%i-%i] ts=[%O] hwo=%i lag=%i"
ms.topic
ms.partition
(ms.messageSet.messages.Length)
(ConsumerMessageSet.size ms)
(ConsumerMessageSet.firstOffset ms)
(ConsumerMessageSet.lastOffset ms)
(ConsumerMessageSet.firstTimestamp ms)
(ms.highWatermarkOffset)
(ConsumerMessageSet.lag ms)

for msi in ms.messageSet.messages do
match partitionOffsets.TryGetValue (ms.partition) with
| true, lastOffset ->
if (lastOffset + 1L < msi.offset) then
let gap = msi.offset - (lastOffset + 1L)
failwithf "non_contig_offsets_detected|partition=%i last_offset=%i current_offset=%i gap=%i" ms.partition lastOffset msi.offset gap
| _ -> ()
partitionOffsets.[ms.partition] <- msi.offset

return () }

use counter = Metrics.counter Log 5000
Expand Down
46 changes: 41 additions & 5 deletions tests/kafunk.Tests/ProducerConsumer.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ let batchSize = argiDefault 4 "1000" |> Int32.Parse
let consumerCount = argiDefault 5 "1" |> Int32.Parse
let producerThreads = argiDefault 6 "100" |> Int32.Parse

let contigDeltaThreshold = 200000

let testId = Guid.NewGuid().ToString("n")
let consumerGroup = "kafunk-producer-consumer-test-" + testId

Expand Down Expand Up @@ -153,7 +155,7 @@ module Reporter =

let report () =
let r = new Report(ack.Received, ack.Duplicates, ack.Sent, ack.Contig)
printReport r
//printReport r
r

let rec loop () = async {
Expand Down Expand Up @@ -196,14 +198,13 @@ let monitor = async {
do! Async.Sleep 5000
let! report = Reporter.report reporter
printReport report
if (report.received - report.contigCount) > 100000 then
if (report.received - report.contigCount) > contigDeltaThreshold then
Log.error "contig_delta_surpassed_threshold"
IVar.tryPut () completed |> ignore }

// ----------------------------------------------------------------------------------------------------------------------------------



let producer = async {

let message (messageNumber:int) =
Expand All @@ -218,7 +219,12 @@ let producer = async {

Log.info "starting_producer_process|batch_count=%i" batchCount

let connCfg = KafkaConfig.create ([KafkaUri.parse host], tcpConfig = chanConfig)
let connCfg =
KafkaConfig.create (
[KafkaUri.parse host],
tcpConfig = chanConfig,
version = Versions.V_0_10_1)

use! conn = Kafka.connAsync connCfg

let producerCfg =
Expand Down Expand Up @@ -252,8 +258,37 @@ let producer = async {

let consumer = async {

let partitionOffsets = new ConcurrentDictionary<Partition, Offset> ()

let handle (_:ConsumerState) (ms:ConsumerMessageSet) = async {

//failwithf "testing ERRORs!"
//Log.error "testing error!"

//let firstOffset' = ConsumerMessageSet.firstOffset ms

//let mutable lastOffset = 0L
//if partitionOffsets.TryGetValue (ms.partition, &lastOffset) then
// if firstOffset' > lastOffset + 1L then
// failwithf "offset_gap_detected|partition=%i last_offset=%i first_offset_next_batch=%i" ms.partition lastOffset firstOffset'

//partitionOffsets.[ms.partition] <- ConsumerMessageSet.lastOffset ms

//ms.messageSet.messages
//|> Seq.pairwise
//|> Seq.iter (fun (msi1,msi2) ->
// if msi1.offset + 1L <> msi2.offset then
// failwithf "non_contiguous_offsets_detected|offset1=%i offset2=%i" msi1.offset msi2.offset
// ())

for msi in ms.messageSet.messages do
match partitionOffsets.TryGetValue (ms.partition) with
| true, lastOffset ->
if (lastOffset + 1L <> msi.offset) then
failwithf "non_contig_offsets|partition=%i last_offset=%i current_offset=%i" ms.partition lastOffset msi.offset
| _ -> ()
partitionOffsets.[ms.partition] <- msi.offset

let values =
ms.messageSet.messages
|> Seq.choose (fun m ->
Expand All @@ -270,7 +305,8 @@ let consumer = async {
let connCfg =
KafkaConfig.create (
[KafkaUri.parse host],
tcpConfig = chanConfig)
tcpConfig = chanConfig,
version = Versions.V_0_10_1)
use! conn = Kafka.connAsync connCfg

let consumerCfg =
Expand Down