Permalink
Browse files

[split] release 5.3.11 of finagle-redis - brings finagle/redis_scan_h…

…scan to master

RB_ID=87639
TBR=true
  • Loading branch information...
1 parent 60c26ee commit 38e4092d0f8ae98a1e53d23bb0e6abf878fa798d Andy Reitz committed Sep 21, 2012
Showing with 410 additions and 191 deletions.
  1. +2 −2 finagle-example/pom.xml
  2. +1 −1 finagle-redis/pom.xml
  3. +4 −4 finagle-redis/src/main/scala/com/twitter/finagle/redis/Client.scala
  4. +14 −2 finagle-redis/src/main/scala/com/twitter/finagle/redis/HashCommands.scala
  5. +14 −1 finagle-redis/src/main/scala/com/twitter/finagle/redis/KeyCommands.scala
  6. +4 −4 finagle-redis/src/main/scala/com/twitter/finagle/redis/SortedSetCommands.scala
  7. +5 −5 finagle-redis/src/main/scala/com/twitter/finagle/redis/protocol/Codec.scala
  8. +7 −1 finagle-redis/src/main/scala/com/twitter/finagle/redis/protocol/Command.scala
  9. +41 −7 finagle-redis/src/main/scala/com/twitter/finagle/redis/protocol/commands/CommandArguments.scala
  10. +2 −2 finagle-redis/src/main/scala/com/twitter/finagle/redis/protocol/commands/CommandTypes.scala
  11. +63 −15 finagle-redis/src/main/scala/com/twitter/finagle/redis/protocol/commands/Hashes.scala
  12. +94 −21 finagle-redis/src/main/scala/com/twitter/finagle/redis/protocol/commands/Keys.scala
  13. +6 −6 finagle-redis/src/main/scala/com/twitter/finagle/redis/protocol/commands/Misc.scala
  14. +66 −66 finagle-redis/src/main/scala/com/twitter/finagle/redis/protocol/commands/SortedSets.scala
  15. +39 −39 finagle-redis/src/main/scala/com/twitter/finagle/redis/protocol/commands/Strings.scala
  16. +6 −6 finagle-redis/src/main/scala/com/twitter/finagle/redis/protocol/commands/Transactions.scala
  17. +2 −2 finagle-redis/src/main/scala/com/twitter/finagle/redis/util/Conversions.scala
  18. +3 −3 finagle-redis/src/test/scala/com/twitter/finagle/redis/NaggatiSpec.scala
  19. +37 −4 finagle-redis/src/test/scala/com/twitter/finagle/redis/integration/ClientSpec.scala
