New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding Avro support capability #199
base: master
Are you sure you want to change the base?
Conversation
@samizzy It looks the build is failing. could you please check it? and it might be worth to check the DCO section that required the commits to be signed off. |
Signed-off-by: Samrat Saha <samrat.saha@zalando.de>
Signed-off-by: Samrat Saha <samrat.saha@zalando.de>
Signed-off-by: Samrat Saha <samrat.saha@zalando.de>
Signed-off-by: Samrat Saha <samrat.saha@zalando.de>
Signed-off-by: Samrat Saha <samrat.saha@zalando.de>
Signed-off-by: Samrat Saha <samrat.saha@zalando.de>
Signed-off-by: Samrat Saha <samrat.saha@zalando.de>
Signed-off-by: Samrat Saha <samrat.saha@zalando.de>
build.sbt
Outdated
"ch.qos.logback" % "logback-classic" % "1.2.7", | ||
"org.apache.avro" % "avro" % "1.11.0", | ||
"com.fasterxml.jackson.dataformat" % "jackson-dataformat-avro" % "2.13.3", | ||
"com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.13.2", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"com.fasterxml.jackson.dataformat" % "jackson-dataformat-avro" % "2.13.3", "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.13.2",
hm. given that the project is a kind of tied to circe, wondering if it worth to have new libraries for encoding decoding bundled-in.
As far as I know lots of users of this library was considering bundling circe
as questionable. and here we increase coupling even more.
@@ -108,6 +111,8 @@ libraryDependencies ++= { | |||
Seq.empty | |||
}) | |||
} | |||
// avro compiler features | |||
avroStringType := "String" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
avroStringType := "String"
should it be always a string? Are there any docs to follow?
build.sbt
Outdated
"com.iheart" %% "ficus" % "1.5.1", | ||
"com.typesafe.scala-logging" %% "scala-logging" % "3.9.4", | ||
"ch.qos.logback" % "logback-classic" % "1.2.7", | ||
"org.apache.avro" % "avro" % "1.11.0", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"org.apache.avro" % "avro" % "1.11.0",
given that in the documentation stated that there are plans to introduce more shema formats and protocols (e.g. protobuf) wondering if it is worth to split the project into several modules so that avro is not bundled-in?
Otherwise the project might become too coupled with dozens of other libraries.
@@ -0,0 +1,49 @@ | |||
{ | |||
"name": "ConsumptionBatch", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ConsumptionBatch
wondering, should it be SubscriptionEvent
?
@@ -0,0 +1,143 @@ | |||
{ | |||
"name": "PublishingBatch", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PublishingBatch
again wondering if it causes confusion with the batch name we had before avro-change?
kanadiHttpConfig: HttpConfig, | ||
http: HttpExt, | ||
materializer: Materializer) | ||
extends SubscriptionsInterface { | ||
protected val logger: LoggerTakingImplicit[FlowId] = Logger.takingImplicit[FlowId](classOf[Subscriptions]) | ||
private val baseUri_ = Uri(baseUri.toString) | ||
protected val baseUri_ = Uri(baseUri.toString) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
protected
why do we need this change?
oAuth2TokenProvider, | ||
AvroSchema(eventTypeName, etSchema.schema.asString.get, etSchema.version.get)))) | ||
|
||
Await.result(result, Duration.apply(5, TimeUnit.SECONDS)).getOrElse(throw SchemaNotFoundError(consumerSchema)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Await.result...
same comment as for the publishing case. it might be worth the user of the library to decide how to react, without introducing 5 sec parameter that might be not relevant for some of the clients at all.
@@ -12,6 +12,10 @@ final case class OtherError(error: BasicServerError) extends Exception { | |||
override def getMessage: String = s"Error from server, response is $error" | |||
} | |||
|
|||
final case class SchemaNotFoundError(schemaString: String) extends Exception { | |||
override def getMessage: String = s"Schema with value $schemaString not found on server" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of overriding getMessage
, can we just provide a parameter to Exception
?
@@ -12,6 +12,10 @@ final case class OtherError(error: BasicServerError) extends Exception { | |||
override def getMessage: String = s"Error from server, response is $error" | |||
} | |||
|
|||
final case class SchemaNotFoundError(schemaString: String) extends Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and wondering what is the reason to use Exception
here? Should it be RuntimeException
or some other common error class, related to the nakadi library?
@@ -0,0 +1,133 @@ | |||
package org.zalando.kanadi.api |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
given the complexity of the added code, it might be worth to add more comprehensive tests, given that the library might be used in critical projects.
@samizzy |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the contribution! I have left some comments as well but mostly I +1 whan @gchudnov already found. I want to highlight three main points from my side:
- Code requires decomposition of avro code from non avro. Having everything in one place complicates the code.
- Using
Await
in production code is unacceptable. This could lead to many problems and such implementation is not suitable for critical services. - Integration test should cover different failure scenarios. Not only successful publish and consumption.
-
@@ -6,3 +6,6 @@ addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.9.2 | |||
addSbtPlugin("org.scoverage" % "sbt-coveralls" % "1.3.1") | |||
addSbtPlugin("com.codecommit" % "sbt-github-actions" % "0.13.0") | |||
addSbtPlugin("com.github.sbt" % "sbt-release" % "1.1.0") | |||
addSbtPlugin("com.github.sbt" % "sbt-avro" % "3.4.0") | |||
|
|||
libraryDependencies += "org.apache.avro" % "avro-compiler" % "1.11.0" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to use the same avro version here and in build.sbt
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has been resolved in 443366a
@@ -0,0 +1,49 @@ | |||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you elaborate on file naming schema? What is 1
in the name? Is it required?
"null", | ||
"string" | ||
], | ||
"default": null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this field?
response <- http.singleRequest(request) | ||
result <- { | ||
if (response.status == StatusCodes.NotFound) { | ||
response.discardEntityBytes() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely worth it :)
@@ -98,6 +107,8 @@ object Event { | |||
} | |||
} | |||
|
|||
final case class AvroEvent[T](override val data: T, metadata: Metadata) extends Event[T](data) | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it is here because of sealed
above.
"The http server closed the connection unexpectedly before delivering responses for") => | ||
retryUnexpectedFailure(events, count, e, currentDuration) | ||
case httpServiceError: HttpServiceError | ||
if httpServiceError.httpResponse.status.intValue().toString.startsWith("5") => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or event make comparison with ServerError
class.
|
||
val envelopes = events.map { event => | ||
new Envelope(toNakadiMetadata(event.metadata, publisherSchema.schemaVersion), | ||
ByteBuffer.wrap(writer.writeValueAsBytes(event.data))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What will happen it there will be an exception thrown?
} | ||
.via(parseUserPayloadFlow) | ||
.map { | ||
case Left(error) => throw error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can not just throw an error from a stream and return Future.successful
below. This will not work and cause many unexpected side effects.
case StatusCodes.NotFound => | ||
unmarshalStringOrProblem(response.entity.withContentType(ContentTypes.`application/json`)).map { | ||
stringOrProblem => | ||
throw Subscriptions.Errors.SubscriptionNotFound(request, response, stringOrProblem) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+100
override def beforeAll = Await.result(eventsTypesClient.create(eventType), 10 seconds) | ||
|
||
override def afterAll = | ||
Await.result(eventsTypesClient.delete(eventTypeName), 10 seconds) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Such tests are not sustainable and will lead to false positives depending on the environment where they run.
btw, it seems that failing tests are related to this issue: adyach/nakadi-docker#5 |
@samizzy Instead propose to add this either to update the existing library for ZIO or create a new one and implement avro there. but the main issue with Nakadi testing remains: we need to resolve this issue: adyach/nakadi-docker#5 |
@gchudnov @vadeg I have just added a merge commit onto @samizzy PR which brings the latest changes from kanadi master (including migration from Akka to Pekko) so its in a state where it can be worked on. One additional thing that I had to add to the merge commit is having to manually convert between |
If this gets merged it should be noted that unlike the JSON format, the Avro parsing isn't streamed (should probably mention this in |
8cd2e05
to
0acd01e
Compare
So CI failed
I guess this is due to the nakadi docker image used in CI is an outdated one which doesn't support avro |
1c37273
to
d59147c
Compare
@mdedetrich |
ca8f9ea
to
700de7d
Compare
Signed-off-by: Matthew de Detrich <mdedetrich@gmail.com>
Signed-off-by: Matthew de Detrich <mdedetrich@gmail.com>
ccd908c
to
a02d1d1
Compare
This PR is part of the effort to introduce Avro capabilities to Nakadi Clients.
This PR contains modifications that introduces Avro publishing and consumption capability, kept in mind to introduce no changes to existing users of the client.