Skip to content

Conversation

@adamosloizou
Copy link
Collaborator

Description

Hello,

I have been using your library recently and I really like it 😃!
I am currently working with Kafka's new library, Kafka Streams and I couldn't find an easy abstraction for testing.

This PR introduces a ScalaTest-based trait that should make testing Kafka Streams testing easier.

Features/Changes

  • Codebase is now multimodule. Moved existing code base to embedded-kafka and introduced kafka-streams submodule. This allows for people to not have to depend on Kafka Streams for using the original code base.
  • The new module now offers EmbeddedKafkaStreams and EmbeddedKafkaStreamsAllInOne traits. They use and expand on the EmbeddedKafkatrait.
  • The design uses the loaner-pattern for safe resource management for creating and destroying Kafka streams as well as test consumers
  • It adds a few useful tools to the existing code base, particularly Consumers and ConsumerExtensions for easily consuming arbitrary messages as Scala Streams.

Testing

  • Tests on kafka-streams pass
  • Tests on existing code base embedded-kafka get [Address already in use errors] which I also see in master. Is that expected?

Notes

I have not changed the README.md as I would like to see how you like it before discussing what the user manual should look like.

Thanks!
Adamos

@adamosloizou adamosloizou force-pushed the kafka-streams branch 2 times, most recently from 92e615f to 30bc5ea Compare October 17, 2016 21:38
@manub
Copy link
Owner

manub commented Oct 17, 2016

Hi @adamosloizou and thanks for your PR! I'll go through it in the next days - however I think your changes are incompatible with Scala 2.10 (there's a transitive dependency missing in Travis build for 2.10). I'm not sure whether we need to keep cross versions compatibility, but if you have an easy way to fix that it would be appreciated.

Will keep you posted on the rest of the PR (I haven't used Kafka Streams myself).

@adamosloizou
Copy link
Collaborator Author

Hey @manub! Yes, it was Typesafe's scalalogging which I have removed so hopefully it should be ok for Scala 2.10 as well.

As for Kafka Streams, interesting concept but its API is still young. You can definitely spot commonalities between that and Spark and Akka Streams API for functional-style transformations (maps, flatmaps etc). It's a nice way forward it seems.

Let me know what you think!

@manub
Copy link
Owner

manub commented Oct 18, 2016

Try to merge master in your branch and run again the build please.

@adamosloizou
Copy link
Collaborator Author

@manub I think I fixed it. There was a clash with the two sub-modules' tests using the same ports for their Kafka/Zookeeper servers.

@adamosloizou
Copy link
Collaborator Author

This is really weird, it works fine locally on my machine. I am running with JVM 8 instead of 7 as the build. Could that have something to do?

@manub
Copy link
Owner

manub commented Oct 23, 2016

I will have a look at it - had a failure with my Mac and I'm in the middle of reinstalling all the software. Sorry for the delay!

@adamosloizou
Copy link
Collaborator Author

I have managed to replicate the issue on my machine by using a VM with limited resources. It was a race-condition that exposed the fact that the Stream consumers could start consuming after the 1st set of published messages.
I fixed it by configuring the streams' consumers to start at the earliest message.

However, now I am seeing a failure on the embedded-kafka side. Any ideas?
Thanks.

@manub
Copy link
Owner

manub commented Oct 24, 2016

Rerunning your build passes it - it's quite unstable on CI and I'm aware of that. Will look at the code as soon as I have an opportunity. As I mentioned before my personal laptop failed and I'm still in the process of restoring it. Thanks for your patience!

Copy link
Owner

@manub manub left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @adamosloizou - thanks for the PR! It looks overall good although there are some coding style things that I'd like to get fixed before merging. Also - would it be possible for you to extend the README.md?

*/
def consumeLazily(topic: String): Stream[(K, V)] = {
val attempts = 1 to MaximumAttempts
attempts.toStream.flatMap(attempt => {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could this be rewritten as .flatMap { attempt => ... }?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in badc3b2

*
* @return the unique ID
*/
def newUuid(): String = UUID.randomUUID().toString
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather have this method still returning a UUID and let the user convert it to string.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in badc3b2

def withConsumer[K: Deserializer, V: Deserializer, T](block: KafkaConsumer[K, V] => T)
(implicit config: EmbeddedKafkaConfig): T = {
val consumer = newConsumer[K, V]()
val result = block(consumer)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can block() ever throw an exception? as probably we don't know in advance it would be better to wrap block(consumer) into a try / finally block in order to close the consumer regardless of what happens.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!
Fixed in badc3b2

@@ -1,7 +1,8 @@
log4j.rootLogger=info, stdout
log4j.rootLogger=off
#log4j.rootLogger=info, stdout
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this line can be removed

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 2cace27

(block: => Any)
(implicit config: EmbeddedKafkaConfig): Any =
withRunningKafka {
topicsToCreate.foreach(topic => createCustomTopic(topic))
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in my opinion it would be more idiomatic .foreach(createCustomTopic)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. I tried that originally but unfortunately it does not work:

Error:(45, 30) type mismatch;
 found   : (String, Map[String,String], Int, Int) => Unit
 required: String => ?
      topicsToCreate.foreach(createCustomTopic)

An alternative is foreach(createCustomTopic(_)). In my opinion that's not particularly pretty and I prefer being explicit as per above. I think it reads better.
What do you think?

(implicit config: EmbeddedKafkaConfig): Any =
withRunningKafka {
topicsToCreate.foreach(topic => createCustomTopic(topic))
val streamId: String = UUIDs.newUuid()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this type annotation it's redundand

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in badc3b2

@@ -0,0 +1,9 @@
log4j.rootLogger=off
#log4j.logger.net.manub.embeddedkafka=debug, stdout
#log4j.logger.org.apache.kafka.streams=debug, stdout
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

those 2 lines can be removed as well

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 2cace27

import org.apache.kafka.streams.kstream.{KStream, KStreamBuilder}
import org.scalatest.{FlatSpec, Matchers}

class ExampleKafkaStreamsSpec extends FlatSpec with Matchers with EmbeddedKafkaStreamsAllInOne {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to use WordSpec here for consistency reasons?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course!
Fixed in badc3b2

@adamosloizou
Copy link
Collaborator Author

adamosloizou commented Oct 25, 2016

README updated 😄!

adamosloizou and others added 3 commits October 26, 2016 00:56
…ggregation project in preparation for introducing kafka-streams subproject.
…ded Kafka.

Introduced EmbeddedKafkaStreams and EmbeddedKafkaStreamsAllInOne traits for easy testing.
Introduced Consumers trait for easy creation and consumption of test messages.
@adamosloizou
Copy link
Collaborator Author

@manub Please re-run the build. I think the failures are the unstable part that you mentioned.

@manub manub merged commit c30918a into manub:master Oct 26, 2016
@manub
Copy link
Owner

manub commented Oct 26, 2016

Thanks @adamosloizou !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants