Skip to content

Conversation

@bjhaid
Copy link
Member

@bjhaid bjhaid commented Feb 18, 2017

This replaces the existing GenEvent based stream implementation with a purely lazy stream implementation, fixing issues described in:

#191
#185
#145
#141

@ook
Copy link

ook commented Feb 24, 2017

Hi, I switched one of my project on that branch, and the new stream/3 implementation behave weirdly when reaching the latest offset: it appears stream/3 stop streaming and return, which is not the expected behavior.

@bjhaid
Copy link
Member Author

bjhaid commented Feb 24, 2017

@ook thanks for exercising the code, this behavior is documented here:

https://github.com/kafkaex/kafka_ex/blob/stream/lib/kafka_ex.ex#L260

We have a choice of allowing the stream to be finite or infinite, I chose the former, maybe we should allow the stream function take a flag to determine if it's a finite stream or an infinite one and act accordingly, then you can either have a stream that never returns like you want or one that returns once it's done consuming the available offsets like was mentioned in some other issue.

@ook
Copy link

ook commented Feb 24, 2017

Both aspect are interesting. May I suggest a big warning in the README.md if you keep this behavior as default, since it breaks with the previous implementation?
I like the new implementation which permit to process tidying / update operations, eventually a sleep, when we reach the "end" of the stream.

Copy link
Collaborator

@dantswain dantswain left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is definitely a step in the right direction. I like this about 1000% better than the previous implementation.

I think there should definitely be an option for the stream to be indefinite, and quite possibly that it should be the default. The kafka partition is, in concept, an indefinite thing.

Unfortunately there's a lot of room for this to be misunderstood, and I think it's our responsibility to ensure that the documentation does a good job clarifying. That doesn't necessarily have to be part of this PR - I can take a stab at it, I'm usually decent at writing documentation.


defimpl Enumerable do
def reduce(data, acc, fun) do
next_fun = fn offset ->
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this an actual function and/or refactor it into a composition of smaller functions?

end
response = data.worker_name
|> GenServer.call({:fetch, %{data.fetch_request| offset: offset}})
|> hd |> Map.get(:partitions) |> hd
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer not to use hd so much - it's a bit opaque. We can either pattern match [element] = thing or add helper functions to the appropriate modules, like FetchResponse.first_response. Also I think we should try to stick to one pipe per line (I think credo would tell us this as well).

@ook
Copy link

ook commented Feb 28, 2017

(FYI, changed into a recursive call to stream: works like a charm)

@dantswain
Copy link
Collaborator

@bjhaid Any plans to revisit this?

@bjhaid
Copy link
Member Author

bjhaid commented Apr 3, 2017

Will do sometime this week, I have not had enough time that I can dedicate to finishing this, but will try to find time to do this week

@dantswain
Copy link
Collaborator

OK no worries, I just thought I'd ping you. Let me know if there's anything I can do to help.

@dantswain
Copy link
Collaborator

@bjhaid Monthly ping on this ;)

@sourcelevel-bot
Copy link

Hello, @bjhaid! This is your first Pull Request that will be reviewed by Ebert, an automatic Code Review service. It will leave comments on this diff with potential issues and style violations found in the code as you push new commits. You can also see all the issues found on this Pull Request on its review page. Please check our documentation for more information.

@bjhaid
Copy link
Member Author

bjhaid commented May 5, 2017

@dantswain I will try to finish this today, and get it behind me

