-
Notifications
You must be signed in to change notification settings - Fork 133
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Producer and Consumer services as ZLayers #128
Conversation
|
||
import zio.ZIO | ||
import zio.{ Has, ZIO } | ||
import zio._ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't need the wildcard import if you are importing the specific types you are using.
@@ -2,6 +2,7 @@ package zio.kafka.client | |||
|
|||
import java.util.concurrent.atomic.AtomicLong | |||
|
|||
import izreflect.fundamentals.reflection.Tags.Tag |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be able to go away since you already have Tagged
in scope through the wildcard import.
|
||
val testProducer: ZLayer[Kafka, Throwable, StringProducer] = | ||
(for { | ||
settings <- producerSettings.toManaged_ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a feeling that this can be written in a simpler way, without converting the settings to a Managed
and then back to Layer
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is indicating that we may want to express the dependencies of the producer as services as well:
object producer {
def live[R, K, B]: ZLayer[Kafka with Serializer[R, K] with Serializer[R, V], Throwable, Consumer] =
???
}
object KafkaTestUtils {
def testProducer: ZLayer[Kafka, Throwable, StringProducer] =
(ZLayer.identity[Kafka] ++ ZLayer.succeed(Serde.string)) >>> producer.live
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My intuition says that we shouldn't force users that want to use ZLayer
to provide everything (incl. settings and serdes) as R
, but maybe that's misguided.
At any rate, I'm fine with this being written as-is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if we had a definition of live
like the above we could then offer a method that would take the settings and series as normal parameters and lift them into services so we could provide the same user experience but have the layer be more composable internally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good solution 👍🏻
def withProducerStrings[R: Tag, A: Tag]( | ||
r: Producer.Service[Any, String, String] => RIO[R, A] | ||
): ZIO[R with StringProducer, Throwable, A] = | ||
ZLayer.fromService(r).build.use(_.get) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly here, I think I should be able to express this without going through a ZLayer
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ZIO.accessM(env => r(env.get))
?
* Flushes the producer's internal buffer. This will guarantee that all records | ||
* currently buffered will be transmitted to the broker. | ||
*/ | ||
def flush: BlockingTask[Unit] = effectBlocking(p.flush()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you want this to be part of the API? If so I think you want to make it a method on Service
otherwise you are going to be in an awkward position where it is only available on Live
instead of Service
but when you access through the environment you are just going to get Service
so you are not going to be able to call flush
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, flush should definitely be part of the API :-)
ZSink.drain.contramapM(produceChunk) | ||
} | ||
|
||
class Live[R, K, V]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final case class
?
valueSerializer: Serializer[R, V] | ||
): Live[R, K, V] = new Live(p, keySerializer, valueSerializer) | ||
|
||
def producer[R, K, V]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe call this make
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, let's keep it as make
. That's the conventional name for effectful creation of things in ZIO.
* @return Producer as a Managed resource that will be closed automatically after use. | ||
*/ | ||
def make[R, K, V]( | ||
private def createLive[R, K, V]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe inline this? You are only calling it one place in make
and it is taking significantly more space just to right the method signature than the actual implementation.
ZLayer.fromManaged( | ||
p.toManaged(p => UIO(p.close(settings.closeTimeout.asJava))) | ||
.map(producer => createLive(producer, keySerializer, valueSerializer)) | ||
) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Normally at this point you would write your environmental accessors. For example:
def produce[R, K, V](record: ProducerRecord[R, K, V]): RIO[R with Producer with Blocking, Task[RecordMetadata]] =
ZIO.accessM(_.get.produce(record))
This will dramatically improve the ergonomics of working with the producer in the environment.
ZLayer.fromManaged( | ||
p.toManaged(p => UIO(p.close(settings.closeTimeout.asJava))) | ||
.map(producer => createLive(producer, keySerializer, valueSerializer)) | ||
) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Normally at this point you would write your environmental accessors. For example:
def produce[R, K, V](record: ProducerRecord[R, K, V]): RIO[R with Producer with Blocking, Task[RecordMetadata]] =
ZIO.accessM(_.get.produce(record))
This will dramatically improve the ergonomics of working with the producer in the environment.
ZLayer.fromManaged( | ||
p.toManaged(p => UIO(p.close(settings.closeTimeout.asJava))) | ||
.map(producer => createLive(producer, keySerializer, valueSerializer)) | ||
) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Normally at this point you would write your environmental accessors. For example:
def produce[R, K, V](record: ProducerRecord[R, K, V]): RIO[R with Producer with Blocking, Task[RecordMetadata]] =
ZIO.accessM(_.get.produce(record))
This will dramatically improve the ergonomics of working with the producer in the environment.
@@ -139,24 +142,9 @@ object Producer { | |||
) | |||
} | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may not be possible given the degree of polymorphism here but are there any specialized "default" constructors that would provide sensible implementations of one or more of the parameters to make
that would make it a little easier to create the producer layer for common cases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought of providing a version that takes String serializers without dependencies, like makeSimpleStrings(settings: ProducerSettings)
, + maybe another one for Array[Byte]
.
@@ -2,8 +2,9 @@ package zio.kafka.client | |||
|
|||
import java.util.UUID | |||
|
|||
import izreflect.fundamentals.reflection.Tags.Tag |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the Tag
concept should be able to go away here as well in favor of Tagged
.
@@ -139,24 +142,9 @@ object Producer { | |||
) | |||
} | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may not be possible given the degree of polymorphism here but are there any specialized "default" constructors that would provide sensible implementations of one or more of the parameters to make
that would make it a little easier to create the producer layer for common cases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kciesielski Thank you for picking this up! Looking great so far. We just need accessors and to re-enable the tests 💪🏻
And thank you @adamgfraser for the comprehensive review! |
} | ||
)( | ||
implicit ts: Tagged[Serializer[R, K]], | ||
td: Tagged[Serializer[R, V]], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@adamgfraser I can't seem to get rid of these implicit Tagged
requirements here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great. Amazing work @kciesielski!
This PR exposes producer and consumer interfaces as ZLayers. Top-level types are:
where:
R
is the environment required for serialization/deserializationK
is the type of keysV
is the type of valuesPublic API for creating service layers:
and from this point layers can be used as dependencies of programs.
It may be worth adding some additional aliases/methods for common use cases:
R
parameter for cases when serialization and deserialization don't have dependenciesString
orArray[Byte]