Skip to content

Commit

Permalink
private[this] all the things
Browse files Browse the repository at this point in the history
  • Loading branch information
knutwalker committed Apr 9, 2015
1 parent 92d28aa commit 31cc10e
Show file tree
Hide file tree
Showing 14 changed files with 105 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package rx.redis.api
import rx.lang.scala.Observable

import rx.redis.clients.RawClient
import rx.redis.util.{DefaultRedisHost, DefaultRedisPort}
import rx.redis.util.{ DefaultRedisHost, DefaultRedisPort }

object RxRedis {
def apply(host: String = DefaultRedisHost, port: Int = DefaultRedisPort): Client = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import java.util.function.Consumer

final class SharedNioEventLoopGroup(threadCount: Int, threadFactory: ThreadFactory) extends NioEventLoopGroup(threadCount, threadFactory) with ReferenceCounted {

private val refs = new AtomicInteger(1)
private def newFuture = new DefaultPromise[Void](GlobalEventExecutor.INSTANCE)
private[this] val refs = new AtomicInteger(1)
private[this] def newFuture = new DefaultPromise[Void](GlobalEventExecutor.INSTANCE)

sys.addShutdownHook {
refs.set(0)
Expand Down
10 changes: 5 additions & 5 deletions modules/client/src/main/scala/rx/redis/clients/RawClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ abstract class RawClient {
// Closing
// =========

private val isClosed = new AtomicBoolean(false)
private val alreadyClosed: Observable[Unit] =
private[this] val isClosed = new AtomicBoolean(false)
private[this] val alreadyClosed: Observable[Unit] =
Observable.error(new IllegalStateException("Client has already shutdown.") with NoStackTrace)

protected def closeClient(): Observable[Unit]
Expand All @@ -110,7 +110,7 @@ abstract class RawClient {
def command[A](cmd: A)(implicit A: Writes[A]): Observable[RespType] =
command(A.write(cmd))

private def withError[A](o: Observable[A]): Observable[A] = {
private[this] def withError[A](o: Observable[A]): Observable[A] = {
o.onErrorResumeNext(new Func1[Throwable, Observable[A]] {
def call(t1: Throwable): Observable[A] = {
Observable.error(
Expand All @@ -119,13 +119,13 @@ abstract class RawClient {
})
}

private def single[A](cmd: A)(implicit A: Writes[A], R: Reads[A, Id]): Observable[R.R] = withError {
private[this] def single[A](cmd: A)(implicit A: Writes[A], R: Reads[A, Id]): Observable[R.R] = withError {
command(A.write(cmd)).map[R.R](new Func1[RespType, R.R] {
def call(t1: RespType): R.R = R.read(t1)
})
}

private def multiple[A](cmd: A)(implicit A: Writes[A], R: Reads[A, List]): Observable[R.R] = withError {
private[this] def multiple[A](cmd: A)(implicit A: Writes[A], R: Reads[A, List]): Observable[R.R] = withError {
command(A.write(cmd)).flatMap[R.R](new Func1[RespType, Observable[R.R]] {
def call(t1: RespType): Observable[R.R] = Observable.from(R.read(t1).asJava)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,17 @@ import io.netty.util.Recycler.Handle
import rx.redis.resp.RespType

class AdapterAction private (private val handle: Handle) {
private var _cmd: RespType = _
private var _sender: Observer[RespType] = _
private[this] var _cmd: RespType = _
private[this] var _sender: Observer[RespType] = _

def cmd = _cmd
def sender = _sender

def update(cmd: RespType, sender: Observer[RespType]): Unit = {
_cmd = cmd
_sender = sender
}

def recycle(): Unit = {
_cmd = null
_sender = null
Expand All @@ -43,8 +48,7 @@ object AdapterAction {

def apply(cmd: RespType, sender: Observer[RespType]): AdapterAction = {
val adapterAction = InstanceRecycler.get()
adapterAction._cmd = cmd
adapterAction._sender = sender
adapterAction.update(cmd, sender)
adapterAction
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import rx.redis.serialization.ByteBufDeserializer

private[redis] trait RespDecoder { this: ChannelInboundHandler

private final var buffered: ByteBuf = null
private[this] final var buffered: ByteBuf = null

final override def channelRead(ctx: ChannelHandlerContext, msg: Any): Unit = msg match {
case in: ByteBuf
Expand All @@ -48,12 +48,12 @@ private[redis] trait RespDecoder { this: ChannelInboundHandler ⇒
ctx.fireChannelInactive()
}

private final def decode(ctx: ChannelHandlerContext, data: ByteBuf): Unit = {
private[this] final def decode(ctx: ChannelHandlerContext, data: ByteBuf): Unit = {
val completeData = mergeFrames(ctx.alloc(), data)
decode0(ctx, completeData)
}

private final def decode0(ctx: ChannelHandlerContext, completeData: ByteBuf): Unit = {
private[this] final def decode0(ctx: ChannelHandlerContext, completeData: ByteBuf): Unit = {
val needMore = ByteBufDeserializer.foreach(completeData) { resp
ctx.fireChannelRead(resp)
}
Expand All @@ -64,7 +64,7 @@ private[redis] trait RespDecoder { this: ChannelInboundHandler ⇒
}
}

private final def mergeFrames(alloc: ByteBufAllocator, frame: ByteBuf): ByteBuf = {
private[this] final def mergeFrames(alloc: ByteBufAllocator, frame: ByteBuf): ByteBuf = {
if (buffered eq null) {
frame
} else {
Expand All @@ -75,7 +75,7 @@ private[redis] trait RespDecoder { this: ChannelInboundHandler ⇒
}
}

private final def ensureSize(alloc: ByteBufAllocator, size: Int): ByteBuf = {
private[this] final def ensureSize(alloc: ByteBufAllocator, size: Int): ByteBuf = {
var newBuf = buffered
if (newBuf.writerIndex > newBuf.maxCapacity - size) {
val buf = alloc.buffer(newBuf.readableBytes + size)
Expand Down
10 changes: 5 additions & 5 deletions modules/client/src/main/scala/rx/redis/pipeline/RxCloser.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ import scala.util.control.NoStackTrace

private[redis] class RxCloser(queue: util.Queue[Observer[RespType]]) extends ChannelDuplexHandler {

private final var canWrite = true
private final var closePromise: ChannelPromise = _
private final val ChannelClosedException = new IllegalStateException("Channel already closed") with NoStackTrace
private[this] final var canWrite = true
private[this] final var closePromise: ChannelPromise = _
private[this] final val ChannelClosedException = new IllegalStateException("Channel already closed") with NoStackTrace

private def canClose = !canWrite && (closePromise ne null)
private[this] def canClose = !canWrite && (closePromise ne null)

private def closeChannel(ctx: ChannelHandlerContext): Unit = {
private[this] def closeChannel(ctx: ChannelHandlerContext): Unit = {
val promise = closePromise
closePromise = null
ctx.close(promise)
Expand Down
27 changes: 15 additions & 12 deletions modules/client/src/main/scala/rx/redis/pipeline/RxNettyClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ import rx.redis.resp.RespType
import scala.language.implicitConversions

object RxNettyClient {
private final val threadFactory = new DefaultThreadFactory("rx-redis", true)
private final val eventLoopGroup = new SharedNioEventLoopGroup(0, threadFactory)
private[this] final val threadFactory = new DefaultThreadFactory("rx-redis", true)
private[this] final val eventLoopGroup = new SharedNioEventLoopGroup(0, threadFactory)

def apply(host: String, port: Int): NettyClient = {
val channelInitializer = new RxChannelInitializer(optimizeForThroughput = true)
Expand All @@ -51,27 +51,30 @@ object RxNettyClient {
new RxNettyClient(channel)
}

private final class ChannelCloseSubscribe(channel: Channel) extends OnSubscribe[Unit] {
private def channelClose(channel: Channel): OnSubscribe[Unit] =
new ChannelCloseSubscribe(channel)

private[this] final class ChannelCloseSubscribe(channel: Channel) extends OnSubscribe[Unit] {
def call(subscriber: Subscriber[_ >: Unit]): Unit =
channel.close().addListener(
new ChannelCloseListener(subscriber, channel.eventLoop.parent))
}

private final class ChannelCloseListener[S <: Subscriber[_ >: Unit]](subscriber: S, eventLoopGroup: EventLoopGroup) extends ChannelFutureListener {
private[this] final class ChannelCloseListener[S <: Subscriber[_ >: Unit]](subscriber: S, eventLoopGroup: EventLoopGroup) extends ChannelFutureListener {
def operationComplete(future: ChannelFuture): Unit =
futureSubscription(
future, subscriber,
eventLoopGroup.shutdownGracefully().addListener(new ShutdownListener(subscriber)))
}

private final class ShutdownListener[F <: Future[_], S <: Subscriber[_ >: Unit]](subscriber: S) extends GenericFutureListener[F] {
private[this] final class ShutdownListener[F <: Future[_], S <: Subscriber[_ >: Unit]](subscriber: S) extends GenericFutureListener[F] {
def operationComplete(future: F): Unit =
futureSubscription(
future, subscriber,
subscriber.onCompleted())
}

private final def futureSubscription[F <: Future[_], S <: Subscriber[_ >: Unit]](future: F, subscriber: S, onNext: Unit): Unit =
private[this] final def futureSubscription[F <: Future[_], S <: Subscriber[_ >: Unit]](future: F, subscriber: S, onNext: Unit): Unit =
if (subscriber.isUnsubscribed) {
future.cancel(true)
} else if (future.isCancelled) {
Expand All @@ -86,15 +89,15 @@ object RxNettyClient {

private[redis] class RxNettyClient(channel: Channel) extends NettyClient {
@inline
private final implicit def writeToRunnable(f: ChannelFuture): Runnable = new Runnable {
private[this] final implicit def writeToRunnable(f: ChannelFuture): Runnable = new Runnable {
def run(): Unit = f
}

private final val eventLoop = channel.eventLoop()
private final val pipeline = channel.pipeline()
private[this] final val eventLoop = channel.eventLoop()
private[this] final val pipeline = channel.pipeline()

private final val emptyPromise = channel.voidPromise()
private final val flushTask = new Runnable {
private[this] final val emptyPromise = channel.voidPromise()
private[this] final val flushTask = new Runnable {
def run(): Unit = pipeline.flush()
}

Expand All @@ -113,6 +116,6 @@ private[redis] class RxNettyClient(channel: Channel) extends NettyClient {
}

def close(): Observable[Unit] = {
Observable.create(new RxNettyClient.ChannelCloseSubscribe(channel)).cache()
Observable.create(RxNettyClient.channelClose(channel)).cache()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ object ByteBufDeserializer {

case class ParseAllResult(data: immutable.Seq[RespType], hasRemainder: Boolean)

private final val INSTANCE = new Deserializer[ByteBuf]()(ByteBufAccess)
private[this] final val INSTANCE = new Deserializer[ByteBuf]()(ByteBufAccess)

private def releaseAfterUse[A](bb: ByteBuf)(f: A): A =
private[this] def releaseAfterUse[A](bb: ByteBuf)(f: A): A =
try f finally bb.release()

def apply(bb: ByteBuf): RespType = INSTANCE(bb)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,50 +40,50 @@ object Deserializer {
final class Deserializer[A](implicit A: BytesAccess[A]) {
import rx.redis.resp.Protocol._

private def notEnoughData(bytes: A) =
private[this] def notEnoughData(bytes: A) =
throw Deserializer.NotEnoughData

private def unknownType(bytes: A) =
private[this] def unknownType(bytes: A) =
expected(bytes, typeChars: _*)

private def expected(bytes: A, expected: Byte*) = {
private[this] def expected(bytes: A, expected: Byte*) = {
val pos = A.readerIndex(bytes)
val found = A.getByteAt(bytes, pos).toChar
throw Deserializer.ProtocolError(pos, found, expected.toList)
}

private def peek(bytes: A) =
private[this] def peek(bytes: A) =
A.getByteAt(bytes, A.readerIndex(bytes))

private def read(bytes: A) =
private[this] def read(bytes: A) =
A.readNextByte(bytes)

private def skip(bytes: A) =
private[this] def skip(bytes: A) =
A.skipBytes(bytes, 1)

private def requireLen(bytes: A, len: Int) =
private[this] def requireLen(bytes: A, len: Int) =
A.isReadable(bytes, len)

private def read(bytes: A, b: Byte): Unit = {
private[this] def read(bytes: A, b: Byte): Unit = {
if (!A.isReadable(bytes)) notEnoughData(bytes)
else if (peek(bytes) != b) expected(bytes, b)
else skip(bytes)
}

private def andRequireCrLf(bytes: A, value: RespType) = {
private[this] def andRequireCrLf(bytes: A, value: RespType) = {
read(bytes, Cr)
read(bytes, Lf)
value
}

private def parseLen(bytes: A) =
private[this] def parseLen(bytes: A) =
parseNumInt(bytes)

private def parseInteger(bytes: A) =
private[this] def parseInteger(bytes: A) =
RespInteger(parseNumLong(bytes))

@tailrec
private def parseNumInt(bytes: A, n: Int, neg: Int): Int = {
private[this] def parseNumInt(bytes: A, n: Int, neg: Int): Int = {
if (!A.isReadable(bytes)) {
notEnoughData(bytes)
} else {
Expand All @@ -99,7 +99,7 @@ final class Deserializer[A](implicit A: BytesAccess[A]) {
}

@tailrec
private def parseNumLong(bytes: A, n: Long, neg: Long): Long = {
private[this] def parseNumLong(bytes: A, n: Long, neg: Long): Long = {
if (!A.isReadable(bytes)) {
notEnoughData(bytes)
} else {
Expand All @@ -114,36 +114,36 @@ final class Deserializer[A](implicit A: BytesAccess[A]) {
}
}

private def parseNumInt(bytes: A): Int =
private[this] def parseNumInt(bytes: A): Int =
parseNumInt(bytes, 0, 1)

private def parseNumLong(bytes: A): Long =
private[this] def parseNumLong(bytes: A): Long =
parseNumLong(bytes, 0L, 1L)

private def readStringOfLen(bytes: A, len: Int)(ct: A RespType) = {
private[this] def readStringOfLen(bytes: A, len: Int)(ct: A RespType) = {
if (!requireLen(bytes, len)) notEnoughData(bytes)
else andRequireCrLf(bytes, ct(A.readBytes(bytes, len)))
}

private def parseSimpleString(bytes: A) = {
private[this] def parseSimpleString(bytes: A) = {
val len = A.bytesBefore(bytes, Cr)
if (len == -1) notEnoughData(bytes)
else readStringOfLen(bytes, len)(b RespString(A.toString(b, Utf8)))
}

private def parseError(bytes: A) = {
private[this] def parseError(bytes: A) = {
val len = A.bytesBefore(bytes, Cr)
if (len == -1) notEnoughData(bytes)
else readStringOfLen(bytes, len)(b RespError(A.toString(b, Utf8)))
}

private def parseBulkString(bytes: A) = {
private[this] def parseBulkString(bytes: A) = {
val len = parseLen(bytes)
if (len == -1) NullString
else readStringOfLen(bytes, len)(b RespBytes(A.toByteArray(b)))
}

private def parseArray(bytes: A) = {
private[this] def parseArray(bytes: A) = {
val size = parseLen(bytes)
if (size == -1) NullArray
else {
Expand All @@ -159,7 +159,7 @@ final class Deserializer[A](implicit A: BytesAccess[A]) {
}
}

private def quickApply(bytes: A): RespType = {
private[this] def quickApply(bytes: A): RespType = {
if (!A.isReadable(bytes)) notEnoughData(bytes)
else {
val firstByte = peek(bytes)
Expand Down

0 comments on commit 31cc10e

Please sign in to comment.