Skip to content
This repository has been archived by the owner on May 25, 2023. It is now read-only.

Scalish #66

Open
wants to merge 10 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.1.4
sbt.version=1.1.4
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package com.lightbend.kafka.scala.streams

import java.util.Properties

import org.apache.kafka.common.{Metric, MetricName}
import org.apache.kafka.streams.processor.{StateRestoreListener, StreamPartitioner, ThreadMetadata}
import org.apache.kafka.streams.state.{QueryableStoreType, StreamsMetadata}
import org.apache.kafka.streams.{KafkaClientSupplier, KafkaStreams, StreamsConfig, Topology}

import scala.collection.JavaConverters._

class KafkaStreamsS(inner: KafkaStreams) {

def allMetadata(): Iterable[StreamsMetadata] =
inner.allMetadata().asScala

def allMetadataForStore(storeName: String): Iterable[StreamsMetadata] =
inner.allMetadataForStore(storeName).asScala

def cleanUp() = {
inner.cleanUp()
this
}

def close() =
inner.close()

def close(timeout: Long, timeUnit: java.util.concurrent.TimeUnit) =
inner.close(timeout, timeUnit)

def localThreadsMetadata(): Set[ThreadMetadata] =
inner.localThreadsMetadata.asScala.toSet

def metadataForKey[K](storeName: String, key: K, keySerializer: Serializer[K]): StreamsMetadata =
inner.metadataForKey(storeName, key, keySerializer)

def metadataForKey[K](storeName: String, key: K, partitioner: StreamPartitioner[_ >: K, _]): StreamsMetadata =
inner.metadataForKey(storeName, key, partitioner)

def metrics(): Map[MetricName, _ <: Metric] =
inner.metrics().asScala.toMap

def withGlobalStateRestoreListener(globalStateRestoreListener: StateRestoreListener) = {
inner.setGlobalStateRestoreListener(globalStateRestoreListener)
this
}

def withStateListener(listener: KafkaStreams.StateListener) = {
inner.setStateListener(listener)
this
}

def withUncaughtExceptionHandler(eh: java.lang.Thread.UncaughtExceptionHandler) = {
inner.setUncaughtExceptionHandler(eh)
this
}

def start(): KafkaStreamsS = {
inner.start()
this
}

def state(): KafkaStreams.State =
inner.state()

def store[T](storeName: String, queryableStoreType: QueryableStoreType[T]) =
inner.store(storeName, queryableStoreType)
}

