From b4a9e24ec48537e054d8b68945e21e70bfc7a501 Mon Sep 17 00:00:00 2001 From: deusaquilus Date: Thu, 7 Feb 2019 01:48:38 -0500 Subject: [PATCH] Refactoring Cassandra for more modularity --- .../io/getquill/CassandraAsyncContext.scala | 61 ++++++-------- .../io/getquill/CassandraSyncContext.scala | 29 +++++-- .../cassandra/CassandraContextEffect.scala | 29 +++++++ .../CassandraFutureContextEffect.scala | 26 ++++++ .../cassandra/CassandraSessionContext.scala | 82 +++++++++++++++---- .../cassandra/PrepareStatementCache.scala | 21 +++-- .../io/getquill/context/ContextEffect.scala | 10 +++ .../getquill/context/TranslateContext.scala | 1 + 8 files changed, 190 insertions(+), 69 deletions(-) create mode 100644 quill-cassandra/src/main/scala/io/getquill/context/cassandra/CassandraContextEffect.scala create mode 100644 quill-cassandra/src/main/scala/io/getquill/context/cassandra/CassandraFutureContextEffect.scala diff --git a/quill-cassandra/src/main/scala/io/getquill/CassandraAsyncContext.scala b/quill-cassandra/src/main/scala/io/getquill/CassandraAsyncContext.scala index 58c106950b..f5aa95a273 100644 --- a/quill-cassandra/src/main/scala/io/getquill/CassandraAsyncContext.scala +++ b/quill-cassandra/src/main/scala/io/getquill/CassandraAsyncContext.scala @@ -1,22 +1,20 @@ package io.getquill -import scala.concurrent.ExecutionContext -import scala.concurrent.Future -import io.getquill.context.cassandra.util.FutureConversions.toScalaFuture -import io.getquill.util.{ ContextLogger, LoadConfig } -import com.typesafe.config.Config -import scala.collection.JavaConverters._ -import io.getquill.context.cassandra.CassandraSessionContext import com.datastax.driver.core.Cluster +import com.typesafe.config.Config +import io.getquill.context.cassandra.{CassandraFutureContextEffect, CassandraSessionContext} import io.getquill.monad.ScalaFutureIOMonad +import io.getquill.util.{ContextLogger, LoadConfig} + +import scala.concurrent.{ExecutionContext, Future} class CassandraAsyncContext[N <: NamingStrategy]( - naming: N, - cluster: Cluster, - keyspace: String, - preparedStatementCacheSize: Long + val naming: N, + val cluster: Cluster, + val keyspace: String, + val preparedStatementCacheSize: Long ) - extends CassandraSessionContext[N](naming, cluster, keyspace, preparedStatementCacheSize) + extends CassandraSessionContext[N] with ScalaFutureIOMonad { def this(naming: N, config: CassandraContextConfig) = this(naming, config.cluster, config.keyspace, config.preparedStatementCacheSize) @@ -25,41 +23,30 @@ class CassandraAsyncContext[N <: NamingStrategy]( private val logger = ContextLogger(classOf[CassandraAsyncContext[_]]) + override type RunContext = ExecutionContext + override type Completed = Unit override type Result[T] = Future[T] override type RunQueryResult[T] = List[T] override type RunQuerySingleResult[T] = T - override type RunActionResult = Unit - override type RunBatchActionResult = Unit + override def complete: Unit = () + + override protected val effect = new CassandraFutureContextEffect override def performIO[T](io: IO[T, _], transactional: Boolean = false)(implicit ec: ExecutionContext): Result[T] = { if (transactional) logger.underlying.warn("Cassandra doesn't support transactions, ignoring `io.transactional`") super.performIO(io) } - def executeQuery[T](cql: String, prepare: Prepare = identityPrepare, extractor: Extractor[T] = identityExtractor)(implicit ec: ExecutionContext): Future[List[T]] = - this.prepareAsync(cql).map(prepare).flatMap { - case (params, bs) => - logger.logQuery(cql, params) - session.executeAsync(bs) - .map(_.all.asScala.toList.map(extractor)) - } + override def executeQuery[T](cql: String, prepare: Prepare = identityPrepare, extractor: Extractor[T] = identityExtractor)(implicit ec: ExecutionContext): Future[List[T]] = + super.executeQuery(cql, prepare, extractor) - def executeQuerySingle[T](cql: String, prepare: Prepare = identityPrepare, extractor: Extractor[T] = identityExtractor)(implicit ec: ExecutionContext): Future[T] = - executeQuery(cql, prepare, extractor).map(handleSingleResult) + override def executeQuerySingle[T](cql: String, prepare: Prepare = identityPrepare, extractor: Extractor[T] = identityExtractor)(implicit ec: ExecutionContext): Future[T] = + super.executeQuerySingle(cql, prepare, extractor) - def executeAction[T](cql: String, prepare: Prepare = identityPrepare)(implicit ec: ExecutionContext): Future[Unit] = { - this.prepareAsync(cql).map(prepare).flatMap { - case (params, bs) => - logger.logQuery(cql, params) - session.executeAsync(bs).map(_ => ()) - } - } + override def executeAction[T](cql: String, prepare: Prepare = identityPrepare)(implicit ec: ExecutionContext): Future[Unit] = + super.executeAction(cql, prepare) + + override def executeBatchAction(groups: List[BatchGroup])(implicit ec: ExecutionContext): Future[Unit] = + super.executeBatchAction(groups) - def executeBatchAction(groups: List[BatchGroup])(implicit ec: ExecutionContext): Future[Unit] = - Future.sequence { - groups.flatMap { - case BatchGroup(cql, prepare) => - prepare.map(executeAction(cql, _)) - } - }.map(_ => ()) } diff --git a/quill-cassandra/src/main/scala/io/getquill/CassandraSyncContext.scala b/quill-cassandra/src/main/scala/io/getquill/CassandraSyncContext.scala index f9d9b0708b..b541393bce 100644 --- a/quill-cassandra/src/main/scala/io/getquill/CassandraSyncContext.scala +++ b/quill-cassandra/src/main/scala/io/getquill/CassandraSyncContext.scala @@ -1,19 +1,20 @@ package io.getquill import com.typesafe.config.Config -import io.getquill.util.{ ContextLogger, LoadConfig } -import io.getquill.context.cassandra.CassandraSessionContext +import io.getquill.util.{ContextLogger, LoadConfig} +import io.getquill.context.cassandra.{CassandraContextEffect, CassandraSessionContext} + import scala.collection.JavaConverters._ import com.datastax.driver.core.Cluster import io.getquill.monad.SyncIOMonad class CassandraSyncContext[N <: NamingStrategy]( - naming: N, - cluster: Cluster, - keyspace: String, - preparedStatementCacheSize: Long + val naming: N, + val cluster: Cluster, + val keyspace: String, + val preparedStatementCacheSize: Long ) - extends CassandraSessionContext[N](naming, cluster, keyspace, preparedStatementCacheSize) + extends CassandraSessionContext[N] with SyncIOMonad { def this(naming: N, config: CassandraContextConfig) = this(naming, config.cluster, config.keyspace, config.preparedStatementCacheSize) @@ -27,6 +28,20 @@ class CassandraSyncContext[N <: NamingStrategy]( override type RunQuerySingleResult[T] = T override type RunActionResult = Unit override type RunBatchActionResult = Unit + override type Completed = Unit + override type RunContext = Unit + + override def complete: Unit = () + + // TODO Not using this, as a dummy for now. Maybe will to use a sync context + override protected val effect = new CassandraContextEffect[Result, RunContext] { + override val executionContext: RunContext = () + override def withContextActions: WithContextActions = ??? + override def wrap[T](t: => T): T = ??? + override def push[A, B](result: A)(f: A => B): B = ??? + override def flatPush[A, B](result: A)(f: A => B): B = ??? + override def seq[A, B](f: List[A]): List[A] = ??? + } override def performIO[T](io: IO[T, _], transactional: Boolean = false): Result[T] = { if (transactional) logger.underlying.warn("Cassandra doesn't support transactions, ignoring `io.transactional`") diff --git a/quill-cassandra/src/main/scala/io/getquill/context/cassandra/CassandraContextEffect.scala b/quill-cassandra/src/main/scala/io/getquill/context/cassandra/CassandraContextEffect.scala new file mode 100644 index 0000000000..67a61c6062 --- /dev/null +++ b/quill-cassandra/src/main/scala/io/getquill/context/cassandra/CassandraContextEffect.scala @@ -0,0 +1,29 @@ +package io.getquill.context.cassandra + +import com.google.common.util.concurrent.ListenableFuture +import io.getquill.context.ContextEffect + +import scala.util.Try +import scala.language.higherKinds + +trait CassandraContextEffect[F[_], EC] extends ContextEffect[F] { self => + + object ImplicitsWithContext { + implicit class ResultTypeOps[A](result: F[A]) { + def map[B](f: A => B)(implicit executionContext: EC) = withContextActions.push(result)(f) + def flatMap[B](f: A => F[B])(implicit executionContext: EC) = withContextActions.flatPush(result)(f) + } + } + + val executionContext: EC + def withContextActions: WithContextActions + + trait WithContextActions { + def wrapListenableFuture[T](listenableFuture: ListenableFuture[T])(implicit executionContext: EC): F[T] + def tryAndThen[T, U](t: => F[T])(pf: PartialFunction[Try[T], U])(implicit executionContext: EC): F[T] + def wrap[T](t: => T)(implicit executionContext: EC): F[T] + def push[A, B](result: F[A])(f: A => B)(implicit executionContext: EC): F[B] + def flatPush[A, B](result: F[A])(f: A => F[B])(implicit executionContext: EC): F[B] + def seq[A, B](f: List[F[A]])(implicit executionContext: EC): F[List[A]] + } +} diff --git a/quill-cassandra/src/main/scala/io/getquill/context/cassandra/CassandraFutureContextEffect.scala b/quill-cassandra/src/main/scala/io/getquill/context/cassandra/CassandraFutureContextEffect.scala new file mode 100644 index 0000000000..9b6756ee59 --- /dev/null +++ b/quill-cassandra/src/main/scala/io/getquill/context/cassandra/CassandraFutureContextEffect.scala @@ -0,0 +1,26 @@ +package io.getquill.context.cassandra + +import com.google.common.util.concurrent.ListenableFuture + +import scala.concurrent.{ExecutionContext, Future} +import scala.util.Try + +// TODO Need to implement +class CassandraFutureContextEffect extends CassandraContextEffect[Future, ExecutionContext] { + + override def wrap[T](t: => T): Future[T] = ??? + override def push[A, B](result: Future[A])(f: A => B): Future[B] = ??? + override def flatPush[A, B](result: Future[A])(f: A => Future[B]): Future[B] = ??? + override def seq[A, B](f: List[Future[A]]): Future[List[A]] = ??? + + override val executionContext: ExecutionContext = scala.concurrent.ExecutionContext.Implicits.global + + override def withContextActions: WithContextActions = new WithContextActions { + override def wrapListenableFuture[T](listenableFuture: ListenableFuture[T])(implicit executionContext: ExecutionContext): Future[T] = ??? + override def tryAndThen[T, U](t: => Future[T])(pf: PartialFunction[Try[T], U])(implicit executionContext: ExecutionContext): Future[T] = ??? + override def wrap[T](t: => T)(implicit executionContext: ExecutionContext): Future[T] = ??? + override def push[A, B](result: Future[A])(f: A => B)(implicit executionContext: ExecutionContext): Future[B] = ??? + override def flatPush[A, B](result: Future[A])(f: A => Future[B])(implicit executionContext: ExecutionContext): Future[B] = ??? + override def seq[A, B](f: List[Future[A]])(implicit executionContext: ExecutionContext): Future[List[A]] = ??? + } +} diff --git a/quill-cassandra/src/main/scala/io/getquill/context/cassandra/CassandraSessionContext.scala b/quill-cassandra/src/main/scala/io/getquill/context/cassandra/CassandraSessionContext.scala index 21a72be1c5..6978995196 100644 --- a/quill-cassandra/src/main/scala/io/getquill/context/cassandra/CassandraSessionContext.scala +++ b/quill-cassandra/src/main/scala/io/getquill/context/cassandra/CassandraSessionContext.scala @@ -1,21 +1,16 @@ package io.getquill.context.cassandra -import com.datastax.driver.core.{ Cluster, _ } +import com.datastax.driver.core.{Cluster, _} import io.getquill.NamingStrategy import io.getquill.context.Context -import io.getquill.context.cassandra.encoding.{ CassandraTypes, Decoders, Encoders, UdtEncoding } +import io.getquill.context.cassandra.encoding.{CassandraTypes, Decoders, Encoders, UdtEncoding} +import io.getquill.util.ContextLogger import io.getquill.util.Messages.fail -import io.getquill.context.cassandra.util.FutureConversions._ + import scala.collection.JavaConverters._ -import scala.concurrent.{ ExecutionContext, Future } import scala.util.Try -abstract class CassandraSessionContext[N <: NamingStrategy]( - val naming: N, - cluster: Cluster, - keyspace: String, - preparedStatementCacheSize: Long -) +abstract class CassandraSessionContext[N <: NamingStrategy] extends Context[CqlIdiom, N] with CassandraContext[N] with Encoders @@ -23,16 +18,35 @@ abstract class CassandraSessionContext[N <: NamingStrategy]( with CassandraTypes with UdtEncoding { + val naming: N + val cluster: Cluster + val keyspace: String + val preparedStatementCacheSize: Long + val idiom = CqlIdiom + private val logger = ContextLogger(classOf[CassandraSessionContext[_]]) + override type PrepareRow = BoundStatement override type ResultRow = Row - override type RunActionReturningResult[T] = Unit - override type RunBatchActionReturningResult[T] = Unit + type RunContext + + // It can be assumed that all cassandra output contexts will be a singleton-type but it cannot be assumed that it will be scala.Unit. + // other frameworks (e.g. Lagom) may have their own singleton-type. + type Completed + override type RunActionReturningResult[T] = Completed + override type RunBatchActionReturningResult[T] = Completed + + def complete:Completed + + protected val effect: CassandraContextEffect[Result, RunContext] + val withContextActions = effect.withContextActions + import effect.ImplicitsWithContext._ + import withContextActions._ private val preparedStatementCache = - new PrepareStatementCache(preparedStatementCacheSize) + new PrepareStatementCache(preparedStatementCacheSize, effect) protected lazy val session = cluster.connect(keyspace) @@ -55,8 +69,8 @@ abstract class CassandraSessionContext[N <: NamingStrategy]( protected def prepare(cql: String): BoundStatement = preparedStatementCache(cql)(session.prepare) - protected def prepareAsync(cql: String)(implicit executionContext: ExecutionContext): Future[BoundStatement] = - preparedStatementCache.async(cql)(session.prepareAsync(_)) + protected def prepareAsync(cql: String)(implicit executionContext: RunContext): Result[BoundStatement] = + preparedStatementCache.async(cql)(prep => wrapListenableFuture(session.prepareAsync(prep))) def close() = { session.close @@ -69,9 +83,43 @@ abstract class CassandraSessionContext[N <: NamingStrategy]( () } - def executeActionReturning[O](sql: String, prepare: Prepare = identityPrepare, extractor: Extractor[O], returningColumn: String): Unit = + // For now leave the IO monad to specifically use scala.Future and not context effects. Maybe in a future refactor... + // override def performIO[T](io: IO[T, _], transactional: Boolean = false)(implicit ec: ExecutionContext): Result[T] = { + // if (transactional) logger.underlying.warn("Cassandra doesn't support transactions, ignoring `io.transactional`") + // super.performIO(io) + // } + + def executeQuery[T](cql: String, prepare: Prepare, extractor: Extractor[T])(implicit ec: RunContext): Result[List[T]] = { + this.prepareAsync(cql).map(prepare).flatMap { + case (params, bs) => + logger.logQuery(cql, params) + wrapListenableFuture(session.executeAsync(bs)) + .map(_.all.asScala.toList.map(extractor)) + } + } + + def executeQuerySingle[T](cql: String, prepare: Prepare, extractor: Extractor[T])(implicit ec: RunContext): Result[T] = + executeQuery(cql, prepare, extractor).map(handleSingleResult) + + def executeAction[T](cql: String, prepare: Prepare)(implicit ec: RunContext): Result[Completed] = { + this.prepareAsync(cql).map(prepare).flatMap { + case (params, bs) => + logger.logQuery(cql, params) + wrapListenableFuture(session.executeAsync(bs)).map(_ => complete) + } + } + + def executeBatchAction(groups: List[BatchGroup])(implicit ec: RunContext): Result[Completed] = + seq { + groups.flatMap { + case BatchGroup(cql, prepare) => + prepare.map(executeAction(cql, _)) + } + }.map(_ => complete) + + def executeActionReturning[O](sql: String, prepare: Prepare, extractor: Extractor[O], returningColumn: String): Completed = fail("Cassandra doesn't support `returning`.") - def executeBatchActionReturning[T](groups: List[BatchGroupReturning], extractor: Extractor[T]): Unit = + def executeBatchActionReturning[T](groups: List[BatchGroupReturning], extractor: Extractor[T]): Completed = fail("Cassandra doesn't support `returning`.") } diff --git a/quill-cassandra/src/main/scala/io/getquill/context/cassandra/PrepareStatementCache.scala b/quill-cassandra/src/main/scala/io/getquill/context/cassandra/PrepareStatementCache.scala index 3590be4aa9..498a36ec2d 100644 --- a/quill-cassandra/src/main/scala/io/getquill/context/cassandra/PrepareStatementCache.scala +++ b/quill-cassandra/src/main/scala/io/getquill/context/cassandra/PrepareStatementCache.scala @@ -2,15 +2,19 @@ package io.getquill.context.cassandra import java.util.concurrent.Callable -import com.datastax.driver.core.{ BoundStatement, PreparedStatement } +import com.datastax.driver.core.{BoundStatement, PreparedStatement} import com.google.common.base.Charsets import com.google.common.cache.CacheBuilder import com.google.common.hash.Hashing -import scala.concurrent.{ ExecutionContext, Future } +import scala.language.higherKinds import scala.util.Success -class PrepareStatementCache(size: Long) { +class PrepareStatementCache[F[_], EC](size: Long, val contextEffect: CassandraContextEffect[F, EC]) { + + val withContextActions = contextEffect.withContextActions + import withContextActions._ + import contextEffect.ImplicitsWithContext._ private val cache = CacheBuilder @@ -28,14 +32,15 @@ class PrepareStatementCache(size: Long) { } ).bind - def async(stmt: String)(prepare: String => Future[PreparedStatement])(implicit context: ExecutionContext): Future[BoundStatement] = { + def async(stmt: String)(prepare: String => F[PreparedStatement])(implicit context: EC): F[BoundStatement] = { val key = hash(stmt) val found = cache.getIfPresent(key) - if (found != null) Future.successful(found.bind) - else prepare(stmt).andThen { - case Success(s) => cache.put(key, s) - }.map(_.bind()) + if (found != null) wrap(found.bind) + else + tryAndThen(prepare(stmt)) { + case Success(s) => cache.put(key, s) + }.map(_.bind()) } private def hash(string: String): java.lang.Long = { diff --git a/quill-core/src/main/scala/io/getquill/context/ContextEffect.scala b/quill-core/src/main/scala/io/getquill/context/ContextEffect.scala index 0e2e0255e5..631d007d8f 100644 --- a/quill-core/src/main/scala/io/getquill/context/ContextEffect.scala +++ b/quill-core/src/main/scala/io/getquill/context/ContextEffect.scala @@ -8,6 +8,14 @@ import scala.language.higherKinds * generic manner. */ trait ContextEffect[F[_]] { + + object Implicits { + implicit class ResultTypeOps[A](result: F[A]) { + def map[B](f: A => B) = push(result)(f) + def flatMap[B](f: A => F[B]) = flatPush(result)(f) + } + } + /** * Lift an element or block of code in the context into the specified effect. */ @@ -18,6 +26,8 @@ trait ContextEffect[F[_]] { */ def push[A, B](result: F[A])(f: A => B): F[B] + def flatPush[A, B](result: F[A])(f: A => F[B]): F[B] + /** * Aggregate a list of effects into a single effect element. Most effect types * used in Quill context easily support this kind of operation e.g. Futures, monix Tasks, Observables, etc... diff --git a/quill-core/src/main/scala/io/getquill/context/TranslateContext.scala b/quill-core/src/main/scala/io/getquill/context/TranslateContext.scala index 34d7fbe525..a0475f8310 100644 --- a/quill-core/src/main/scala/io/getquill/context/TranslateContext.scala +++ b/quill-core/src/main/scala/io/getquill/context/TranslateContext.scala @@ -14,6 +14,7 @@ trait TranslateContext extends TranslateContextBase { override private[getquill] val translateEffect: ContextEffect[TranslateResult] = new ContextEffect[TranslateResult] { override def wrap[T](t: => T): T = t override def push[A, B](result: A)(f: A => B): B = f(result) + override def flatPush[A, B](result: A)(f: A => B): B = f(result) override def seq[A, B](list: List[A]): List[A] = list } }