Skip to content

Conversation

@scarbo87
Copy link
Contributor

There is no break in forever loop, so we will never receive error “ UnknownTopicOrPartition” on line 181. My code fixes case when there is no partitions found.

scarbo87 and others added 2 commits January 30, 2019 14:29
* Test against multiple Kafka versions

* Message writer for record batches (#163)

* Add branching for record batch writer

* Make key and value from headers public

* Add writeVarInt function

* Implement writeRecord method

* Write record batch method

* Use zigzag encoding for varint

* Drop logging

* Add unit test for v2 record batch

* Remove commented out code

* Use int constants from math package

* Use exact record size instead of extra buffer

* Give a simpler name to calcVarIntLen

* Move error check out of if else block

* Extract message set size in a function

* Give calcRecordSize a shorter name

* Protect recordSize from confusing order of params

* Use recordSize intead of estimatedRecordSize

* Extract record batch size

* Use record size from calculation

* Assign offsets to messages in record batch

* Write message set straight to output writer

* Write record batch straight to output writer

* Get rid of remainder buffer

* Remove commented out code

* Use bytes.Equal for comparing slices of bytes

* Compress messages before calculating size

* Run tests for kafka 0.10 (#187)

Added a utility method to check broker version so that tests for
newer functionality can be skipped.

* fix
Copy link
Contributor

@achille-roussel achille-roussel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for submitting this patch!

Would you be able to write a test to demonstrate that this change addresses the problem? (clearly we're missing this today).

@scarbo87
Copy link
Contributor Author

Of course

@achille-roussel achille-roussel self-assigned this Feb 1, 2019
@scarbo87
Copy link
Contributor Author

scarbo87 commented Feb 1, 2019

I was wrong, if you use context.WithTimeout, then will exit this function LookupPartition by timeout, but the internal gorutine will not be completed and there will be a memory leak.

If you set up a Kafka with an option auto.create.topics.enable=false, then the error t.TopicErrorCode=3 (UNKNOWN_TOPIC_OR_PARTITION) on this line will not be processed.
On this line partitions and err will equlas nil, this is wrong.

@achille-roussel
Copy link
Contributor

@scarbo87 should we always return UnknownTopicOrPartition if len(partitions) == 0 ?

@scarbo87
Copy link
Contributor Author

scarbo87 commented Feb 6, 2019

No, this is not a good solution.

Currently I found two problems in the code.

  1. We need to process sleep boolean result on line, otherwise we ignore context and have memory leak.
  2. If we connect to kafka and topic not exists, then on line currently we will not return error, because c.topic will be always empty string.

I will take care of this. Let me know if you agree.

@scarbo87
Copy link
Contributor Author

@achille-roussel, this fix is more correct

Copy link
Contributor

@achille-roussel achille-roussel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good, thanks for the fix! 👍

@achille-roussel achille-roussel merged commit a3bd32e into segmentio:master Feb 11, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants