Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring Cassandra for more modularity #1322

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
package io.getquill

import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import io.getquill.context.cassandra.util.FutureConversions.toScalaFuture
import io.getquill.util.{ ContextLogger, LoadConfig }
import com.typesafe.config.Config
import scala.collection.JavaConverters._
import io.getquill.context.cassandra.CassandraSessionContext
import com.datastax.driver.core.Cluster
import com.typesafe.config.Config
import io.getquill.context.cassandra.{CassandraFutureContextEffect, CassandraSessionContext}
import io.getquill.monad.ScalaFutureIOMonad
import io.getquill.util.{ContextLogger, LoadConfig}

import scala.concurrent.{ExecutionContext, Future}

class CassandraAsyncContext[N <: NamingStrategy](
naming: N,
cluster: Cluster,
keyspace: String,
preparedStatementCacheSize: Long
val naming: N,
val cluster: Cluster,
val keyspace: String,
val preparedStatementCacheSize: Long
)
extends CassandraSessionContext[N](naming, cluster, keyspace, preparedStatementCacheSize)
extends CassandraSessionContext[N]
with ScalaFutureIOMonad {

def this(naming: N, config: CassandraContextConfig) = this(naming, config.cluster, config.keyspace, config.preparedStatementCacheSize)
Expand All @@ -25,41 +23,30 @@ class CassandraAsyncContext[N <: NamingStrategy](

private val logger = ContextLogger(classOf[CassandraAsyncContext[_]])

override type RunContext = ExecutionContext
override type Completed = Unit
override type Result[T] = Future[T]
override type RunQueryResult[T] = List[T]
override type RunQuerySingleResult[T] = T
override type RunActionResult = Unit
override type RunBatchActionResult = Unit
override def complete: Unit = ()

override protected val effect = new CassandraFutureContextEffect

override def performIO[T](io: IO[T, _], transactional: Boolean = false)(implicit ec: ExecutionContext): Result[T] = {
if (transactional) logger.underlying.warn("Cassandra doesn't support transactions, ignoring `io.transactional`")
super.performIO(io)
}

def executeQuery[T](cql: String, prepare: Prepare = identityPrepare, extractor: Extractor[T] = identityExtractor)(implicit ec: ExecutionContext): Future[List[T]] =
this.prepareAsync(cql).map(prepare).flatMap {
case (params, bs) =>
logger.logQuery(cql, params)
session.executeAsync(bs)
.map(_.all.asScala.toList.map(extractor))
}
override def executeQuery[T](cql: String, prepare: Prepare = identityPrepare, extractor: Extractor[T] = identityExtractor)(implicit ec: ExecutionContext): Future[List[T]] =
super.executeQuery(cql, prepare, extractor)

def executeQuerySingle[T](cql: String, prepare: Prepare = identityPrepare, extractor: Extractor[T] = identityExtractor)(implicit ec: ExecutionContext): Future[T] =
executeQuery(cql, prepare, extractor).map(handleSingleResult)
override def executeQuerySingle[T](cql: String, prepare: Prepare = identityPrepare, extractor: Extractor[T] = identityExtractor)(implicit ec: ExecutionContext): Future[T] =
super.executeQuerySingle(cql, prepare, extractor)

def executeAction[T](cql: String, prepare: Prepare = identityPrepare)(implicit ec: ExecutionContext): Future[Unit] = {
this.prepareAsync(cql).map(prepare).flatMap {
case (params, bs) =>
logger.logQuery(cql, params)
session.executeAsync(bs).map(_ => ())
}
}
override def executeAction[T](cql: String, prepare: Prepare = identityPrepare)(implicit ec: ExecutionContext): Future[Unit] =
super.executeAction(cql, prepare)

override def executeBatchAction(groups: List[BatchGroup])(implicit ec: ExecutionContext): Future[Unit] =
super.executeBatchAction(groups)

def executeBatchAction(groups: List[BatchGroup])(implicit ec: ExecutionContext): Future[Unit] =
Future.sequence {
groups.flatMap {
case BatchGroup(cql, prepare) =>
prepare.map(executeAction(cql, _))
}
}.map(_ => ())
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
package io.getquill

import com.typesafe.config.Config
import io.getquill.util.{ ContextLogger, LoadConfig }
import io.getquill.context.cassandra.CassandraSessionContext
import io.getquill.util.{ContextLogger, LoadConfig}
import io.getquill.context.cassandra.{CassandraContextEffect, CassandraSessionContext}

import scala.collection.JavaConverters._
import com.datastax.driver.core.Cluster
import io.getquill.monad.SyncIOMonad

class CassandraSyncContext[N <: NamingStrategy](
naming: N,
cluster: Cluster,
keyspace: String,
preparedStatementCacheSize: Long
val naming: N,
val cluster: Cluster,
val keyspace: String,
val preparedStatementCacheSize: Long
)
extends CassandraSessionContext[N](naming, cluster, keyspace, preparedStatementCacheSize)
extends CassandraSessionContext[N]
with SyncIOMonad {

def this(naming: N, config: CassandraContextConfig) = this(naming, config.cluster, config.keyspace, config.preparedStatementCacheSize)
Expand All @@ -27,6 +28,20 @@ class CassandraSyncContext[N <: NamingStrategy](
override type RunQuerySingleResult[T] = T
override type RunActionResult = Unit
override type RunBatchActionResult = Unit
override type Completed = Unit
override type RunContext = Unit

override def complete: Unit = ()

// TODO Not using this, as a dummy for now. Maybe will to use a sync context
override protected val effect = new CassandraContextEffect[Result, RunContext] {
override val executionContext: RunContext = ()
override def withContextActions: WithContextActions = ???
override def wrap[T](t: => T): T = ???
override def push[A, B](result: A)(f: A => B): B = ???
override def flatPush[A, B](result: A)(f: A => B): B = ???
override def seq[A, B](f: List[A]): List[A] = ???
}

override def performIO[T](io: IO[T, _], transactional: Boolean = false): Result[T] = {
if (transactional) logger.underlying.warn("Cassandra doesn't support transactions, ignoring `io.transactional`")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.getquill.context.cassandra

import com.google.common.util.concurrent.ListenableFuture
import io.getquill.context.ContextEffect

import scala.util.Try
import scala.language.higherKinds

trait CassandraContextEffect[F[_], EC] extends ContextEffect[F] { self =>

object ImplicitsWithContext {
implicit class ResultTypeOps[A](result: F[A]) {
def map[B](f: A => B)(implicit executionContext: EC) = withContextActions.push(result)(f)
def flatMap[B](f: A => F[B])(implicit executionContext: EC) = withContextActions.flatPush(result)(f)
}
}

val executionContext: EC
def withContextActions: WithContextActions

trait WithContextActions {
def wrapListenableFuture[T](listenableFuture: ListenableFuture[T])(implicit executionContext: EC): F[T]
def tryAndThen[T, U](t: => F[T])(pf: PartialFunction[Try[T], U])(implicit executionContext: EC): F[T]
def wrap[T](t: => T)(implicit executionContext: EC): F[T]
def push[A, B](result: F[A])(f: A => B)(implicit executionContext: EC): F[B]
def flatPush[A, B](result: F[A])(f: A => F[B])(implicit executionContext: EC): F[B]
def seq[A, B](f: List[F[A]])(implicit executionContext: EC): F[List[A]]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.getquill.context.cassandra

import com.google.common.util.concurrent.ListenableFuture

import scala.concurrent.{ExecutionContext, Future}
import scala.util.Try

// TODO Need to implement
class CassandraFutureContextEffect extends CassandraContextEffect[Future, ExecutionContext] {

override def wrap[T](t: => T): Future[T] = ???
override def push[A, B](result: Future[A])(f: A => B): Future[B] = ???
override def flatPush[A, B](result: Future[A])(f: A => Future[B]): Future[B] = ???
override def seq[A, B](f: List[Future[A]]): Future[List[A]] = ???

override val executionContext: ExecutionContext = scala.concurrent.ExecutionContext.Implicits.global

override def withContextActions: WithContextActions = new WithContextActions {
override def wrapListenableFuture[T](listenableFuture: ListenableFuture[T])(implicit executionContext: ExecutionContext): Future[T] = ???
override def tryAndThen[T, U](t: => Future[T])(pf: PartialFunction[Try[T], U])(implicit executionContext: ExecutionContext): Future[T] = ???
override def wrap[T](t: => T)(implicit executionContext: ExecutionContext): Future[T] = ???
override def push[A, B](result: Future[A])(f: A => B)(implicit executionContext: ExecutionContext): Future[B] = ???
override def flatPush[A, B](result: Future[A])(f: A => Future[B])(implicit executionContext: ExecutionContext): Future[B] = ???
override def seq[A, B](f: List[Future[A]])(implicit executionContext: ExecutionContext): Future[List[A]] = ???
}
}
Original file line number Diff line number Diff line change
@@ -1,38 +1,52 @@
package io.getquill.context.cassandra

import com.datastax.driver.core.{ Cluster, _ }
import com.datastax.driver.core.{Cluster, _}
import io.getquill.NamingStrategy
import io.getquill.context.Context
import io.getquill.context.cassandra.encoding.{ CassandraTypes, Decoders, Encoders, UdtEncoding }
import io.getquill.context.cassandra.encoding.{CassandraTypes, Decoders, Encoders, UdtEncoding}
import io.getquill.util.ContextLogger
import io.getquill.util.Messages.fail
import io.getquill.context.cassandra.util.FutureConversions._

import scala.collection.JavaConverters._
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.Try

abstract class CassandraSessionContext[N <: NamingStrategy](
val naming: N,
cluster: Cluster,
keyspace: String,
preparedStatementCacheSize: Long
)
abstract class CassandraSessionContext[N <: NamingStrategy]
extends Context[CqlIdiom, N]
with CassandraContext[N]
with Encoders
with Decoders
with CassandraTypes
with UdtEncoding {

val naming: N
val cluster: Cluster
val keyspace: String
val preparedStatementCacheSize: Long

val idiom = CqlIdiom

private val logger = ContextLogger(classOf[CassandraSessionContext[_]])

override type PrepareRow = BoundStatement
override type ResultRow = Row

override type RunActionReturningResult[T] = Unit
override type RunBatchActionReturningResult[T] = Unit
type RunContext

// It can be assumed that all cassandra output contexts will be a singleton-type but it cannot be assumed that it will be scala.Unit.
// other frameworks (e.g. Lagom) may have their own singleton-type.
type Completed
override type RunActionReturningResult[T] = Completed
override type RunBatchActionReturningResult[T] = Completed

def complete:Completed

protected val effect: CassandraContextEffect[Result, RunContext]
val withContextActions = effect.withContextActions
import effect.ImplicitsWithContext._
import withContextActions._

private val preparedStatementCache =
new PrepareStatementCache(preparedStatementCacheSize)
new PrepareStatementCache(preparedStatementCacheSize, effect)

protected lazy val session = cluster.connect(keyspace)

Expand All @@ -55,8 +69,8 @@ abstract class CassandraSessionContext[N <: NamingStrategy](
protected def prepare(cql: String): BoundStatement =
preparedStatementCache(cql)(session.prepare)

protected def prepareAsync(cql: String)(implicit executionContext: ExecutionContext): Future[BoundStatement] =
preparedStatementCache.async(cql)(session.prepareAsync(_))
protected def prepareAsync(cql: String)(implicit executionContext: RunContext): Result[BoundStatement] =
preparedStatementCache.async(cql)(prep => wrapListenableFuture(session.prepareAsync(prep)))

def close() = {
session.close
Expand All @@ -69,9 +83,43 @@ abstract class CassandraSessionContext[N <: NamingStrategy](
()
}

def executeActionReturning[O](sql: String, prepare: Prepare = identityPrepare, extractor: Extractor[O], returningColumn: String): Unit =
// For now leave the IO monad to specifically use scala.Future and not context effects. Maybe in a future refactor...
// override def performIO[T](io: IO[T, _], transactional: Boolean = false)(implicit ec: ExecutionContext): Result[T] = {
// if (transactional) logger.underlying.warn("Cassandra doesn't support transactions, ignoring `io.transactional`")
// super.performIO(io)
// }

def executeQuery[T](cql: String, prepare: Prepare, extractor: Extractor[T])(implicit ec: RunContext): Result[List[T]] = {
this.prepareAsync(cql).map(prepare).flatMap {
case (params, bs) =>
logger.logQuery(cql, params)
wrapListenableFuture(session.executeAsync(bs))
.map(_.all.asScala.toList.map(extractor))
}
}

def executeQuerySingle[T](cql: String, prepare: Prepare, extractor: Extractor[T])(implicit ec: RunContext): Result[T] =
executeQuery(cql, prepare, extractor).map(handleSingleResult)

def executeAction[T](cql: String, prepare: Prepare)(implicit ec: RunContext): Result[Completed] = {
this.prepareAsync(cql).map(prepare).flatMap {
case (params, bs) =>
logger.logQuery(cql, params)
wrapListenableFuture(session.executeAsync(bs)).map(_ => complete)
}
}

def executeBatchAction(groups: List[BatchGroup])(implicit ec: RunContext): Result[Completed] =
seq {
groups.flatMap {
case BatchGroup(cql, prepare) =>
prepare.map(executeAction(cql, _))
}
}.map(_ => complete)

def executeActionReturning[O](sql: String, prepare: Prepare, extractor: Extractor[O], returningColumn: String): Completed =
fail("Cassandra doesn't support `returning`.")

def executeBatchActionReturning[T](groups: List[BatchGroupReturning], extractor: Extractor[T]): Unit =
def executeBatchActionReturning[T](groups: List[BatchGroupReturning], extractor: Extractor[T]): Completed =
fail("Cassandra doesn't support `returning`.")
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,19 @@ package io.getquill.context.cassandra

import java.util.concurrent.Callable

import com.datastax.driver.core.{ BoundStatement, PreparedStatement }
import com.datastax.driver.core.{BoundStatement, PreparedStatement}
import com.google.common.base.Charsets
import com.google.common.cache.CacheBuilder
import com.google.common.hash.Hashing

import scala.concurrent.{ ExecutionContext, Future }
import scala.language.higherKinds
import scala.util.Success

class PrepareStatementCache(size: Long) {
class PrepareStatementCache[F[_], EC](size: Long, val contextEffect: CassandraContextEffect[F, EC]) {

val withContextActions = contextEffect.withContextActions
import withContextActions._
import contextEffect.ImplicitsWithContext._

private val cache =
CacheBuilder
Expand All @@ -28,14 +32,15 @@ class PrepareStatementCache(size: Long) {
}
).bind

def async(stmt: String)(prepare: String => Future[PreparedStatement])(implicit context: ExecutionContext): Future[BoundStatement] = {
def async(stmt: String)(prepare: String => F[PreparedStatement])(implicit context: EC): F[BoundStatement] = {
val key = hash(stmt)
val found = cache.getIfPresent(key)

if (found != null) Future.successful(found.bind)
else prepare(stmt).andThen {
case Success(s) => cache.put(key, s)
}.map(_.bind())
if (found != null) wrap(found.bind)
else
tryAndThen(prepare(stmt)) {
case Success(s) => cache.put(key, s)
}.map(_.bind())
}

private def hash(string: String): java.lang.Long = {
Expand Down
10 changes: 10 additions & 0 deletions quill-core/src/main/scala/io/getquill/context/ContextEffect.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,14 @@ import scala.language.higherKinds
* generic manner.
*/
trait ContextEffect[F[_]] {

object Implicits {
implicit class ResultTypeOps[A](result: F[A]) {
def map[B](f: A => B) = push(result)(f)
def flatMap[B](f: A => F[B]) = flatPush(result)(f)
}
}

/**
* Lift an element or block of code in the context into the specified effect.
*/
Expand All @@ -18,6 +26,8 @@ trait ContextEffect[F[_]] {
*/
def push[A, B](result: F[A])(f: A => B): F[B]

def flatPush[A, B](result: F[A])(f: A => F[B]): F[B]

/**
* Aggregate a list of effects into a single effect element. Most effect types
* used in Quill context easily support this kind of operation e.g. Futures, monix Tasks, Observables, etc...
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ trait TranslateContext extends TranslateContextBase {
override private[getquill] val translateEffect: ContextEffect[TranslateResult] = new ContextEffect[TranslateResult] {
override def wrap[T](t: => T): T = t
override def push[A, B](result: A)(f: A => B): B = f(result)
override def flatPush[A, B](result: A)(f: A => B): B = f(result)
override def seq[A, B](list: List[A]): List[A] = list
}
}
Expand Down