Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
initial commit of scala 2.8.0 branch into master
  • Loading branch information
shorrockin committed Jul 16, 2010
2 parents daf8949 + 567690e commit 4647aad
Show file tree
Hide file tree
Showing 13 changed files with 141 additions and 47 deletions.
6 changes: 3 additions & 3 deletions README.textile
Expand Up @@ -3,9 +3,9 @@ h3. Cascal - Cassandra Simplified
|_. Primary Author|Chris Shorrock|
|_. Home Page|"http://wiki.github.com/shorrockin/cascal/":http://wiki.github.com/shorrockin/cascal/|
|_. API Doc|"http://shorrockin.com/cascal/scaladocs/":http://shorrockin.com/cascal/scaladocs/|
|_. Stable Version|1.2|
|_. Stable Version|1.2 (Scala 2.7.7)|
|_. Snapshot Version|1.3-SNAPSHOT|
|_. Scala Version|2.7.7|
|_. Scala Version|2.8.0|
|_. Cassandra Version|0.6.1|


Expand All @@ -30,7 +30,7 @@ h3. Maven Information
<dependency>
<groupId>com.shorrockin</groupId>
<artifactId>cascal</artifactId>
<version>1.0</version>
<version>1.2-Scala.2.8.0-SNAPSHOT</version>
</dependency>
</dependencies>

Expand Down
3 changes: 2 additions & 1 deletion pom.xml
Expand Up @@ -19,7 +19,7 @@

<properties>
<java.compile.version>1.5</java.compile.version>
<scala.version>2.7.7</scala.version>
<scala.version>2.8.0</scala.version>
</properties>

<build>
Expand All @@ -40,6 +40,7 @@
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.14</version>
<executions>
<execution>
<id>scala-compile-first</id>
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Expand Up @@ -4,5 +4,5 @@ project.organization=com.shorrockin
project.name=cascal
sbt.version=0.7.3
project.version=N/A
build.scala.versions=2.7.7
build.scala.versions=2.8.0
project.initialize=false
6 changes: 4 additions & 2 deletions src/main/scala/com/shorrockin/cascal/model/Column.scala
Expand Up @@ -5,6 +5,8 @@ import org.apache.cassandra.thrift.{ColumnPath, ColumnOrSuperColumn}
import org.apache.cassandra.thrift.{Column => CassColumn}
import org.apache.cassandra.thrift.{SuperColumn => CassSuperColumn}
import com.shorrockin.cascal.utils.Conversions
import com.shorrockin.cascal.utils.Utils.now


/**
* a column is the child component of a super column or a
Expand All @@ -18,8 +20,8 @@ case class Column[Owner](val name:Array[Byte],
val time:Long,
val owner:Owner) extends Gettable[Column[Owner]] {

def this(name:Array[Byte], value:Array[Byte], owner:Owner) = this(name, value, System.currentTimeMillis, owner)
def this(name:Array[Byte], owner:Owner) = this(name, null, System.currentTimeMillis, owner)
def this(name:Array[Byte], value:Array[Byte], owner:Owner) = this(name, value, now, owner)
def this(name:Array[Byte], owner:Owner) = this(name, null, now, owner)
def this(name:Array[Byte], value:Array[Byte], date:Date, owner:Owner) = this(name, value, date.getTime, owner)


Expand Down
5 changes: 4 additions & 1 deletion src/main/scala/com/shorrockin/cascal/model/SuperColumn.scala
@@ -1,7 +1,6 @@
package com.shorrockin.cascal.model

import org.apache.cassandra.thrift.{ColumnPath, ColumnParent, ColumnOrSuperColumn}
import scala.collection.jcl.Conversions.convertList

/**
* a super standard key the key who's parent is a super key. It acts in much
Expand All @@ -24,6 +23,10 @@ case class SuperColumn(val value:Array[Byte], val key:SuperKey) extends Gettable

def ::(other:SuperColumn):List[SuperColumn] = other :: this :: Nil

private def convertList[T](v:java.util.List[T]):List[T] = {
scala.collection.JavaConversions.asBuffer(v).toList
}

/**
* given the returned object from the get request, convert
* to our return type.
Expand Down
5 changes: 4 additions & 1 deletion src/main/scala/com/shorrockin/cascal/model/SuperKey.scala
@@ -1,6 +1,5 @@
package com.shorrockin.cascal.model

import scala.collection.jcl.Conversions.convertList
import org.apache.cassandra.thrift.{ColumnOrSuperColumn}

case class SuperKey(val value:String, val family:SuperColumnFamily) extends Key[SuperColumn, Seq[(SuperColumn, Seq[Column[SuperColumn]])]] {
Expand All @@ -21,5 +20,9 @@ case class SuperKey(val value:String, val family:SuperColumnFamily) extends Key[
}
}

private def convertList[T](v:java.util.List[T]):List[T] = {
scala.collection.JavaConversions.asBuffer(v).toList
}

override def toString = "%s \\ SuperKey(value = %s)".format(family.toString, value)
}
3 changes: 2 additions & 1 deletion src/main/scala/com/shorrockin/cascal/session/Operation.scala
Expand Up @@ -2,6 +2,7 @@ package com.shorrockin.cascal.session

import org.apache.cassandra.thrift.{Deletion, Mutation}
import com.shorrockin.cascal.model._
import com.shorrockin.cascal.utils.Utils.now

/**
* defines an operation that can be executed in parallel with a collection
Expand Down Expand Up @@ -50,7 +51,7 @@ class Delete(val container:ColumnContainer[_, _], val predicate:Predicate) exten
lazy val mutation = {
val out = new Mutation
val del = new Deletion
del.setTimestamp(System.currentTimeMillis)
del.setTimestamp(now)

predicate match {
case EmptyPredicate => /* do nothing */
Expand Down
61 changes: 32 additions & 29 deletions src/main/scala/com/shorrockin/cascal/session/Session.scala
Expand Up @@ -2,32 +2,29 @@ package com.shorrockin.cascal.session

import org.apache.thrift.protocol.TBinaryProtocol

import collection.jcl.Buffer
import org.apache.cassandra.thrift.{Mutation, Cassandra, NotFoundException, ConsistencyLevel}
import java.util.{Map => JMap, List => JList, HashMap, ArrayList}



import collection.jcl.Conversions._
import com.shorrockin.cascal.utils.Conversions._
import com.shorrockin.cascal.utils.Utils.now

import com.shorrockin.cascal.model._
import org.apache.thrift.transport.{TFramedTransport, TSocket}
import collection.immutable.HashSet

import java.util.concurrent.atomic.AtomicLong

/**
* a cascal session is the entry point for interacting with the
* cassandra system through various path elements.
*
* @author Chris Shorrock
*/
class Session(val host: Host, val defaultConsistency: Consistency, val framedTransport: Boolean) extends SessionTemplate {
def this(host: String, port: Int, timeout: Int, defaultConsistency: Consistency, framedTransport: Boolean) = this (Host(host, port, timeout), defaultConsistency, framedTransport)

def this(host: String, port: Int, timeout: Int, defaultConsistency: Consistency) = this (host, port, timeout, defaultConsistency, false)
class Session(val host:Host, val defaultConsistency:Consistency, val framedTransport:Boolean) extends SessionTemplate {

def this(host: String, port: Int, timeout: Int) = this (host, port, timeout, Consistency.One, false)
def this(host:String, port:Int, timeout:Int, defaultConsistency:Consistency, framedTransport:Boolean) = this(Host(host, port, timeout), defaultConsistency, framedTransport)
def this(host:String, port:Int, timeout:Int, defaultConsistency:Consistency) = this(host, port, timeout, defaultConsistency, false)
def this(host:String, port:Int, timeout:Int) = this(host, port, timeout, Consistency.One, false)

private val sock = {
if (framedTransport) new TFramedTransport(new TSocket(host.address, host.port, host.timeout))
Expand Down Expand Up @@ -91,12 +88,11 @@ class Session(val host: Host, val defaultConsistency: Consistency, val framedTra
lazy val keyspaces: Seq[String] = Buffer(client.get_string_list_property("keyspaces"))

/**
* returns the
* returns the descriptors for all keyspaces
*/

lazy val keyspaceDescriptors: Set[Tuple3[String, String, String]] = {
var keyspaceDesc: Set[Tuple3[String, String, String]] = new HashSet[Tuple3[String, String, String]]
client.describe_keyspaces foreach {
convertSet(client.describe_keyspaces) foreach {
space =>
val familyMap = client.describe_keyspace(space)
familyMap.keySet foreach {
Expand Down Expand Up @@ -129,6 +125,7 @@ class Session(val host: Host, val defaultConsistency: Consistency, val framedTra
}
}


/**
* returns the column value for the specified column
*/
Expand Down Expand Up @@ -247,11 +244,10 @@ class Session(val host: Host, val defaultConsistency: Consistency, val framedTra

def locate(key: String) = (containers.find {_.key.value.equals(key)}).get

results.map {
(tuple) =>
val key = locate(tuple._1)
val value = key.convertListResult(tuple._2)
(key -> value)
convertMap(results).map { (tuple) =>
val key = locate(tuple._1)
val value = key.convertListResult(tuple._2)
(key -> value)
}.toSeq
} else {
throw new IllegalArgumentException("must provide at least 1 container for a list(keys, predicate, consistency) call")
Expand Down Expand Up @@ -280,10 +276,9 @@ class Session(val host: Host, val defaultConsistency: Consistency, val framedTra
val results = client.get_range_slices(family.keyspace.value, family.columnParent, predicate.slicePredicate, range.cassandraRange, consistency)
var map = Map[Key[ColumnType, ListType], ListType]()

results.foreach {
(keyslice) =>
val key = (family \ keyslice.key)
map = map + (key -> key.convertListResult(keyslice.columns))
convertList(results).foreach { (keyslice) =>
val key = (family \ keyslice.key)
map = map + (key -> key.convertListResult(keyslice.columns))
}
map
}
Expand Down Expand Up @@ -352,13 +347,6 @@ class Session(val host: Host, val defaultConsistency: Consistency, val framedTra
*/
private implicit def toThriftConsistency(c: Consistency): ConsistencyLevel = c.thriftValue


/**
* retuns the current time in milliseconds
*/
private def now = System.currentTimeMillis


/**
* all calls which access the session should be wrapped within this method,
* it will catch any exceptions and make sure the session is then removed
Expand All @@ -370,4 +358,19 @@ class Session(val host: Host, val defaultConsistency: Consistency, val framedTra
case t: Throwable => lastError = Some(t); throw t
}

}
private def Buffer[T](v:java.util.List[T]) = {
scala.collection.JavaConversions.asBuffer(v)
}

implicit private def convertList[T](v:java.util.List[T]):List[T] = {
scala.collection.JavaConversions.asBuffer(v).toList
}

implicit private def convertMap[K,V](v:java.util.Map[K,V]): scala.collection.mutable.Map[K,V] = {
scala.collection.JavaConversions.asMap(v)
}

implicit private def convertSet[T](s:java.util.Set[T]):scala.collection.mutable.Set[T] = {
scala.collection.JavaConversions.asSet(s)
}
}
Expand Up @@ -5,8 +5,8 @@ import org.apache.cassandra.config.DatabaseDescriptor
import java.io.File
import java.net.ConnectException
import org.apache.thrift.transport.{TTransportException, TSocket}
import session._
import utils.{Utils, Logging}
import com.shorrockin.cascal.session._
import com.shorrockin.cascal.utils.{Utils, Logging}
/**
* trait which mixes in the functionality necessary to embed
* cassandra into a unit test
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/com/shorrockin/cascal/utils/Logging.scala
Expand Up @@ -8,5 +8,5 @@ import org.apache.commons.logging.LogFactory
* @author Chris Shorrock
*/
trait Logging {
@transient @volatile lazy val log = LogFactory.getLog(this.getClass.getName)
lazy val log = LogFactory.getLog(this.getClass.getName)
}
32 changes: 31 additions & 1 deletion src/main/scala/com/shorrockin/cascal/utils/Utils.scala
Expand Up @@ -2,6 +2,7 @@ package com.shorrockin.cascal.utils

import _root_.scala.io.Source
import java.io.{FileWriter, InputStream, FileOutputStream, File}
import java.util.concurrent.TimeUnit

/**
* common utility functions that don't fit elsewhere.
Expand Down Expand Up @@ -51,7 +52,7 @@ object Utils extends Logging {
* file in the source file.
*/
def replace(file:File, replacements:(String, String)*):File = {
val contents = Source.fromFile(file).getLines.toList.map { (line) =>
val contents = Source.fromFile(file).getLines().toList.map { (line) =>
var current = line
replacements.foreach { (r) => current = current.replace(r._1, r._2) }
current
Expand Down Expand Up @@ -82,4 +83,33 @@ object Utils extends Logging {
closeable.foreach { (c) => ignore(c.close()) }
}
}

private val epocBaseMicros = TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis)
private val runBaseNanos = System.nanoTime

def currentTimeMicros = epocBaseMicros + TimeUnit.NANOSECONDS.toMicros(System.nanoTime-runBaseNanos)

var COMPENSATE_FOR_LOW_PRECISION_SYSTEM_TIME = System.getProperty("com.shorrockin.cascal.COMPENSATE_FOR_LOW_PRESCISION_SYSTEM_TIME", "false") == "true"

private var previousNow = System.currentTimeMillis

/**
* retuns the current time in micro seconds
*/
def now = {

var rc = currentTimeMicros

// It's very possible the platform can issue repetitive calls to now faster than
// the the platforms timer can change.
if( COMPENSATE_FOR_LOW_PRECISION_SYSTEM_TIME ) {
Utils.synchronized {
if( rc <= previousNow ) {
rc = previousNow + 1
}
previousNow = rc
}
}
rc
}
}
38 changes: 38 additions & 0 deletions src/test/scala/com/shorrockin/cascal/TestInsertRemoveLoop.scala
@@ -0,0 +1,38 @@
package com.shorrockin.cascal

import testing._
import org.junit.{Assert, Test}
import com.shorrockin.cascal.utils.Utils

/**
* tests a looping insert remove. Stresses out the precision of
* system time.
*/
class TestInsertRemoveLoop extends CassandraTestPool {
import com.shorrockin.cascal.utils.Conversions._
import Assert._

@Test def testInsertRemoveLoop = borrow { session =>

def checkLowResolution = {
var onLowPrecisionSystem = false
for( i <- 1L to 100L ) {
session.remove("Test" \ "Standard" \ "Test")
session.insert("Test" \ "Standard" \ "Test" \ (i, "hello:"+i))
if( session.get("Test" \ "Standard" \ "Test" \ i) == None ) {
onLowPrecisionSystem = true
}
}
onLowPrecisionSystem
}

if( checkLowResolution ) {
println("You have low resolution timer on this system")
Utils.COMPENSATE_FOR_LOW_PRECISION_SYSTEM_TIME = true
assertFalse("setting Utils.COMPENSATE_FOR_LOW_PRECISION_SYSTEM_TIME = true did not work around the low resolution timer problems.", checkLowResolution);
} else {
println("You have high resolution timer on this system")
}
}

}

0 comments on commit 4647aad

Please sign in to comment.