Skip to content

Commit

Permalink
Merge pull request #954 from ivantopo/issue-#798/instrument-4x-cassan…
Browse files Browse the repository at this point in the history
…dra-driver

bring tracing support for Cassandra 4.x driver, fixes #798
  • Loading branch information
SimunKaracic authored Mar 22, 2021
2 parents cf7aecb + 48d84b2 commit fb33a60
Show file tree
Hide file tree
Showing 13 changed files with 277 additions and 29 deletions.
11 changes: 1 addition & 10 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -310,16 +310,7 @@ lazy val `kamon-cassandra` = (project in file("instrumentation/kamon-cassandra")
.disablePlugins(AssemblyPlugin)
.enablePlugins(JavaAgent)
.settings(instrumentationSettings)
.settings(
libraryDependencies ++= Seq(
kanelaAgent % "provided",
"com.datastax.cassandra" % "cassandra-driver-core" % "3.6.0" % "provided",
"org.apache.cassandra" % "cassandra-all" % "3.11.2" % "provided",
scalatest % "test",
logbackClassic % "test",
"org.cassandraunit" % "cassandra-unit" % "3.11.2.0" % "test"
)
).dependsOn(`kamon-core`, `kamon-instrumentation-common`, `kamon-testkit` % "test", `kamon-executors`)
.dependsOn(`kamon-core`, `kamon-instrumentation-common`, `kamon-testkit` % "test", `kamon-executors`)

lazy val `kamon-elasticsearch` = (project in file("instrumentation/kamon-elasticsearch"))
.disablePlugins(AssemblyPlugin)
Expand Down
30 changes: 30 additions & 0 deletions instrumentation/kamon-cassandra/build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
lazy val Cassandra3xTest = config("testCas3") extend(Test)
lazy val Cassandra4xTest = config("testCas4") extend(Test)

val cassandra3xDriverVersion = "3.10.0"
val cassandra4xDriverVersion = "4.10.0"

libraryDependencies ++= Seq(
kanelaAgent % "provided",
scalatest % "test",
logbackClassic % "test",
"org.testcontainers" % "cassandra" % "1.15.2" % "test"
)

libraryDependencies ++= Seq(
"com.datastax.cassandra" % "cassandra-driver-core" % cassandra3xDriverVersion % "provided,testCas3"
)

libraryDependencies ++= Seq(
"com.datastax.oss" % "java-driver-core" % cassandra4xDriverVersion % "provided,testCas4",
"com.datastax.oss" % "java-driver-query-builder" % cassandra4xDriverVersion % "provided,testCas4"
)

configs(Cassandra3xTest, Cassandra4xTest)
inConfig(Cassandra3xTest)(Defaults.testSettings)
inConfig(Cassandra4xTest)(Defaults.testSettings)

test in Test := {
(test in Cassandra3xTest).value
(test in Cassandra4xTest).value
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ kanela {
]

within = [
"com.datastax.driver.core..*"
"com.datastax.driver.core..*",
"com.datastax.oss.driver.internal.core..*"
]
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import kanela.agent.libs.net.bytebuddy.asm.Advice

object QueryOperations {
val QueryOperationName = "cassandra.query"
val BatchOperationName = "cassandra.batch"
val QueryPrepareOperationName: String = QueryOperationName + ".prepare"
val ExecutionOperationName: String = QueryOperationName + ".execution"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,19 @@ package cassandra
package driver

import com.datastax.driver.core._
import com.datastax.oss.driver.api.core.cql
import com.datastax.oss.driver.api.core.cql.PrepareRequest
import com.datastax.oss.driver.internal.core.cql.{CqlPrepareHandler, CqlRequestHandler}
import kamon.instrumentation.cassandra.driver.DriverInstrumentation.ClusterManagerBridge
import kamon.instrumentation.cassandra.metrics.HasPoolMetrics
import kamon.instrumentation.context.HasContext.MixinWithInitializer
import kamon.instrumentation.context.HasContext
import kamon.trace.Span
import kanela.agent.api.instrumentation.InstrumentationBuilder
import kanela.agent.api.instrumentation.bridge.FieldBridge
import kanela.agent.libs.net.bytebuddy.asm.Advice

import java.util.concurrent.CompletionStage
import java.util.function.BiConsumer

class DriverInstrumentation extends InstrumentationBuilder {

Expand Down Expand Up @@ -63,10 +71,10 @@ class DriverInstrumentation extends InstrumentationBuilder {
.advise(method("onException"), OnExceptionAdvice)
.advise(method("onTimeout"), OnTimeoutAdvice)
.advise(method("onSet"), OnSetAdvice)
.mixin(classOf[MixinWithInitializer])
.mixin(classOf[HasContext.MixinWithInitializer])

onSubTypesOf("com.datastax.driver.core.Message$Response")
.mixin(classOf[MixinWithInitializer])
.mixin(classOf[HasContext.MixinWithInitializer])

onType("com.datastax.driver.core.ArrayBackedResultSet")
.advise(method("fromMessage"), OnResultSetConstruction)
Expand All @@ -76,7 +84,7 @@ class DriverInstrumentation extends InstrumentationBuilder {
* we need to carry parent-span id through result sets
*/
onType("com.datastax.driver.core.ArrayBackedResultSet$MultiPage")
.mixin(classOf[MixinWithInitializer])
.mixin(classOf[HasContext.MixinWithInitializer])
onType("com.datastax.driver.core.ArrayBackedResultSet$MultiPage")
.advise(method("queryNextPage"), OnFetchMore)

Expand All @@ -88,6 +96,17 @@ class DriverInstrumentation extends InstrumentationBuilder {
.mixin(classOf[HasPoolMetrics.Mixin])
.advise(method("setLocationInfo"), HostLocationAdvice)


/**
* Cassandra Driver 4.10 support
*/
onTypes(
"com.datastax.oss.driver.internal.core.cql.CqlPrepareHandler",
"com.datastax.oss.driver.internal.core.cql.CqlRequestHandler")
.mixin(classOf[HasContext.Mixin])
.advise(isConstructor(), OnRequestHandlerConstructorAdvice)
.advise(method("onThrottleReady"), OnThrottleReadyAdvice)

}

object DriverInstrumentation {
Expand All @@ -96,3 +115,54 @@ object DriverInstrumentation {
def getClusterName: String
}
}

object OnRequestHandlerConstructorAdvice {

@Advice.OnMethodExit()
def exit(@Advice.This requestHandler: HasContext, @Advice.Argument(0) req: Any): Unit = {
val (operationName, statement) = req match {
case pr: PrepareRequest => (QueryOperations.QueryPrepareOperationName, pr.getQuery())
case ss: cql.SimpleStatement => (QueryOperations.QueryOperationName, ss.getQuery())
case bs: cql.BoundStatement => (QueryOperations.QueryOperationName, bs.getPreparedStatement.getQuery())
case bs: cql.BatchStatement => (QueryOperations.BatchOperationName, "")
}

// Make that every case added to the "onTypes" clause for the Cassandra 4.x support
// is also handled in this match.
val resultStage: CompletionStage[_] = requestHandler match {
case cph: CqlPrepareHandler => cph.handle()
case crh: CqlRequestHandler => crh.handle()
}

val clientSpan = Kamon.clientSpanBuilder(operationName, "cassandra.driver")
.tag("db.type", "cassandra")
.tag("db.statement", statement)
.start()

/**
* We are registering a callback on the result CompletionStage because the setFinalResult and
* setFinalError methods might be called more than once on the same request handler.
*/
resultStage.whenComplete(new BiConsumer[Any, Throwable] {
override def accept(result: Any, error: Throwable): Unit = {
if(error != null) {
clientSpan
.fail(error)
.finish()
}
else {
clientSpan.finish()
}
}
})
}
}

object OnThrottleReadyAdvice {

@Advice.OnMethodEnter()
def enter(@Advice.This requestHandler: HasContext): Unit = {
val querySpan = requestHandler.context.get(Span.Key)
querySpan.mark("cassandra.throttle.ready")
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
kamon.instrumentation.cassandra {
tracing.create-round-trip-spans = yes
metrics.track-node-connection-pools = yes
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@
<root level="INFO">
<appender-ref ref="STDOUT"/>
</root>
</configuration>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ import kamon.instrumentation.cassandra.NodeConnectionPoolMetrics.NodeConnectionP
import kamon.instrumentation.executor.ExecutorMetrics
import kamon.tag.TagSet
import kamon.testkit.{InstrumentInspection, MetricInspection}
import org.cassandraunit.utils.EmbeddedCassandraServerHelper
import org.scalatest.concurrent.Eventually
import org.scalatest.time.SpanSugar
import org.scalatest.{BeforeAndAfterAll, Matchers, OptionValues, WordSpec}
import org.testcontainers.containers.CassandraContainer

class CassandraClientMetricsSpec
extends WordSpec
Expand Down Expand Up @@ -111,11 +111,12 @@ class CassandraClientMetricsSpec
}

var session: Session = _
val cassandra = new CassandraContainer("cassandra:3.11.10")

override protected def beforeAll(): Unit = {
EmbeddedCassandraServerHelper.startEmbeddedCassandra(40000L)
session = EmbeddedCassandraServerHelper.getCluster.newSession()

override protected def beforeAll(): Unit = {
cassandra.start()
session = cassandra.getCluster.newSession()
val keyspace = s"keyspaceMetricSpec"

session.execute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@ package kamon.instrumentation.instrumentation
import com.datastax.driver.core.exceptions.DriverException
import com.datastax.driver.core.querybuilder.QueryBuilder
import com.datastax.driver.core.{QueryOperations, Session}
import kamon.module.Module.Registration
import kamon.tag.Lookups._
import kamon.testkit.{InstrumentInspection, MetricInspection, Reconfigure, TestSpanReporter}
import org.cassandraunit.utils.EmbeddedCassandraServerHelper
import org.scalatest.concurrent.Eventually
import org.scalatest.time.SpanSugar
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Matchers, OptionValues, WordSpec}
import org.testcontainers.containers.CassandraContainer

import scala.collection.JavaConverters._

Expand Down Expand Up @@ -118,17 +117,17 @@ class CassandraClientTracingInstrumentationSpec
}
}

var registration: Registration = _
var session: Session = _
var session: Session = _
val cassandra = new CassandraContainer("cassandra:3.11.10")

override protected def beforeAll(): Unit = {
enableFastSpanFlushing()
sampleAlways()

val keyspace = s"keyspaceTracingSpec"

EmbeddedCassandraServerHelper.startEmbeddedCassandra(40000L)
session = EmbeddedCassandraServerHelper.getCluster.newSession()
cassandra.start()
session = cassandra.getCluster.newSession()

session.execute(
s"create keyspace $keyspace with replication = {'class':'SimpleStrategy', 'replication_factor':3}"
Expand All @@ -137,7 +136,7 @@ class CassandraClientTracingInstrumentationSpec
session.execute(s"USE $keyspace")

session.execute("create table users (id uuid primary key, name text )")
for (i <- 1 to 12) {
for (_ <- 1 to 12) {
session.execute("insert into users (id, name) values (uuid(), 'kamon')")
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
kamon.instrumentation.cassandra {
tracing.create-round-trip-spans = yes
metrics.track-node-connection-pools = yes
}
16 changes: 16 additions & 0 deletions instrumentation/kamon-cassandra/src/testCas4/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<configuration>
<statusListener class="ch.qos.logback.core.status.NopStatusListener"/>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>

<logger name="org.hyperic" level="OFF"/>
<logger name="org.cassandraunit" level="OFF"/>
<logger name="org.apache" level="OFF"/>

<root level="WARN">
<appender-ref ref="STDOUT"/>
</root>
</configuration>
Loading

0 comments on commit fb33a60

Please sign in to comment.