View
@@ -4,7 +4,7 @@
<groupId>com.twitter</groupId>
<artifactId>finagle-example</artifactId>
<packaging>jar</packaging>
- <version>5.3.11</version>
+ <version>5.3.12</version>
<parent>
<groupId>com.twitter</groupId>
<artifactId>scala-parent-292</artifactId>
@@ -103,7 +103,7 @@
<dependency>
<groupId>com.twitter</groupId>
<artifactId>finagle-redis</artifactId>
- <version>5.3.10</version>
+ <version>5.3.11</version>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
View
@@ -4,7 +4,7 @@
<groupId>com.twitter</groupId>
<artifactId>finagle-redis</artifactId>
<packaging>jar</packaging>
- <version>5.3.10</version>
+ <version>5.3.11</version>
<parent>
<groupId>com.twitter</groupId>
<artifactId>scala-parent-292</artifactId>
@@ -93,9 +93,9 @@ class BaseClient(service: Service[Command, Reply]) {
/**
* Helper function to convert a Redis multi-bulk reply into a map of pairs
*/
- private[redis] def returnPairs(messages: List[ChannelBuffer]) = {
+ private[redis] def returnPairs(messages: Seq[ChannelBuffer]) = {
assert(messages.length % 2 == 0, "Odd number of items in response")
- messages.grouped(2).toList flatMap { case List(a, b) => Some(a, b); case _ => None }
+ messages.grouped(2).toSeq flatMap { case Seq(a, b) => Some(a, b); case _ => None }
}
}
@@ -120,7 +120,7 @@ trait TransactionalClient extends Client {
* @param keys to watch
*/
def watch(keys: Seq[ChannelBuffer]): Future[Unit] =
- doRequest(Watch(keys.toList)) {
+ doRequest(Watch(keys)) {
case StatusReply(message) => Future.Unit
}
@@ -166,7 +166,7 @@ private[redis] class ConnectedTransactionalClient(
def transaction(cmds: Seq[Command]): Future[Seq[Reply]] = {
serviceFactory() flatMap { svc =>
multi(svc) flatMap { _ =>
- val cmdQueue = cmds.toList map { cmd => svc(cmd) }
+ val cmdQueue = cmds map { cmd => svc(cmd) }
Future.collect(cmdQueue) flatMap { _ => exec(svc) }
} rescue { case e =>
svc(Discard) flatMap { _ =>
@@ -17,7 +17,7 @@ trait Hashes { self: BaseClient =>
* @return Number of fields deleted
*/
def hDel(key: ChannelBuffer, fields: Seq[ChannelBuffer]): Future[JLong] =
- doRequest(HDel(key, fields.toList)) {
+ doRequest(HDel(key, fields)) {
case IntegerReply(n) => Future.value(n)
}
@@ -62,12 +62,24 @@ trait Hashes { self: BaseClient =>
* @return List of values
*/
def hMGet(key: ChannelBuffer, fields: Seq[ChannelBuffer]): Future[Seq[ChannelBuffer]] =
- doRequest(HMGet(key, fields.toList)) {
+ doRequest(HMGet(key, fields)) {
case MBulkReply(messages) => Future.value(
ReplyFormat.toChannelBuffers(messages))
case EmptyMBulkReply() => Future.value(Seq())
}
+ /**
+ * Returns keys in given hash, starting at cursor
+ * @param hash key, cursor, count, pattern
+ * @return cursor followed by matching keys
+ */
+ def hScan(key: ChannelBuffer, cursor: Long, count: Option[Long], pattern: Option[ChannelBuffer]
+ ): Future[Seq[ChannelBuffer]] =
+ doRequest(HScan(key, cursor, count, pattern)) {
+ case MBulkReply(messages) => Future.value(ReplyFormat.toChannelBuffers(messages))
+ case EmptyMBulkReply() => Future.value(Seq())
+ }
+
/**
* Sets field value pair in given hash
* @param hash key, field, value
@@ -17,7 +17,7 @@ trait Keys { self: BaseClient =>
* @return Number of keys removed
*/
def del(keys: Seq[ChannelBuffer]): Future[JLong] =
- doRequest(Del(keys.toList)) {
+ doRequest(Del(keys)) {
case IntegerReply(n) => Future.value(n)
}
@@ -41,4 +41,17 @@ trait Keys { self: BaseClient =>
case MBulkReply(messages) => Future.value(ReplyFormat.toChannelBuffers(messages))
case EmptyMBulkReply() => Future.value(Seq())
}
+
+ /**
+ * Returns keys starting at cursor
+ * @param cursor, count, pattern
+ * @return cursor followed by matching keys
+ */
+ def scan(cursor: Long, count: Option[Long], pattern: Option[ChannelBuffer]
+ ): Future[Seq[ChannelBuffer]] =
+ doRequest(Scan(cursor, count, pattern)) {
+ case MBulkReply(messages) => Future.value(ReplyFormat.toChannelBuffers(messages))
+ case EmptyMBulkReply() => Future.value(Seq())
+ }
+
}
@@ -17,7 +17,7 @@ trait SortedSets { self: BaseClient =>
* @return Number of elements added to sorted set
*/
def zAdd(key: ChannelBuffer, score: Double, member: ChannelBuffer): Future[JLong] =
- doRequest(ZAdd(key, List(ZMember(score, member)))) {
+ doRequest(ZAdd(key, Seq(ZMember(score, member)))) {
case IntegerReply(n) => Future.value(n)
}
@@ -60,7 +60,7 @@ trait SortedSets { self: BaseClient =>
) {
case MBulkReply(messages) => Future.value(
ZRangeResults(returnPairs(ReplyFormat.toChannelBuffers(messages))))
- case EmptyMBulkReply() => Future.value(ZRangeResults(List()))
+ case EmptyMBulkReply() => Future.value(ZRangeResults(Seq()))
}
/**
@@ -69,7 +69,7 @@ trait SortedSets { self: BaseClient =>
* @return Number of members removed from sorted set
*/
def zRem(key: ChannelBuffer, members: Seq[ChannelBuffer]): Future[JLong] =
- doRequest(ZRem(key, members.toList)) {
+ doRequest(ZRem(key, members)) {
case IntegerReply(n) => Future.value(n)
}
@@ -104,7 +104,7 @@ trait SortedSets { self: BaseClient =>
) {
case MBulkReply(messages) => Future.value(
ZRangeResults(returnPairs(ReplyFormat.toChannelBuffers(messages))))
- case EmptyMBulkReply() => Future.value(ZRangeResults(List()))
+ case EmptyMBulkReply() => Future.value(ZRangeResults(Seq()))
}
/**
@@ -46,17 +46,17 @@ private[redis] object RedisCodec {
val NEG_INFINITY_BA = StringToChannelBuffer("-inf")
- def toUnifiedFormat(args: List[ChannelBuffer], includeHeader: Boolean = true) = {
+ def toUnifiedFormat(args: Seq[ChannelBuffer], includeHeader: Boolean = true) = {
val header = includeHeader match {
case true =>
- List(ARG_COUNT_MARKER_BA, StringToChannelBuffer(args.length.toString), EOL_DELIMITER_BA)
+ Seq(ARG_COUNT_MARKER_BA, StringToChannelBuffer(args.length.toString), EOL_DELIMITER_BA)
case false => Nil
}
val buffers = args.map({ arg =>
if (arg.readableBytes == 0) {
- List(NIL_BULK_REPLY_BA, EOL_DELIMITER_BA)
+ Seq(NIL_BULK_REPLY_BA, EOL_DELIMITER_BA)
} else {
- List(
+ Seq(
ARG_SIZE_MARKER_BA,
StringToChannelBuffer(arg.readableBytes.toString),
EOL_DELIMITER_BA,
@@ -65,7 +65,7 @@ private[redis] object RedisCodec {
)
}
}).flatten
- ChannelBuffers.wrappedBuffer((header ::: buffers).toArray:_*)
+ ChannelBuffers.wrappedBuffer((header ++ buffers).toArray:_*)
}
}
@@ -25,6 +25,7 @@ object Commands {
val RANDOMKEY = "RANDOMKEY"
val RENAME = "RENAME"
val RENAMENX = "RENAMENX"
+ val SCAN = "SCAN"
val TTL = "TTL"
val TYPE = "TYPE"
@@ -78,6 +79,7 @@ object Commands {
val HGETALL = "HGETALL"
val HKEYS = "HKEYS"
val HMGET = "HMGET"
+ val HSCAN = "HSCAN"
val HSET = "HSET"
// Transactions
@@ -98,6 +100,7 @@ object Commands {
RANDOMKEY -> {_ => Randomkey()},
RENAME -> {Rename(_)},
RENAMENX -> {RenameNx(_)},
+ SCAN -> {Scan(_)},
TTL -> {Ttl(_)},
TYPE -> {Type(_)},
@@ -151,6 +154,7 @@ object Commands {
HGETALL -> {HGetAll(_)},
HKEYS -> {HKeys(_)},
HMGET -> {HMGet(_)},
+ HSCAN -> {HScan(_)},
HSET -> {HSet(_)},
// transactions
@@ -166,7 +170,7 @@ object Commands {
_(args)
}.getOrElse(throw ClientError("Unsupported command: " + cmd))
- def trimList(list: List[Array[Byte]], count: Int, from: String = "") = {
+ def trimList(list: Seq[Array[Byte]], count: Int, from: String = "") = {
RequireClientProtocol(list != null, "%s Empty list found".format(from))
RequireClientProtocol(
list.length == count,
@@ -187,6 +191,7 @@ object CommandBytes {
val RANDOMKEY = StringToChannelBuffer("RANDOMKEY")
val RENAME = StringToChannelBuffer("RENAME")
val RENAMENX = StringToChannelBuffer("RENAMENX")
+ val SCAN = StringToChannelBuffer("SCAN")
val TTL = StringToChannelBuffer("TTL")
val TYPE = StringToChannelBuffer("TYPE")
@@ -240,6 +245,7 @@ object CommandBytes {
val HGETALL = StringToChannelBuffer("HGETALL")
val HKEYS = StringToChannelBuffer("HKEYS")
val HMGET = StringToChannelBuffer("HMGET")
+ val HSCAN = StringToChannelBuffer("HSCAN")
val HSET = StringToChannelBuffer("HSET")
// Transactions
@@ -24,13 +24,13 @@ case class Limit(offset: Long, count: Long) extends CommandArgument {
def commandBytes = Limit.LIMIT_CB
override def toString = "%s %d %d".format(Limit.LIMIT, offset, count)
def toChannelBuffer = ChannelBuffers.wrappedBuffer(toChannelBuffers.toArray:_*)
- def toChannelBuffers = List(Limit.LIMIT_CB,
+ def toChannelBuffers = Seq(Limit.LIMIT_CB,
StringToChannelBuffer(offset.toString), StringToChannelBuffer(count.toString))
}
object Limit {
val LIMIT = "LIMIT"
val LIMIT_CB = StringToChannelBuffer(LIMIT)
- def apply(args: List[String]) = {
+ def apply(args: Seq[String]) = {
RequireClientProtocol(args != null && args.length == 3, "LIMIT requires two arguments")
RequireClientProtocol(args.head == LIMIT, "LIMIT must start with LIMIT clause")
RequireClientProtocol.safe {
@@ -48,7 +48,7 @@ class Weights(underlying: Array[Double]) extends CommandArgument with IndexedSeq
override def toString = Weights.toString + " " + this.mkString(" ")
def toChannelBuffer = ChannelBuffers.wrappedBuffer(toChannelBuffers.toArray:_*)
def toChannelBuffers =
- Weights.WEIGHTS_CB :: underlying.map(w => StringToChannelBuffer(w.toString)).toList
+ Weights.WEIGHTS_CB +: underlying.map(w => StringToChannelBuffer(w.toString)).toSeq
def command = Weights.WEIGHTS
}
@@ -58,10 +58,10 @@ object Weights {
val WEIGHTS_CB = StringToChannelBuffer(WEIGHTS)
def apply(weight: Double) = new Weights(Array(weight))
- def apply(weights: Double*) = new Weights(List(weights:_*).toArray)
+ def apply(weights: Double*) = new Weights(weights.toArray)
def apply(weights: Array[Double]) = new Weights(weights)
- def apply(args: List[String]): Option[Weights] = {
+ def apply(args: Seq[String]): Option[Weights] = {
val argLength = args.length
RequireClientProtocol(
args != null && argLength > 0,
@@ -83,7 +83,7 @@ object Weights {
sealed abstract class Aggregate(val name: String) {
override def toString = Aggregate.toString + " " + name.toUpperCase
def toChannelBuffer = ChannelBuffers.wrappedBuffer(toChannelBuffers.toArray:_*)
- def toChannelBuffers = List(Aggregate.AGGREGATE_CB, StringToChannelBuffer(name.toUpperCase))
+ def toChannelBuffers = Seq(Aggregate.AGGREGATE_CB, StringToChannelBuffer(name.toUpperCase))
def equals(str: String) = str.equals(name)
}
object Aggregate {
@@ -94,7 +94,7 @@ object Aggregate {
case object Max extends Aggregate("MAX")
override def toString = AGGREGATE
- def apply(args: List[String]): Option[Aggregate] = {
+ def apply(args: Seq[String]): Option[Aggregate] = {
val argLength = args.length
RequireClientProtocol(
args != null && argLength > 0,
@@ -112,3 +112,37 @@ object Aggregate {
}
}
}
+
+
+object Count {
+ val COUNT = "COUNT"
+ val COUNT_CB = StringToChannelBuffer(COUNT)
+
+ def apply(args: Seq[String]): Option[Long] = {
+ RequireClientProtocol(
+ args != null && !args.isEmpty,
+ "COUNT can not be specified with empty list")
+ args.head.toUpperCase match {
+ case COUNT =>
+ RequireClientProtocol(args.length == 2, "COUNT requires two arguments")
+ Some(RequireClientProtocol.safe { NumberFormat.toLong(args(1)) })
+ case _ => None
+ }
+ }
+}
+
+
+object Pattern {
+ val PATTERN = "PATTERN"
+ val PATTERN_CB = StringToChannelBuffer(PATTERN)
+
+ def apply(args: Seq[String]): Option[String] = {
+ RequireClientProtocol(
+ args != null && !args.isEmpty,
+ "AGGREGATE can not be specified with empty list")
+ args.head.toUpperCase match {
+ case PATTERN => Some(args(1))
+ case _ => None
+ }
+ }
+}
@@ -13,9 +13,9 @@ trait StrictKeyCommand extends KeyCommand {
}
trait KeysCommand extends Command {
- val keys: List[ChannelBuffer]
+ val keys: Seq[ChannelBuffer]
protected def validate() {
- RequireClientProtocol(keys != null && keys.length > 0, "Empty KeySet found")
+ RequireClientProtocol(keys != null && !keys.isEmpty, "Empty KeySet found")
keys.foreach { key =>
RequireClientProtocol(key != null && key.readableBytes > 0, "Empty key found")
}
Oops, something went wrong.

0 comments on commit 38e4092

Please sign in to comment.