stream = KafkaEx.stream(random_string, 0, worker_name: :stream, offset: 0, auto_commit: false)
log = TestHelper.wait_for_accum(
fn() -> GenEvent.call(stream.manager, KafkaEx.Handler, :messages) end,
fn() -> stream |> Enum.take(2) end,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use a function call when a pipeline is only one function long

# make sure we consume at least one message before we assert that there is no offset committed
_log = TestHelper.wait_for_any(
fn() -> GenEvent.call(stream.manager, KafkaEx.Handler, :messages) |> Enum.take(2) end
fn() -> stream |> Enum.take(2) end

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use a function call when a pipeline is only one function long

stream = KafkaEx.stream(random_string, 0, worker_name: worker_name)
log = TestHelper.wait_for_any(
fn() -> GenEvent.call(stream.manager, KafkaEx.Handler, :messages) |> Enum.take(2) end
fn() -> stream |> Enum.take(2) end

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use a function call when a pipeline is only one function long

stream = KafkaEx.stream(random_string, 0, worker_name: :stream_auto_commit, offset: 0)
log = TestHelper.wait_for_any(
fn() -> GenEvent.call(stream.manager, KafkaEx.Handler, :messages) |> Enum.take(2) end
fn() -> stream |> Enum.take(2) end

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use a function call when a pipeline is only one function long

lib/kafka_ex.ex Outdated
Returns a stream that consumes fetched messages, the stream will halt once the max_bytes number of messages is reached, if you want to halt the stream early supply a small max_bytes, for the inverse supply a large max_bytes.
Optional arguments(KeywordList)
- stream_mode: the mode the stream will be in, `:infinite` for an infinite stream and `:finite` for a finite stream, a finite stream will return once there's no data left to consume from the partition while an infinite stream will block till there's data to consume from the stream.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be good if this mirrors the command line tools and uses something like :no_wait_at_logend https://cwiki.apache.org/confluence/display/KAFKA/System+Tools#SystemTools-SimpleConsumerShell

@dantswain
Copy link
Collaborator

dantswain commented Jul 14, 2017

I can publish 100 messages to a topic and create a stream for that topic:

iex(8)> Enum.each(1 .. 100, fn(ix) -> KafkaEx.produce("mytopic", 0, "Message #{ix}") end)
:ok

iex(9)> s = KafkaEx.stream("mytopic", 0, auto_commit: true, consumer_group: "test", max_bytes: 100)
%KafkaEx.Stream{consumer_group: "test",
 fetch_request: %KafkaEx.Protocol.Fetch.Request{auto_commit: true,
  client_id: nil, correlation_id: nil, max_bytes: 100, min_bytes: 1, offset: 0,
  partition: 0, topic: "mytopic", wait_time: 10}, no_wait_at_logend: false,
 worker_name: :kafka_ex}

Note it sets the offset to 0 because the consumer group is new. Now I take 20 messages, which should be well over 100 bytes. It actually ends up with duplicate messages because it's over the max_bytes:

iex(11)> m = Enum.take(s, 20)
[%KafkaEx.Protocol.Fetch.Message{attributes: 0, crc: 1226351079, key: "",
  offset: 0, value: "Message 1"},
 %KafkaEx.Protocol.Fetch.Message{attributes: 0, crc: 3490836573, key: "",
  offset: 1, value: "Message 2"},
 %KafkaEx.Protocol.Fetch.Message{attributes: 0, crc: 3490836573, key: "",
  offset: 1, value: "Message 2"},
 %KafkaEx.Protocol.Fetch.Message{attributes: 0, crc: 2803286219, key: "",
  offset: 2, value: "Message 3"},
 %KafkaEx.Protocol.Fetch.Message{attributes: 0, crc: 2803286219, key: "",
  offset: 2, value: "Message 3"},
 %KafkaEx.Protocol.Fetch.Message{attributes: 0, crc: 963794280, key: "",
  offset: 3, value: "Message 4"},
 %KafkaEx.Protocol.Fetch.Message{attributes: 0, crc: 963794280, key: "",
  offset: 3, value: "Message 4"},
 %KafkaEx.Protocol.Fetch.Message{attributes: 0, crc: 1316316670, key: "",
  offset: 4, value: "Message 5"},
 %KafkaEx.Protocol.Fetch.Message{attributes: 0, crc: 1316316670, key: "",
  offset: 4, value: "Message 5"},
 %KafkaEx.Protocol.Fetch.Message{attributes: 0, crc: 3615241284, key: "",
  offset: 5, value: "Message 6"},
 %KafkaEx.Protocol.Fetch.Message{attributes: 0, crc: 3615241284, key: "",
  offset: 5, value: "Message 6"},
 %KafkaEx.Protocol.Fetch.Message{attributes: 0, crc: 2692416722, key: "",
  offset: 6, value: "Message 7"},
 %KafkaEx.Protocol.Fetch.Message{attributes: 0, crc: 2692416722, key: "",
  offset: 6, value: "Message 7"},
 %KafkaEx.Protocol.Fetch.Message{attributes: 0, crc: 818157891, key: "",
  offset: 7, value: "Message 8"},
 %KafkaEx.Protocol.Fetch.Message{attributes: 0, crc: 818157891, key: "",
  offset: 7, value: "Message 8"},
 %KafkaEx.Protocol.Fetch.Message{attributes: 0, crc: 1203972565, key: "",
  offset: 8, value: "Message 9"},
 %KafkaEx.Protocol.Fetch.Message{attributes: 0, crc: 1203972565, key: "",
  offset: 8, value: "Message 9"},
 %KafkaEx.Protocol.Fetch.Message{attributes: 0, crc: 1394529882, key: "",
  offset: 9, value: "Message 10"},
 %KafkaEx.Protocol.Fetch.Message{attributes: 0, crc: 1394529882, key: "",
  offset: 9, value: "Message 10"},
 %KafkaEx.Protocol.Fetch.Message{attributes: 0, crc: 605677260, key: "",
  offset: 10, value: "Message 11"}]

If I take 1 again, it actually starts back at 0, because the offset value in my stream struct is 0:

iex(12)> m = Enum.take(s, 1)
[%KafkaEx.Protocol.Fetch.Message{attributes: 0, crc: 1226351079, key: "",
  offset: 0, value: "Message 1"}]

This makes sense if you look at the implementation of KafkaEx.Stream because it takes s.offset as the first offset to fetch.

This is even more fun: If I create a NEW stream with the same consumer group, I get offset 2. Furthermore, if I take one from that stream I start with message at offset 2, so I've actually completely skipped offset 1:

iex(13)> s2 = KafkaEx.stream("mytopic", 0, auto_commit: true, consumer_group: "test", max_bytes: 100)
%KafkaEx.Stream{consumer_group: "test",
 fetch_request: %KafkaEx.Protocol.Fetch.Request{auto_commit: true,
  client_id: nil, correlation_id: nil, max_bytes: 100, min_bytes: 1, offset: 2,
  partition: 0, topic: "mytopic", wait_time: 10}, no_wait_at_logend: false,
 worker_name: :kafka_ex}
iex(14)> m = Enum.take(s2, 1)
[%KafkaEx.Protocol.Fetch.Message{attributes: 0, crc: 2803286219, key: "",
  offset: 2, value: "Message 3"}]

I'm pretty sure I can sort this all out. The question is, from a design standpoint, should the stream continue along the topic from the last offset that was committed, or should it start at the same offset. There are kind of conflicting ideas here - it's intuitive that subsequent calls would fetch the next messages, BUT this is a functional language and therefore there's an argument that making the same function call twice with the same arguments would return the same value. I think we just need to make a decision either way.

Regardless, I definitely agree that documenting this is really important.

@bjhaid
Copy link
Member Author

bjhaid commented Jul 14, 2017

Can you verify what the behavior of File.stream! is, we should try to be consistent as much as possible with the read behavior of File.stream!

@bjhaid
Copy link
Member Author

bjhaid commented Jul 14, 2017

nvm I just verified, and this is consistent with the behavior of File.stream!, returning duplicate elements is weird though, maybe we can try not doing that :)

@dantswain
Copy link
Collaborator

File.stream! always starts at the same point in the file:

iex(20)> f = File.stream!("README.md")
%File.Stream{line_or_bytes: :line, modes: [:raw, :read_ahead, :binary],
 path: "README.md", raw: true}
iex(21)> Enum.take(f, 2)
["KafkaEx\n", "========\n"]
iex(22)> Enum.take(f, 2)
["KafkaEx\n", "========\n"]

no_wait_at_logend: false

defimpl Enumerable do
def reduce(data = %KafkaEx.Stream{}, acc, fun) do

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

File has the variable name before the pattern while most of the files have the variable name after the pattern when naming parameter pattern matches

assert m3.value == "message 3"
assert m4.value == "message 4"
end

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be no trailing white-space at the end of a line.

@dantswain
Copy link
Collaborator

I fixed the double-consume issue. I think we may be committing in the wrong spot, so I want to add some tests for that. Still need to update the documentation as well.

There's a potential weirdness with auto_commit. Suppose auto_commit is true, we have 10 messages in the log, and someone does Enum.take(stream, 3). With smallish messages, it's entirely possible that the fetch will get us all 10 messages in the log, but the user is only consuming 3 of them. I think we can handle this because the acc argument to reduce will have the take argument, but I'm not sure if that's an intended usage of the API... I'll look into it.

# this function returns a Stream.resource stream, so we need to define
# start_fun, next_fun, and after_fun callbacks


Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be no more than 1 consecutive blank lines.

@dantswain dantswain dismissed their stale review July 18, 2017 20:00

Took over the PR

@dantswain dantswain requested a review from joshuawscott July 18, 2017 20:00
@dantswain
Copy link
Collaborator

@bjhaid @joshuawscott I've handled everything I could think of, and updated the documentation. Could you guys give this another review, please?

One thing I considered doing was delegating stream creation to a function in the KafkaEx.Stream module, and moving the bulk of the KafkaEx.stream/3 documentation to the moduledocs for KafkaEx.Stream. Doing that could get a little tricky because KafkaEx.stream/3 uses some private function inside the root module.

consumer_group: consumer_group
)

[m1, m2, m3, m4] = Enum.take(stream, 10)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth testing that this actually only returns 4 elements?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would match fail if I didn't get 4 elements, or am I misunderstanding you?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

goes off to drink more coffee before reviewing more

)

assert ["Msg 1", "Msg 2"] == stream1
|> Enum.take(2)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpicky, but I kind of prefer assigning to a variable first so that the assert is more self-contained

messages = stream1 
|> Enum.take(2) 
|> Enum.map(&(&1.value))
assert ["Msg 1", "Msg 2"] == messages

Mainly because I was super confused that a list == a stream before I noticed the pipeline on the next line.

Copy link
Member

@joshuawscott joshuawscott left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM :shipit:

Copy link
Member Author

@bjhaid bjhaid left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM minus slight var name vs string literal comment

worker_name: :stream,
offset: 0,
auto_commit: true,
consumer_group: "stream_test"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use the consumer_group var here instead of "stream_test"

@sourcelevel-bot
Copy link

Ebert has finished reviewing this Pull Request and has found:

  • 8 fixed issues! 🎉

You can see more details about this review at https://ebertapp.io/github/kafkaex/kafka_ex/pulls/193.

@dantswain dantswain merged commit 5584859 into master Jul 25, 2017
@dantswain dantswain deleted the stream branch July 25, 2017 13:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants