Skip to content

Commit

Permalink
Java friendliness
Browse files Browse the repository at this point in the history
  • Loading branch information
Nick Kallen committed Dec 27, 2010
2 parents 3854acd + e609f81 commit 3e76114
Show file tree
Hide file tree
Showing 11 changed files with 63 additions and 64 deletions.
4 changes: 2 additions & 2 deletions project/build.properties
@@ -1,8 +1,8 @@
#Project properties
#Wed Dec 22 10:28:16 PST 2010
#Thu Dec 23 15:54:15 PST 2010
project.organization=com.twitter
project.name=finagle
sbt.version=0.7.4
project.version=1.0.9-SNAPSHOT
project.version=1.0.10-SNAPSHOT
build.scala.versions=2.8.1
project.initialize=false
6 changes: 4 additions & 2 deletions src/main/java/com/twitter/finagle/javaapi/HttpClientTest.java
Expand Up @@ -5,6 +5,7 @@
import com.twitter.finagle.service.Service;
import com.twitter.finagle.builder.*;
import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
import com.twitter.util.*;

public class HttpClientTest {
Expand All @@ -16,8 +17,9 @@ public static void main(String args[]) {
.codec(Codec4J.Http)
.buildService();

Future<HttpResponse> response =
client.apply(new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"));
HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/");

Future<HttpResponse> response = client.apply(request);

response.addEventListener(
new FutureEventListener<HttpResponse>() {
Expand Down
Expand Up @@ -19,8 +19,8 @@ public static void main(String args[]) {

Service<ThriftCall<Silly.bleep_args, Silly.bleep_result>, Silly.bleep_result> client =
ClientBuilder.get()
.hosts("localhost:10000")
.codec(Codec4J.Thrift)
.hosts("localhost:10000")
.buildService();

Future<Silly.bleep_result> response =
Expand Down
Expand Up @@ -34,7 +34,7 @@ public Future<ThriftReply> apply(ThriftCall call) {
return future;
}

return future;
throw new IllegalArgumentException("Method: " + call.getMethod() + " is unsupported!");
}
};

Expand Down
Expand Up @@ -80,69 +80,69 @@ case class ClientBuilder(
None // proactivelyConnect
)

def hosts(hostnamePortCombinations: String) =
def hosts(hostnamePortCombinations: String): ClientBuilder =
copy(_hosts = Some(parseHosts(hostnamePortCombinations)))

def hosts(addresses: Collection[SocketAddress]) =
def hosts(addresses: Collection[SocketAddress]): ClientBuilder =
copy(_hosts = Some(addresses toSeq))

def hosts(addresses: Iterable[SocketAddress]) =
def hosts(addresses: Iterable[SocketAddress]): ClientBuilder =
copy(_hosts = Some(addresses toSeq))

def codec(codec: Codec) =
def codec(codec: Codec): ClientBuilder =
copy(_codec = Some(codec))

def connectionTimeout(value: Long, unit: TimeUnit) =
def connectionTimeout(value: Long, unit: TimeUnit): ClientBuilder =
copy(_connectionTimeout = Timeout(value, unit))

def connectionTimeout(duration: Duration) =
def connectionTimeout(duration: Duration): ClientBuilder =
copy(_connectionTimeout = Timeout(duration.inMillis, TimeUnit.MICROSECONDS))

def requestTimeout(value: Long, unit: TimeUnit) =
def requestTimeout(value: Long, unit: TimeUnit): ClientBuilder =
copy(_requestTimeout = Timeout(value, unit))

def requestTimeout(duration: Duration) =
def requestTimeout(duration: Duration): ClientBuilder =
copy(_requestTimeout = Timeout(duration.inMillis, TimeUnit.MICROSECONDS))

def reportTo(receiver: StatsReceiver) =
def reportTo(receiver: StatsReceiver): ClientBuilder =
copy(_statsReceiver = Some(receiver))

def sampleWindow(value: Long, unit: TimeUnit) =
def sampleWindow(value: Long, unit: TimeUnit): ClientBuilder =
copy(_sampleWindow = Timeout(value, unit))

def sampleGranularity(value: Long, unit: TimeUnit) =
def sampleGranularity(value: Long, unit: TimeUnit): ClientBuilder =
copy(_sampleGranularity = Timeout(value, unit))

def name(value: String) = copy(_name = Some(value))
def name(value: String): ClientBuilder = copy(_name = Some(value))

def hostConnectionLimit(value: Int) =
def hostConnectionLimit(value: Int): ClientBuilder =
copy(_hostConnectionLimit = Some(value))

def retries(value: Int) =
def retries(value: Int): ClientBuilder =
copy(_retries = Some(value))

def initialBackoff(value: Duration) =
def initialBackoff(value: Duration): ClientBuilder =
copy(_initialBackoff = Some(value))

def backoffMultiplier(value: Int) =
def backoffMultiplier(value: Int): ClientBuilder =
copy(_backoffMultiplier = Some(value))

def sendBufferSize(value: Int) = copy(_sendBufferSize = Some(value))
def recvBufferSize(value: Int) = copy(_recvBufferSize = Some(value))
def sendBufferSize(value: Int): ClientBuilder = copy(_sendBufferSize = Some(value))
def recvBufferSize(value: Int): ClientBuilder = copy(_recvBufferSize = Some(value))

def exportLoadsToOstrich() = copy(_exportLoadsToOstrich = true)
def exportLoadsToOstrich(): ClientBuilder = copy(_exportLoadsToOstrich = true)

def failureAccrualWindow(value: Long, unit: TimeUnit) =
def failureAccrualWindow(value: Long, unit: TimeUnit): ClientBuilder =
copy(_failureAccrualWindow = Timeout(value, unit))

def channelFactory(cf: ChannelFactory) =
def channelFactory(cf: ChannelFactory): ClientBuilder =
copy(_channelFactory = Some(cf))

def proactivelyConnect(duration: Duration) =
def proactivelyConnect(duration: Duration): ClientBuilder =
copy(_proactivelyConnect = Some(duration))

// ** BUILDING
def logger(logger: Logger) = copy(_logger = Some(logger))
def logger(logger: Logger): ClientBuilder = copy(_logger = Some(logger))

private def bootstrap(codec: Codec)(host: SocketAddress) = {
val bs = new BrokerClientBootstrap(_channelFactory getOrElse defaultChannelFactory)
Expand Down
Expand Up @@ -47,7 +47,7 @@ class SampleHandler(samples: SampleRepository[AddableSample[_]])
ctx.getAttachment match {
case Timing(requestedAt: Time) =>
samples("exception", e.getCause.getClass.getName).add(
requestedAt.ago.inMilliseconds.toInt)
requestedAt.inMilliseconds.toInt)
case _ => ()
}
super.exceptionCaught(ctx, e)
Expand All @@ -69,7 +69,7 @@ class SampleHandler(samples: SampleRepository[AddableSample[_]])
case (_, p: PartialUpstreamMessageEvent) =>
()
case (Timing(requestedAt: Time), r: HttpResponse) =>
latencySample.add(requestedAt.ago.inMilliseconds.toInt)
latencySample.add(requestedAt.inMilliseconds.toInt)
case (_, _) =>
() // WTF?
}
Expand Down Expand Up @@ -125,62 +125,62 @@ case class ServerBuilder(
None // maxQueueDepth
)

def codec(codec: Codec) =
def codec(codec: Codec): ServerBuilder =
copy(_codec = Some(codec))

def connectionTimeout(value: Long, unit: TimeUnit) =
def connectionTimeout(value: Long, unit: TimeUnit): ServerBuilder =
copy(_connectionTimeout = Timeout(value, unit))

def connectionTimeout(duration: Duration) =
def connectionTimeout(duration: Duration): ServerBuilder =
copy(_connectionTimeout = Timeout(duration.inMillis, TimeUnit.MICROSECONDS))

def requestTimeout(value: Long, unit: TimeUnit) =
def requestTimeout(value: Long, unit: TimeUnit): ServerBuilder =
copy(_requestTimeout = Timeout(value, unit))

def requestTimeout(duration: Duration) =
def requestTimeout(duration: Duration): ServerBuilder =
copy(_requestTimeout = Timeout(duration.inMillis, TimeUnit.MICROSECONDS))

def reportTo(receiver: StatsReceiver) =
def reportTo(receiver: StatsReceiver): ServerBuilder =
copy(_statsReceiver = Some(receiver))

def sampleWindow(value: Long, unit: TimeUnit) =
def sampleWindow(value: Long, unit: TimeUnit): ServerBuilder =
copy(_sampleWindow = Timeout(value, unit))

def sampleGranularity(value: Long, unit: TimeUnit) =
def sampleGranularity(value: Long, unit: TimeUnit): ServerBuilder =
copy(_sampleGranularity = Timeout(value, unit))

def sampleGranularity(duration: Duration) =
def sampleGranularity(duration: Duration): ServerBuilder =
copy(_sampleGranularity = Timeout(duration.inMillis, TimeUnit.MICROSECONDS))

def name(value: String) = copy(_name = Some(value))
def name(value: String): ServerBuilder = copy(_name = Some(value))

def sendBufferSize(value: Int) = copy(_sendBufferSize = Some(value))
def recvBufferSize(value: Int) = copy(_recvBufferSize = Some(value))
def sendBufferSize(value: Int): ServerBuilder = copy(_sendBufferSize = Some(value))
def recvBufferSize(value: Int): ServerBuilder = copy(_recvBufferSize = Some(value))

def pipelineFactory(value: ChannelPipelineFactory) =
def pipelineFactory(value: ChannelPipelineFactory): ServerBuilder =
copy(_pipelineFactory = Some(value))

def service[Req <: AnyRef, Rep <: AnyRef](service: Service[Req, Rep]) =
def service[Req <: AnyRef, Rep <: AnyRef](service: Service[Req, Rep]): ServerBuilder =
copy(_pipelineFactory = Some(ServicePipelineFactory(service)))

def bindTo(address: SocketAddress) =
def bindTo(address: SocketAddress): ServerBuilder =
copy(_bindTo = Some(address))

def channelFactory(cf: ChannelFactory) =
def channelFactory(cf: ChannelFactory): ServerBuilder =
copy(_channelFactory = Some(cf))

def logger(logger: Logger) = copy(_logger = Some(logger))
def logger(logger: Logger): ServerBuilder = copy(_logger = Some(logger))

def tls(path: String, password: String) =
def tls(path: String, password: String): ServerBuilder =
copy(_tls = Some(Ssl(path, password)))

def startTls(value: Boolean) =
def startTls(value: Boolean): ServerBuilder =
copy(_startTls = true)

def maxConcurrentRequests(max: Int) =
def maxConcurrentRequests(max: Int): ServerBuilder =
copy(_maxConcurrentRequests = Some(max))

def maxQueueDepth(max: Int) =
def maxQueueDepth(max: Int): ServerBuilder =
copy(_maxQueueDepth = Some(max))

private def statsRepository(
Expand Down
6 changes: 3 additions & 3 deletions src/main/scala/com/twitter/finagle/channel/ChannelPool.scala
Expand Up @@ -19,7 +19,7 @@ class ChannelPool(
extends Serialized
{
@volatile private[this] var _isAvailable = false
@volatile private[this] var lastConnectAttempt = Time.never
@volatile private[this] var lastConnectAttempt = Time.epoch

private[this] val channelQueue = new ConcurrentLinkedQueue[Channel]

Expand All @@ -36,7 +36,7 @@ class ChannelPool(
// as an application would decidedly be unhealthy on connection
// failure.
def tryToConnect(period: Duration) {
val timeSinceLastConnectAttempt = lastConnectAttempt.ago
val timeSinceLastConnectAttempt = Duration.since(lastConnectAttempt)

if (timeSinceLastConnectAttempt < period) {
Broker.timer(period - timeSinceLastConnectAttempt) {
Expand All @@ -50,7 +50,7 @@ class ChannelPool(
_isAvailable = true
case Cancelled =>
tryToConnect(period)
case Error(_) =>
case Error(_) =>
tryToConnect(period)
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/com/twitter/finagle/channel/LoadedBroker.scala
Expand Up @@ -45,12 +45,12 @@ class StatsLoadedBroker(
underlying.dispatch(e) whenDone0 { future =>
future {
case Ok(_) =>
latencySample.add(begin.ago.inMilliseconds.toInt)
latencySample.add(begin.inMilliseconds.toInt)
case Error(e) =>
// TODO: exception hierarchy here to differentiate between
// application, connection & other (internal?) exceptions.
samples("exception", e.getClass.getName)
.add(begin.ago.inMilliseconds.toInt)
.add(begin.inMilliseconds.toInt)
case Cancelled => /*ignore*/ ()
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/main/scala/com/twitter/finagle/service/Service.scala
Expand Up @@ -15,6 +15,8 @@ abstract class Service[-Req <: AnyRef, +Rep <: AnyRef] extends (Req => Future[Re
def map[Req1 <: AnyRef](f: (Req1) => (Req)) = new Service[Req1, Rep] {
def apply(req1: Req1) = Service.this.apply(f(req1))
}

def apply(request: Req): Future[Rep]
}

// A filter is a service transform [Req -> (Req1 -> Rep1) -> Rep].
Expand Down
Expand Up @@ -37,9 +37,9 @@ object ServerBuilderSpec extends Specification {
client(new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "go/slow"))(aReallyLongTime) must throwA[ChannelClosedException]
}

duration mustEqual 1.second
duration.moreOrLessEquals(1.second, 1.second) mustBe true

server.close().awaitUninterruptibly()
}
}
}
}
5 changes: 0 additions & 5 deletions src/test/scala/com/twitter/finagle/thrift/EndToEnd.scala
Expand Up @@ -30,11 +30,6 @@ import com.twitter.finagle.RandomSocket
import com.twitter.finagle.util.Conversions._

object EndToEndSpec extends Specification {
class SillyImpl extends Silly.Iface {
def bleep(bloop: String): String =
bloop.reverse
}

// TODO: test with a traditional thrift stack over local loopback
// TCP

Expand Down

4 comments on commit 3e76114

@jaked
Copy link

@jaked jaked commented on 3e76114 Jan 3, 2011

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in LoadedBroker and ServerBuilder some calls to since (a.k.a. ago) were removed, but I think we're trying to compute deltas here, so they should remain. does that sound right?

@jjmmcc
Copy link
Contributor

@jjmmcc jjmmcc commented on 3e76114 Jan 3, 2011

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That could be the cause of the timeout exceptions I'm now seeing with Finagle

@mariusae
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, those should be converted to Duration.since().

@jjmmcc
Copy link
Contributor

@jjmmcc jjmmcc commented on 3e76114 Jan 3, 2011

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or preferably time.untilNow. (Duration.since should be deprecated/deleted IMHO because it is redundant and the name is potentially ambiguous).

Please sign in to comment.