Skip to content

Commit

Permalink
Remove type-level difference of single/multiple
Browse files Browse the repository at this point in the history
  • Loading branch information
knutwalker committed Apr 19, 2015
1 parent 30c4138 commit 92a3610
Show file tree
Hide file tree
Showing 11 changed files with 188 additions and 207 deletions.
72 changes: 26 additions & 46 deletions modules/client/src/main/scala/rx/redis/clients/RawClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,17 @@ package rx.redis.clients

import rx.redis.RedisCommand
import rx.redis.commands._
import rx.redis.pipeline.{ NettyClient, RxNettyClient }
import rx.redis.pipeline.{ OperatorDecode, NettyClient, RxNettyClient }
import rx.redis.resp.RespType
import rx.redis.serialization.{ writeStringAsRedisCommand, ByteBufFormat, ByteBufReader, ByteBufWriter, Id, Reads, Writes }
import rx.redis.serialization.{ writeStringAsRedisCommand, ByteBufFormat, ByteBufReader, ByteBufWriter, Reads, Writes }

import rx.Observable.OnSubscribe
import rx.functions.Func1
import rx.subjects.AsyncSubject
import rx.{ Observable, Subscriber }

import io.netty.buffer.ByteBuf
import io.netty.channel.{ ChannelFuture, ChannelFutureListener }

import scala.collection.JavaConverters.seqAsJavaListConverter
import scala.concurrent.duration.{ Deadline, FiniteDuration }
import util.control.{ NoStackTrace, NonFatal }

Expand Down Expand Up @@ -128,106 +126,88 @@ abstract class RawClient {
}
}

private[this] def withError[A](o: Observable[A]): Observable[A] = {
o.onErrorResumeNext(new Func1[Throwable, Observable[A]] {
def call(t1: Throwable): Observable[A] = {
Observable.error(
new IllegalArgumentException(s"Cannot interpret value", t1))
}
})
}

private[this] def single[A](cmd: A)(implicit A: Writes[A], R: Reads[A, Id]): Observable[R.R] = withError {
command(cmd)(A).map[R.R](new Func1[RespType, R.R] {
def call(t1: RespType): R.R = R.read(t1)
})
}

private[this] def multiple[A](cmd: A)(implicit A: Writes[A], R: Reads[A, List]): Observable[R.R] = withError {
command(cmd)(A).flatMap[R.R](new Func1[RespType, Observable[R.R]] {
def call(t1: RespType): Observable[R.R] = Observable.from(R.read(t1).asJava)
})
}
private[this] def run[A](cmd: A)(implicit A: Writes[A], R: Reads[A]): Observable[R.R] =
command(cmd)(A).lift(new OperatorDecode[A, R.R](R))

// ==============
// Key Commands
// ==============

final def del(keys: String*): Observable[Long] =
single(Del(keys: _*))
run(Del(keys: _*))

final def exists(key: String): Observable[Boolean] =
single(Exists(key))
run(Exists(key))

final def expire(key: String, expires: FiniteDuration): Observable[Boolean] =
single(Expire(key, expires))
run(Expire(key, expires))

final def expireAt(key: String, deadline: Deadline): Observable[Boolean] =
single(ExpireAt(key, deadline))
run(ExpireAt(key, deadline))

final def keys(pattern: String): Observable[String] =
multiple(Keys(pattern))
run(Keys(pattern))

final def randomKey(): Observable[Option[String]] =
single(RandomKey)
run(RandomKey)

final def ttl(key: String): Observable[Long] =
single(Ttl(key))
run(Ttl(key))

// =================
// String Commands
// =================

final def decr(key: String): Observable[Long] =
single(Decr(key))
run(Decr(key))

final def decrBy(key: String, amount: Long): Observable[Long] =
single(DecrBy(key, amount))
run(DecrBy(key, amount))

final def get[A: ByteBufReader](key: String): Observable[Option[A]] =
single(Get(key))
run(Get(key))

final def incr(key: String): Observable[Long] =
single(Incr(key))
run(Incr(key))

final def incrBy(key: String, amount: Long): Observable[Long] =
single(IncrBy(key, amount))
run(IncrBy(key, amount))

final def mget[A: ByteBufReader](keys: String*): Observable[Option[A]] =
multiple(MGet(keys: _*))
run(MGet(keys: _*))

final def mset[A: ByteBufWriter](items: (String, A)*): Observable[Boolean] =
single(MSet(items: _*))
run(MSet(items: _*))

final def set[A: ByteBufWriter](key: String, value: A): Observable[Boolean] =
single(Set(key, value))
run(Set(key, value))

final def setEx[A: ByteBufWriter](key: String, value: A, expires: FiniteDuration): Observable[Boolean] =
single(SetEx(key, expires, value))
run(SetEx(key, expires, value))

final def setNx[A: ByteBufWriter](key: String, value: A): Observable[Boolean] =
single(SetNx(key, value))
run(SetNx(key, value))

final def strLen(key: String): Observable[Long] =
single(StrLen(key))
run(StrLen(key))

// =====================
// Connection Commands
// =====================

final def echo[A: ByteBufFormat](msg: A): Observable[A] =
single(Echo(msg))
run(Echo(msg))

final def ping(): Observable[String] =
single(Ping)
run(Ping)

// ===============
// Hash Commands
// ===============

final def hget[A: ByteBufReader](key: String, field: String): Observable[Option[A]] =
single(HGet(key, field))
run(HGet(key, field))

final def hgetAll[A: ByteBufReader](key: String): Observable[(String, A)] =
multiple(HGetAll(key))
run(HGetAll(key))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2014 – 2015 Paul Horn
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package rx.redis.pipeline

import rx.redis.pipeline.OperatorDecode.SubscriberDecode
import rx.redis.resp.RespType
import rx.redis.serialization.{ DecodeException, Reads }

import rx.Observable.Operator
import rx.Subscriber

import scala.util.control.NonFatal

final class OperatorDecode[A, B](A: Reads.Aux[A, B]) extends Operator[B, RespType] {
def call(downstream: Subscriber[_ >: B]): Subscriber[RespType] =
new SubscriberDecode[A, B](downstream, A)
}
object OperatorDecode {
final class SubscriberDecode[A, B](downstream: Subscriber[_ >: B], A: Reads.Aux[A, B]) extends Subscriber[RespType] {
def onNext(x: RespType): Unit = try {
A.read
.andThen(_.foreach(downstream.onNext))
.applyOrElse(x, (x: RespType) downstream.onError(DecodeException[A](x)))
} catch {
case NonFatal(ex) downstream.onError(DecodeException[A](x, ex))
}

def onError(e: Throwable): Unit =
downstream.onError(e)

def onCompleted(): Unit =
downstream.onCompleted()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@

package rx.redis.commands

import rx.redis.serialization.{ ByteBufFormat, ByteBufReader, ByteBufWriter, Id, Reads, Writes }
import rx.redis.serialization.{ ByteBufFormat, ByteBufReader, ByteBufWriter, Reads, Writes }

case object Ping {
implicit val writes: Writes[Ping.type] =
Writes.writes[Ping.type]

implicit val readsFormat: Reads[Ping.type, Id] { type R = String } =
implicit val readsFormat: Reads.Aux[Ping.type, String] =
Reads.value[Ping.type, String]
}

Expand All @@ -31,6 +31,6 @@ object Echo {
implicit def writes[A: ByteBufWriter]: Writes[Echo[A]] =
Writes.writes[Echo[A]]

implicit def readsFormat[A: ByteBufReader]: Reads[Echo[A], Id] { type R = A } =
implicit def readsFormat[A: ByteBufReader]: Reads.Aux[Echo[A], A] =
Reads.value[Echo[A], A]
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@

package rx.redis.commands

import rx.redis.serialization.{ ByteBufReader, Id, Reads, Writes }
import rx.redis.serialization.{ ByteBufReader, Reads, Writes }

case class HGet(key: String, field: String)
object HGet {
implicit val writes: Writes[HGet] =
Writes.writes[HGet]

implicit def readsFormat[A: ByteBufReader]: Reads[HGet, Id] { type R = Option[A] } =
implicit def readsFormat[A: ByteBufReader]: Reads.Aux[HGet, Option[A]] =
Reads.opt[HGet, A]
}

Expand All @@ -32,6 +32,6 @@ object HGetAll {
implicit val writes: Writes[HGetAll] =
Writes.writes[HGetAll]

implicit def readsFormat[A: ByteBufReader]: Reads[HGetAll, List] { type R = (String, A) } =
implicit def readsFormat[A: ByteBufReader]: Reads.Aux[HGetAll, (String, A)] =
Reads.listPair[HGetAll, String, A]
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ package rx.redis.commands

import scala.concurrent.duration.{ Deadline, FiniteDuration }

import rx.redis.serialization.{ Id, Reads, Writes }
import rx.redis.serialization.{ Reads, Writes }

case class Del(keys: String*)
object Del {
implicit val writes: Writes[Del] =
Writes.writes[Del]

implicit val readsFormat: Reads[Del, Id] { type R = Long } =
implicit val readsFormat: Reads.Aux[Del, Long] =
Reads.int[Del]
}

Expand All @@ -34,7 +34,7 @@ object Exists {
implicit val writes: Writes[Exists] =
Writes.writes[Exists]

implicit val readsFormat: Reads[Exists, Id] { type R = Boolean } =
implicit val readsFormat: Reads.Aux[Exists, Boolean] =
Reads.bool[Exists]
}

Expand All @@ -43,7 +43,7 @@ object Expire {
implicit val writes: Writes[Expire] =
Writes.writes[Expire]

implicit val readsFormat: Reads[Expire, Id] { type R = Boolean } =
implicit val readsFormat: Reads.Aux[Expire, Boolean] =
Reads.bool[Expire]
}

Expand All @@ -53,7 +53,7 @@ object ExpireAt {
implicit val writes: Writes[ExpireAt] =
Writes.writes[ExpireAt]

implicit val readsFormat: Reads[ExpireAt, Id] { type R = Boolean } =
implicit val readsFormat: Reads.Aux[ExpireAt, Boolean] =
Reads.bool[ExpireAt]
}

Expand All @@ -62,15 +62,15 @@ object Keys {
implicit val writes: Writes[Keys] =
Writes.writes[Keys]

implicit val readsFormat: Reads[Keys, List] { type R = String } =
implicit val readsFormat: Reads.Aux[Keys, String] =
Reads.list[Keys, String]
}

case object RandomKey {
implicit val writes: Writes[RandomKey.type] =
Writes.writes[RandomKey.type]

implicit val readsFormat: Reads[RandomKey.type, Id] { type R = Option[String] } =
implicit val readsFormat: Reads.Aux[RandomKey.type, Option[String]] =
Reads.opt[RandomKey.type, String]
}

Expand All @@ -79,6 +79,6 @@ object Ttl {
implicit val writes: Writes[Ttl] =
Writes.writes[Ttl]

implicit val readsFormat: Reads[Ttl, Id] { type R = Long } =
implicit val readsFormat: Reads.Aux[Ttl, Long] =
Reads.int[Ttl]
}

0 comments on commit 92a3610

Please sign in to comment.