Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lookup not working with multi-partitioned topics #186

Closed
OumArEtH opened this issue May 20, 2019 · 4 comments
Closed

Lookup not working with multi-partitioned topics #186

OumArEtH opened this issue May 20, 2019 · 4 comments

Comments

@OumArEtH
Copy link

Hello community,
I am developing a streaming app and my case is the following:

  • I created a topic with 3 partitions named <t_lookup>, which will be used for lookups, log compaction enabled
  • I added the table to my graph using <goka.Lookup(lookupTable, new(codec.String))>
  • I have an input stream of events named <input_stream> and my whole Graph declaration is: <g := goka.DefineGroup(lookupGroup, goka.Input(lookupSourceTopic, new(codec.String), perfromJoin), goka.Lookup(lookupTable, new(codec.String)), )>
  • When I try to do <ctx.Lookup(lookupTable, ctx.Key())>, I dont find any match although there are matches in <t_lookup>.
  • But when I create the topic <t_lookup> with only one partition, matches are found.

So my question is: Does a topic used for lookups need to have just one partition or am I missing something here.
I have attached my whole sourcecode.

Many Thanks in advance
lookup.txt

@db7
Copy link
Collaborator

db7 commented May 21, 2019

I haven't tried running your code but it looks good to me. As you initially thought, lookup tables can have any number of partitions.

How was your t_lookup created? Was this perhaps created with a Java program? The java clients use a different hash function as the golang code. If that is the problem, you can overwrite the golang hash function with murmur2.

@OumArEtH
Copy link
Author

Many Thanks for your reply. I created t_lookup using the Kafka-topics console client. How/where can I overwrite the golang hash function? Thanks

@db7
Copy link
Collaborator

db7 commented May 21, 2019

To set the hasher you have to pass the goka.WithHasher() option to the processor. It sounds like you'll need a murmur2 hasher. I think this one should work:
https://github.com/burdiyan/kafkautil/blob/master/partitioner.go#L27

You can create the processor like this:

p, err := goka.NewProcessor(brokers, g, goka.WithHasher(kafkautil.MurmurHasher()))

Also take a look in the tips page, you may find something useful: https://github.com/lovoo/goka/wiki/Tips

Let me know if that was helpful.

@OumArEtH
Copy link
Author

Thanks very much. The MurmurHasher works like a charm with multiple partitions :) Thanks again

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants