Permalink
Browse files

[split] minor memory optimizations in cassie

  • Loading branch information...
1 parent 0b8b509 commit 79c104ee76ee31125bdad94010ceff0811b022dd Ryan King committed Mar 5, 2012
@@ -0,0 +1,43 @@
+// Copyright 2012 Twitter, Inc.
+
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+
+// http://www.apache.org/licenses/LICENSE-2.0
+
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.twitter.cassie
+
+import java.nio.ByteBuffer
+import java.util.{ List => JList, Map => JMap, Set => JSet, ArrayList => JArrayList,HashMap => JHashMap}
+import org.apache.cassandra.finagle.thrift
+
+
+trait BatchMutation {
+
+ private[cassie] val mutations: JMap[ByteBuffer, JMap[String, JList[thrift.Mutation]]] =
+ new JHashMap[ByteBuffer, JMap[String, JList[thrift.Mutation]]]()
+
+ // modifies the supplied JHashMap
+ protected def putMutation(encodedKey: ByteBuffer, cfName: String, mutation: thrift.Mutation) = {
+ var h = mutations.get(encodedKey)
+ if (h == null){
+ h = new JHashMap[String, JList[thrift.Mutation]]
+ mutations.put(encodedKey, h)
+ }
+
+ var l = h.get(cfName)
+ if (l == null) {
+ l = new JArrayList[thrift.Mutation]
+ h.put(cfName, l)
+ }
+ l.add(mutation)
+ }
+
+}
@@ -21,10 +21,6 @@ import java.util.{ List => JList, Map => JMap, Set => JSet, ArrayList => JArrayL
import org.apache.cassandra.finagle.thrift
import scala.collection.mutable.ListBuffer
-trait BatchMutation {
- private[cassie] def mutations: JMap[ByteBuffer, JMap[String, JList[thrift.Mutation]]]
-}
-
/**
* A ColumnFamily-alike which batches mutations into a single API call.
*
@@ -34,13 +30,10 @@ class BatchMutationBuilder[Key, Name, Value](private[cassie] val cf: ColumnFamil
type This = BatchMutationBuilder[Key, Name, Value]
- private[cassie] case class Insert(key: Key, column: Column[Name, Value])
- private[cassie] case class Deletions(key: Key, columnNames: JSet[Name], timestamp: Long)
-
- private val ops = new ListBuffer[Either[Insert, Deletions]]
-
def insert(key: Key, column: Column[Name, Value]): This = synchronized {
- ops.append(Left(Insert(key, column)))
+ val mutation = insertMutation(key, column)
+ val encodedKey = cf.keyCodec.encode(key)
+ putMutation(encodedKey, cf.name, mutation)
this
}
@@ -53,8 +46,11 @@ class BatchMutationBuilder[Key, Name, Value](private[cassie] val cf: ColumnFamil
def removeColumns(key: Key, columns: JSet[Name]): This =
removeColumns(key, columns, cf.clock.timestamp)
- def removeColumns(key: Key, columns: JSet[Name], timestamp: Long): This = synchronized {
- ops.append(Right(Deletions(key, columns, timestamp)))
+ def removeColumns(key: Key, columnNames: JSet[Name], timestamp: Long): This = synchronized {
+ val mutation = deleteMutation(key, columnNames, timestamp)
+ val encodedKey = cf.keyCodec.encode(key)
+
+ putMutation(encodedKey, cf.name, mutation)
this
}
@@ -67,48 +63,29 @@ class BatchMutationBuilder[Key, Name, Value](private[cassie] val cf: ColumnFamil
}.flatten
}
- private[cassie] override def mutations: JMap[ByteBuffer, JMap[String, JList[thrift.Mutation]]] = synchronized {
- val mutations = new JHashMap[ByteBuffer, JMap[String, JList[thrift.Mutation]]]()
-
- ops.map {
- case Left(insert) => {
- val cosc = new thrift.ColumnOrSuperColumn
- cosc.setColumn(
- Column.convert(
- cf.nameCodec,
- cf.valueCodec,
- cf.clock,
- insert.column
- )
- )
- val mutation = new thrift.Mutation
- mutation.setColumn_or_supercolumn(cosc)
-
- val encodedKey = cf.keyCodec.encode(insert.key)
-
- val h = Option(mutations.get(encodedKey)).getOrElse { val x = new JHashMap[String, JList[thrift.Mutation]]; mutations.put(encodedKey, x); x }
- val l = Option(h.get(cf.name)).getOrElse { val y = new JArrayList[thrift.Mutation]; h.put(cf.name, y); y }
- l.add(mutation)
- }
- case Right(deletions) => {
- val timestamp = deletions.timestamp
- val pred = new thrift.SlicePredicate
- pred.setColumn_names(cf.nameCodec.encodeSet(deletions.columnNames))
-
- val deletion = new thrift.Deletion()
- deletion.setTimestamp(timestamp)
- deletion.setPredicate(pred)
-
- val mutation = new thrift.Mutation
- mutation.setDeletion(deletion)
-
- val encodedKey = cf.keyCodec.encode(deletions.key)
-
- val h = Option(mutations.get(encodedKey)).getOrElse { val x = new JHashMap[String, JList[thrift.Mutation]]; mutations.put(encodedKey, x); x }
- val l = Option(h.get(cf.name)).getOrElse { val y = new JArrayList[thrift.Mutation]; h.put(cf.name, y); y }
- l.add(mutation)
- }
- }
- mutations
+ private[this] def insertMutation(key: Key, column: Column[Name, Value]): thrift.Mutation = {
+ val cosc = new thrift.ColumnOrSuperColumn
+ cosc.setColumn(
+ Column.convert(
+ cf.nameCodec,
+ cf.valueCodec,
+ cf.clock,
+ column
+ )
+ )
+ val mutation = new thrift.Mutation
+ mutation.setColumn_or_supercolumn(cosc)
+ }
+
+ private[this] def deleteMutation(key: Key, columnNames: JSet[Name], timestamp: Long): thrift.Mutation = {
+ val pred = new thrift.SlicePredicate
+ pred.setColumn_names(cf.nameCodec.encodeSet(columnNames))
+
+ val deletion = new thrift.Deletion()
+ deletion.setTimestamp(timestamp)
+ deletion.setPredicate(pred)
+
+ val mutation = new thrift.Mutation
+ mutation.setDeletion(deletion)
}
}
@@ -16,12 +16,9 @@ package com.twitter.cassie
import com.twitter.cassie.codecs.Codec
import com.twitter.util.Future
-import java.nio.ByteBuffer
import java.util.Collections.{ singleton => singletonJSet }
-import java.util.{List => JList,Map => JMap, Set => JSet, ArrayList => JArrayList, HashMap => JHashMap}
+import java.util.{Set => JSet}
import org.apache.cassandra.finagle.thrift
-import scala.collection.JavaConversions._
-import scala.collection.mutable.ListBuffer
/**
* A ColumnFamily-alike which batches mutations into a single API call for counters.
@@ -31,21 +28,16 @@ class CounterBatchMutationBuilder[Key, Name](cf: CounterColumnFamily[Key, Name])
type This = CounterBatchMutationBuilder[Key, Name]
- case class Insert(key: Key, column: CounterColumn[Name])
- case class Deletions(key: Key, columnNames: JSet[Name])
-
- private val ops = new ListBuffer[Either[Insert, Deletions]]
-
def insert(key: Key, column: CounterColumn[Name]): This = synchronized {
- ops.append(Left(Insert(key, column)))
+ putMutation(cf.keyCodec.encode(key), cf.name, insertMutation(key, column))
this
}
def removeColumn(key: Key, columnName: Name): This =
removeColumns(key, singletonJSet(columnName))
def removeColumns(key: Key, columnNames: JSet[Name]): This = synchronized {
- ops.append(Right(Deletions(key, columnNames)))
+ putMutation(cf.keyCodec.encode(key), cf.name, deleteMutation(key, columnNames))
this
}
@@ -58,40 +50,22 @@ class CounterBatchMutationBuilder[Key, Name](cf: CounterColumnFamily[Key, Name])
}.flatten
}
- private[cassie] def mutations: JMap[ByteBuffer, JMap[String, JList[thrift.Mutation]]] = synchronized {
- val mutations = new JHashMap[ByteBuffer, JMap[String, JList[thrift.Mutation]]]()
-
- ops.map {
- case Left(insert) => {
- val cosc = new thrift.ColumnOrSuperColumn()
- val counterColumn = new thrift.CounterColumn(cf.nameCodec.encode(insert.column.name), insert.column.value)
- cosc.setCounter_column(counterColumn)
- val mutation = new thrift.Mutation
- mutation.setColumn_or_supercolumn(cosc)
-
- val encodedKey = cf.keyCodec.encode(insert.key)
-
- val h = Option(mutations.get(encodedKey)).getOrElse { val x = new JHashMap[String, JList[thrift.Mutation]]; mutations.put(encodedKey, x); x }
- val l = Option(h.get(cf.name)).getOrElse { val y = new JArrayList[thrift.Mutation]; h.put(cf.name, y); y }
- l.add(mutation)
- }
- case Right(deletions) => {
- val pred = new thrift.SlicePredicate
- pred.setColumn_names(cf.nameCodec.encodeSet(deletions.columnNames))
-
- val deletion = new thrift.Deletion
- deletion.setPredicate(pred)
+ private[this] def insertMutation(key: Key, column: CounterColumn[Name]): thrift.Mutation = {
+ val cosc = new thrift.ColumnOrSuperColumn()
+ val counterColumn = new thrift.CounterColumn(cf.nameCodec.encode(column.name), column.value)
+ cosc.setCounter_column(counterColumn)
+ val mutation = new thrift.Mutation
+ mutation.setColumn_or_supercolumn(cosc)
+ }
- val mutation = new thrift.Mutation
- mutation.setDeletion(deletion)
+ private[this] def deleteMutation(key: Key, columnNames: JSet[Name]): thrift.Mutation = {
+ val pred = new thrift.SlicePredicate
+ pred.setColumn_names(cf.nameCodec.encodeSet(columnNames))
- val encodedKey = cf.keyCodec.encode(deletions.key)
+ val deletion = new thrift.Deletion
+ deletion.setPredicate(pred)
- val h = Option(mutations.get(encodedKey)).getOrElse { val x = new JHashMap[String, JList[thrift.Mutation]]; mutations.put(encodedKey, x); x }
- val l = Option(h.get(cf.name)).getOrElse { val y = new JArrayList[thrift.Mutation]; h.put(cf.name, y); y }
- l.add(mutation)
- }
- }
- mutations
+ val mutation = new thrift.Mutation
+ mutation.setDeletion(deletion)
}
}
@@ -15,19 +15,13 @@
package com.twitter.cassie
import com.twitter.util.Future
-import java.nio.ByteBuffer
-import java.util.{ArrayList => JArrayList,HashMap => JHashMap,Iterator => JIterator,List => JList,Map => JMap,Set => JSet}
+import java.util.{ArrayList => JArrayList}
import org.apache.cassandra.finagle.thrift
-import scala.collection.mutable.ListBuffer
class SuperCounterBatchMutationBuilder[Key, Name, SubName](cf: SuperCounterColumnFamily[Key, Name, SubName]) extends BatchMutation {
- case class Insert(key: Key, name: Name, column: CounterColumn[SubName])
-
- private val ops = new ListBuffer[Insert]
-
def insert(key: Key, name: Name, column: CounterColumn[SubName]) = synchronized {
- ops.append(Insert(key, name, column))
+ putMutation(cf.keyCodec.encode(key), cf.name, insertMutation(key, name, column))
this
}
@@ -40,25 +34,14 @@ class SuperCounterBatchMutationBuilder[Key, Name, SubName](cf: SuperCounterColum
}.flatten
}
- private[cassie] def mutations: JMap[ByteBuffer, JMap[String, JList[thrift.Mutation]]] = synchronized {
- val mutations = new JHashMap[ByteBuffer, JMap[String, JList[thrift.Mutation]]]()
-
- ops.map { insert =>
- val cosc = new thrift.ColumnOrSuperColumn()
- val counterColumn = new thrift.CounterColumn(cf.subNameCodec.encode(insert.column.name), insert.column.value)
- val columns = new JArrayList[thrift.CounterColumn]()
- columns.add(counterColumn)
- val sc = new thrift.CounterSuperColumn(cf.nameCodec.encode(insert.name), columns)
- cosc.setCounter_super_column(sc)
- val mutation = new thrift.Mutation
- mutation.setColumn_or_supercolumn(cosc)
-
- val encodedKey = cf.keyCodec.encode(insert.key)
-
- val h = Option(mutations.get(encodedKey)).getOrElse { val x = new JHashMap[String, JList[thrift.Mutation]]; mutations.put(encodedKey, x); x }
- val l = Option(h.get(cf.name)).getOrElse { val y = new JArrayList[thrift.Mutation]; h.put(cf.name, y); y }
- l.add(mutation)
- }
- mutations
+ private[this] def insertMutation(key: Key, name: Name, column: CounterColumn[SubName]): thrift.Mutation = {
+ val cosc = new thrift.ColumnOrSuperColumn()
+ val counterColumn = new thrift.CounterColumn(cf.subNameCodec.encode(column.name), column.value)
+ val columns = new JArrayList[thrift.CounterColumn]()
+ columns.add(counterColumn)
+ val sc = new thrift.CounterSuperColumn(cf.nameCodec.encode(name), columns)
+ cosc.setCounter_super_column(sc)
+ val mutation = new thrift.Mutation
+ mutation.setColumn_or_supercolumn(cosc)
}
}
@@ -0,0 +1,51 @@
+// Copyright 2012 Twitter, Inc.
+
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+
+// http://www.apache.org/licenses/LICENSE-2.0
+
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.cassie.stress
+
+import com.twitter.cassie.codecs.{LongCodec, Utf8Codec}
+import com.twitter.cassie._
+import com.twitter.conversions.time._
+import com.twitter.finagle.stats.OstrichStatsReceiver
+import com.twitter.finagle.stress.Stats
+import com.twitter.util.{Time, ScheduledThreadPoolTimer}
+
+class BatchMutationBuilderStresser extends Stresser {
+
+ private[this] val cluster = new Cluster("localhost", new OstrichStatsReceiver())
+ private[this] val keyspace = cluster.keyspace("test").connect()
+ private[this] val cf = keyspace.columnFamily("standard", Utf8Codec, LongCodec, LongCodec)
+
+ private[this] val beginTime = Time.now
+ private[this] val timer = new ScheduledThreadPoolTimer()
+
+ timer.schedule(10.seconds) {
+ println("@@ %ds".format(beginTime.untilNow.inSeconds))
+ Stats.prettyPrintStats()
+ }
+
+ private[this] val row = "foo"
+ private[this] val data = 0 until 100 toSeq
+
+ def dispatchLoop() {
+ val batch = cf.batch
+ data foreach { col =>
+ batch.insert(row, Column(col, col))
+ }
+
+ batch.execute ensure {
+ dispatchLoop()
+ }
+ }
+
+}
Oops, something went wrong.

0 comments on commit 79c104e

Please sign in to comment.