Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bring tracing support for Cassandra 4.x driver, fixes #798 #954

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 1 addition & 10 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -309,16 +309,7 @@ lazy val `kamon-cassandra` = (project in file("instrumentation/kamon-cassandra")
.disablePlugins(AssemblyPlugin)
.enablePlugins(JavaAgent)
.settings(instrumentationSettings)
.settings(
libraryDependencies ++= Seq(
kanelaAgent % "provided",
"com.datastax.cassandra" % "cassandra-driver-core" % "3.6.0" % "provided",
"org.apache.cassandra" % "cassandra-all" % "3.11.2" % "provided",
scalatest % "test",
logbackClassic % "test",
"org.cassandraunit" % "cassandra-unit" % "3.11.2.0" % "test"
)
).dependsOn(`kamon-core`, `kamon-instrumentation-common`, `kamon-testkit` % "test", `kamon-executors`)
.dependsOn(`kamon-core`, `kamon-instrumentation-common`, `kamon-testkit` % "test", `kamon-executors`)

lazy val `kamon-elasticsearch` = (project in file("instrumentation/kamon-elasticsearch"))
.disablePlugins(AssemblyPlugin)
Expand Down
30 changes: 30 additions & 0 deletions instrumentation/kamon-cassandra/build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
lazy val Cassandra3xTest = config("testCas3") extend(Test)
lazy val Cassandra4xTest = config("testCas4") extend(Test)

val cassandra3xDriverVersion = "3.10.0"
val cassandra4xDriverVersion = "4.10.0"

libraryDependencies ++= Seq(
kanelaAgent % "provided",
scalatest % "test",
logbackClassic % "test",
"org.testcontainers" % "cassandra" % "1.15.2" % "test"
)

libraryDependencies ++= Seq(
"com.datastax.cassandra" % "cassandra-driver-core" % cassandra3xDriverVersion % "provided,testCas3"
)

libraryDependencies ++= Seq(
"com.datastax.oss" % "java-driver-core" % cassandra4xDriverVersion % "provided,testCas4",
"com.datastax.oss" % "java-driver-query-builder" % cassandra4xDriverVersion % "provided,testCas4"
)

configs(Cassandra3xTest, Cassandra4xTest)
inConfig(Cassandra3xTest)(Defaults.testSettings)
inConfig(Cassandra4xTest)(Defaults.testSettings)

test in Test := {
(test in Cassandra3xTest).value
(test in Cassandra4xTest).value
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ kanela {
]

within = [
"com.datastax.driver.core..*"
"com.datastax.driver.core..*",
"com.datastax.oss.driver.internal.core..*"
]
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import kanela.agent.libs.net.bytebuddy.asm.Advice

object QueryOperations {
val QueryOperationName = "cassandra.query"
val BatchOperationName = "cassandra.batch"
val QueryPrepareOperationName: String = QueryOperationName + ".prepare"
val ExecutionOperationName: String = QueryOperationName + ".execution"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,19 @@ package cassandra
package driver

import com.datastax.driver.core._
import com.datastax.oss.driver.api.core.cql
import com.datastax.oss.driver.api.core.cql.PrepareRequest
import com.datastax.oss.driver.internal.core.cql.{CqlPrepareHandler, CqlRequestHandler}
import kamon.instrumentation.cassandra.driver.DriverInstrumentation.ClusterManagerBridge
import kamon.instrumentation.cassandra.metrics.HasPoolMetrics
import kamon.instrumentation.context.HasContext.MixinWithInitializer
import kamon.instrumentation.context.HasContext
import kamon.trace.Span
import kanela.agent.api.instrumentation.InstrumentationBuilder
import kanela.agent.api.instrumentation.bridge.FieldBridge
import kanela.agent.libs.net.bytebuddy.asm.Advice

import java.util.concurrent.CompletionStage
import java.util.function.BiConsumer

class DriverInstrumentation extends InstrumentationBuilder {

Expand Down Expand Up @@ -63,10 +71,10 @@ class DriverInstrumentation extends InstrumentationBuilder {
.advise(method("onException"), OnExceptionAdvice)
.advise(method("onTimeout"), OnTimeoutAdvice)
.advise(method("onSet"), OnSetAdvice)
.mixin(classOf[MixinWithInitializer])
.mixin(classOf[HasContext.MixinWithInitializer])

onSubTypesOf("com.datastax.driver.core.Message$Response")
.mixin(classOf[MixinWithInitializer])
.mixin(classOf[HasContext.MixinWithInitializer])

onType("com.datastax.driver.core.ArrayBackedResultSet")
.advise(method("fromMessage"), OnResultSetConstruction)
Expand All @@ -76,7 +84,7 @@ class DriverInstrumentation extends InstrumentationBuilder {
* we need to carry parent-span id through result sets
*/
onType("com.datastax.driver.core.ArrayBackedResultSet$MultiPage")
.mixin(classOf[MixinWithInitializer])
.mixin(classOf[HasContext.MixinWithInitializer])
onType("com.datastax.driver.core.ArrayBackedResultSet$MultiPage")
.advise(method("queryNextPage"), OnFetchMore)

Expand All @@ -88,6 +96,17 @@ class DriverInstrumentation extends InstrumentationBuilder {
.mixin(classOf[HasPoolMetrics.Mixin])
.advise(method("setLocationInfo"), HostLocationAdvice)


/**
* Cassandra Driver 4.10 support
*/
onTypes(
"com.datastax.oss.driver.internal.core.cql.CqlPrepareHandler",
"com.datastax.oss.driver.internal.core.cql.CqlRequestHandler")
.mixin(classOf[HasContext.Mixin])
.advise(isConstructor(), OnRequestHandlerConstructorAdvice)
.advise(method("onThrottleReady"), OnThrottleReadyAdvice)

}

object DriverInstrumentation {
Expand All @@ -96,3 +115,54 @@ object DriverInstrumentation {
def getClusterName: String
}
}

object OnRequestHandlerConstructorAdvice {

@Advice.OnMethodExit()
def exit(@Advice.This requestHandler: HasContext, @Advice.Argument(0) req: Any): Unit = {
val (operationName, statement) = req match {
case pr: PrepareRequest => (QueryOperations.QueryPrepareOperationName, pr.getQuery())
case ss: cql.SimpleStatement => (QueryOperations.QueryOperationName, ss.getQuery())
case bs: cql.BoundStatement => (QueryOperations.QueryOperationName, bs.getPreparedStatement.getQuery())
case bs: cql.BatchStatement => (QueryOperations.BatchOperationName, "")
}

// Make that every case added to the "onTypes" clause for the Cassandra 4.x support
// is also handled in this match.
val resultStage: CompletionStage[_] = requestHandler match {
case cph: CqlPrepareHandler => cph.handle()
case crh: CqlRequestHandler => crh.handle()
}

val clientSpan = Kamon.clientSpanBuilder(operationName, "cassandra.driver")
.tag("db.type", "cassandra")
.tag("db.statement", statement)
.start()

/**
* We are registering a callback on the result CompletionStage because the setFinalResult and
* setFinalError methods might be called more than once on the same request handler.
*/
resultStage.whenComplete(new BiConsumer[Any, Throwable] {
override def accept(result: Any, error: Throwable): Unit = {
if(error != null) {
clientSpan
.fail(error)
.finish()
}
else {
clientSpan.finish()
}
}
})
}
}

object OnThrottleReadyAdvice {

@Advice.OnMethodEnter()
def enter(@Advice.This requestHandler: HasContext): Unit = {
val querySpan = requestHandler.context.get(Span.Key)
querySpan.mark("cassandra.throttle.ready")
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
kamon.instrumentation.cassandra {
tracing.create-round-trip-spans = yes
metrics.track-node-connection-pools = yes
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@
<root level="INFO">
<appender-ref ref="STDOUT"/>
</root>
</configuration>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ import kamon.instrumentation.cassandra.NodeConnectionPoolMetrics.NodeConnectionP
import kamon.instrumentation.executor.ExecutorMetrics
import kamon.tag.TagSet
import kamon.testkit.{InstrumentInspection, MetricInspection}
import org.cassandraunit.utils.EmbeddedCassandraServerHelper
import org.scalatest.concurrent.Eventually
import org.scalatest.time.SpanSugar
import org.scalatest.{BeforeAndAfterAll, Matchers, OptionValues, WordSpec}
import org.testcontainers.containers.CassandraContainer

class CassandraClientMetricsSpec
extends WordSpec
Expand Down Expand Up @@ -111,11 +111,12 @@ class CassandraClientMetricsSpec
}

var session: Session = _
val cassandra = new CassandraContainer("cassandra:3.11.10")

override protected def beforeAll(): Unit = {
EmbeddedCassandraServerHelper.startEmbeddedCassandra(40000L)
session = EmbeddedCassandraServerHelper.getCluster.newSession()

override protected def beforeAll(): Unit = {
cassandra.start()
session = cassandra.getCluster.newSession()
val keyspace = s"keyspaceMetricSpec"

session.execute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@ package kamon.instrumentation.instrumentation
import com.datastax.driver.core.exceptions.DriverException
import com.datastax.driver.core.querybuilder.QueryBuilder
import com.datastax.driver.core.{QueryOperations, Session}
import kamon.module.Module.Registration
import kamon.tag.Lookups._
import kamon.testkit.{InstrumentInspection, MetricInspection, Reconfigure, TestSpanReporter}
import org.cassandraunit.utils.EmbeddedCassandraServerHelper
import org.scalatest.concurrent.Eventually
import org.scalatest.time.SpanSugar
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Matchers, OptionValues, WordSpec}
import org.testcontainers.containers.CassandraContainer

import scala.collection.JavaConverters._

Expand Down Expand Up @@ -118,17 +117,17 @@ class CassandraClientTracingInstrumentationSpec
}
}

var registration: Registration = _
var session: Session = _
var session: Session = _
val cassandra = new CassandraContainer("cassandra:3.11.10")

override protected def beforeAll(): Unit = {
enableFastSpanFlushing()
sampleAlways()

val keyspace = s"keyspaceTracingSpec"

EmbeddedCassandraServerHelper.startEmbeddedCassandra(40000L)
session = EmbeddedCassandraServerHelper.getCluster.newSession()
cassandra.start()
session = cassandra.getCluster.newSession()

session.execute(
s"create keyspace $keyspace with replication = {'class':'SimpleStrategy', 'replication_factor':3}"
Expand All @@ -137,7 +136,7 @@ class CassandraClientTracingInstrumentationSpec
session.execute(s"USE $keyspace")

session.execute("create table users (id uuid primary key, name text )")
for (i <- 1 to 12) {
for (_ <- 1 to 12) {
session.execute("insert into users (id, name) values (uuid(), 'kamon')")
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
kamon.instrumentation.cassandra {
tracing.create-round-trip-spans = yes
metrics.track-node-connection-pools = yes
}
16 changes: 16 additions & 0 deletions instrumentation/kamon-cassandra/src/testCas4/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<configuration>
<statusListener class="ch.qos.logback.core.status.NopStatusListener"/>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>

<logger name="org.hyperic" level="OFF"/>
<logger name="org.cassandraunit" level="OFF"/>
<logger name="org.apache" level="OFF"/>

<root level="WARN">
<appender-ref ref="STDOUT"/>
</root>
</configuration>
Loading