Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proprietary Serde type #37

Merged
merged 15 commits into from Oct 3, 2019
Merged

Proprietary Serde type #37

merged 15 commits into from Oct 3, 2019

Conversation

svroonland
Copy link
Collaborator

Implements #30

  • Serde[T] extends Serializer[T] with Deserializer[T]
  • Internal KafkaConsumer and KafkaProducer works with Array[Byte] serdes for keys and values
  • Instances for default Kafka serdes
  • Creation from existing Kafka serdes
  • Operators for effectful and non-effectful conversion of Serdes (mapping, contramapping and invariant mapping), can be used to construct new Serdes from existing ones
  • either operator on Deserializer to explicitly handle deserialization failures.

* Internal KafkaConsumer and KafkaProducer works with Array[Byte] serdes for keys and values
* Instances for default Kafka serdes
* Conversion from Kafka serdes
* Operators for effectful and non-effectful conversion of Serdes (mapping, contramapping and invariant mapping), can be used to construct new Serdes from existing ones
@iravid
Copy link
Member

iravid commented Sep 28, 2019

Wow that was pretty quick @svroonland! :-)

Here are some thoughts:

  • Parameterize the serdes with R, and allow access to the record metadata from the environment. So a Deserializer[R, T] returns a RIO[R with RecordMetadata, T]
  • What do we gain from having the Serde trait rather than just separate Serializer/Deserializers?
  • Deserializer can go up to flatMap, definitely useful

@iravid
Copy link
Member

iravid commented Sep 28, 2019

Oh and I'm really happy you got rid of K and V from the runloop 🙏🏻

@iravid
Copy link
Member

iravid commented Sep 28, 2019

Another thought: we definitely want a distinction between pure and effectful serializers/deserializers. This will let us shortcut the ZIO runloop if we're just working with pure deserializers.

@svroonland
Copy link
Collaborator Author

Wow that was pretty quick @svroonland! :-)

Yeah, I have written these codec-like things before several times. It's easy when you treat them as invariant functors (in the category of endo-functors of course (that part is a joke)) :)

  • I don't think I have yet mastered ZIO enough to see how that RecordMetaData thing would work, but would be happy to work on it with some pointers.
  • I think it's nice to have a Serde type so you can define the serializer and deserializer for a new datatype as a single unit, by doing an inmap on an existing data type for example.
  • What do you mean exactly, go up to flatMap?

Oh and I'm really happy you got rid of K and V from the runloop

Indeed!

By the way, if you have a different design or implementation in mind, don't hesitate to disregard or cherry-pick from this PR! Or just tell me what to change :)

@svroonland
Copy link
Collaborator Author

Hmm, about effectful serdes: I don't think we should want arbitrary effects in serdes, maybe we should just use Try[T] as result type instead of Task[T]..

@iravid
Copy link
Member

iravid commented Sep 28, 2019

Effectful serdes are definitely useful! E.g., running logging effects. Don't give up on them :-)

@svroonland
Copy link
Collaborator Author

I see. But perhaps its simpler to model them non-effectful and if you want to do something like logging you do that in the stream consumption?

@svroonland
Copy link
Collaborator Author

.. To elaborate on that: you could get a stream of ConsumerRecord[Array[Byte], Array[Byte]] and do your own effectful deserialization there.

@iravid
Copy link
Member

iravid commented Sep 29, 2019

Yeah, but that's just introducing another effect type where we could just use ZIO and attain simpler code.

@iravid
Copy link
Member

iravid commented Sep 29, 2019

Maybe I should rephrase. Specifying serdes as Array[Byte] => ZIO[R, Throwable, A] has two main upsides:

  1. We don't artificially constrain the user - after all, there's no reason not to allow them if we disregard our opinions of what should and shouldn't run in the serdes;
  2. We attain simpler composition of the serdes with our stream, as we don't need to convert between an intermediate representation.

As a consequence of 2, we'll be allocating less, as we're staying in ZIO and not encoding the results in an intermediate representation.

@svroonland
Copy link
Collaborator Author

Sounds fair. One use I can think of is some sort of Avro-schema lookup serializer, which needs to connect to a schema registry or something.

@svroonland
Copy link
Collaborator Author

Well, that seems to compile :)

I don't really like the Consumer.make[Any, String, String]. Since we can't infer the key and value types anyway, maybe it's better to change Consumer.make to

  def make[R, K, V](
    settings: ConsumerSettings,
    keyDeserializer: Deserializer[R, K],
    valueDeserializer: Deserializer[R, V]
)

so we can do Consumer.make(settings, Serdes.string, Serdes.string).

@svroonland svroonland marked this pull request as ready for review September 30, 2019 15:56
@iravid
Copy link
Member

iravid commented Sep 30, 2019

@svroonland I actually suggest deferring the specification of a deserializer to the partitionedStream / plainStream methods.

@svroonland
Copy link
Collaborator Author

Updated PR. If we're happy with this implementation I can start to add some docs and some tests.

*/
trait Serde[-R, T] extends Deserializer[R, T] with Serializer[R, T]

object Serde {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add a constructor that wraps Kafka's Serde?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, that's in Serdes. Why not move them here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants