Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscriber using grpc client fails on decode #541

Closed
martintupy opened this issue Jun 16, 2024 · 2 comments · Fixed by #542
Closed

Subscriber using grpc client fails on decode #541

martintupy opened this issue Jun 16, 2024 · 2 comments · Fixed by #542

Comments

@martintupy
Copy link

martintupy commented Jun 16, 2024

When a Subscriber is constructed using grcp client

PubSubSubscriber
  .grpc[IO]
  .projectId(ProjectId("..."))
  .subscription(Subscription("..."))
  .defaultUri
  .httpClient(client)
  .defaultRetry
  .noErrorHandling
  .withDefaults
  .decodeTo[String]
  .subscribe
  .evalMap { message => IO.print(message.value) *> message.ack }
  .compile
  .drain

It's then going to fail on following error. Same constructor with http, is fine.

java.lang.IllegalArgumentException: Illegal base64 character 7b
        at java.base/java.util.Base64$Decoder.decode0(Base64.java:852)
        at java.base/java.util.Base64$Decoder.decode(Base64.java:570)
        at fs2.pubsub.grpc.GrpcConstructors$.fs2$pubsub$grpc$GrpcConstructors$Client$$anon$1$$_$$anonfun$2$$anonfun$2(GrpcConstructors.scala:143)
        at scala.Option.map(Option.scala:242)
        at fs2.pubsub.grpc.GrpcConstructors$Client$$anon$1.$anonfun$2(GrpcConstructors.scala:143)
        at scala.collection.immutable.Vector2.map(Vector.scala:2140)
        at scala.collection.immutable.Vector2.map(Vector.scala:443)
        at fs2.pubsub.grpc.GrpcConstructors$.fs2$pubsub$grpc$GrpcConstructors$Client$$anon$1$$_$read$$anonfun$1(GrpcConstructors.scala:155)
        at modify @ fs2.internal.Scope.close(Scope.scala:262)
        at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
        at uncancelable @ fs2.Compiler$Target.uncancelable(Compiler.scala:165)
        at run$ @ Main$.run(Subscribe.scala:10)
        at >>$extension @ org.typelevel.keypool.KeyPool$Builder.keepRunning$1(KeyPool.scala:370)
        at run$ @ Main$.run(Subscribe.scala:10)
        at update @ fs2.internal.Scope.releaseChildScope(Scope.scala:227)
        at >>$extension @ org.typelevel.keypool.KeyPool$Builder.keepRunning$1(KeyPool.scala:370)
        at run$ @ Main$.run(Subscribe.scala:10)
        at void @ org.typelevel.keypool.KeyPool$.reap(KeyPool.scala:187)
        at main$ @ Main$.main(Subscribe.scala:10)

I suspect that PubSubMessage response in rpc protocol isn't base64 encoded, as it's not stated in rpc api docs https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PubsubMessage. Unlike REST response, which is base64 encoded https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage

Following line probably shouldn't contain base64 decoding https://github.com/permutive-engineering/fs2-pubsub/blob/main/modules/fs2-pubsub/src/main/scala-2.13%2B/fs2/pubsub/grpc/GrpcConstructors.scala#L143

@alejandrohdezma
Copy link
Contributor

Hey @martintupy, great catch! It seems we didn't catch this on testing since both subscriber & publisher were using the base 64 encoding/decoding. This would have failed if data would have been published from another source.

@alejandrohdezma
Copy link
Contributor

Hey @martintupy, this has been released in v1.1.0. Thanks for raising it!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants