Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZIO Telemetry and ZIO grpc #544

Closed
omidb opened this issue Aug 17, 2023 · 5 comments
Closed

ZIO Telemetry and ZIO grpc #544

omidb opened this issue Aug 17, 2023 · 5 comments

Comments

@omidb
Copy link

omidb commented Aug 17, 2023

Hi,

I can easily run this example form ZIO telemetry: https://zio.dev/zio-telemetry/opentelemetry-instrumentation-example
This uses ZIO http and Open Telemetry. I have created the same thing using the ZIO grpc but I'm not successful.
I don't see any spans or my services in Jaeger. Although I see it with the ZIO http example.
What am I missing?

dependencies: "dev.zio" % "zio-opentelemetry_2.13" % "3.0.0-RC17"
package zio_grpc.examples.helloworld

import io.grpc.StatusException
import io.grpc.protobuf.services.ProtoReflectionService;
import scalapb.zio_grpc.Server
import scalapb.zio_grpc.ServerMain
import scalapb.zio_grpc.ServiceList
import zio._
import zio.Console._
import io.grpc.examples.helloworld.helloworld.ZioHelloworld.{Greeter, Welcomer}
import io.grpc.examples.helloworld.helloworld.{HelloReply, HelloRequest}
import zio_grpc.examples.helloworld.userdatabase.UserDatabase
import scalapb.zio_grpc.ServerLayer

import io.grpc.Status

import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.api.trace.Tracer
import zio.telemetry.opentelemetry.context.ContextStorage
import zio.telemetry.opentelemetry.tracing.Tracing
import scalapb.zio_grpc.RequestContext
import io.grpc.StatusException
import scalapb.zio_grpc.{RequestContext, ZTransform}
import zio.stream.ZStream
object LoggingTransform {
  def live = ZLayer.fromFunction(LoggingTransform.apply _)
}
case class LoggingTransform(tracing: Tracing)
    extends ZTransform[Any, RequestContext] {

  def logCause(rc: RequestContext, cause: Cause[StatusException]): UIO[Unit] =
    printLine(rc.toString()).orDie

  def accessLog(rc: RequestContext): UIO[Unit] = {
    printLine(rc.toString()).orDie
  }

  override def effect[A](
      io: Any => ZIO[Any, StatusException, A]
  ): RequestContext => ZIO[Any, StatusException, A] = { rc =>
    try { io(rc) }
    catch {
      case t: Throwable =>
        (ZIO.logError(t.getMessage()) *> ZIO.fail(t)).mapError { t =>
          new StatusException(Status.INTERNAL)
        }

    }
  }

  override def stream[A](
      io: Any => ZStream[Any, StatusException, A]
  ): RequestContext => ZStream[Any, StatusException, A] = { rc =>
    (io(rc) ++ ZStream.fromZIO(accessLog(rc)).drain).onError(logCause(rc, _))
  }
}

case class WelcomerWithDatabase(database: UserDatabase, tracing: Tracing)
    extends Welcomer {
  import tracing.aspects._
  def welcome(
      request: HelloRequest
  ): ZIO[Any, StatusException, HelloReply] =
    printLine(request).mapError(e =>
      new StatusException(io.grpc.Status.INVALID_ARGUMENT)
    ) *> database.fetchUser(request.name).map { user =>
      HelloReply(s"Welcome ${user.name}")
    } @@ span("welcome")
}

object WelcomerWithDatabase {
  private val live = ZLayer.fromFunction(WelcomerWithDatabase.apply _)
  val layer =
    LoggingTransform.live >>> live
}

object MultipleServices extends zio.ZIOAppDefault {
  private val globalTracerLayer: TaskLayer[Tracer] =
    ZLayer.fromZIO(
      ZIO
        .attempt(
          GlobalOpenTelemetry.getTracer(
            "zio_grpc.examples.helloworld.MultipleServices"
          )
        )
        .logError("error getting tracer: ")
    )

  val serverLayer = ServerLayer.fromServiceList(
    io.grpc.ServerBuilder
      .forPort(9090)
      .addService(ProtoReflectionService.newInstance()),
    ServiceList
      .addFromEnvironment[Welcomer]
  )

  val ourApp = ZLayer.make[Server](
    serverLayer,
    UserDatabase.layer,
    WelcomerWithDatabase.layer,
    Tracing.live,
    globalTracerLayer,
    ContextStorage.openTelemetryContext
  )

  def run =
    (ourApp.build *> ZIO.never).exitCode
}

and here is the client:

package zio_grpc.examples.helloworld

import io.grpc.examples.helloworld.helloworld.ZioHelloworld.{
  GreeterClient,
  WelcomerClient
}
import io.grpc.examples.helloworld.helloworld.HelloRequest
import io.grpc.ManagedChannelBuilder
import zio.Console._
import scalapb.zio_grpc.ZManagedChannel
import zio._
import io.grpc.examples.helloworld.helloworld.ZioHelloworld

object HelloWorldClient extends zio.ZIOAppDefault {
  val clientLayer: Layer[Throwable, GreeterClient] =
    GreeterClient.live(
      ZManagedChannel(
        ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext()
      )
    )

  def myAppLogic =
    for {
      r <- GreeterClient.sayHello(HelloRequest("test")).logError("got error")
      _ <- printLine(r.message)
    } yield ()

  final def run =
    myAppLogic.provideLayer(clientLayer).exitCode
}

object WelcomerDemoClient extends zio.ZIOAppDefault {
  val clientLayer: Layer[Throwable, WelcomerClient] =
    WelcomerClient.live(
      ZManagedChannel(
        ManagedChannelBuilder.forAddress("localhost", 9090).usePlaintext()
      )
    )

  def myAppLogic =
    for {
      r <- WelcomerClient.welcome(HelloRequest("test")).logError("got error")
      _ <- printLine(r.message)
    } yield ()

  final def run =
    myAppLogic.provideLayer(clientLayer).exitCode
}

to launch them:
first this:

docker run --rm -it \
  -e COLLECTOR_OTLP_ENABLED=true \
  -p 14250:14250 \
  -p 16686:16686 \
  -p 4317:4317 \
  jaegertracing/all-in-one:1.47

and then we can launch the services.

export OTEL_AGENT_PATH=$(cs fetch --classpath "io.opentelemetry.javaagent:opentelemetry-javaagent:latest.release")
sbt -J-javaagent:$OTEL_AGENT_PATH \
    -J-Dotel.service.name=grpserver \
    -J-Dotel.traces.sampler=always_on \
    -J-Dotel.traces.exporter=otlp \
    -J-Dotel.metrics.exporter=none \
  "runMain zio_grpc.examples.helloworld.MultipleServices"

and

export OTEL_AGENT_PATH=$(cs fetch --classpath "io.opentelemetry.javaagent:opentelemetry-javaagent:latest.release")
sbt -J-javaagent:$OTEL_AGENT_PATH \
    -J-Dotel.service.name=grpclient \
    -J-Dotel.traces.sampler=always_on \
    -J-Dotel.traces.exporter=otlp \
    -J-Dotel.metrics.exporter=none \
  "runMain zio_grpc.examples.helloworld.WelcomerDemoClient"
@thesamet
Copy link
Contributor

Hi @omidb , I am not familiar with zio-telemetry. It sounds like zio-telemetry doesn't work in conjunction with zio-grpc (and not the other way around). I wouldn't expect to have resources to debug zio-telemetry itself and understand why it doesn't emit metrics - I suggest to investigate this with the zio-telemetry project.

@thesamet
Copy link
Contributor

(That's not to suggest it's not caused by something within zio-grpc problem, I'd be happy to accept a bug report/PR that specifies an unexpected behavior in terms of zio-grpc)

@omidb
Copy link
Author

omidb commented Aug 17, 2023

@thesamet I will be working with telemetry team to figure this out. I had a question though. It seems that I need to do something like this: https://github.com/open-telemetry/opentelemetry-java-instrumentation/tree/main/instrumentation/grpc-1.6/library
where would be the place to do this interception? Is there any hook in the context transformer that can help?

// For client-side, attach the interceptor to your channel builder.
void configureClientInterceptor(Opentelemetry opentelemetry, NettyChannelBuilder nettyChannelBuilder) {
  GrpcTelemetry grpcTelemetry = GrpcTelemetry.create(opentelemetry);
  nettyChannelBuilder.intercept(grpcTelemetry.newClientInterceptor());
}

// For server-side, attatch the interceptor to your service.
ServerServiceDefinition configureServerInterceptor(Opentelemetry opentelemetry, ServerServiceDefinition serviceDefinition) {
  GrpcTelemetry grpcTelemetry = GrpcTelemetry.create(opentelemetry);
  return ServiceInterceptors.intercept(serviceDefinition, grpcTelemetry.newServerInterceptor());
}

@thesamet
Copy link
Contributor

These are grpc-java inceptors, so fortunately you can use them directly without zio-grpc in the way. In the client-side, ZManagedChannel is a thin-wrapper around the ManageChannelBuilder involved in the example code you pasted, so it can be adapted to be used here:

ManagedChannelBuilder.forAddress("localhost", 9090).usePlaintext()
, similarly for the server-side.

@omidb
Copy link
Author

omidb commented Aug 18, 2023

I have a full example now that works for the future users:
on the server side:

  val openTelemetry = GlobalOpenTelemetry.get()
  val grpcTelemetry = GrpcTelemetry.create(openTelemetry)
  val intercept = GrpcTelemetry.create(openTelemetry).newServerInterceptor()
  val serverLayer = ServerLayer.fromServiceList(
    io.grpc.ServerBuilder
      .forPort(9090)
      .intercept(intercept)
      .addService(ProtoReflectionService.newInstance()),
    ServiceList
      .addFromEnvironment[Welcomer]
  )

on the client side:

  val openTelemetry = GlobalOpenTelemetry.get()
  val grpcTelemetry = GrpcTelemetry.create(openTelemetry);
  val clientLayer: Layer[Throwable, GreeterClient] =
    GreeterClient.live(
      ZManagedChannel(
        ManagedChannelBuilder
          .forAddress("localhost", 9090)
          .intercept(grpcTelemetry.newClientInterceptor())
          .usePlaintext()
      )
    )

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants