Skip to content

Commit

Permalink
Merge f6f178d into 54aee84
Browse files Browse the repository at this point in the history
  • Loading branch information
alexflav23 committed Jul 16, 2018
2 parents 54aee84 + f6f178d commit 0343109
Show file tree
Hide file tree
Showing 22 changed files with 233 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import com.outworkers.phantom.builder.query._
trait SelectTable[T <: CassandraTable[T, R], R] {
self: CassandraTable[T, R] =>

def select: RootSelectBlock[T, R] = RootSelectBlock[T, R](this.asInstanceOf[T], Nil, fromRow)
def table: T = this.asInstanceOf[T]

def select: RootSelectBlock[T, R] = RootSelectBlock[T, R](table, Nil, fromRow)

def select[A](f1: T => SelectColumn[A]): RootSelectBlock[T, A] = {
val t = this.asInstanceOf[T]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,33 @@ object OperatorClause extends Clause {
}

object TypedClause extends Clause {
class Condition[RR](override val qb: CQLQuery, val extractor: Row => RR) extends QueryCondition(qb, Nil)
class Condition[RR](override val qb: CQLQuery, val extractor: Row => RR) extends QueryCondition(qb, Nil) { outer =>

def ~[BB](other: Condition[BB]): TypedProjection[BB :: RR :: HNil] = new TypedProjection[BB :: RR :: HNil](List(qb, other.qb)) {
override def extractor: Row => BB :: RR :: HNil = r => {
other.extractor(r) :: outer.extractor(r) :: HNil
}
}
}

abstract class TypedProjection[HL <: HList](queries: List[CQLQuery]) extends QueryCondition[HNil](
QueryBuilder.Utils.join(queries.reverse: _*),
Nil
) { outer =>
def extractor: Row => HL

def ~[RR](other: Condition[RR]): TypedProjection[RR :: HL] = new TypedProjection[RR :: HL](other.qb :: queries) {
override def extractor: Row => RR :: HL = r => other.extractor(r) :: outer.extractor(r)
}
}

object TypedProjection {
implicit def condition1[A1](source: Condition[A1]): TypedProjection[A1 :: HNil] = {
new TypedProjection[A1 :: HNil](List(source.qb)) {
override def extractor: Row => A1 :: HNil = r => source.extractor(r) :: HNil
}
}
}
}

object DeleteClause extends Clause {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ case class PartitionQueryColumn[RR](name: String)(
)(fn: (String, String) => CQLQuery)(implicit pp: Primitive[R]): WhereClause.PartitionCondition = {
new WhereClause.PartitionCondition(
fn(name, pp.asCql(value)), {
session: Session => pp.serialize(value, session.protocolVersion)
session: Session => RoutingKeyValue(
cql = pp.asCql(value),
bytes = pp.serialize(value, session.protocolVersion)
)
}
)
}
Expand Down Expand Up @@ -106,7 +109,10 @@ case class PartitionQueryColumn[RR](name: String)(
): WhereClause.PartitionCondition = {
new WhereClause.PartitionCondition(
QueryBuilder.Where.in(name, values.map(p.asCql)), {
session: Session => ev.serialize(ListValue(values), session.protocolVersion)
session: Session => RoutingKeyValue(
s"List(${QueryBuilder.Utils.join(values.map(p.asCql)).queryString}",
ev.serialize(ListValue(values), session.protocolVersion)
)
}
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,23 @@
*/
package com.outworkers.phantom.builder.ops

import java.nio.ByteBuffer

import com.outworkers.phantom.builder.primitives.Primitive
import com.outworkers.phantom.builder.query.prepared.PrepareMark
import com.outworkers.phantom.column.AbstractColumn
import shapeless._

/**
*
* @param cql
* @param bytes
*/
case class RoutingKeyValue(
cql: String,
bytes: ByteBuffer
)

object TokenTypes {
sealed trait Root
trait ValueToken extends Root
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,7 @@ import java.nio.ByteBuffer
import com.datastax.driver.core.Session

package object ops {
type TokenizerKey = (Session => ByteBuffer)


type TokenizerKey = (Session => RoutingKeyValue)
}
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,6 @@ object Primitive {
*/
implicit def materializer[T]: Primitive[T] = macro PrimitiveMacro.materializer[T]


def iso[A, B : Primitive](r: B => A)(w: A => B): Primitive[A] = derive[A, B](w)(r)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,13 @@
*/
package com.outworkers.phantom.builder.primitives

import java.math.BigInteger
import java.net.{InetAddress, UnknownHostException}
import java.nio.charset.Charset
import java.nio.{BufferUnderflowException, ByteBuffer}
import java.sql.{ Timestamp => JTimestamp }
import java.time.Instant
import java.util.{Date, UUID}

import com.datastax.driver.core._
import com.datastax.driver.core.exceptions.{DriverInternalError, InvalidTypeException}
import com.datastax.driver.core.utils.Bytes
import com.outworkers.phantom.builder.QueryBuilder
import com.outworkers.phantom.builder.query.engine.CQLQuery
import com.outworkers.phantom.builder.syntax.CQLSyntax
import org.joda.time.{DateTime, DateTimeZone, LocalDate => JodaLocalDate}

import scala.collection.generic.CanBuildFrom
import scala.util.Try

object Utils {
private[phantom] def unsupported(version: ProtocolVersion): DriverInternalError = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package com.outworkers.phantom.builder.query

import com.datastax.driver.core._
import com.datastax.driver.core.policies.TokenAwarePolicy
import com.outworkers.phantom.Manager
import com.outworkers.phantom.builder.ops.TokenizerKey

trait Modifier extends (Statement => Statement)
Expand All @@ -31,8 +32,13 @@ case class RoutingKeyModifier(
val policy = session.getCluster.getConfiguration.getPolicies.getLoadBalancingPolicy

if (policy.isInstanceOf[TokenAwarePolicy] && tokens.nonEmpty) {

val routingKeys = tokens.map(_.apply(session))

Manager.logger.debug(s"Routing key tokens found. Settings routing key to ${routingKeys.map(_.cql).mkString("(", ",", ")")}")

st
.setRoutingKey(tokens.map(_.apply(session)): _*)
.setRoutingKey(routingKeys.map(_.bytes):_*)
.setKeyspace(session.getLoggedKeyspace)
} else {
st
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
*/
package com.outworkers.phantom.builder.query

import java.nio.ByteBuffer

import com.datastax.driver.core.{ConsistencyLevel, Session}
import com.outworkers.phantom.builder.clauses._
import com.outworkers.phantom.builder.ops.TokenizerKey
Expand All @@ -28,7 +26,7 @@ import com.outworkers.phantom.builder.syntax.CQLSyntax
import com.outworkers.phantom.builder.{ConsistencyBound, LimitBound, OrderBound, WhereBound, _}
import com.outworkers.phantom.connectors.KeySpace
import com.outworkers.phantom.{CassandraTable, Row}
import shapeless.ops.hlist.{Prepend, Reverse}
import shapeless.ops.hlist.{Prepend, Reverse, Tupler}
import shapeless.{::, =:!=, HList, HNil}

import scala.annotation.implicitNotFound
Expand Down Expand Up @@ -235,13 +233,19 @@ private[phantom] class RootSelectBlock[
}
}

def function[RR](f1: TypedClause.Condition[RR])(
implicit keySpace: KeySpace
): SelectQuery.Default[T, RR] = {
def function[
HL <: HList,
Rev <: HList,
TP
](projection: T => TypedClause.TypedProjection[HL])(
implicit keySpace: KeySpace,
rev: Reverse.Aux[HL, Rev],
ev: Tupler.Aux[Rev, TP]
): SelectQuery.Default[T, TP] = {
new SelectQuery(
table,
f1.extractor,
QueryBuilder.Select.select(table.tableName, keySpace.name, f1.qb),
row => ev.apply(rev.apply(projection(table).extractor(row))),
QueryBuilder.Select.select(table.tableName, keySpace.name, projection(table).qb),
Nil,
WherePart.empty,
OrderPart.empty,
Expand All @@ -252,13 +256,13 @@ private[phantom] class RootSelectBlock[
)
}

def function[RR](f1: T => TypedClause.Condition[RR])(
def function[RR](f1: TypedClause.Condition[RR])(
implicit keySpace: KeySpace
): SelectQuery.Default[T, RR] = {
new SelectQuery(
table,
f1(table).extractor,
QueryBuilder.Select.select(table.tableName, keySpace.name, f1(table).qb),
f1.extractor,
QueryBuilder.Select.select(table.tableName, keySpace.name, f1.qb),
Nil,
WherePart.empty,
OrderPart.empty,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,11 @@ abstract class ResultQueryInterface[

protected[this] def flattenedOption[Inner](
row: Option[Row]
)(implicit ev: R <:< Option[Inner]): Option[Inner] = {
row flatMap fromRow
)(implicit ev: R <:< Tuple1[Option[Inner]]): Option[Inner] = {
row match {
case Some(r) => fromRow(r)._1
case None => None
}
}

protected[this] def directMapper(
Expand All @@ -59,7 +62,7 @@ abstract class ResultQueryInterface[

private[phantom] def optionalFetch[Inner](source: F[ResultSet])(
implicit ec: ExecutionContextExecutor,
ev: R <:< Option[Inner]
ev: R <:< Tuple1[Option[Inner]]
): F[Option[Inner]] = {
source map { res => flattenedOption(res.value()) }
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2013 - 2017 Outworkers Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.outworkers.phantom.ops

import shapeless._
import shapeless.ops.tuple.Prepend

/*
trait AggregateSequence[L <: HList] extends DepFn1[L]
object AggregateSequence {
type Aux[L <: HList, Out0] = AggregateSequence[L] { type Out = Out0 }
implicit def hnilAggregateSequence: Aux[HNil, Option[Unit]] =
new AggregateSequence[HNil] {
type Out = Option[Unit]
def apply(l: HNil): Option[Unit] = Some(())
}
implicit def hconsAggregateSequence[H, T <: HList, OutT](
implicit fst: Aux[T, Option[OutT]],
pre: Prepend[Tuple1[H], OutT]
): Aux[Option[H] :: T, Option[pre.Out]] = new AggregateSequence[Option[H] :: T] {
type Out = Option[pre.Out]
def apply(l: Option[H] :: T): Option[pre.Out] = {
l.head.flatMap(fst(l.tail)).map {
case (h, t) => pre(Tuple1(h), t)
}
}
}
}*/
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,31 @@ class SelectQueryOps[
* @return A Scala future guaranteed to contain a single result wrapped as an Option.
*/
@implicitNotFound("You have already defined limit on this Query. You cannot specify multiple limits on the same builder.")
def aggregate[Inner]()(
def agg[T]()(
implicit session: Session,
ev: Limit =:= Unlimited,
ec: ExecutionContextExecutor,
unwrap: Record <:< Tuple1[T]
): F[Option[T]] = {
val enforceLimit = if (query.count) LimitedPart.empty else query.limitedPart append QueryBuilder.limit(1.toString)
singleFetch(adapter.fromGuava(query.copy(limitedPart = enforceLimit).executableQuery.statement())).map(optRec => optRec.map(_._1))
}

/**
* Returns the first row from the select ignoring everything else
* @param session The implicit session provided by a [[com.outworkers.phantom.connectors.Connector]].
* @param ev The implicit limit for the query.
* @param ec The implicit Scala execution context.
* @return A Scala future guaranteed to contain a single result wrapped as an Option.
*/
@implicitNotFound("You have already defined limit on this Query. You cannot specify multiple limits on the same builder.")
def aggregate()(
implicit session: Session,
ev: Limit =:= Unlimited,
opt: Record <:< Option[Inner],
ec: ExecutionContextExecutor
): F[Option[Inner]] = {
): F[Option[Record]] = {
val enforceLimit = if (query.count) LimitedPart.empty else query.limitedPart append QueryBuilder.limit(1.toString)
optionalFetch(adapter.fromGuava(query.copy(limitedPart = enforceLimit).executableQuery.statement()))
singleFetch(adapter.fromGuava(query.copy(limitedPart = enforceLimit).executableQuery.statement()))
}

override def fromRow(r: Row): Record = query.fromRow(r)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import java.util.concurrent.TimeUnit

import com.datastax.driver.core.VersionNumber
import com.outworkers.phantom.database.DatabaseProvider
import com.outworkers.phantom.dsl.{DateTime, UUID}
import com.outworkers.phantom.dsl.UUID
import com.outworkers.phantom.tables.TestDatabase
import com.outworkers.util.samplers._
import io.circe.{Encoder, Json}
Expand Down Expand Up @@ -60,7 +60,7 @@ trait PhantomBaseSuite extends Suite with Matchers
override def sample: LocalDate = LocalDate.now(DateTimeZone.UTC)
}

override implicit val patienceConfig = PatienceConfig(
override implicit val patienceConfig: PatienceConfig = PatienceConfig(
timeout = defaultTimeoutSpan,
interval = Span(defaultScalaInterval, Millis)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class JodaDateTimeColumnTest extends PhantomSuite {

override def beforeAll(): Unit = {
super.beforeAll()
database.primitivesJoda.createSchema()
val _ = database.primitivesJoda.createSchema()
}

it should "correctly insert and extract a JodaTime date" in {
Expand Down

0 comments on commit 0343109

Please sign in to comment.