Skip to content

Commit

Permalink
[split] finagle-redis: return tuples and doubles
Browse files Browse the repository at this point in the history
- hGetAll returns a seq of tuples vs map to preserve order
- zRangeByScoreWithScores is now zRangeByScore which returns a zRangeResults object
- zRevRangeByScoresWithScores is now zRevRangeByScore
- zScore now returns a double rather than Array[Byte], which makes more sense

RB_ID=64769
  • Loading branch information
Anirudh Srinivas committed May 23, 2012
1 parent a3c244e commit 248f474
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 48 deletions.
77 changes: 56 additions & 21 deletions finagle-redis/src/main/scala/com/twitter/finagle/redis/Client.scala
Expand Up @@ -2,7 +2,7 @@ package com.twitter.finagle.redis

import com.twitter.finagle.builder.{ClientBuilder, ClientConfig}
import com.twitter.finagle.redis.protocol._
import com.twitter.finagle.redis.util.BytesToString
import com.twitter.finagle.redis.util.{BytesToString, NumberFormat}
import com.twitter.finagle.Service
import com.twitter.util.Future

Expand Down Expand Up @@ -169,14 +169,18 @@ class Client(service: Service[Command, Reply]) {
/**
* Gets all field value pairs for given hash
* @param hash key
* @return Map of field/value pairs
* @return Sequence of field/value pairs
*/
def hGetAll(key: Array[Byte]): Future[Map[Array[Byte], Array[Byte]]] =
def hGetAllAsPairs(key: Array[Byte]): Future[Seq[(Array[Byte], Array[Byte])]] =
doRequest(HGetAll(key)) {
case MBulkReply(messages) => returnPairs(messages)
case EmptyMBulkReply() => Future.value(Map())
case MBulkReply(messages) => Future.value(returnPairs(messages))
case EmptyMBulkReply() => Future.value(Seq())
}

@deprecated("Use hGetAllAsPairs instead")
def hGetAll(key: Array[Byte]): Future[Map[Array[Byte], Array[Byte]]] =
hGetAllAsPairs(key) map { res => res toMap }

/**
* Gets values for given fields in hash
* @param hash key, fields
Expand Down Expand Up @@ -213,11 +217,12 @@ class Client(service: Service[Command, Reply]) {
/**
* Gets score of member in sorted set
* @param key, member
* @return Score of member as a byte array
* @return Score of member
*/
def zScore(key: Array[Byte], member: Array[Byte]): Future[Option[Array[Byte]]] =
def zScore(key: Array[Byte], member: Array[Byte]): Future[Option[Double]] =
doRequest(ZScore(key, member)) {
case BulkReply(message) => Future.value(Some(message))
case BulkReply(message) => Future.value(
Some(NumberFormat.toDouble(BytesToString(message))))
case EmptyBulkReply() => Future.value(None)
}

Expand All @@ -235,8 +240,25 @@ class Client(service: Service[Command, Reply]) {
* Gets member, score pairs from sorted set between min and max
* Results are limited by offset and count
* @param key, min, max, offset, count
* @return Map of member/score pairs
* @return ZRangeResults object containing item/score pairs
*/
def zRangeByScore(
key: Array[Byte], min: Double, max: Double, offset: Int, count: Int
): Future[ZRangeResults] =
doRequest(
ZRangeByScore(
BytesToString(key),
ZInterval(min),
ZInterval(max),
WithScores.asArg,
Some(Limit(offset, count))
)
) {
case MBulkReply(messages) => Future.value(ZRangeResults(returnPairs(messages)))
case EmptyMBulkReply() => Future.value(ZRangeResults(List()))
}

@deprecated("Use zRangeByScoreWithScoresAsPairs instead")
def zRangeByScoreWithScores(
key: Array[Byte], min: Double, max: Double, offset: Int, count: Int
): Future[Map[Array[Byte], Array[Byte]]] =
Expand All @@ -245,11 +267,11 @@ class Client(service: Service[Command, Reply]) {
BytesToString(key),
ZInterval(min),
ZInterval(max),
Some(WithScores),
WithScores.asArg,
Some(Limit(offset, count))
)
) {
case MBulkReply(messages) => returnPairs(messages)
case MBulkReply(messages) => Future.value(returnPairs(messages) toMap)
case EmptyMBulkReply() => Future.value(Map())
}

Expand Down Expand Up @@ -278,7 +300,7 @@ class Client(service: Service[Command, Reply]) {
* Returns specified range of elements in sorted set at key
* Elements are ordered from highest to lowest score
* @param key, start, stop
* @return List of element in specified range
* @return List of elements in specified range
*/
def zRevRange(key: Array[Byte], start: Int, stop: Int): Future[Seq[Array[Byte]]] =
doRequest(ZRevRange(key, start, stop)) {
Expand All @@ -291,8 +313,25 @@ class Client(service: Service[Command, Reply]) {
* Elements are ordered from highest to lowest score
* Results are limited by offset and count
* @param key, max, min, offset, count
* @return Map of element/score pairs in specified score range
* @return ZRangeResults object containing item/score pairs
*/
def zRevRangeByScore(
key: Array[Byte], max: Double, min: Double, offset: Int, count: Int
): Future[ZRangeResults] =
doRequest(
ZRevRangeByScore(
BytesToString(key),
ZInterval(max),
ZInterval(min),
WithScores.asArg,
Some(Limit(offset, count))
)
) {
case MBulkReply(messages) => Future.value(ZRangeResults(returnPairs(messages)))
case EmptyMBulkReply() => Future.value(ZRangeResults(List()))
}

@deprecated("Use zRevRangeByScoreWithScoresAsPairs instead")
def zRevRangeByScoreWithScores(
key: Array[Byte], max: Double, min: Double, offset: Int, count: Int
): Future[Map[Array[Byte], Array[Byte]]] =
Expand All @@ -301,11 +340,11 @@ class Client(service: Service[Command, Reply]) {
BytesToString(key),
ZInterval(max),
ZInterval(min),
Some(WithScores),
WithScores.asArg,
Some(Limit(offset, count))
)
) {
case MBulkReply(messages) => returnPairs(messages)
case MBulkReply(messages) => Future.value(returnPairs(messages) toMap)
case EmptyMBulkReply() => Future.value(Map())
}

Expand All @@ -328,11 +367,7 @@ class Client(service: Service[Command, Reply]) {
*/
private def returnPairs(messages: List[Array[Byte]]) = {
assert(messages.length % 2 == 0, "Odd number of items in response")
Future.value({
messages.grouped(2).toList flatMap {
case List(a, b) => Some(a, b)
} toMap
})
messages.grouped(2).toList map { case List(a, b) => (a, b) }
}

}
}
Expand Up @@ -17,6 +17,7 @@ case object WithScores extends CommandArgument {
case WITHSCORES => Some(s)
case _ => None
}
val asArg = Some(WithScores)
}

case class Limit(offset: Int, count: Int) extends CommandArgument {
Expand Down
Expand Up @@ -293,6 +293,18 @@ object ZUnionStore extends ZStoreCompanion {
* Internal Helpers
*/

case class ZRangeResults(entries: Array[Array[Byte]], scores: Array[Double]) {
def asTuples(): Seq[(Array[Byte], Double)] =
(entries, scores).zipped map { (entry, score) => (entry, score) } toSeq
}
object ZRangeResults {
def apply(tuples: List[(Array[Byte], Array[Byte])]): ZRangeResults = {
val arrays = tuples.unzip
val doubles = arrays._2 map { score => NumberFormat.toDouble(BytesToString(score)) }
ZRangeResults(arrays._1.toArray, doubles.toArray)
}
}

// Represents part of an interval, helpers in companion object
case class ZInterval(value: String) {
import ZInterval._
Expand Down
@@ -1,7 +1,9 @@
package com.twitter.finagle.redis
package util

import java.nio.charset.Charset
import org.jboss.netty.buffer.{ChannelBuffer, ChannelBuffers}
import org.jboss.netty.util.CharsetUtil

trait ErrorConversion {
def getException(msg: String): Throwable
Expand All @@ -21,23 +23,28 @@ trait ErrorConversion {
}

object BytesToString {
def apply(arg: Array[Byte], charset: String = "UTF-8") = new String(arg, charset)
def fromList(args: List[Array[Byte]], charset: String = "UTF-8") = args.map { arg =>
BytesToString(arg, charset)
}
def fromMap(args: Map[Array[Byte], Array[Byte]], charset: String = "UTF-8") =
args.toSeq map { arg =>
(BytesToString(arg._1, charset), BytesToString(arg._2, charset))
}
def apply(arg: Array[Byte], charset: Charset = CharsetUtil.UTF_8) = new String(arg, charset)

def fromList(args: List[Array[Byte]], charset: Charset = CharsetUtil.UTF_8) =
args.map { arg => BytesToString(arg, charset) }

def fromTuples(args: Seq[(Array[Byte], Array[Byte])], charset: Charset = CharsetUtil.UTF_8) =
args map { arg => (BytesToString(arg._1), BytesToString(arg._2)) }

def fromTuplesWithDoubles(args: Seq[(Array[Byte], Double)],
charset: Charset = CharsetUtil.UTF_8) =
args map { arg => (BytesToString(arg._1, charset), arg._2) }

}
object StringToBytes {
def apply(arg: String, charset: String = "UTF-8") = arg.getBytes(charset)
def fromList(args: List[String], charset: String = "UTF-8") = args.map { arg =>
arg.getBytes(charset)
}
def apply(arg: String, charset: Charset = CharsetUtil.UTF_8) = arg.getBytes(charset)
def fromList(args: List[String], charset: Charset = CharsetUtil.UTF_8) =
args.map { arg =>
arg.getBytes(charset)
}
}
object StringToChannelBuffer {
def apply(string: String, charset: String = "UTF-8") = {
def apply(string: String, charset: Charset = CharsetUtil.UTF_8) = {
ChannelBuffers.wrappedBuffer(string.getBytes(charset))
}
}
Expand Down
Expand Up @@ -115,11 +115,18 @@ class ClientSpec extends SpecificationWithJUnit {
client.hMGet("foo", Seq("bar", "boo"))().toList) mustEqual Seq("baz", "moo")
}

"get multiple values at once (deprecated)" in {
client.hSet(foo, bar, baz)()
client.hSet(foo, boo, moo)()
BytesToString.fromTuples(
client.hGetAll(foo)() toSeq) mustEqual Seq(("bar", "baz"), ("boo", "moo"))
}

"get multiple values at once" in {
client.hSet(foo, bar, baz)()
client.hSet(foo, boo, moo)()
BytesToString.fromMap(
client.hGetAll(foo)()) mustEqual Seq(("bar" -> "baz"), ("boo", "moo"))
BytesToString.fromTuples(
client.hGetAllAsPairs(foo)()) mustEqual Seq(("bar", "baz"), ("boo", "moo"))
}

}
Expand All @@ -128,10 +135,10 @@ class ClientSpec extends SpecificationWithJUnit {
"perform sorted set commands" in {

"add members and get scores" in {
client.zAdd(foo, 10, bar)() mustEqual 1
client.zAdd(foo, 20, baz)() mustEqual 1
BytesToString(client.zScore(foo, bar)().get) mustEqual "10"
BytesToString(client.zScore(foo, baz)().get) mustEqual "20"
client.zAdd(foo, 10.5, bar)() mustEqual 1
client.zAdd(foo, 20.1, baz)() mustEqual 1
client.zScore(foo, bar)().get mustEqual 10.5
client.zScore(foo, baz)().get mustEqual 20.1
}

"add members and get the zcount" in {
Expand All @@ -141,12 +148,20 @@ class ClientSpec extends SpecificationWithJUnit {
client.zCount(foo, 40, 50)() mustEqual 0
}

"get zRangeByScore (deprecated)" in {
client.zAdd(foo, 10, bar)() mustEqual 1
client.zAdd(foo, 20, baz)() mustEqual 1
BytesToString.fromTuples(
client.zRangeByScoreWithScores(foo, 0, 30, 0, 5)() toSeq) mustEqual Seq(("bar", 10),
("baz", 20))
}

"get the zRangeByScore" in {
client.zAdd(foo, 10, bar)() mustEqual 1
client.zAdd(foo, 20, baz)() mustEqual 1
BytesToString.fromMap(
client.zRangeByScoreWithScores(foo, 0, 30, 0, 5)()) mustEqual Seq(("bar", "10"),
("baz", "20"))
BytesToString.fromTuplesWithDoubles(
client.zRangeByScore(foo, 0, 30, 0, 5)().asTuples) mustEqual Seq(("bar", 10),
("baz", 20))
}

"get cardinality and remove members" in {
Expand All @@ -163,17 +178,24 @@ class ClientSpec extends SpecificationWithJUnit {
client.zRevRange(foo, 0, -1)().toList) mustEqual Seq("baz", "bar")
}

"get zRevRangeByScoreWithScores (deprecated)" in {
client.zAdd(foo, 10, bar)() mustEqual 1
client.zAdd(foo, 20, baz)() mustEqual 1
BytesToString.fromTuples(
client.zRevRangeByScoreWithScores(foo, 0, 10, 0, 1)() toSeq) mustEqual Seq(("bar", 10))
client.zRevRangeByScoreWithScores(foo, 0, 0, 0, 1)() mustEqual Map()
}

"get zRevRangeByScoreWithScores" in {
client.zAdd(foo, 10, bar)() mustEqual 1
client.zAdd(foo, 20, baz)() mustEqual 1
BytesToString.fromMap(
client.zRevRangeByScoreWithScores(foo, 0, 10, 0, 1)()) mustEqual Seq(("bar", "10"))
BytesToString.fromMap(
client.zRevRangeByScoreWithScores(foo, 0, 0, 0, 1)()) mustEqual Seq()
BytesToString.fromTuplesWithDoubles(
client.zRevRangeByScore(foo, 0, 10, 0, 1)().asTuples) mustEqual Seq(("bar", 10))
client.zRevRangeByScore(foo, 0, 0, 0, 1)().asTuples == Seq()
}

}

}

}
}

0 comments on commit 248f474

Please sign in to comment.