diff --git a/src/main/java/com/example/analytics/AnalyticsApplication.java b/src/main/java/com/example/analytics/AnalyticsApplication.java index e7b4730..e6f9e5b 100644 --- a/src/main/java/com/example/analytics/AnalyticsApplication.java +++ b/src/main/java/com/example/analytics/AnalyticsApplication.java @@ -1,12 +1,19 @@ package com.example.analytics; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.kstream.ForeachAction; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; @@ -21,7 +28,7 @@ import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; import org.springframework.cloud.stream.annotation.StreamListener; -import org.springframework.cloud.stream.binder.kafka.streams.QueryableStoreRegistry; +import org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; @@ -31,10 +38,6 @@ import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; -import java.util.*; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - interface AnalyticsBinding { @@ -123,8 +126,6 @@ public static class PageCountSink { @StreamListener public void pageCount(@Input((AnalyticsBinding.PAGE_COUNT_IN)) KTable counts) { - - counts .toStream() .foreach((key, value) -> log.info(key + "=" + value)); @@ -134,17 +135,18 @@ public void pageCount(@Input((AnalyticsBinding.PAGE_COUNT_IN)) KTable counts() { Map counts = new HashMap<>(); ReadOnlyKeyValueStore queryableStoreType = - this.registry.getQueryableStoreType(AnalyticsBinding.PAGE_COUNT_MV, QueryableStoreTypes.keyValueStore()); + this.interactiveQueryService + .getQueryableStore(AnalyticsBinding.PAGE_COUNT_MV, QueryableStoreTypes.keyValueStore()); KeyValueIterator all = queryableStoreType.all(); while (all.hasNext()) { KeyValue value = all.next();