Skip to content

Commit

Permalink
Merge pull request #451 from outworkers/release/states
Browse files Browse the repository at this point in the history
Adding ability to use paging states and modify statements
  • Loading branch information
alexflav23 committed Mar 15, 2016
2 parents b589c1f + c27f62c commit 6cb1030
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import com.websudos.phantom.builder.clauses.DeleteClause
import com.websudos.phantom.builder.query._
import com.websudos.phantom.column.AbstractColumn
import com.websudos.phantom.connectors.KeySpace
import com.websudos.phantom.exceptions.InvalidPrimaryKeyException
import com.websudos.phantom.exceptions.{InvalidClusteringKeyException, InvalidPrimaryKeyException}
import org.slf4j.LoggerFactory

import scala.collection.mutable.{ArrayBuffer => MutableArrayBuffer}
Expand Down Expand Up @@ -153,7 +153,7 @@ abstract class CassandraTable[T <: CassandraTable[T, R], R] extends SelectTable[

val operand = partitions.lengthCompare(1)
val key = if (operand < 0) {
throw InvalidPrimaryKeyException()
throw InvalidPrimaryKeyException(tableName)
} else if (operand == 0) {

val partitionKey = partitions.headOption.map(_.name).orNull
Expand Down Expand Up @@ -184,8 +184,9 @@ abstract class CassandraTable[T <: CassandraTable[T, R], R] extends SelectTable[
*/
private[this] def preconditions(): Unit = {
if (clustered && primaryKeys.diff(clusteringColumns).nonEmpty) {
logger.error("When using CLUSTERING ORDER all PrimaryKey definitions must become a ClusteringKey definition and specify order.")
throw new InvalidPrimaryKeyException("When using CLUSTERING ORDER all PrimaryKey definitions must become a ClusteringKey definition and specify order.")
logger.error("When using CLUSTERING ORDER all PrimaryKey definitions" +
" must become a ClusteringKey definition and specify order.")
throw InvalidClusteringKeyException(tableName)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,25 @@ trait ExecutableStatement extends CassandraOperations {
def execute()(implicit session: Session, keySpace: KeySpace): TwitterFuture[ResultSet] = {
twitterQueryStringExecuteToFuture(statement)
}

/**
* This will convert the underlying call to Cassandra done with Google Guava ListenableFuture to a consumable
* Scala Future that will be completed once the operation is completed on the
* database end.
*
* The execution context of the transformation is provided by phantom via
* [[com.websudos.phantom.Manager.scalaExecutor]] and it is recommended to
* use [[com.websudos.phantom.dsl.context]] for operations that chain
* database calls.
*
* @param modifyStatement The function allowing to modify underlying [[Statement]]
* @param session The implicit session provided by a [[com.websudos.phantom.connectors.Connector]].
* @param keySpace The implicit keySpace definition provided by a [[com.websudos.phantom.connectors.Connector]].
* @return
*/
def execute(modifyStatement : Statement => Statement)(implicit session: Session, keySpace: KeySpace): TwitterFuture[ResultSet] = {
twitterQueryStringExecuteToFuture(modifyStatement(statement))
}
}

private[phantom] class ExecutableStatementList(val list: Seq[CQLQuery]) extends CassandraOperations {
Expand Down Expand Up @@ -225,6 +244,32 @@ trait ExecutableQuery[T <: CassandraTable[T, _], R, Limit <: LimitBound] extends
future() map { resultSet => { directMapper(resultSet.all) } }
}

/**
* Returns a parsed sequence of [R]ows
* This is not suitable for big results set
* @param session The Cassandra session in use.
* @param ec The Execution Context.
* @return A Scala future wrapping a list of mapped results.
*/
def fetch(state: PagingState)(implicit session: Session, ec: ExecutionContext, keySpace: KeySpace): ScalaFuture[List[R]] = {
future(st => st.setPagingState(state)) map {
resultSet => { directMapper(resultSet.all) }
}
}

/**
* Returns a parsed sequence of [R]ows
* This is not suitable for big results set
* @param session The Cassandra session in use.
* @param ec The Execution Context.
* @return A Scala future wrapping a list of mapped results.
*/
def fetch(modifyStatement : Statement => Statement)(implicit session: Session, ec: ExecutionContext, keySpace: KeySpace): ScalaFuture[List[R]] = {
future(modifyStatement) map {
resultSet => { directMapper(resultSet.all) }
}
}

/**
* Returns a parsed iterator of [R]ows
* @param session The Cassandra session in use.
Expand All @@ -244,4 +289,24 @@ trait ExecutableQuery[T <: CassandraTable[T, _], R, Limit <: LimitBound] extends
def collect()(implicit session: Session, keySpace: KeySpace): TwitterFuture[List[R]] = {
execute() map { resultSet => { directMapper(resultSet.all) } }
}

/**
* Returns a parsed sequence of [R]ows
* This is not suitable for big results set
* @param session The Cassandra session in use.
* @return A Twitter future wrapping a list of mapped results.
*/
def collect(modifyStatement : Statement => Statement)(implicit session: Session, keySpace: KeySpace): TwitterFuture[List[R]] = {
execute(modifyStatement) map { resultSet => { directMapper(resultSet.all) } }
}

/**
* Returns a parsed sequence of [R]ows
* This is not suitable for big results set
* @param session The Cassandra session in use.
* @return A Twitter future wrapping a list of mapped results.
*/
def collect(pagingState: PagingState)(implicit session: Session, keySpace: KeySpace): TwitterFuture[List[R]] = {
execute(st => st.setPagingState(pagingState)) map { resultSet => { directMapper(resultSet.all) } }
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,43 @@
/*
* Copyright 2013-2015 Websudos, Limited.
*
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* - Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* - Explicit consent must be obtained from the copyright owner, Websudos Limited before any redistribution is made.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
package com.websudos.phantom.exceptions

case class InvalidPrimaryKeyException(msg: String = "You need to define at least one PartitionKey for the schema") extends RuntimeException(msg)
import scala.util.control.NoStackTrace

case class InvalidTableException(msg: String) extends RuntimeException(msg)
case class InvalidClusteringKeyException(table: String) extends
RuntimeException(s"Table $table: When using CLUSTERING ORDER all PrimaryKey" +
s" definitions must become a ClusteringKey definition and specify order."
) with NoStackTrace

case class InvalidPrimaryKeyException(
table: String
) extends RuntimeException(s"You need to define at least one PartitionKey for the table $table") with NoStackTrace

case class InvalidTableException(msg: String) extends RuntimeException(msg) with NoStackTrace
2 changes: 1 addition & 1 deletion project/Build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ object Build extends Build {

val sharedSettings: Seq[Def.Setting[_]] = Defaults.coreDefaultSettings ++ Seq(
organization := "com.websudos",
version := "1.22.1",
version := "1.23.1",
scalaVersion := "2.11.7",
credentials ++= defaultCredentials,
crossScalaVersions := Seq("2.10.5", "2.11.7"),
Expand Down

0 comments on commit 6cb1030

Please sign in to comment.