Skip to content

Commit

Permalink
Verifying C* repo read timeout handling.
Browse files Browse the repository at this point in the history
Fixes #29.
Fixes #26.

This marks the first actually useful scassandra integration. It allows
to simulate read timeouts during Cassandra repository queries, and
therefore allows to verify that the repository re-tries the queries
two times before giving up.
  • Loading branch information
manuelkiessling committed Feb 2, 2016
1 parent 9ae0efd commit c639c49
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 10 deletions.
33 changes: 28 additions & 5 deletions api/app/repositories/CassandraRepository.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package repositories

import com.datastax.driver.core.exceptions.ReadTimeoutException
import com.datastax.driver.core.querybuilder.QueryBuilder
import com.datastax.driver.core.querybuilder.QueryBuilder._
import com.datastax.driver.core.{ResultSet, Row, Session}
Expand All @@ -11,20 +12,42 @@ abstract class CassandraRepository[M <: Model, I](session: Session, tablename: S
extends Repository[M, I] {
def rowToModel(row: Row): M

def getNBySinglePartitionKeyValue(partitionKeyValue: I, n: Int): ResultSet = {
def getNBySinglePartitionKeyValue(partitionKeyValue: I, n: Int, nthTry: Int): ResultSet = {
val selectStmt =
select()
.from(tablename)
.where(QueryBuilder.eq(partitionKeyName, partitionKeyValue))
.limit(n)
.limit(n) + " /* " + nthTry + ". try */"
// We add the ' /* N. try */' CQL comment because right now it seems to
// be the only way to make stubbed cassandra priming work if we want
// to simulate "fail 2 times with read timeout, then work"

session.execute(selectStmt)
session.execute(selectStmt)
}

override def getNById(id: I, n: Int): Try[List[M]] = {
Try {
val rows = getNBySinglePartitionKeyValue(id, n).all().toList
rows.map(row => rowToModel(row))

// There's probably a very clever functional approach to
// "Try it 3 times, then finally fail", see e.g.
// http://stackoverflow.com/questions/28506466/already-existing-functional-way-for-a-retry-until-in-scala
try {
val rows = getNBySinglePartitionKeyValue(id, n, 1).all().toList
rows.map(row => rowToModel(row))
} catch {
case e: ReadTimeoutException => {
try {
val rows = getNBySinglePartitionKeyValue(id, n, 2).all().toList
rows.map(row => rowToModel(row))
} catch {
case e: ReadTimeoutException => {
val rows = getNBySinglePartitionKeyValue(id, n, 3).all().toList
rows.map(row => rowToModel(row))
}
}
}
}

}
}

Expand Down
74 changes: 69 additions & 5 deletions api/test/IntegrationWithFailingCassandraSpec.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import java.io.File

import com.journeymonitor.analyze.common.{CassandraClient, CassandraConnectionUri}
import org.scalatest.BeforeAndAfter
import org.scalatestplus.play._
import org.scassandra.cql.PrimitiveType._
import org.scassandra.http.client.PrimingRequest
Expand All @@ -22,9 +21,62 @@ class IntegrationWithFailingCassandraSpec extends PlaySpec with OneBrowserPerSui

val pc = new ScassandraServerRule().primingClient()

val query3readtimeouts = "SELECT * FROM statistics WHERE testcase_id='testcase3readtimeouts' LIMIT 2;"
val timeout = then().withResult(PrimingRequest.Result.read_request_timeout)

pc.prime(PrimingRequest.queryBuilder()
.withQuery(query3readtimeouts + " /* 1. try */")
.withThen(timeout)
.build()
)

pc.prime(PrimingRequest.queryBuilder()
.withQuery(query3readtimeouts + " /* 2. try */")
.withThen(timeout)
.build()
)

pc.prime(PrimingRequest.queryBuilder()
.withQuery(query3readtimeouts + " /* 3. try */")
.withThen(timeout)
.build()
)

val query2readtimeouts = "SELECT * FROM statistics WHERE testcase_id='testcase2readtimeouts' LIMIT 2;"

pc.prime(PrimingRequest.queryBuilder()
.withQuery("""SELECT * FROM statistics WHERE testcase_id=? LIMIT 2;""")
.withThen(then().withResult(PrimingRequest.Result.read_request_timeout))
.withQuery(query2readtimeouts + " /* 1. try */")
.withThen(timeout)
.build()
)

pc.prime(PrimingRequest.queryBuilder()
.withQuery(query2readtimeouts + " /* 2. try */")
.withThen(timeout)
.build()
)

import scala.collection.JavaConversions._ // required to map from Scala 'Any' to Java '? extends Object>'
val row = Map[String, Any](
"testresult_id" -> "testresultFoo",
"runtime_milliseconds" -> 42,
"number_of_200" -> 23,
"number_of_400" -> 4,
"number_of_500" -> 5
)

pc.prime(PrimingRequest.queryBuilder()
.withQuery(query2readtimeouts + " /* 3. try */")
.withThen(then()
.withColumnTypes(
column("testresult_id", TEXT),
column("runtime_milliseconds", INT),
column("number_of_200", INT),
column("number_of_400", INT),
column("number_of_500", INT)
)
.withRows(row)
)
.build()
)

Expand All @@ -51,10 +103,22 @@ class IntegrationWithFailingCassandraSpec extends PlaySpec with OneBrowserPerSui

"Integrated application with failing Cassandra" should {

"return an error upon encountering a Cassandra ReadTimeout" in {
go to "http://localhost:" + port + "/testcases/testcaseFoo/statistics/latest/?n=2"
"return an error upon encountering 3 Cassandra read timeouts in a row" in {
go to "http://localhost:" + port + "/testcases/testcase3readtimeouts/statistics/latest/?n=2"
pageSource mustBe """{"message":"An error occured"}"""
}

"return a result upon encountering only 2 Cassandra read timeouts in a row followed by a success" in {
go to "http://localhost:" + port + "/testcases/testcase2readtimeouts/statistics/latest/?n=2"
pageSource mustBe
"""
|[{"testresultId":"testresultFoo",
|"runtimeMilliseconds":42,
|"numberOf200":23,
|"numberOf400":4,
|"numberOf500":5}]
|""".stripMargin.replace("\n", "")
}

}
}

0 comments on commit c639c49

Please sign in to comment.