Skip to content
This repository has been archived by the owner on Nov 20, 2020. It is now read-only.

MessageTooBigException when consuming a Snappy compressed topic. #160

Closed
eulerfx opened this issue Aug 3, 2017 · 0 comments
Closed

MessageTooBigException when consuming a Snappy compressed topic. #160

eulerfx opened this issue Aug 3, 2017 · 0 comments

Comments

@eulerfx
Copy link
Contributor

eulerfx commented Aug 3, 2017

The following is raised:

2017-08-03 12:54:17:5250|ERROR|Kafunk.Consumer|fetch_process_errored|group_id=kafunk_test_0802a generation_id=1 member_id=-b795691e-7684-482c-a1ce-b01bd5b62512 topic=XXXX partition_count=256 error=System.AggregateException: One or more errors occurred. ---> Kafunk.Protocol+MessageTooBigException: partition=0 offset=14053 message_set_size=32768 message_size=44331
   at Kafunk.Protocol.MessageSet.Read(Int16 messageVer, Int32 partition, Int16 ec, Int32 messageSetSize, BinaryZipper buf) in C:\code\kafunk\src\kafunk\Protocol.fs:line 448
   at Kafunk.CompressionModule.SnappyModule.decompress(Int16 messageVer, Message m) in C:\code\kafunk\src\kafunk\Compression.fs:line 154
   at Kafunk.CompressionModule.decompress@167-1.Invoke(MessageSetItem msi) in C:\code\kafunk\src\kafunk\Compression.fs:line 175
   at Microsoft.FSharp.Collections.ArrayModule.Parallel.Collect@706-1.Invoke(Int32 obj)
   at System.Threading.Tasks.Parallel.<>c__DisplayClassf`1.<ForWorker>b__c()
   at System.Threading.Tasks.Task.InnerInvoke()
   at System.Threading.Tasks.Task.InnerInvokeWithArg(Task childTask)
   at System.Threading.Tasks.Task.<>c__DisplayClass11.<ExecuteSelfReplicating>b__10(Object param0)
   --- End of inner exception stack trace ---
   at System.Threading.Tasks.Task.ThrowIfExceptional(Boolean includeTaskCanceledExceptions)
   at System.Threading.Tasks.Task.Wait(Int32 millisecondsTimeout, CancellationToken cancellationToken)
   at System.Threading.Tasks.Task.Wait()
   at System.Threading.Tasks.Parallel.ForWorker[TLocal](Int32 fromInclusive, Int32 toExclusive, ParallelOptions parallelOptions, Action`1 body, Action`2 bodyWithState, Func`4 bodyWithLocal, Func`1 localInit, Action`1 localFinally)
   at System.Threading.Tasks.Parallel.For(Int32 fromInclusive, Int32 toExclusive, Action`1 body)
   at Microsoft.FSharp.Collections.ArrayModule.Parallel.Collect[T,TResult](FSharpFunc`2 mapping, T[] array)
   at Kafunk.CompressionModule.decompress(Int16 messageVer, MessageSet ms) in C:\code\kafunk\src\kafunk\Compression.fs:line 166
   at Kafunk.ConsumerModule.tryFetch@616-5.Invoke(Tuple`5 tupledArg) in C:\code\kafunk\src\kafunk\Consumer.fs:line 619
   at Microsoft.FSharp.Collections.IEnumerator.map@107.DoMoveNext(b& )
   at Microsoft.FSharp.Collections.IEnumerator.MapEnumerator`1.System-Collections-IEnumerator-MoveNext()
   at Microsoft.FSharp.Core.CompilerServices.RuntimeHelpers.takeInner@658[T,TResult](ConcatEnumerator`2 x, Unit unitVar0)
   at Kafunk.Prelude.SeqModule.partitionChoices4[a,b,c,d](IEnumerable`1 s) in C:\code\kafunk\src\kafunk\Utility\Prelude.fs:line 255
   at Kafunk.ConsumerModule.tryFetch@609-2.Invoke(FSharpChoice`2 _arg1) in C:\code\kafunk\src\kafunk\Consumer.fs:line 612
   at Microsoft.FSharp.Control.AsyncBuilderImpl.args@787-1.Invoke(a a)
---> (Inner Exception #0) Kafunk.Protocol+MessageTooBigException: partition=0 offset=14053 message_set_size=32768 message_size=44331
   at Kafunk.Protocol.MessageSet.Read(Int16 messageVer, Int32 partition, Int16 ec, Int32 messageSetSize, BinaryZipper buf) in C:\code\kafunk\src\kafunk\Protocol.fs:line 448
   at Kafunk.CompressionModule.SnappyModule.decompress(Int16 messageVer, Message m) in C:\code\kafunk\src\kafunk\Compression.fs:line 154
   at Kafunk.CompressionModule.decompress@167-1.Invoke(MessageSetItem msi) in C:\code\kafunk\src\kafunk\Compression.fs:line 175
   at Microsoft.FSharp.Collections.ArrayModule.Parallel.Collect@706-1.Invoke(Int32 obj)
   at System.Threading.Tasks.Parallel.<>c__DisplayClassf`1.<ForWorker>b__c()
   at System.Threading.Tasks.Task.InnerInvoke()
   at System.Threading.Tasks.Task.InnerInvokeWithArg(Task childTask)
   at System.Threading.Tasks.Task.<>c__DisplayClass11.<ExecuteSelfReplicating>b__10(Object param0)<---

This occurs after the outer message set has been decoded into a single message, which is decompressed and then further decoded into a message set. If the outer message set had partial messages, those would only be skipped in the outer decoding phase, not after decompression.

eulerfx added a commit to eulerfx/kafunk that referenced this issue Aug 3, 2017
@eulerfx eulerfx mentioned this issue Aug 3, 2017
eulerfx added a commit that referenced this issue Aug 3, 2017
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant