Skip to content

Commit

Permalink
Merge 1645969 into 54aee84
Browse files Browse the repository at this point in the history
  • Loading branch information
alexflav23 committed Jul 19, 2018
2 parents 54aee84 + 1645969 commit e86aaec
Show file tree
Hide file tree
Showing 27 changed files with 222 additions and 91 deletions.
10 changes: 3 additions & 7 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ sudo: required
dist: trusty
scala:
- 2.10.6
- 2.11.11
- 2.11.12
python: 2.7.13
cache:
directories:
Expand All @@ -15,7 +15,7 @@ before_cache:
- find $HOME/.sbt -name "*.lock" -delete
env:
global:
- TARGET_SCALA_VERSION: 2.12.5
- TARGET_SCALA_VERSION: 2.12.6
- GH_REF: github.com/outworkers/phantom.git
- secure: V5iziDRj988+kcpW6PHOjZZYoayDi2+Fjx2Y6F9dL2mYw3kcjrwyyQgpWoMPMrXHdR61xoollyytgZPfavNViocNxYZMVRfQBLeTCd+mvuLQEvra6aRWl7XaYlpGi5+uHEh5k84MsRNsEZKiiuabxMRZvglZSC8QHYqYgDx3rho=
- secure: nslC+pNpj8XnEnolwAhfVMP0j/mNnlMm9MCqD3IWiRlh5RRgt6t5s1XCSF6y9y/kOB4p0ny3ly7qR4uZxtKvVnJzjrrpf5UAlSpFjA+s7jMgumQWuUsDm6u3uP5DykTWNwa8xpRT7J2vcCM/MoP1DSwuHQ7ptO8yFfVlel3LFtY=
Expand All @@ -35,12 +35,8 @@ jdk:
- oraclejdk8
matrix:
include:
- scala: 2.12.5
- scala: 2.12.6
jdk: oraclejdk8
addons:
apt:
packages:
- oracle-java8-installer
before_install: unset SBT_OPTS JVM_OPTS
install:
- ./build/install_cassandra.sh
Expand Down
7 changes: 4 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ lazy val Versions = new {
val circe = "0.8.0"

val scala210 = "2.10.6"
val scala211 = "2.11.11"
val scala212 = "2.12.5"
val scala211 = "2.11.12"
val scala212 = "2.12.6"
val scalaAll = Seq(scala210, scala211, scala212)

val scala = new {
Expand Down Expand Up @@ -169,7 +169,8 @@ val sharedSettings: Seq[Def.Setting[_]] = Defaults.coreDefaultSettings ++ Seq(
Resolver.jcenterRepo
),

logLevel in ThisBuild := { if (Publishing.runningUnderCi) Level.Error else Level.Info },
logLevel in Compile := { if (Publishing.runningUnderCi) Level.Error else Level.Info },
logLevel in Test := Level.Info,
libraryDependencies ++= Seq(
"ch.qos.logback" % "logback-classic" % Versions.logback % Test,
"org.slf4j" % "log4j-over-slf4j" % Versions.slf4j
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import com.outworkers.phantom.builder.query._
trait SelectTable[T <: CassandraTable[T, R], R] {
self: CassandraTable[T, R] =>

def select: RootSelectBlock[T, R] = RootSelectBlock[T, R](this.asInstanceOf[T], Nil, fromRow)
def table: T = this.asInstanceOf[T]

def select: RootSelectBlock[T, R] = RootSelectBlock[T, R](table, Nil, fromRow)

def select[A](f1: T => SelectColumn[A]): RootSelectBlock[T, A] = {
val t = this.asInstanceOf[T]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,33 @@ object OperatorClause extends Clause {
}

object TypedClause extends Clause {
class Condition[RR](override val qb: CQLQuery, val extractor: Row => RR) extends QueryCondition(qb, Nil)
class Condition[RR](override val qb: CQLQuery, val extractor: Row => RR) extends QueryCondition(qb, Nil) { outer =>

def ~[BB](other: Condition[BB]): TypedProjection[BB :: RR :: HNil] = new TypedProjection[BB :: RR :: HNil](List(qb, other.qb)) {
override def extractor: Row => BB :: RR :: HNil = r => {
other.extractor(r) :: outer.extractor(r) :: HNil
}
}
}

abstract class TypedProjection[HL <: HList](queries: List[CQLQuery]) extends QueryCondition[HNil](
QueryBuilder.Utils.join(queries.reverse: _*),
Nil
) { outer =>
def extractor: Row => HL

def ~[RR](other: Condition[RR]): TypedProjection[RR :: HL] = new TypedProjection[RR :: HL](other.qb :: queries) {
override def extractor: Row => RR :: HL = r => other.extractor(r) :: outer.extractor(r)
}
}

object TypedProjection {
implicit def condition1[A1](source: Condition[A1]): TypedProjection[A1 :: HNil] = {
new TypedProjection[A1 :: HNil](List(source.qb)) {
override def extractor: Row => A1 :: HNil = r => source.extractor(r) :: HNil
}
}
}
}

object DeleteClause extends Clause {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ case class PartitionQueryColumn[RR](name: String)(
)(fn: (String, String) => CQLQuery)(implicit pp: Primitive[R]): WhereClause.PartitionCondition = {
new WhereClause.PartitionCondition(
fn(name, pp.asCql(value)), {
session: Session => pp.serialize(value, session.protocolVersion)
session: Session => RoutingKeyValue(
cql = pp.asCql(value),
bytes = pp.serialize(value, session.protocolVersion)
)
}
)
}
Expand Down Expand Up @@ -106,7 +109,10 @@ case class PartitionQueryColumn[RR](name: String)(
): WhereClause.PartitionCondition = {
new WhereClause.PartitionCondition(
QueryBuilder.Where.in(name, values.map(p.asCql)), {
session: Session => ev.serialize(ListValue(values), session.protocolVersion)
session: Session => RoutingKeyValue(
s"List(${QueryBuilder.Utils.join(values.map(p.asCql)).queryString}",
ev.serialize(ListValue(values), session.protocolVersion)
)
}
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,23 @@
*/
package com.outworkers.phantom.builder.ops

import java.nio.ByteBuffer

import com.outworkers.phantom.builder.primitives.Primitive
import com.outworkers.phantom.builder.query.prepared.PrepareMark
import com.outworkers.phantom.column.AbstractColumn
import shapeless._

/**
*
* @param cql
* @param bytes
*/
case class RoutingKeyValue(
cql: String,
bytes: ByteBuffer
)

object TokenTypes {
sealed trait Root
trait ValueToken extends Root
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,7 @@ import java.nio.ByteBuffer
import com.datastax.driver.core.Session

package object ops {
type TokenizerKey = (Session => ByteBuffer)


type TokenizerKey = (Session => RoutingKeyValue)
}
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,6 @@ object Primitive {
*/
implicit def materializer[T]: Primitive[T] = macro PrimitiveMacro.materializer[T]


def iso[A, B : Primitive](r: B => A)(w: A => B): Primitive[A] = derive[A, B](w)(r)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,13 @@
*/
package com.outworkers.phantom.builder.primitives

import java.math.BigInteger
import java.net.{InetAddress, UnknownHostException}
import java.nio.charset.Charset
import java.nio.{BufferUnderflowException, ByteBuffer}
import java.sql.{ Timestamp => JTimestamp }
import java.time.Instant
import java.util.{Date, UUID}

import com.datastax.driver.core._
import com.datastax.driver.core.exceptions.{DriverInternalError, InvalidTypeException}
import com.datastax.driver.core.utils.Bytes
import com.outworkers.phantom.builder.QueryBuilder
import com.outworkers.phantom.builder.query.engine.CQLQuery
import com.outworkers.phantom.builder.syntax.CQLSyntax
import org.joda.time.{DateTime, DateTimeZone, LocalDate => JodaLocalDate}

import scala.collection.generic.CanBuildFrom
import scala.util.Try

object Utils {
private[phantom] def unsupported(version: ProtocolVersion): DriverInternalError = {
Expand Down Expand Up @@ -93,7 +82,7 @@ object Utils {
}

/**
* Utility method that "packs" together a list of {@link ByteBuffer}s containing
* Utility method that "packs" together a list of {{java.nio.ByteBuffer}}s containing
* serialized collection elements.
* Mainly intended for use with collection codecs when serializing collections.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package com.outworkers.phantom.builder.query

import com.datastax.driver.core._
import com.datastax.driver.core.policies.TokenAwarePolicy
import com.outworkers.phantom.Manager
import com.outworkers.phantom.builder.ops.TokenizerKey

trait Modifier extends (Statement => Statement)
Expand All @@ -31,8 +32,13 @@ case class RoutingKeyModifier(
val policy = session.getCluster.getConfiguration.getPolicies.getLoadBalancingPolicy

if (policy.isInstanceOf[TokenAwarePolicy] && tokens.nonEmpty) {

val routingKeys = tokens.map(_.apply(session))

Manager.logger.debug(s"Routing key tokens found. Settings routing key to ${routingKeys.map(_.cql).mkString("(", ",", ")")}")

st
.setRoutingKey(tokens.map(_.apply(session)): _*)
.setRoutingKey(routingKeys.map(_.bytes):_*)
.setKeyspace(session.getLoggedKeyspace)
} else {
st
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
*/
package com.outworkers.phantom.builder.query

import java.nio.ByteBuffer

import com.datastax.driver.core.{ConsistencyLevel, Session}
import com.outworkers.phantom.builder.clauses._
import com.outworkers.phantom.builder.ops.TokenizerKey
Expand All @@ -28,7 +26,7 @@ import com.outworkers.phantom.builder.syntax.CQLSyntax
import com.outworkers.phantom.builder.{ConsistencyBound, LimitBound, OrderBound, WhereBound, _}
import com.outworkers.phantom.connectors.KeySpace
import com.outworkers.phantom.{CassandraTable, Row}
import shapeless.ops.hlist.{Prepend, Reverse}
import shapeless.ops.hlist.{Prepend, Reverse, Tupler}
import shapeless.{::, =:!=, HList, HNil}

import scala.annotation.implicitNotFound
Expand Down Expand Up @@ -235,13 +233,19 @@ private[phantom] class RootSelectBlock[
}
}

def function[RR](f1: TypedClause.Condition[RR])(
implicit keySpace: KeySpace
): SelectQuery.Default[T, RR] = {
def function[
HL <: HList,
Rev <: HList,
TP
](projection: T => TypedClause.TypedProjection[HL])(
implicit keySpace: KeySpace,
rev: Reverse.Aux[HL, Rev],
ev: Tupler.Aux[Rev, TP]
): SelectQuery.Default[T, TP] = {
new SelectQuery(
table,
f1.extractor,
QueryBuilder.Select.select(table.tableName, keySpace.name, f1.qb),
row => ev.apply(rev.apply(projection(table).extractor(row))),
QueryBuilder.Select.select(table.tableName, keySpace.name, projection(table).qb),
Nil,
WherePart.empty,
OrderPart.empty,
Expand All @@ -252,13 +256,13 @@ private[phantom] class RootSelectBlock[
)
}

def function[RR](f1: T => TypedClause.Condition[RR])(
def function[RR](f1: TypedClause.Condition[RR])(
implicit keySpace: KeySpace
): SelectQuery.Default[T, RR] = {
new SelectQuery(
table,
f1(table).extractor,
QueryBuilder.Select.select(table.tableName, keySpace.name, f1(table).qb),
f1.extractor,
QueryBuilder.Select.select(table.tableName, keySpace.name, f1.qb),
Nil,
WherePart.empty,
OrderPart.empty,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,6 @@ abstract class ResultQueryInterface[
row map fromRow
}

protected[this] def flattenedOption[Inner](
row: Option[Row]
)(implicit ev: R <:< Option[Inner]): Option[Inner] = {
row flatMap fromRow
}

protected[this] def directMapper(
results: Iterator[Row]
): List[R] = results.map(fromRow).toList
Expand All @@ -57,13 +51,6 @@ abstract class ResultQueryInterface[
f map { r => IteratorResult(r.iterate().map(fromRow), r) }
}

private[phantom] def optionalFetch[Inner](source: F[ResultSet])(
implicit ec: ExecutionContextExecutor,
ev: R <:< Option[Inner]
): F[Option[Inner]] = {
source map { res => flattenedOption(res.value()) }
}

private[phantom] def singleFetch(source: F[ResultSet])(
implicit session: Session,
ec: ExecutionContextExecutor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@
*/
package com.outworkers.phantom.ops

import com.datastax.driver.core.Session
import com.outworkers.phantom.ResultSet
import com.outworkers.phantom.builder.query.CreateQuery.DelegatedCreateQuery
import com.outworkers.phantom.builder.query.execution._
import com.outworkers.phantom.database.Database

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,25 @@ class SelectQueryOps[
singleFetch(adapter.fromGuava(query.copy(limitedPart = enforceLimit).executableQuery.statement()))
}

/**
* Returns the result of an aggregate function call, provided a single aggregate function was invoked.
* This is used to circumvent some compiler limitations around HLists being tupled. Phantom relies on HLists
* to compute a multiple aggregate return function extractor, and if a single aggregate is selected,
* a Tuple1(value) is returned. This function will extract the content of the Tuple1 to have a more presentable type.
* @param session The implicit session provided by a [[com.outworkers.phantom.connectors.Connector]].
* @param ev The implicit limit for the query.
* @param ec The implicit Scala execution context.
* @return A Scala future guaranteed to contain a single result wrapped as an Option.
*/
def aggregate[T]()(
implicit session: Session,
ev: Limit =:= Unlimited,
ec: ExecutionContextExecutor,
unwrap: Record <:< Tuple1[T]
): F[Option[T]] = {
singleFetch(adapter.fromGuava(query.executableQuery.statement())).map(_.map { case Tuple1(vd: T) => vd })
}

/**
* Returns the first row from the select ignoring everything else
* @param session The implicit session provided by a [[com.outworkers.phantom.connectors.Connector]].
Expand All @@ -70,14 +89,12 @@ class SelectQueryOps[
* @return A Scala future guaranteed to contain a single result wrapped as an Option.
*/
@implicitNotFound("You have already defined limit on this Query. You cannot specify multiple limits on the same builder.")
def aggregate[Inner]()(
def multiAggregate()(
implicit session: Session,
ev: Limit =:= Unlimited,
opt: Record <:< Option[Inner],
ec: ExecutionContextExecutor
): F[Option[Inner]] = {
val enforceLimit = if (query.count) LimitedPart.empty else query.limitedPart append QueryBuilder.limit(1.toString)
optionalFetch(adapter.fromGuava(query.copy(limitedPart = enforceLimit).executableQuery.statement()))
): F[Option[Record]] = {
singleFetch(adapter.fromGuava(query.executableQuery.statement()))
}

override def fromRow(r: Row): Record = query.fromRow(r)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import java.util.concurrent.TimeUnit

import com.datastax.driver.core.VersionNumber
import com.outworkers.phantom.database.DatabaseProvider
import com.outworkers.phantom.dsl.{DateTime, UUID}
import com.outworkers.phantom.dsl.UUID
import com.outworkers.phantom.tables.TestDatabase
import com.outworkers.util.samplers._
import io.circe.{Encoder, Json}
Expand Down Expand Up @@ -60,7 +60,7 @@ trait PhantomBaseSuite extends Suite with Matchers
override def sample: LocalDate = LocalDate.now(DateTimeZone.UTC)
}

override implicit val patienceConfig = PatienceConfig(
override implicit val patienceConfig: PatienceConfig = PatienceConfig(
timeout = defaultTimeoutSpan,
interval = Span(defaultScalaInterval, Millis)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ class BatchQueryTest extends PhantomSuite {
}
}

it should "prioritise batch updates in a last first order" in {
ignore should "prioritise batch updates in a last first order" in {
val row = gen[JodaRow]

val statement1 = database.primitivesJoda.insert
Expand Down

0 comments on commit e86aaec

Please sign in to comment.