Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add KStreams support #30

Closed
sabbyanandan opened this issue Aug 31, 2016 · 4 comments
Closed

Add KStreams support #30

sabbyanandan opened this issue Aug 31, 2016 · 4 comments

Comments

@sabbyanandan
Copy link
Contributor

sabbyanandan commented Aug 31, 2016

No description provided.

@sabbyanandan
Copy link
Contributor Author

Depends on spring-cloud/spring-cloud-stream#519

@sabbyanandan sabbyanandan added this to the FUTURE milestone Oct 27, 2016
@mbogoevici
Copy link
Contributor

Duplicated by #144

@codependent
Copy link

codependent commented Jul 15, 2017

@mbogoevici It's great to have KStreams available directly in Spring Cloud Streams. I only miss one thing:

With Spring Kafka I wanted to use interative queries so I configured KStreams like this to generate a KTable:

KStream<Integer, JsonNode> stream = kStreamBuilder.stream(integerSerde, jsonSerde, STREAMING_TOPIC1);

stream.map( (key, value) -> {
    return new KeyValue<>(value.get("productId").asInt(), value.get("quantity").asInt());
}).groupByKey().reduce( (v1, v2) -> v1 + v2, "ProductsStock");

Then in my business service I injected KStreamBuilderFactoryBean to have access to KafkaStreams and thus be able to implement the query against the previously defined store:

@Autowired
private KStreamBuilderFactoryBean kStreamBuilderFactoryBean;

@Override
public Integer getProductStock(Integer id) {
        KafkaStreams streams = kStreamBuilderFactoryBean.getKafkaStreams();
        ReadOnlyKeyValueStore<Integer, Integer> keyValueStore =
        streams.store("ProductsStock", QueryableStoreTypes.keyValueStore());
        return keyValueStore.get(id);
}

Do we have to do the same to use the interactive queries when using Spring Cloud Streams?

@artembilan
Copy link
Contributor

@codependent ,

Yes, according to the code in the PR #159 from @sobychacko , we have:

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_KSTREAM_BUILDER_BEAN_NAME)
public KStreamBuilderFactoryBean defaultKStreamBuilder(

So, that is really feasible what we have discussed with you in the spring-projects/spring-kafka#365

Why do you think that might be something different?

Thanks for your feedback though!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants