From 9852da0b953b6030952c803af67f1799e85fdc30 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Wed, 27 Mar 2019 17:39:55 -0400 Subject: [PATCH 1/2] Remove deprecated components --- .../com/example/analytics/AnalyticsApplication.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/example/analytics/AnalyticsApplication.java b/src/main/java/com/example/analytics/AnalyticsApplication.java index e7b4730..a648535 100644 --- a/src/main/java/com/example/analytics/AnalyticsApplication.java +++ b/src/main/java/com/example/analytics/AnalyticsApplication.java @@ -21,6 +21,7 @@ import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; import org.springframework.cloud.stream.annotation.StreamListener; +import org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService; import org.springframework.cloud.stream.binder.kafka.streams.QueryableStoreRegistry; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.Message; @@ -123,8 +124,6 @@ public static class PageCountSink { @StreamListener public void pageCount(@Input((AnalyticsBinding.PAGE_COUNT_IN)) KTable counts) { - - counts .toStream() .foreach((key, value) -> log.info(key + "=" + value)); @@ -134,17 +133,18 @@ public void pageCount(@Input((AnalyticsBinding.PAGE_COUNT_IN)) KTable counts() { Map counts = new HashMap<>(); ReadOnlyKeyValueStore queryableStoreType = - this.registry.getQueryableStoreType(AnalyticsBinding.PAGE_COUNT_MV, QueryableStoreTypes.keyValueStore()); + this.interactiveQueryService + .getQueryableStore(AnalyticsBinding.PAGE_COUNT_MV, QueryableStoreTypes.keyValueStore()); KeyValueIterator all = queryableStoreType.all(); while (all.hasNext()) { KeyValue value = all.next(); From 2e39f228ba1476eecee1e78fe49bef64581cfce0 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Wed, 27 Mar 2019 17:43:03 -0400 Subject: [PATCH 2/2] Polishing --- .../example/analytics/AnalyticsApplication.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/example/analytics/AnalyticsApplication.java b/src/main/java/com/example/analytics/AnalyticsApplication.java index a648535..e6f9e5b 100644 --- a/src/main/java/com/example/analytics/AnalyticsApplication.java +++ b/src/main/java/com/example/analytics/AnalyticsApplication.java @@ -1,12 +1,19 @@ package com.example.analytics; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.kstream.ForeachAction; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; @@ -22,7 +29,6 @@ import org.springframework.cloud.stream.annotation.Output; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService; -import org.springframework.cloud.stream.binder.kafka.streams.QueryableStoreRegistry; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; @@ -32,10 +38,6 @@ import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; -import java.util.*; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - interface AnalyticsBinding {