Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Document how to implement Kafka Streams Interactive Queries #365

Closed
codependent opened this issue Jul 13, 2017 · 2 comments · Fixed by #367
Closed

Document how to implement Kafka Streams Interactive Queries #365

codependent opened this issue Jul 13, 2017 · 2 comments · Fixed by #367

Comments

@codependent
Copy link

codependent commented Jul 13, 2017

Currently (1.2.2) the documentation indicates how to configure and start you Kafka Streams processes. Also according to the documentation the KafkaStreams instance is automatically managed by the framework.

The point is that in order to implement Interactive Queries we need to have access to the KafkaStreams instance and from this point on is kind of a wild guess how to properly do it. It would be really helpful if the specific setup necessary to implement Interactive Queries were documented:

My approach was this, but unfortunately it doesn't work:

@Configuration
public class KStreamsConfig {

	private static final String STREAMING_TOPIC1 = "orders";

	@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public StreamsConfig kStreamsConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "product-service-streams");
        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
        props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
        return new StreamsConfig(props);
    }
	
	@Bean
	public FactoryBean<KStreamBuilder> myKStreamBuilder(StreamsConfig streamsConfig) {
	    return new KStreamBuilderFactoryBean(streamsConfig);
	}
	
	@Bean
	public KStream<?, ?> kStream(KStreamBuilder kStreamBuilder, KStreamBuilderFactoryBean kStreamBuilderFactoryBean) {

		Serde<Integer> integerSerde = Serdes.Integer();
		final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
        final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
        final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
		
	    KStream<Integer, JsonNode> stream = kStreamBuilder.stream(integerSerde, jsonSerde, STREAMING_TOPIC1);
	    
	    KGroupedStream<Integer, Integer> quantityStream = stream.map( (key, value) -> {
	    	return new KeyValue<>(value.get("productId").asInt(), value.get("quantity").asInt());
	    }).groupByKey();
	    quantityStream.count("ProductStock");
	    
	    stream.print();
	    return stream;
	}
}

In my service class I try to inject KStreamBuilderFactoryBean to access the underlying KafkaStreams instance and thus, query my previously configured store:

	@Autowired
	private KStreamBuilderFactoryBean kStreamBuilderFactoryBean;

	@Override
	public Long getProductStock(Integer id) {
		KafkaStreams streams = kStreamBuilderFactoryBean.getKafkaStreams();
		ReadOnlyKeyValueStore<String, Long> keyValueStore =
	    streams.store("ProductStock", QueryableStoreTypes.keyValueStore());
		return keyValueStore.get(id.toString());
	}

But the Spring Boot app doesn't even start:

***************************
APPLICATION FAILED TO START
***************************

Description:

Field kStreamBuilderFactoryBean in com.codependent.microshopping.product.service.ProductServiceImpl required a bean of type 'org.springframework.kafka.core.KStreamBuilderFactoryBean' that could not be found.


Action:

Consider defining a bean of type 'org.springframework.kafka.core.KStreamBuilderFactoryBean' in your configuration.
@artembilan artembilan self-assigned this Jul 13, 2017
artembilan added a commit to artembilan/spring-kafka that referenced this issue Jul 13, 2017
Fixes spring-projects#365
Fixes spring-projects#317

* Add `closeTimeout` option to the `KStreamBuilderFactoryBean` to avoid
infinite wait on the internal `KafkaStreams` during `stop()` phase
* Document with samples how to get access to the `KafkaStreams`
instance via `KStreamBuilderFactoryBean` injection
@artembilan
Copy link
Member

See linked PR #367.

Thank you for the report!

@codependent
Copy link
Author

@artembilan thank you for clearing this up so quickly!

garyrussell pushed a commit that referenced this issue Jul 19, 2017
Fixes #365
Fixes #317

* Add `closeTimeout` option to the `KStreamBuilderFactoryBean` to avoid
infinite wait on the internal `KafkaStreams` during `stop()` phase
* Document with samples how to get access to the `KafkaStreams`
instance via `KStreamBuilderFactoryBean` injection
garyrussell pushed a commit that referenced this issue Aug 7, 2017
Fixes #365
Fixes #317

* Add `closeTimeout` option to the `KStreamBuilderFactoryBean` to avoid
infinite wait on the internal `KafkaStreams` during `stop()` phase
* Document with samples how to get access to the `KafkaStreams`
instance via `KStreamBuilderFactoryBean` injection
denis554 added a commit to denis554/spring-kafka that referenced this issue Mar 27, 2019
Fixes spring-projects/spring-kafka#365
Fixes spring-projects/spring-kafka#317

* Add `closeTimeout` option to the `KStreamBuilderFactoryBean` to avoid
infinite wait on the internal `KafkaStreams` during `stop()` phase
* Document with samples how to get access to the `KafkaStreams`
instance via `KStreamBuilderFactoryBean` injection
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants