forked from lightbend/kafka-streams-scala
-
Notifications
You must be signed in to change notification settings - Fork 0
/
KafkaStreamsS.scala
80 lines (56 loc) · 2.54 KB
/
KafkaStreamsS.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package com.lightbend.kafka.scala.streams
import java.util.Properties
import org.apache.kafka.common.{Metric, MetricName}
import org.apache.kafka.streams.processor.{StateRestoreListener, StreamPartitioner, ThreadMetadata}
import org.apache.kafka.streams.state.{QueryableStoreType, StreamsMetadata}
import org.apache.kafka.streams.{KafkaClientSupplier, KafkaStreams, StreamsConfig, Topology}
import scala.collection.JavaConverters._
class KafkaStreamsS(inner: KafkaStreams) {
def allMetadata(): Iterable[StreamsMetadata] =
inner.allMetadata().asScala
def allMetadataForStore(storeName: String): Iterable[StreamsMetadata] =
inner.allMetadataForStore(storeName).asScala
def cleanUp() = {
inner.cleanUp()
this
}
def close() =
inner.close()
def close(timeout: Long, timeUnit: java.util.concurrent.TimeUnit) =
inner.close(timeout, timeUnit)
def localThreadsMetadata(): Set[ThreadMetadata] =
inner.localThreadsMetadata.asScala.toSet
def metadataForKey[K](storeName: String, key: K, keySerializer: Serializer[K]): StreamsMetadata =
inner.metadataForKey(storeName, key, keySerializer)
def metadataForKey[K](storeName: String, key: K, partitioner: StreamPartitioner[_ >: K, _]): StreamsMetadata =
inner.metadataForKey(storeName, key, partitioner)
def metrics(): Map[MetricName, _ <: Metric] =
inner.metrics().asScala.toMap
def withGlobalStateRestoreListener(globalStateRestoreListener: StateRestoreListener) = {
inner.setGlobalStateRestoreListener(globalStateRestoreListener)
this
}
def withStateListener(listener: KafkaStreams.StateListener) = {
inner.setStateListener(listener)
this
}
def withUncaughtExceptionHandler(eh: java.lang.Thread.UncaughtExceptionHandler) = {
inner.setUncaughtExceptionHandler(eh)
this
}
def start(): KafkaStreamsS = {
inner.start()
this
}
def state(): KafkaStreams.State =
inner.state()
def store[T](storeName: String, queryableStoreType: QueryableStoreType[T]) =
inner.store(storeName, queryableStoreType)
}
object KafkaStreamsS {
def apply(s: StreamsBuilderS, p: Properties): KafkaStreamsS = new KafkaStreamsS(new KafkaStreams(s.build(), p))
def apply(topology: Topology, p: Properties): KafkaStreamsS = new KafkaStreamsS(new KafkaStreams(topology, p))
def apply(topology: Topology, config: StreamsConfig) = new KafkaStreamsS(new KafkaStreams(topology, config))
def apply(topology: Topology, config: StreamsConfig, clientSupplier: KafkaClientSupplier) =
new KafkaStreamsS(new KafkaStreams(topology, config, clientSupplier))
}