Skip to content

Commit

Permalink
Merge pull request #120 from getquill/cassandra-stream-tests
Browse files Browse the repository at this point in the history
quill-cassandra: test stream source
  • Loading branch information
fwbrasil committed Jan 25, 2016
2 parents 159a0d9 + 62c0869 commit d029bef
Show file tree
Hide file tree
Showing 9 changed files with 70 additions and 54 deletions.
11 changes: 0 additions & 11 deletions README.md
Expand Up @@ -546,17 +546,6 @@ val db = io.getquill.source.cassandra.mirror.mirrorSource

Supported operations:

**consistencyLevel**
```scala
import com.datastax.driver.core.ConsistencyLevel

val q = quote {
query[Person].insert(_.age -> 10, _.name -> "John")
}
db.withConsistencyLevel(ConsistencyLevel.QUORUM).run(q)
// INSERT INTO Person (age,name) VALUES (10, 'John')
```

**allowFiltering**

```scala
Expand Down
Expand Up @@ -19,6 +19,7 @@ class CassandraAsyncSource[N <: NamingStrategy]
override type QueryResult[T] = Future[List[T]]
override type ActionResult[T] = Future[ResultSet]
override type BatchedActionResult[T] = Future[List[ResultSet]]
override type Params[T] = List[T]

def query[T](cql: String, bind: BoundStatement => BoundStatement, extractor: Row => T)(implicit ec: ExecutionContext): Future[List[T]] = {
logger.info(cql)
Expand Down
Expand Up @@ -18,6 +18,7 @@ trait CassandraSource[N <: NamingStrategy, R, S]
type QueryResult[T]
type ActionResult[T]
type BatchedActionResult[T]
type Params[T]

def run[T](
quoted: Quoted[Query[T]]): QueryResult[T] = macro CassandraSourceMacro.run[R, S]
Expand Down Expand Up @@ -45,25 +46,25 @@ trait CassandraSource[N <: NamingStrategy, R, S]
def run[T](
quoted: Quoted[Action[T]]): ActionResult[T] = macro CassandraSourceMacro.run[R, S]
def run[P1, T](
quoted: Quoted[P1 => Action[T]]): List[P1] => BatchedActionResult[T] = macro CassandraSourceMacro.run[R, S]
quoted: Quoted[P1 => Action[T]]): Params[P1] => BatchedActionResult[T] = macro CassandraSourceMacro.run[R, S]
def run[P1, P2, T](
quoted: Quoted[(P1, P2) => Action[T]]): List[(P1, P2)] => BatchedActionResult[T] = macro CassandraSourceMacro.run[R, S]
quoted: Quoted[(P1, P2) => Action[T]]): Params[(P1, P2)] => BatchedActionResult[T] = macro CassandraSourceMacro.run[R, S]
def run[P1, P2, P3, T](
quoted: Quoted[(P1, P2, P3) => Action[T]]): List[(P1, P2, P3)] => BatchedActionResult[T] = macro CassandraSourceMacro.run[R, S]
quoted: Quoted[(P1, P2, P3) => Action[T]]): Params[(P1, P2, P3)] => BatchedActionResult[T] = macro CassandraSourceMacro.run[R, S]
def run[P1, P2, P3, P4, T](
quoted: Quoted[(P1, P2, P3, P4) => Action[T]]): List[(P1, P2, P3, P4)] => BatchedActionResult[T] = macro CassandraSourceMacro.run[R, S]
quoted: Quoted[(P1, P2, P3, P4) => Action[T]]): Params[(P1, P2, P3, P4)] => BatchedActionResult[T] = macro CassandraSourceMacro.run[R, S]
def run[P1, P2, P3, P4, P5, T](
quoted: Quoted[(P1, P2, P3, P4, P5) => Action[T]]): List[(P1, P2, P3, P4, P5)] => BatchedActionResult[T] = macro CassandraSourceMacro.run[R, S]
quoted: Quoted[(P1, P2, P3, P4, P5) => Action[T]]): Params[(P1, P2, P3, P4, P5)] => BatchedActionResult[T] = macro CassandraSourceMacro.run[R, S]
def run[P1, P2, P3, P4, P5, P6, T](
quoted: Quoted[(P1, P2, P3, P4, P5, P6) => Action[T]]): List[(P1, P2, P3, P4, P5, P6)] => BatchedActionResult[T] = macro CassandraSourceMacro.run[R, S]
quoted: Quoted[(P1, P2, P3, P4, P5, P6) => Action[T]]): Params[(P1, P2, P3, P4, P5, P6)] => BatchedActionResult[T] = macro CassandraSourceMacro.run[R, S]
def run[P1, P2, P3, P4, P5, P6, P7, T](
quoted: Quoted[(P1, P2, P3, P4, P5, P6, P7) => Action[T]]): List[(P1, P2, P3, P4, P5, P6, P7)] => BatchedActionResult[T] = macro CassandraSourceMacro.run[R, S]
quoted: Quoted[(P1, P2, P3, P4, P5, P6, P7) => Action[T]]): Params[(P1, P2, P3, P4, P5, P6, P7)] => BatchedActionResult[T] = macro CassandraSourceMacro.run[R, S]
def run[P1, P2, P3, P4, P5, P6, P7, P8, T](
quoted: Quoted[(P1, P2, P3, P4, P5, P6, P7, P8) => Action[T]]): List[(P1, P2, P3, P4, P5, P6, P7, P8)] => BatchedActionResult[T] = macro CassandraSourceMacro.run[R, S]
quoted: Quoted[(P1, P2, P3, P4, P5, P6, P7, P8) => Action[T]]): Params[(P1, P2, P3, P4, P5, P6, P7, P8)] => BatchedActionResult[T] = macro CassandraSourceMacro.run[R, S]
def run[P1, P2, P3, P4, P5, P6, P7, P8, P9, T](
quoted: Quoted[(P1, P2, P3, P4, P5, P6, P7, P8, P9) => Action[T]]): List[(P1, P2, P3, P4, P5, P6, P7, P8, P9)] => BatchedActionResult[T] = macro CassandraSourceMacro.run[R, S]
quoted: Quoted[(P1, P2, P3, P4, P5, P6, P7, P8, P9) => Action[T]]): Params[(P1, P2, P3, P4, P5, P6, P7, P8, P9)] => BatchedActionResult[T] = macro CassandraSourceMacro.run[R, S]
def run[P1, P2, P3, P4, P5, P6, P7, P8, P9, P10, T](
quoted: Quoted[(P1, P2, P3, P4, P5, P6, P7, P8, P9, P10) => Action[T]]): List[(P1, P2, P3, P4, P5, P6, P7, P8, P9, P10)] => BatchedActionResult[T] = macro CassandraSourceMacro.run[R, S]
quoted: Quoted[(P1, P2, P3, P4, P5, P6, P7, P8, P9, P10) => Action[T]]): Params[(P1, P2, P3, P4, P5, P6, P7, P8, P9, P10)] => BatchedActionResult[T] = macro CassandraSourceMacro.run[R, S]

def run[T](
quoted: Quoted[T]): QueryResult[T] = macro CassandraSourceMacro.run[R, S]
Expand Down
Expand Up @@ -19,36 +19,36 @@ class CassandraStreamSource[N <: NamingStrategy]

override type QueryResult[T] = Observable[T]
override type ActionResult[T] = Observable[ResultSet]
override type BatchedActionResult[T] = Observable[List[ResultSet]]

def query[T](cql: String, bind: BoundStatement => BoundStatement, extractor: Row => T)(implicit ec: ExecutionContext): Observable[T] = {
logger.info(cql)
Observable
.fromFuture(session.executeAsync(prepare(cql, bind)))
.map(_.iterator)
.flatMap(Observable.fromIterator(_))
.map(extractor)
}

def execute(cql: String)(implicit ec: ExecutionContext): Observable[ResultSet] = {
logger.info(cql)
Observable.fromFuture(session.executeAsync(prepare(cql)))
}

def execute(cql: String, bindList: List[BoundStatement => BoundStatement])(implicit ec: ExecutionContext): Observable[List[ResultSet]] = {
logger.info(cql)
bindList match {
case Nil => Future.successful(List())
case head :: tail =>
Observable.fromFuture(session.executeAsync(prepare(cql, head))).flatMap { result =>
execute(cql, tail).map(result +: _)
}
override type BatchedActionResult[T] = Observable[ResultSet]
override type Params[T] = Observable[T]

private def logged[T](cql: String)(f: => Observable[T]) =
for {
_ <- Observable.apply(logger.info(cql))
r <- f
} yield {
r
}

def query[T](cql: String, bind: BoundStatement => BoundStatement, extractor: Row => T)(implicit ec: ExecutionContext): Observable[T] =
logged(cql) {
Observable
.fromFuture(session.executeAsync(prepare(cql, bind)))
.map(_.iterator)
.flatMap(Observable.fromIterator(_))
.map(extractor)
}

def execute(cql: String)(implicit ec: ExecutionContext): Observable[ResultSet] =
logged(cql) {
Observable.fromFuture(session.executeAsync(prepare(cql)))
}
}

def execute[T](cql: String, bindParams: T => BoundStatement => BoundStatement)(implicit ec: ExecutionContext): Observable[T] => Observable[ResultSet] =
(values: Observable[T]) =>
values.flatMap { value =>
Observable.fromFuture(session.executeAsync(prepare(cql, bindParams(value))))
logged(cql) {
Observable.fromFuture(session.executeAsync(prepare(cql, bindParams(value))))
}
}
}
Expand Up @@ -16,6 +16,7 @@ class CassandraSyncSource[N <: NamingStrategy]
override type QueryResult[T] = List[T]
override type ActionResult[T] = ResultSet
override type BatchedActionResult[T] = List[ResultSet]
override type Params[T] = List[T]

def query[T](cql: String, bind: BoundStatement => BoundStatement, extractor: Row => T): List[T] = {
logger.info(cql)
Expand Down
Expand Up @@ -15,7 +15,10 @@ object mirrorSource
with MirrorEncoders
with MirrorDecoders {

def withConsistencyLevel(level: ConsistencyLevel) = this
override type QueryResult[T] = QueryMirror[T]
override type ActionResult[T] = ActionMirror
override type BatchedActionResult[T] = BatchActionMirror
override type Params[T] = List[T]

override def close = ()

Expand All @@ -25,10 +28,6 @@ object mirrorSource
else
Success(())

type QueryResult[T] = QueryMirror[T]
type ActionResult[T] = ActionMirror
type BatchedActionResult[T] = BatchActionMirror

case class ActionMirror(cql: String)

def execute(cql: String) =
Expand Down
7 changes: 6 additions & 1 deletion quill-cassandra/src/test/resources/application.properties
Expand Up @@ -12,4 +12,9 @@ testSyncDB.session.addressTranslater=com.datastax.driver.core.policies.IdentityT
testAsyncDB.keyspace=quill_test
testAsyncDB.preparedStatementCacheSize=1000
testAsyncDB.session.contactPoint=127.0.0.1
testAsyncDB.session.queryOptions.consistencyLevel=LOCAL_QUORUM
testAsyncDB.session.queryOptions.consistencyLevel=LOCAL_QUORUM

testStreamDB.keyspace=quill_test
testStreamDB.preparedStatementCacheSize=1000
testStreamDB.session.contactPoint=127.0.0.1
testStreamDB.session.queryOptions.consistencyLevel=LOCAL_QUORUM
Expand Up @@ -2,8 +2,10 @@ package io.getquill.source.cassandra

import io.getquill._
import java.util.Date
import scala.concurrent.ExecutionContext.Implicits.global
import com.datastax.driver.core.ConsistencyLevel
import monifu.reactive.Observable
import scala.collection.mutable.ListBuffer
import scala.concurrent.Await

class EncodingSpec extends Spec {

Expand All @@ -16,6 +18,7 @@ class EncodingSpec extends Spec {
}

"async" in {
import scala.concurrent.ExecutionContext.Implicits.global
await {
for {
_ <- testAsyncDB.run(query[EncodingTestEntity].delete)
Expand All @@ -26,6 +29,21 @@ class EncodingSpec extends Spec {
}
}
}

"stream" in {
import monifu.concurrent.Implicits.globalScheduler
val result =
for {
_ <- testStreamDB.run(query[EncodingTestEntity].delete)
inserts = Observable.from(insertValues: _*)
_ <- testStreamDB.run(query[EncodingTestEntity].insert)(inserts).count
result <- testStreamDB.run(query[EncodingTestEntity])
} yield {
result
}
val f = result.foldLeft(List.empty[EncodingTestEntity])(_ :+ _).asFuture
verify(await(f).getOrElse(List()))
}
}

private def verify(result: List[EncodingTestEntity]): Unit =
Expand Down
Expand Up @@ -5,3 +5,5 @@ import io.getquill.naming.Literal
object testSyncDB extends CassandraSyncSource[Literal]

object testAsyncDB extends CassandraAsyncSource[Literal]

object testStreamDB extends CassandraStreamSource[Literal]

0 comments on commit d029bef

Please sign in to comment.