-
Notifications
You must be signed in to change notification settings - Fork 51
Add a typesafe-by-default implementation for kafka-streams-scala #52
Conversation
We are using this API internally in our own fork and we did catch several missing Serde instances while developing. This work reads over #46 and makes the opinionated assumption that a type safe (by default) version is more idiomatic. We also provide the implicit transformations in a pluggable fashion (see the derivations package) with a default SerdeDerivations object containing them all. |
Thanks for the PR. It will take some time to take a look at the full implementation. In the mean time some thoughts that spring to my mind after a brief round of skimming ..
Thanks again for the PR. We will take a look soon. |
Thank you for the comments.
But regarding the rest, this approach assumes you don't want to use the default serdes (by default). My reasoning is that you don't gain anything by using the default serdes because the type system should find your implicit value for the appropriate one, and if it doesn't, then it should probably not compile, because otherwise you'd be using an inappropriate serde. We can discuss this point further if you want. A different approach is the one you've been implementing, using the Perhaps[T] mechanism to allow for unexistent implicits, but such approach creates runtime exceptions where there could be compile-time errors, and I think that's the main point of divergence in our respective points of view. Another case could be made with a more heavy wrapper that could contain path-dependent types including the default serdes types, and use such default serdes only if no implicits are provided (or otherwise keep the default serde instances around and inject them as the explicit serdes), I'm not sure if my wording is clear. Something along the lines of this class AlternativeImplementationForKStreamS[K, V, DefaultK, DefaultV](inner: KStream[K, V],
defaultKSerde: Serde[DefaultK],
defaultVSerde: Serde[DefaultV]) {
def mapValues[VR](f: V => VR)
(implicit materialized: Materialized[K, VR]) = ???
def mapValues[VR](f: V => DefaultV)
(implicit ev: DefaultK =:= K) =
mapValues(f)(materializedFromSerde(defaultKSerde, defaultVSerde))
} I look forward to your comments on this. I'll push a new commit on the PR soon. Also, thanks for taking a look at this! |
You made me think that you can indeed still break my API if you provide an explicit Joined (or Materialized) instance without all the correct parameters. Maybe a However, as I demonstrate in one of the example/tests, you can always provide your own Joined/Materialized/etc instance and they will be used instead (i.e., "implicit" parameters can still be provided explicitly if you want to). And if you do, then you can break the typesafe isolation. I'm ok for now with the approach anyway, since providing implicit parameters explicitly is something you shouldn't need to do, but if you do, you should know why you're doing so. |
Also, the tests are now failing and I don't know why. They seem to pass both locally and on Travis but for some reason sbt exits with 1. |
@ssaavedra - Thanks a lot for all the effort. Like the implicit serde implementation that u have there. We will play around with your implementation and a couple of other members will also take a look. |
Sure. If you can also take a look at why the build fails that would be appreciated. |
I will take a look. |
In the test case clicksPerRegion.toStream.to(outputTopic) How can we change this to clicksPerRegion.toStream.unsafely {
_.to(outputTopic, Produced.`with`(stringSerde, scalaLongSerde))
} but looks like |
|
||
/** Conversions to keep the underlying abstraction from leaking. These allow | ||
* us to always return a TS object instead of the underlying one. | ||
* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean by leaking of the underlying abstraction ? May be I am missing something but can you please point what advantage do we get out of these implicit classes instead of using the plain old implicit conversions as in https://github.com/lightbend/kafka-streams-scala/blob/develop/src/main/scala/com/lightbend/kafka/scala/streams/ImplicitConversions.scala#L19 ?
In the implementation of TSKStream
you pass the unsafe
Java abstraction, call the relevant method on it and then again do a .map(_.safe)
. Why not return the result itself and let the implicit conversion take care of converting it into the safe version ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may be unfortunate to put that comment in the scaladoc of the implicits, since the library user can still use those to wrap their KStream
instances into TSKStream
s. With that I mean that we want the TSKStream definitions to always return a TSKType value and not a KStream (or others) directly. It's essentially the same you were already doing in KStreamS too.
What do you mean by plain old implicit conversions, using implicit def
instead of an implicit class
with a method?
I did that because it feels less error-prone to miss out a type when you change the types explicitly. In your example, if I couldn't do .map(_.safe)
, I would have to, instead, have a .map(identity)
with an implicit conversion in the result of the identity, which I find to be a more convoluted way of thinking.
I also experimented implementing the library as calls to unsafely { _.mapValues(xxx) }
, and I'm not sure of which is a better idiom.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's essentially the same you were already doing in
KStreamS
too.
I was trying to think what additional benefit TSKType
brings on us. KStreamS
is a plain wrapper over the Java instance and a plain old implicit in ImplicitConversions
does all necessary conversions. Do you see any potential unsafety in that approach that led you to come up with the implicit classes with safe
methods ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For example, take this code under consideration:
val x: KStream[K, V] = ...
implicit def KStreamToTS(instance: KStream[K, V]): TSKStream[K, V] = new TSKStream(instance)
x.mapValues((t: V) => f(t))
In the later expression, we will be using the KStream#mapValues method, but if we want to use the TSKStream#mapValues method, there is no easy way to fix that. However, by prepending .safe
you can select to apply the transformation selectively if you want it.
Keeping both APIs available from the same object makes it harder to debug problems which may arise from the implicit conversions not getting always triggered, and by separating the APIs via an intermediate .safe
step, it's easier to distinguish. Alternatively, the user could adscribe their object to the TSKStream type (e.g., (x : TSKStream[K, V]).mapValues(???)
) but that is more convoluted than adding .safe
.
In comparison with KStreamS, this wrapper is a bit less thin, as it will try to include the appropriate Serde/Produced/etc instances, so you may want to know if you are using the TS version or not because otherwise you may need to produce the serializing objects yourself, so I find it's a bit more important to distinguish whether you're using the wrapper or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For simple usage cases, such as implementing the API both methods will suffice, as you are stating the return types explicitly (and thus you are guaranteed that the conversion will happen inside the function), but if your application still uses KStreams around, it's probaly easier to keep track of which operations you are performing by adding them explicitly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the later expression, we will be using the
KStream#mapValues
method, but if we want to use theTSKStream#mapValues
method, there is no easy way to fix that. However, by prepending.safe
you can select to apply the transformation selectively if you want it.
This is a relatively rare use case and as u mentioned later type ascription can be used as well. So (x: KStreamS).mapValues
can be a good workaround with the earlier implementation.
To be honest I am still not convinced about the relative pay offs of the additional types, though, as you said above, there may be some complex use cases where they offer advantages. The question is whether we should make the implementation complex enough to cater to some of the very uncommon use cases. Also can we qualify some of those use cases where this may be useful ?
@seglo, @deanwampler - what do you think ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think using type ascription to hint to use the implicit conversion is something any end user with basic OO knowledge will understand. If the call to safe
is simply facilitating this conversion then I don't see a lot of value add.
I don't know how common the use case will be to do this conversion in user application code. Both the existing kafka-streams-scala implementation, and this typesafe alternative, provide explicit ways to construct the stream so you wouldn't need to worry about the base types in most circumstances, but obviously having the option to cast to and back is a requirement for compatibility with the rest of the kafka streams ecosystem.
This is just another alternate opinion to consider of many already present in this PR, but in keeping with the original intent of this library I think the tie breaker is that we should favour a thin abstraction over a heavier abstraction when they more or less satisfy the same requirement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've set the change on another branch to use an implicit def
instead of an implicit class
, but see that in https://github.com/openshine/kafka-streams-scala/pull/1/files#diff-dfd6a81a0b5cd3214dc0e71d172fa824L36 we can no longer say .map(_.safe)
and in order to resolve the type application we have to call the explicit type converter (or otherwise we would have to adscribe the element to the type much more verbosely, like .map(s => s : TSKStream[K, V])
.
: Joined[K, V, VO] = { | ||
Joined.`with`(key, value, value2) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this allow us to get a Joined
by specifying 2 of the serdes while the third one is picked from config ? We can do that with unsafely
by explicitly specifying the Joined
parameter. But can we do this through this implicit ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure I understand this question.
This implicit definition will generate a Joined[K, V, VO] instance that holds the three appropriate Serde if there is one for each K, V, VO type. More convolutely said, this implicit def
provides a proof for instantiation of a Joined[K, V, VO] instance upon witnessing an instance of each Serde[K]
, Serde[V]
and Serde[VO]
.
If you don't provide a witness for some of the Serde values, this definition will not be considered.
But you can always create your own Joined instance using the Joined constructor, this definition just provides a sane way for automatically generating such an instance. But if you wish otherwise, you can always provide your own Joined, even if you can implicitly have another by providing it explicitly to the underlying call as I demonstrate on https://github.com/lightbend/kafka-streams-scala/pull/52/files/19400f78f4ae80adae946e266ea6d19c96b150f4#diff-ed482980f14897992a99a62b8bfe9310R119
You can always provide explicitly an implicit value at a function call site. But this implicit def
only generates Joined instances that are type checked against all their parameters.
If you don't want that to happen and you wish lesser type constraints you can avoid importing it into your scope (as you see, the modularity in this file allows you to construct your DIY-trait with the implicits you may wish to provide while forcing others to be used at call time or provided as implicit vals on the code).
with defaultSerdes | ||
|
||
object Default extends Default | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is what I was referring to in the comment before this one. I create a Default
object with the Default
trait (so you can do import Default._
instead of extends Default
on your Kafka Streams graph builder class and thus avoid exporting symbols on that object). But the user can choose to implement their own object MyTraits extends serialized with produced with consumed with defaultSerdes
and provide their own ways of building Materialized
or Joined
instances (for example).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool pattern 👍
def unsafely[K1, V1, UnsafeDst[_, _], SafeDst[_, _]] | ||
(transformer: KType[K, V] => UnsafeDst[K1, V1]) | ||
(implicit wrap: ConverterToTypeSafer[UnsafeDst, SafeDst]) | ||
: SafeDst[K1, V1] = wrap.safe(transformer(inner.unsafe)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this unsafely
work for an underlying function like KStream#to
which returns a Unit
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. The type signature is a bit convoluted, but from the transformer: KType[K, V] => UnsafeDst[K1, V1]
you see that this is different from the return type of unsafely
, namely SafeDst[K1, V1]
.
In this case, unsafely
expects to have an instance of ConverterToTypeSafer[UnsafeDst, SafeDst]
which means that there must be a conversion from UnsafeDst to SafeDst available.
I tried to convey this in the Scaladoc but I may have failed, if you have a better wording for this it's much welcome :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this mean we cannot use the default serdes for methods like KStream#to
which returns a Unit
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For that you will have to use unsafelyNoWrap
, since I thought of unsafely
as a "chaining" operator, which would still return an object to operate with. I like to keep unsafely
because it already provides the wrap back to a TSKType object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May be I was missing something and could not get through. Will u please change one of the KStream#to
calls to use this feature of using default config ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand what you mean here. You want to expose in this API a TSKStream.toWithDefautSerdes(topic: String)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can already do
stream.unsafelyNoWrap(_.to(topic))
The travis failure is due to memory constraints. The kafka local server hogs memory and with the setting of |
Yes, Maybe it's not as clear from the scaladoc as it should, but the idea is that with |
The KafkaLocalServer is quite a memory problem, but between tests the old server should get destroyed and the memory shouldn't increase as much. Maybe we should try to force a gc after the tests. Otherwise, we could also drop the actual kafka server and rely on a mocked version such as https://github.com/jpzk/mockedstreams. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR provides an alternate implementation of the existing implicit serdes implementation. I think both implementations are valid, but have very different opinions:
Initial library opinion: Keep the library as lightweight as possible, but give the end user a way to be a little more idiomatic by accepting an implicit serdes if in scope, otherwise fallback to the default serialization config.
PR opinion: Explicit about compile time type safety and enforce that end users always have serialization types in scope. Do not allow the possibility of using default serialization config unless they explicitly return to using base types.
At this time I don't have a strong preference over which implementation to use, but I do think we should ultimately decide to pick one of them. If we include both implementations then we effectively double the surface area of this library which makes the choice of which to use more ambiguous for the end user. It will also become more of a burden to maintain the library.
|
||
/** Conversions to keep the underlying abstraction from leaking. These allow | ||
* us to always return a TS object instead of the underlying one. | ||
* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think using type ascription to hint to use the implicit conversion is something any end user with basic OO knowledge will understand. If the call to safe
is simply facilitating this conversion then I don't see a lot of value add.
I don't know how common the use case will be to do this conversion in user application code. Both the existing kafka-streams-scala implementation, and this typesafe alternative, provide explicit ways to construct the stream so you wouldn't need to worry about the base types in most circumstances, but obviously having the option to cast to and back is a requirement for compatibility with the rest of the kafka streams ecosystem.
This is just another alternate opinion to consider of many already present in this PR, but in keeping with the original intent of this library I think the tie breaker is that we should favour a thin abstraction over a heavier abstraction when they more or less satisfy the same requirement.
|
||
/** A base type for all base stream classes in the typesafe API. | ||
* | ||
* @author Santiago Saavedra (ssaavedra@openshine.com) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know we don't have a contribution policy defined for this project, but in general we shouldn't need author attributions in the codebase because the community will inherit its maintenance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand that (actually it was an automatic comment left over from the IDE) but if you want contributors to follow any policy, there should be one first :)
with defaultSerdes | ||
|
||
object Default extends Default | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool pattern 👍
@@ -0,0 +1,161 @@ | |||
/** | |||
* Copyright (C) 2018 OpenShine SL <https://www.openshine.com> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think the copyright attributions are necessary, unless the intention is to release a fork of this library by OpenShine, in which case they should live there.
This enables easier migration to typesafe Kafka Streams.
scala.Long and scala.Double Serde instances were being written in uppercase for no reason.
According to SIP-15 both are equivalent and this reduces the noise in the file.
Instead of providing it implicitly, we test that we can plug our own Joined instances. This is non-typesafe, but as the user is explicitly putting their values we may assume that's expected behavior at this point. However, a better abstraction could be put into place (e.g., having our own Joined wrapper that disallows the wildcard type behaviors).
Also, when producing the other example, I realised that better "sugar" for creating the Serializer/Deserializer instances in Scala was interesting to make it easier to write for users. Do you think something like this should be included in the PR? |
From a sugar point of view, this looks good to me. So 👍 on that .. But it all depends how much we want to abstract over the underlying Java library. We started with the intention of not adding enough sugar so that the layer doesn't seem too unfamiliar to Java programmers. Now that we are talking typeclasses, this makes perfect sense to me .. |
package com.lightbend.kafka.scala.streams.typesafe.util | ||
|
||
import org.apache.kafka.common.serialization.Serde | ||
import org.apache.kafka.streams.kstream.{Materialized => Base} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would rather name it JMaterialized
or JavaMaterialized
?
The issue is I had to look at this import to understand what Base
is.
|
||
def aggregate[VR](initializer: => VR, | ||
aggregator: (K, V, VR) => VR) | ||
(implicit materialized: Materialized[K, VR, wsb]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand which Materialized
will it resolve here and what will be the name of the state store?
Is it a good practice not to give an explicit state store name? As far as I know Kafka Streams will give a <application id>-<operator name>-<suffix>
if the materialized is not given, but here it all depends on the resolved Materialized
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This question is answered by https://github.com/lightbend/kafka-streams-scala/pull/52/files#diff-3ff58d9ba68308f7de806461036e3449R67
Thanks
package object typesafe { | ||
private[typesafe] type kvs = KeyValueStore[Bytes, Array[Byte]] | ||
private[typesafe] type ssb = SessionStore[Bytes, Array[Byte]] | ||
private[typesafe] type wsb = WindowStore[Bytes, Array[Byte]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Types should be Pascal case. Any special reason why it's all lowercase here?
*/ | ||
class TSKGroupedStream[K, V] | ||
(protected[typesafe] override val unsafe: KGroupedStream[K, V]) | ||
extends AnyVal with TSKType[KGroupedStream, K, V] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is a great to extends AnyVal
here since this can lead to a lot of boxing and unboxing.
See: https://failex.blogspot.co.uk/2017/04/the-high-cost-of-anyval-subclasses.html
*/ | ||
class TSKGroupedTable[K, V] | ||
(protected[typesafe] override val unsafe: KGroupedTable[K, V]) | ||
extends AnyVal with TSKType[KGroupedTable, K, V] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above:
I don't think this is a great to extends AnyVal
here since this can lead to a lot of boxing and unboxing.
See: https://failex.blogspot.co.uk/2017/04/the-high-cost-of-anyval-subclasses.html
* operations and every serde is implicitly provided. | ||
*/ | ||
class TSKStream[K, V](protected[typesafe] override val unsafe: KStream[K, V]) | ||
extends AnyVal with TSKType[KStream, K, V] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above:
I don't think this is a great to extends AnyVal
here since this can lead to a lot of boxing and unboxing.
See: https://failex.blogspot.co.uk/2017/04/the-high-cost-of-anyval-subclasses.html
import org.apache.kafka.streams.{Consumed, StreamsBuilder} | ||
|
||
class TSKTable[K, V](protected[typesafe] override val unsafe: KTable[K, V]) | ||
extends AnyVal with TSKType[KTable, K, V] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above:
I don't think this is a great to extends AnyVal
here since this can lead to a lot of boxing and unboxing.
See: https://failex.blogspot.co.uk/2017/04/the-high-cost-of-anyval-subclasses.html
|
||
class TSSessionWindowedKStream[K, V] | ||
(protected[typesafe] override val unsafe: SessionWindowedKStream[K, V]) | ||
extends AnyVal with TSKType[SessionWindowedKStream, K, V] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above:
I don't think this is a great to extends AnyVal
here since this can lead to a lot of boxing and unboxing.
See: https://failex.blogspot.co.uk/2017/04/the-high-cost-of-anyval-subclasses.html
|
||
class TSTimeWindowedKStream[K, V] | ||
(protected[typesafe] override val unsafe: TimeWindowedKStream[K, V]) | ||
extends AnyVal with TSKType[TimeWindowedKStream, K, V] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above:
I don't think this is a great to extends AnyVal
here since this can lead to a lot of boxing and unboxing.
See: https://failex.blogspot.co.uk/2017/04/the-high-cost-of-anyval-subclasses.html
*/ | ||
package object derivations { | ||
|
||
trait materialized extends Any { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Types should be Pascal case. Any special reason why it's all lowercase here?
Sorry, I have been on a holiday and after coming back I couldn't get up to this until today. What has happened to this initiative? |
@ssaavedra Have a look at the top of the readme:
Your PR probably needs to be done on the Kafka repo directly https://github.com/apache/kafka |
Enable a type safer API for kafka-streams.
This PR creates new abstractions (named
TS-
from the base objects in the underlying library) which enable a more type safe abstraction than the provided default.This does not use the Perhaps pattern and requires some implicit value to be always present in order to call the methods, which ensures no compile-time errors are transformed into runtime exceptions where avoidable.
You can still perform unsafe operations by calling the
.unsafely
handler, which will perform your operation in the underlying data type and still wrap it into the type safe operator when finished, so if you import the UnsafelyWrapper implicit class you can still make use of the default Serdes, but you can't by default so you will catch serialization errors unless you explicitly request to avoid so.