Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package com.lightbend.kafka.scala.streams.typesafe
import com.lightbend.kafka.scala.streams.typesafe.unsafe.ConverterToTypeSafer
import org.apache.kafka.streams.kstream._

import scala.language.implicitConversions

/** Conversions to keep the underlying abstraction from leaking. These allow
* us to always return a TS object instead of the underlying one.
*
Expand All @@ -28,81 +30,61 @@ import org.apache.kafka.streams.kstream._
*/
object ImplicitConverters {

implicit class TSKStreamAuto[K, V]
(val inner: KStream[K, V])
extends AnyVal {
def safe: TSKStream[K, V] =
implicit def wrapKStream[K, V](inner: KStream[K, V]): TSKStream[K, V] =
new TSKStream[K, V](inner)
}

implicit class TSKTableAuto[K, V]
(val inner: KTable[K, V])
extends AnyVal {
def safe: TSKTable[K, V] =
implicit def wrapKTable[K, V](inner: KTable[K, V]): TSKTable[K, V] =
new TSKTable[K, V](inner)
}

implicit class TSKGroupedStreamAuto[K, V]
(val inner: KGroupedStream[K, V])
extends AnyVal {
def safe: TSKGroupedStream[K, V] =
implicit def wrapKGroupedStream[K, V](inner: KGroupedStream[K, V])
: TSKGroupedStream[K, V] =
new TSKGroupedStream[K, V](inner)
}

implicit class TSKGroupedTableAuto[K, V]
(val inner: KGroupedTable[K, V])
extends AnyVal {
def safe: TSKGroupedTable[K, V] =
implicit def wrapKGroupedTable[K, V](inner: KGroupedTable[K, V])
: TSKGroupedTable[K, V] =
new TSKGroupedTable[K, V](inner)
}

implicit class TSSessionWindowedKStreamAuto[K, V]
(val inner: SessionWindowedKStream[K, V])
extends AnyVal {
def safe: TSSessionWindowedKStream[K, V] =
implicit def wrapSessionWindowedKStream[K, V]
(inner: SessionWindowedKStream[K, V]): TSSessionWindowedKStream[K, V] =
new TSSessionWindowedKStream[K, V](inner)
}

implicit class TSTimeWindowedKStreamAuto[K, V]
(val inner: TimeWindowedKStream[K, V])
extends AnyVal {
def safe: TSTimeWindowedKStream[K, V] =
implicit def wrapTSTimeWindowedKStream[K, V]
(inner: TimeWindowedKStream[K, V]): TSTimeWindowedKStream[K, V] =
new TSTimeWindowedKStream[K, V](inner)
}

implicit object TSKGroupedStreamAuto
extends ConverterToTypeSafer[KGroupedStream, TSKGroupedStream] {
override def safe[K, V](src: KGroupedStream[K, V]): TSKGroupedStream[K, V] =
src.safe
src
}

implicit object TSKStreamAuto
extends ConverterToTypeSafer[KStream, TSKStream] {
override def safe[K, V](src: KStream[K, V]): TSKStream[K, V] = src.safe
override def safe[K, V](src: KStream[K, V]): TSKStream[K, V] = src
}

implicit object TSKTableAuto
extends ConverterToTypeSafer[KTable, TSKTable] {
override def safe[K, V](src: KTable[K, V]): TSKTable[K, V] = src.safe
override def safe[K, V](src: KTable[K, V]): TSKTable[K, V] = src
}

implicit object TSKGroupedTableAuto
extends ConverterToTypeSafer[KGroupedTable, TSKGroupedTable] {
override def safe[K, V](src: KGroupedTable[K, V])
: TSKGroupedTable[K, V] = src.safe
: TSKGroupedTable[K, V] = src
}

implicit object TSSessionWindowedKStreamAuto
extends ConverterToTypeSafer[SessionWindowedKStream,
TSSessionWindowedKStream] {
override def safe[K, V](src: SessionWindowedKStream[K, V])
: TSSessionWindowedKStream[K, V] = src.safe
: TSSessionWindowedKStream[K, V] = src
}

implicit object TSTimeWindowedKStreamAuto
extends ConverterToTypeSafer[TimeWindowedKStream, TSTimeWindowedKStream] {
override def safe[K, V](src: TimeWindowedKStream[K, V])
: TSTimeWindowedKStream[K, V] = src.safe
: TSTimeWindowedKStream[K, V] = src
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,32 +34,27 @@ class TSKGroupedStream[K, V]
unsafe
.aggregate(initializer.asInitializer, aggregator.asAggregator,
materialized)
.safe

def count(implicit materialized: Materialized[K, java.lang.Long, kvs])
: TSKTable[K, Long] =
unsafe
.count(materialized)
.mapValues[scala.Long](
{ l: java.lang.Long => Long2long(l) }.asValueMapper)
.safe

def reduce(reducer: (V, V) => V)
(implicit materialized: Materialized[K, V, kvs])
: TSKTable[K, V] =
unsafe
.reduce(reducer.asReducer, materialized)
.safe

def windowedBy(windows: SessionWindows)
: TSSessionWindowedKStream[K, V] =
unsafe
.windowedBy(windows)
.safe

def windowedBy[W <: Window](windows: Windows[W])
: TSTimeWindowedKStream[K, V] =
unsafe
.windowedBy(windows)
.safe
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,12 @@ class TSKGroupedTable[K, V]
.count(materialized)
.mapValues[scala.Long](
{ l: java.lang.Long => Long2long(l) }.asValueMapper)
.safe

def reduce(adder: (V, V) => V,
subtractor: (V, V) => V)
(implicit materialized: Materialized[K, V, kvs]): TSKTable[K, V] =
unsafe
.reduce(adder.asReducer, subtractor.asReducer, materialized)
.safe

def aggregate[VR](initializer: => VR,
adder: (K, V, VR) => VR,
Expand All @@ -51,5 +49,4 @@ class TSKGroupedTable[K, V]
adder.asAggregator,
subtractor.asAggregator,
materialized)
.safe
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,40 +33,39 @@ class TSKStream[K, V](protected[typesafe] override val unsafe: KStream[K, V])
extends AnyVal with TSKType[KStream, K, V] {

def branch(predicates: ((K, V) => Boolean)*): Array[TSKStream[K, V]] = {
unsafe.branch(predicates.map(_.asPredicate): _*).map(_.safe)
unsafe
.branch(predicates.map(_.asPredicate): _*)
.map(wrapKStream)
}

def groupBy[KR](selector: (K, V) => KR)
(implicit serialized: Serialized[KR, V])
: TSKGroupedStream[KR, V] =
unsafe
.groupBy(selector.asKeyValueMapper, serialized)
.safe

def groupByKey(implicit serialized: Serialized[K, V])
: TSKGroupedStream[K, V] =
unsafe
.groupByKey(serialized)
.safe

def join[VO, VR](otherStream: TSKStream[K, VO],
joiner: (V, VO) => VR,
windows: JoinWindows)
(implicit joined: Joined[K, V, VO]): TSKStream[K, VR] =
unsafe.join[VO, VR](otherStream.unsafe, joiner.asValueJoiner, windows,
joined).safe
joined)

def join[VT, VR](table: TSKTable[K, VT],
joiner: (V, VT) => VR)
(implicit joined: Joined[K, V, VT]): TSKStream[K, VR] =
unsafe.join[VT, VR](table.unsafe, joiner.asValueJoiner, joined).safe
unsafe.join[VT, VR](table.unsafe, joiner.asValueJoiner, joined)

def join[GK, GV, RV](globalKTable: GlobalKTable[GK, GV],
keyValueMapper: (K, V) => GK,
joiner: (V, GV) => RV): TSKStream[K, RV] =
unsafe
.join[GK, GV, RV](globalKTable, keyValueMapper(_, _), joiner(_, _))
.safe

def leftJoin[VO, VR](otherStream: TSKStream[K, VO],
joiner: (V, VO) => VR,
Expand All @@ -78,14 +77,12 @@ class TSKStream[K, V](protected[typesafe] override val unsafe: KStream[K, V])
joiner.asValueJoiner,
windows,
joined)
.safe

def leftJoin[VT, VR](table: TSKTable[K, VT],
joiner: (V, VT) => VR)
(implicit joined: Joined[K, V, VT]): TSKStream[K, VR]
= unsafe
.leftJoin[VT, VR](table.unsafe, joiner.asValueJoiner, joined)
.safe

def leftJoin[GK, GV, RV](globalKTable: GlobalKTable[GK, GV],
keyValueMapper: (K, V) => GK,
Expand All @@ -96,7 +93,6 @@ class TSKStream[K, V](protected[typesafe] override val unsafe: KStream[K, V])
globalKTable,
keyValueMapper.asKeyValueMapper,
joiner.asValueJoiner)
.safe

def outerJoin[VO, VR](otherStream: TSKStream[K, VO],
joiner: (V, VO) => VR,
Expand All @@ -106,50 +102,48 @@ class TSKStream[K, V](protected[typesafe] override val unsafe: KStream[K, V])
unsafe
.outerJoin[VO, VR](otherStream.unsafe, joiner.asValueJoiner, windows,
joined)
.safe

def merge(stream: TSKStream[K, V]): TSKStream[K, V] =
unsafe.merge(stream.unsafe).safe
unsafe.merge(stream.unsafe)

def peek(action: (K, V) => Unit): TSKStream[K, V] =
unsafe.peek(action(_, _)).safe
unsafe.peek(action(_, _))

def split(predicate: (K, V) => Boolean)
: (TSKStream[K, V], TSKStream[K, V]) =
(filter(predicate), filterNot(predicate))

def filter(predicate: (K, V) => Boolean): TSKStream[K, V] =
unsafe.filter(predicate.asPredicate).safe
unsafe.filter(predicate.asPredicate)

def filterNot(predicate: (K, V) => Boolean): TSKStream[K, V] =
unsafe.filterNot(predicate.asPredicate).safe
unsafe.filterNot(predicate.asPredicate)

def selectKey[KR](mapper: (K, V) => KR): TSKStream[KR, V] = {
unsafe.selectKey[KR]((k: K, v: V) => mapper(k, v)).safe
unsafe.selectKey[KR]((k: K, v: V) => mapper(k, v))
}

def map[KR, VR](mapper: (K, V) => (KR, VR)): TSKStream[KR, VR] =
unsafe.map[KR, VR](mapper.asKeyValueMapper).safe
unsafe.map[KR, VR](mapper.asKeyValueMapper)

def mapValues[VR](mapper: V => VR): TSKStream[K, VR] =
unsafe.mapValues[VR](mapper.asValueMapper).safe
unsafe.mapValues[VR](mapper.asValueMapper)

def flatMap[KR, VR](mapper: (K, V) => Iterable[(KR, VR)])
: TSKStream[KR, VR] = {
val kvMapper = mapper.tupled andThen (iter => iter.map(Tuple2ToKeyValue)
.asJava)

unsafe.flatMap[KR, VR]((k, v) => kvMapper(k, v)).safe
unsafe.flatMap[KR, VR]((k, v) => kvMapper(k, v))
}

def flatMapValues[VR](mapper: V => Iterable[VR]): TSKStream[K, VR] =
unsafe.flatMapValues({
v: V => mapper(v).asJava
}.asValueMapper)
.safe

def filterValues(predicate: V => Boolean): TSKStream[K, V] =
unsafe.filter((k, v) => predicate(v)).safe
unsafe.filter((k, v) => predicate(v))

def print(printed: Printed[K, V]): Unit = unsafe.print(printed)

Expand Down Expand Up @@ -182,7 +176,6 @@ class TSKStream[K, V](protected[typesafe] override val unsafe: KStream[K, V])
}
unsafe
.transform(transformerSupplierJ, stateStoreNames: _*)
.safe
}

def transformValues[VR](valueTransformerSupplier: () => ValueTransformer[V,
Expand All @@ -192,7 +185,6 @@ class TSKStream[K, V](protected[typesafe] override val unsafe: KStream[K, V])
valueTransformerSupplier()
unsafe
.transformValues[VR](valueTransformerSupplierJ, stateStoreNames: _*)
.safe
}

def process(processorSupplier: () => Processor[K, V],
Expand All @@ -206,7 +198,6 @@ class TSKStream[K, V](protected[typesafe] override val unsafe: KStream[K, V])
(implicit produced: Produced[K, V]): TSKStream[K, V] =
unsafe
.through(topic, produced)
.safe

def to(topic: String)(implicit produced: Produced[K, V]): Unit = {
unsafe.to(topic, produced)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,12 @@ class TSKTable[K, V](protected[typesafe] override val unsafe: KTable[K, V])
: TSKGroupedTable[KR, VR] =
unsafe
.groupBy(selector.asKeyValueMapper, serialized)
.safe

def mapValues[VR](mapper: V => VR)
(implicit materialized: Materialized[K, VR, kvs])
: TSKTable[K, VR] =
unsafe
.mapValues[VR](mapper.asValueMapper, materialized)
.safe

def filterValues(predicate: V => Boolean)
(implicit materialized: Materialized[K, V, kvs])
Expand All @@ -46,52 +44,45 @@ class TSKTable[K, V](protected[typesafe] override val unsafe: KTable[K, V])
: TSKTable[K, V] =
unsafe
.filter(predicate.asPredicate, materialized)
.safe

def filterNot(predicate: (K, V) => Boolean)
(implicit materialized: Materialized[K, V, kvs])
: TSKTable[K, V] =
unsafe
.filterNot(predicate.asPredicate, materialized)
.safe

def toStream: TSKStream[K, V] = unsafe.toStream.safe
def toStream: TSKStream[K, V] = unsafe.toStream

def toStream[KR](keyMapper: (K, V) => KR): TSKStream[KR, V] =
unsafe
.toStream[KR](keyMapper.asKeyValueMapper)
.safe

def groupBy[KR, VR](selector: (K, V) => (KR, VR))
(implicit serialized: Serialized[KR, VR])
: TSKGroupedTable[KR, VR]
= unsafe
.groupBy(selector.asKeyValueMapper, serialized)
.safe

def join[VO, VR](other: TSKTable[K, VO],
joiner: (V, VO) => VR)
(implicit materialized: Materialized[K, VR, kvs])
: TSKTable[K, VR] =
unsafe
.join[VO, VR](other.unsafe, joiner.asValueJoiner, materialized)
.safe

def leftJoin[VO, VR](other: TSKTable[K, VO],
joiner: (V, VO) => VR)
(implicit materialized: Materialized[K, VR, kvs])
: TSKTable[K, VR] =
unsafe
.leftJoin[VO, VR](other.unsafe, joiner.asValueJoiner, materialized)
.safe

def outerJoin[VO, VR](other: TSKTable[K, VO],
joiner: (V, VO) => VR)
(implicit materialized: Materialized[K, VR, kvs])
: TSKTable[K, VR] =
unsafe
.outerJoin[VO, VR](other.unsafe, joiner.asValueJoiner, materialized)
.safe

def queryableStoreName: String = unsafe.queryableStoreName()

Expand Down
Loading