Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clarification Request for brod:fetch #251

Closed
randysecrist opened this issue Dec 12, 2017 · 8 comments
Closed

Clarification Request for brod:fetch #251

randysecrist opened this issue Dec 12, 2017 · 8 comments

Comments

@randysecrist
Copy link

See attached screen shot ...

screen shot 2017-12-12 at 3 21 34 pm

brod:fetch using the earliest offset only seems to be returning a subset of messages.

In this case, There are 5 messages in partition 0. Earliest offset is 0, latest is 5. A call to fetch using offset 0 returns message 0,1 and 2. Calling fetch with offset 3 returns the last two messages.

Is this expected? Anyway to get back all 5 with a single fetch call?

@zmstone
Copy link
Contributor

zmstone commented Dec 12, 2017

Which brod version and which kafka version were you testing?
on latest version 3.3.4
:brod.fetch/3 by default uses 100K Bytes as max_bytes in fetch request.
:brod.fetch/8 allows you to specify max_bytes.

However, the messages are not that big, i.e. 3 messages are obviously less than 100K.
This seem quite odd to me. Not sure why kafka would not return all 5 messages in the batch.

@zmstone
Copy link
Contributor

zmstone commented Dec 12, 2017

there are 40 hours between timestamp of message at offset 2 and offset 3,
maybe the are from two segments in kafka

could you check in kafka log-data directory, see if there are two segments for that partition ?

@randysecrist
Copy link
Author

Yes, I had to find another partition and offset that was doing this; but it looks like there are multiple segments.

screen shot 2017-12-13 at 9 28 23 am

We use kafka 2.11. Brod; 3.3.4

@zmstone
Copy link
Contributor

zmstone commented Dec 13, 2017

that should be it.

@randysecrist
Copy link
Author

Ok, so it is expected. The code I have doing the fetch looks like this; does this seem correct?

  defp fetch(acc, hosts, topic, partition, start, total_count) do
    case :brod.fetch(default_hosts(), topic, partition, start) do
      {:ok, []} -> acc
      {:error, reason} -> {:error, reason}
      {_, messages} ->
        result = acc ++ messages
        case (length(result) < total_count) do
          true -> fetch(result, hosts, topic, partition, start + length(result), total_count)
          false -> result
        end
    end
  end

@zmstone
Copy link
Contributor

zmstone commented Dec 13, 2017

Seem correct. But it’s probably better to connect leader then use brod_util:fetch/4 to avoid re-connect for each attempt.
See brod_cli fetch for example

@randysecrist
Copy link
Author

Thank you for the review.

@zmstone
Copy link
Contributor

zmstone commented Dec 13, 2017

Should be start+length(messages) not length(result)
Also next begin offset is not always start+length because offsets may not sequential in compacted partitions, should be last message’s offset+1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants