Skip to content

Commit

Permalink
Use fetchSize, return next page instead of the current one
Browse files Browse the repository at this point in the history
FMT
  • Loading branch information
dionysios-ntaouros committed Sep 6, 2022
1 parent 73fdfbf commit f952ec6
Showing 1 changed file with 49 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import io.getquill.context.ExecutionInfo
import io.getquill.context.cassandra.CqlIdiom
import io.getquill.context.monix.MonixContext
import io.getquill.util.{ ContextLogger, LoadConfig }
import io.getquill.context.cassandra.util.FutureConversions._
import monix.eval.Task
import monix.execution.Scheduler
import monix.reactive.Observable

import scala.compat.java8.FutureConverters._
import scala.jdk.CollectionConverters._
import scala.util.{ Failure, Success }
Expand All @@ -19,8 +19,7 @@ class CassandraMonixContext[+N <: NamingStrategy](
naming: N,
session: CqlSession,
preparedStatementCacheSize: Long
)
extends CassandraCqlSessionContext[N](naming, session, preparedStatementCacheSize)
) extends CassandraCqlSessionContext[N](naming, session, preparedStatementCacheSize)
with MonixContext[CqlIdiom, N] {

def this(naming: N, config: CassandraContextConfig) = this(naming, config.session, config.preparedStatementCacheSize)
Expand All @@ -41,28 +40,44 @@ class CassandraMonixContext[+N <: NamingStrategy](
Task.defer {
val page = rs.currentPage().asScala
if (rs.hasMorePages)
Task.from(rs.fetchNextPage().toCompletableFuture.toScala).map(_ => page)
Task
.from(rs.fetchNextPage().toCompletableFuture.toScala)
.map(_.currentPage().asScala)
else
Task.now(page)
}

def streamQuery[T](fetchSize: Option[Int], cql: String, prepare: Prepare = identityPrepare, extractor: Extractor[T] = identityExtractor)(info: ExecutionInfo, dc: Runner): Observable[T] = {
def streamQuery[T](
fetchSize: Option[Int],
cql: String,
prepare: Prepare = identityPrepare,
extractor: Extractor[T] = identityExtractor
)(info: ExecutionInfo, dc: Runner): Observable[T] = {
Observable
.fromTask(prepareRowAndLog(cql, prepare))
.fromTask(prepareRowAndLog(cql, prepare, fetchSize))
.mapEvalF(p => session.executeAsync(p).toScala)
.flatMap(Observable.fromAsyncStateAction((rs: AsyncResultSet) => page(rs).map((_, rs)))(_))
.flatMap(
Observable.fromAsyncStateAction((rs: AsyncResultSet) => page(rs).map((_, rs)))(_)
)
.takeWhile(_.nonEmpty)
.flatMap(Observable.fromIterable)
.map(row => extractor(row, this))
}

def executeQuery[T](cql: String, prepare: Prepare = identityPrepare, extractor: Extractor[T] = identityExtractor)(info: ExecutionInfo, dc: Runner): Task[List[T]] = {
streamQuery[T](None, cql, prepare, extractor)(info, dc)
.foldLeftL(List[T]())({ case (l, r) => r +: l }).map(_.reverse)
.foldLeftL(List[T]())({ case (l, r) => r +: l })
.map(_.reverse)
}

def executeQuerySingle[T](cql: String, prepare: Prepare = identityPrepare, extractor: Extractor[T] = identityExtractor)(info: ExecutionInfo, dc: Runner): Task[T] =
executeQuery(cql, prepare, extractor)(info, dc).map(handleSingleResult(cql, _))
def executeQuerySingle[T](
cql: String,
prepare: Prepare = identityPrepare,
extractor: Extractor[T] = identityExtractor
)(info: ExecutionInfo, dc: Runner): Task[T] =
executeQuery(cql, prepare, extractor)(info, dc).map(
handleSingleResult(cql, _)
)

def executeAction(cql: String, prepare: Prepare = identityPrepare)(info: ExecutionInfo, dc: Runner): Task[Unit] = {
prepareRowAndLog(cql, prepare)
Expand All @@ -71,19 +86,34 @@ class CassandraMonixContext[+N <: NamingStrategy](
}

def executeBatchAction(groups: List[BatchGroup])(info: ExecutionInfo, dc: Runner): Task[Unit] =
Observable.fromIterable(groups).flatMap {
case BatchGroup(cql, prepare) =>
Observable.fromIterable(prepare)
.flatMap(prep => Observable.fromTask(executeAction(cql, prep)(info, dc)))
.map(_ => ())
}.completedL
Observable
.fromIterable(groups)
.flatMap {
case BatchGroup(cql, prepare) =>
Observable
.fromIterable(prepare)
.flatMap(prep => Observable.fromTask(executeAction(cql, prep)(info, dc)))
.map(_ => ())
}
.completedL

private def prepareRowAndLog(cql: String, prepare: Prepare = identityPrepare): Task[PrepareRow] = {
private def prepareRowAndLog(
cql: String,
prepare: Prepare = identityPrepare,
fetchSize: Option[Int] = None
): Task[PrepareRow] = {
Task.async0[PrepareRow] { (scheduler, callback) =>
implicit val executor: Scheduler = scheduler

super.prepareAsync(cql)
.map(row => prepare(row, this))
super
.prepareAsync(cql)
.map { row =>
val rowWithPageSize = fetchSize match {
case Some(size) => row.setPageSize(size)
case None => row
}
prepare(rowWithPageSize, this)
}
.onComplete {
case Success((params, bs)) =>
logger.logQuery(cql, params)
Expand Down

0 comments on commit f952ec6

Please sign in to comment.