object KafkaStreamsS {
def apply(s: StreamsBuilderS, p: Properties): KafkaStreamsS = new KafkaStreamsS(new KafkaStreams(s.build(), p))

def apply(topology: Topology, p: Properties): KafkaStreamsS = new KafkaStreamsS(new KafkaStreams(topology, p))

def apply(topology: Topology, config: StreamsConfig) = new KafkaStreamsS(new KafkaStreams(topology, config))

def apply(topology: Topology, config: StreamsConfig, clientSupplier: KafkaClientSupplier) =
new KafkaStreamsS(new KafkaStreams(topology, config, clientSupplier))

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ class KafkaLocalServer private (kafkaProperties: Properties, zooKeeperServer: Zo
DEFAULT_ZK_CONNECTION_TIMEOUT_MS,
false)

def start(): Unit = {
def start(): KafkaLocalServer = {

broker = KafkaServerStartable.fromProps(kafkaProperties)
broker.startup()
this
}

//scalastyle:off null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,14 @@ import com.lightbend.kafka.scala.server.{KafkaLocalServer, MessageListener, Mess
import minitest.TestSuite
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization._
import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsConfig}
import org.apache.kafka.streams.{KeyValue, StreamsConfig}
import ImplicitConversions._
import com.typesafe.scalalogging.LazyLogging

object KafkaStreamsTest extends TestSuite[KafkaLocalServer] with WordCountTestData with LazyLogging {

override def setup(): KafkaLocalServer = {
val s = KafkaLocalServer(cleanOnStart = true, Some(localStateDir))
s.start()
s
}
override def setup(): KafkaLocalServer =
KafkaLocalServer(cleanOnStart = true, Some(localStateDir)).start()

override def tearDown(server: KafkaLocalServer): Unit =
server.stop()
Expand Down Expand Up @@ -55,8 +52,7 @@ object KafkaStreamsTest extends TestSuite[KafkaLocalServer] with WordCountTestDa

wordCounts.toStream.to(outputTopic)

val streams = new KafkaStreams(builder.build(), streamsConfiguration)
streams.start()
val streams = KafkaStreamsS(builder, streamsConfiguration).start()

//
// Step 2: Produce some input data to the input topic.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization._
import org.apache.kafka.streams.kstream.Transformer
import org.apache.kafka.streams.processor.ProcessorContext
import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsConfig}
import org.apache.kafka.streams.{KeyValue, StreamsConfig}
import ImplicitConversions._
import com.typesafe.scalalogging.LazyLogging

Expand Down Expand Up @@ -69,11 +69,8 @@ object ProbabilisticCountingScalaIntegrationTest
extends TestSuite[KafkaLocalServer]
with ProbabilisticCountingScalaIntegrationTestData {

override def setup(): KafkaLocalServer = {
val s = KafkaLocalServer(cleanOnStart = true, Some(localStateDir))
s.start()
s
}
override def setup(): KafkaLocalServer =
KafkaLocalServer(cleanOnStart = true, Some(localStateDir)).start()

override def tearDown(server: KafkaLocalServer): Unit =
server.stop()
Expand Down Expand Up @@ -149,8 +146,7 @@ object ProbabilisticCountingScalaIntegrationTest
.transform(() => new ProbabilisticCounter, cmsStoreName)
.to(outputTopic)

val streams: KafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration)
streams.start()
val streams: KafkaStreamsS = KafkaStreamsS(builder, streamsConfiguration).start()

// Step 2: Publish some input text lines.
val sender =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import com.typesafe.scalalogging.LazyLogging
import minitest.TestSuite
import org.apache.kafka.common.serialization._
import org.apache.kafka.streams.processor.{AbstractProcessor, ProcessorContext, PunctuationType}
import org.apache.kafka.streams.{KafkaStreams, StreamsConfig, Topology}
import org.apache.kafka.streams.{StreamsConfig, Topology}

/**
* This sample is using usage of punctuate, which is significantly changed in version 1.0 and
Expand All @@ -20,11 +20,8 @@ import org.apache.kafka.streams.{KafkaStreams, StreamsConfig, Topology}
*/
object PunctuateTest extends TestSuite[KafkaLocalServer] with PunctuateTestData with LazyLogging {

override def setup(): KafkaLocalServer = {
val s = KafkaLocalServer(cleanOnStart = true, Some(localStateDir))
s.start()
s
}
override def setup(): KafkaLocalServer =
KafkaLocalServer(cleanOnStart = true, Some(localStateDir)).start()

override def tearDown(server: KafkaLocalServer): Unit =
server.stop()
Expand All @@ -49,8 +46,7 @@ object PunctuateTest extends TestSuite[KafkaLocalServer] with PunctuateTestData
topology.addSource("data", inputTopic)
// Processors
topology.addProcessor("data processor", () => new SampleProcessor(5000), "data")
val streams = new KafkaStreams(topology, streamsConfiguration)
streams.start()
val streams = KafkaStreamsS(topology, streamsConfiguration).start()
// Allpw time for the streams to start up
Thread.sleep(5000L)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,8 @@ object StreamToTableJoinScalaIntegrationTestImplicitSerdes
with StreamToTableJoinTestData
with LazyLogging {

override def setup(): KafkaLocalServer = {
val s = KafkaLocalServer(cleanOnStart = true, Some(localStateDir))
s.start()
s
}
override def setup(): KafkaLocalServer =
KafkaLocalServer(cleanOnStart = true, Some(localStateDir)).start()

override def tearDown(server: KafkaLocalServer): Unit =
server.stop()
Expand Down Expand Up @@ -106,24 +103,24 @@ object StreamToTableJoinScalaIntegrationTestImplicitSerdes
// Write the (continuously updating) results to the output topic.
clicksPerRegion.toStream.to(outputTopic)

val streams: KafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration)

streams.setUncaughtExceptionHandler(
(_: Thread, e: Throwable) =>
try {
logger.error(s"Stream terminated because of uncaught exception .. Shutting down app", e)
e.printStackTrace()
val closed: Unit = streams.close()
logger.info(s"Exiting application after streams close ($closed)")
} catch {
case x: Exception => x.printStackTrace()
} finally {
logger.debug("Exiting application ..")
System.exit(-1)
}
)

streams.start()
val streams: KafkaStreamsS = KafkaStreamsS(builder, streamsConfiguration)

streams
.withUncaughtExceptionHandler(
(_: Thread, e: Throwable) =>
try {
logger.error(s"Stream terminated because of uncaught exception .. Shutting down app", e)
e.printStackTrace()
val closed = streams.close()
logger.info(s"Exiting application after streams close ($closed)")
} catch {
case x: Exception => x.printStackTrace()
} finally {
logger.debug("Exiting application ..")
System.exit(-1)
}
)
.start()

//
// Step 2: Publish user-region information.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization._
import org.apache.kafka.streams._
import ImplicitConversions._
import com.typesafe.scalalogging.LazyLogging

object StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro
extends TestSuite[KafkaLocalServer]
with StreamToTableJoinTestData {
with StreamToTableJoinTestData
with LazyLogging {

case class UserClicks(clicks: Long)

Expand Down Expand Up @@ -78,11 +80,8 @@ object StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro
* must be `static` and `public`) to a workaround combination of `@Rule
* def` and a `private val`.
*/
override def setup(): KafkaLocalServer = {
val s = KafkaLocalServer(true, Some(localStateDir))
s.start()
s
}
override def setup(): KafkaLocalServer =
KafkaLocalServer(true, Some(localStateDir)).start()

override def tearDown(server: KafkaLocalServer): Unit =
server.stop()
Expand Down Expand Up @@ -138,27 +137,24 @@ object StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro
// Write the (continuously updating) results to the output topic.
clicksPerRegion.toStream.to(outputTopic)

val streams: KafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration)

val streams: KafkaStreamsS = KafkaStreamsS(builder.build(), streamsConfiguration)
streams
.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
.withUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
override def uncaughtException(t: Thread, e: Throwable): Unit =
try {
println(s"Stream terminated because of uncaught exception .. Shutting " +
s"down app",
e)
e.printStackTrace
logger.error(s"Stream terminated because of uncaught exception .. Shutting " +
s"down app",
e)
val closed = streams.close()
println(s"Exiting application after streams close ($closed)")
logger.debug(s"Exiting application after streams close ($closed)")
} catch {
case x: Exception => x.printStackTrace
} finally {
println("Exiting application ..")
logger.debug("Exiting application ..")
System.exit(-1)
}
})

streams.start()
.start()

//
// Step 2: Publish user-region information.
Expand Down