Finagle-Kestrel readReliably reconnection problem #130

Closed
wishoping opened this Issue Jan 4, 2013 · 4 comments

Comments

3 participants
@wishoping

Hi, I'm Junki Kim from South Korea.

I try to adjust finagle-kestrel for kestrel reliable and fast client.
While adjust finagle-kesrel, I met some problem with readReliably reconnection problem.

Below log is problem situation.

com.twitter.finagle.kestrel.AllHandlesDiedException$
at com.twitter.finagle.kestrel.AllHandlesDiedException$.(MultiReader.scala)
at com.twitter.finagle.kestrel.MultiReader$.loop$1(MultiReader.scala:80)
at com.twitter.finagle.kestrel.MultiReader$$anonfun$5.apply(MultiReader.scala:99)
at com.twitter.finagle.kestrel.MultiReader$$anonfun$5.apply(MultiReader.scala:97)
at com.twitter.concurrent.Offer$$anon$1$$anonfun$prepare$1$$anon$7$$anonfun$ack$1.apply(Offer.scala:73)
at com.twitter.concurrent.Offer$$anon$1$$anonfun$prepare$1$$anon$7$$anonfun$ack$1.apply(Offer.scala:72)
at com.twitter.util.Future$$anonfun$map$1$$anonfun$apply$6.apply(Future.scala:617)
at com.twitter.util.Try$.apply(Try.scala:13)
at com.twitter.util.Future$.apply(Future.scala:55)
at com.twitter.util.Future$$anonfun$map$1.apply(Future.scala:617)
at com.twitter.util.Future$$anonfun$map$1.apply(Future.scala:617)
at com.twitter.util.Future$$anonfun$flatMap$1.apply(Future.scala:591)
at com.twitter.util.Future$$anonfun$flatMap$1.apply(Future.scala:590)
at com.twitter.util.Promise$Transformer.liftedTree1$1(Promise.scala:67)
at com.twitter.util.Promise$Transformer.k(Promise.scala:67)
at com.twitter.util.Promise$Transformer.apply(Promise.scala:76)
at com.twitter.util.Promise$Transformer.apply(Promise.scala:58)
at com.twitter.util.Promise$$anon$1.run(Promise.scala:250)
at com.twitter.concurrent.Scheduler$LocalScheduler.run(Scheduler.scala:60)
at com.twitter.concurrent.Scheduler$LocalScheduler.submit(Scheduler.scala:40)
at com.twitter.concurrent.Scheduler$.submit(Scheduler.scala:26)
at com.twitter.util.Promise.runq(Promise.scala:236)
at com.twitter.util.Promise.updateIfEmpty(Promise.scala:482)
at com.twitter.util.Promise.update(Promise.scala:455)
at com.twitter.util.Promise.setException(Promise.scala:445)
at com.twitter.concurrent.AsyncQueue$$anonfun$fail$1.apply(AsyncQueue.scala:91)
at com.twitter.concurrent.AsyncQueue$$anonfun$fail$1.apply(AsyncQueue.scala:91)
at scala.collection.Iterator$class.foreach(Iterator.scala:772)
at scala.collection.LinearSeqLike$$anon$1.foreach(LinearSeqLike.scala:50)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
at scala.collection.immutable.Queue.foreach(Queue.scala:42)
at com.twitter.concurrent.AsyncQueue.fail(AsyncQueue.scala:91)
at com.twitter.finagle.transport.ClientChannelTransport.fail(ChannelTransport.scala:115)
at com.twitter.finagle.transport.ClientChannelTransport.handleUpstream(ChannelTransport.scala:130)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:565)
at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:793)
at org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:61)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:565)
at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:793)
at org.jboss.netty.handler.codec.frame.FrameDecoder.cleanup(FrameDecoder.java:489)
at org.jboss.netty.handler.codec.frame.FrameDecoder.channelClosed(FrameDecoder.java:372)
at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:93)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:565)
at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:793)
at org.jboss.netty.channel.SimpleChannelHandler.channelClosed(SimpleChannelHandler.java:223)
at com.twitter.finagle.channel.ChannelStatsHandler.channelClosed(ChannelStatsHandler.scala:88)
at org.jboss.netty.channel.SimpleChannelHandler.handleUpstream(SimpleChannelHandler.java:113)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:565)
at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:793)
at org.jboss.netty.channel.SimpleChannelHandler.channelClosed(SimpleChannelHandler.java:223)
at org.jboss.netty.channel.SimpleChannelHandler.handleUpstream(SimpleChannelHandler.java:113)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:565)
at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:793)
at com.twitter.finagle.netty3.SimpleChannelSnooper.handleUpstream(ChannelSnooper.scala:81)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:565)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:560)
at org.jboss.netty.channel.Channels.fireChannelClosed(Channels.java:476)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.close(AbstractNioWorker.java:649)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:99)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.processSelectedKeys(AbstractNioWorker.java:390)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:261)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:35)
at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:102)
at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:680)
2013. 1. 4 오전 11:44:53 com.twitter.common.zookeeper.ServerSetImpl$ServerSetWatcher notifyServerSetChange
정보: received initial membership [ServiceInstance(serviceEndpoint:Endpoint(host:192.168.0.27, port:22133), additionalEndpoints:{thrift=Endpoint(host:192.168.0.27, port:2229), memcache=Endpoint(host:192.168.0.27, port:22133), text=Endpoint(host:192.168.0.27, port:2222)}, status:ALIVE)]
ServerSet - 192.168.0.27:22133 connected!
2013. 1. 4 오전 11:45:00 com.twitter.finagle.builder.ClientBuilder$$anonfun$17$$anonfun$apply$7 apply
정보: 2de3e7d2 client ^ [id: 0x2de3e7d2] OPEN
2013. 1. 4 오전 11:45:00 com.twitter.finagle.builder.ClientBuilder$$anonfun$17$$anonfun$apply$7 apply
정보: 2de3e7d2 client v [id: 0x2de3e7d2] CONNECT: 192.168.0.27/192.168.0.27:22133
2013. 1. 4 오전 11:45:00 com.twitter.finagle.builder.ClientBuilder$$anonfun$17$$anonfun$apply$7 apply
정보: 2de3e7d2 client ^ [id: 0x2de3e7d2, /192.168.0.27:56669 => 192.168.0.27/192.168.0.27:22133] BOUND: /192.168.0.27:56669
2013. 1. 4 오전 11:45:00 com.twitter.finagle.builder.ClientBuilder$$anonfun$17$$anonfun$apply$7 apply
정보: 2de3e7d2 client ^ [id: 0x2de3e7d2, /192.168.0.27:56669 => 192.168.0.27/192.168.0.27:22133] CONNECTED: 192.168.0.27/192.168.0.27:22133
2013. 1. 4 오전 11:45:10 com.twitter.finagle.builder.ClientBuilder$$anonfun$17$$anonfun$apply$7 apply
정보: 2de3e7d2 client ^ [id: 0x2de3e7d2, /192.168.0.27:56669 :> 192.168.0.27/192.168.0.27:22133] DISCONNECTED
2013. 1. 4 오전 11:45:10 com.twitter.finagle.builder.ClientBuilder$$anonfun$17$$anonfun$apply$7 apply
정보: 2de3e7d2 client ^ [id: 0x2de3e7d2, /192.168.0.27:56669 :> 192.168.0.27/192.168.0.27:22133] UNBOUND
2013. 1. 4 오전 11:45:10 com.twitter.finagle.builder.ClientBuilder$$anonfun$17$$anonfun$apply$7 apply
정보: 2de3e7d2 client ^ [id: 0x2de3e7d2, /192.168.0.27:56669 :> 192.168.0.27/192.168.0.27:22133] CLOSED

After "AllHandlesDiedException$", I try to reconnect to kestrel server using "client.readReliably" . Yes, it works Kestrel connection session establishing! But can't establish read stream!

below is kestrel debug log ...

DEB [20130104-11:44:33.641] kestrel: New session 111 from 192.168.0.27:56667
DEB [20130104-11:44:33.649] kestrel: get -> q=crawl_user t=Some(2013-01-28 23:15:57 +0000) open=true peek=false
DEB [20130104-11:44:43.653] kestrel: End of session 111

// after reconnect
DEB [20130104-11:45:00.489] kestrel: New session 112 from 192.168.0.27:56669
DEB [20130104-11:45:10.492] kestrel: End of session 112

So what is the problem?

Your advice will be my good solution! Please help me!

Below is my test code....

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.log4j.Logger;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.jboss.netty.util.CharsetUtil;

import scala.Function0;
import scala.Function1;
import scala.Tuple2;
import scala.collection.Seq;
import scala.runtime.BoxedUnit;
import scala.runtime.Nothing$;

import com.google.common.collect.ImmutableSet;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.twitter.common.net.pool.DynamicHostSet.HostChangeMonitor;
import com.twitter.common.net.pool.DynamicHostSet.MonitorException;
import com.twitter.common.quantity.Amount;
import com.twitter.common.quantity.Time;
import com.twitter.common.zookeeper.ServerSet;
import com.twitter.common.zookeeper.ServerSetImpl;
import com.twitter.common.zookeeper.ZooKeeperClient;
import com.twitter.concurrent.Spool;
import com.twitter.finagle.ServiceFactory;
import com.twitter.finagle.builder.ClientBuilder;
import com.twitter.finagle.builder.Cluster.Change;
import com.twitter.finagle.kestrel.MultiReader;
import com.twitter.finagle.kestrel.ReadHandle;
import com.twitter.finagle.kestrel.ReadMessage;
import com.twitter.finagle.kestrel.java.Client;
import com.twitter.finagle.kestrel.protocol.Command;
import com.twitter.finagle.kestrel.protocol.Kestrel;
import com.twitter.finagle.kestrel.protocol.Response;
import com.twitter.finagle.service.Backoff;
import com.twitter.finagle.service.RetryPolicy;
import com.twitter.finagle.service.SimpleRetryPolicy;
import com.twitter.finagle.stats.Counter;
import com.twitter.finagle.stats.Gauge;
import com.twitter.finagle.stats.GlobalStatsReceiver;
import com.twitter.finagle.stats.JavaLoggerStatsReceiver;
import com.twitter.finagle.stats.Stat;
import com.twitter.finagle.stats.StatsReceiver;
import com.twitter.finagle.zookeeper.ZookeeperServerSetCluster;
import com.twitter.thrift.ServiceInstance;
import com.twitter.util.Config.Nothing;
import com.twitter.util.Duration;
import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
import com.twitter.util.JavaTimer;
import com.twitter.util.Monitor;
import com.twitter.util.Try;

public class KestrelWorker2 {
public static Logger LOG = Logger.getLogger(KestrelWorker2.class);

private ReadHandle currentReadHandle = null;
private ZookeeperServerSetCluster zkCluster = null;
private Client curClient = null;
private ServerSet curServerSet = null;

private String zkHost = "localhost";
private int zkPort = 2181;
private int ZK_TIME_OUT= 30; //seconds 

private String kestrelZkReadServicePath = "/kestrel/read";

private AtomicBoolean isStartListening = new AtomicBoolean(false);
private AtomicInteger listeningServerCount = new AtomicInteger(0);

public void readQueueFromZk(final String queueName) {
    // Initialize Flag 
    isStartListening.set(false);
    listeningServerCount.set(0);

    final ZooKeeperClient zkClient = new ZooKeeperClient(Amount.of(ZK_TIME_OUT, Time.SECONDS), new InetSocketAddress(zkHost, zkPort));
    zkClient.register(new Watcher() {

        @Override
        public void process(WatchedEvent event) {
            switch ( event.getState() ) {
            case Disconnected :
            case Expired :
                // Notify, Zookeeper Connection is not valid!
                LOG.error("Zookeeper Connection is not Valid!");
                break;
            case SyncConnected :
                break;
            default:
                break;
            }

        }
    } );

    String prefixPath = kestrelZkReadServicePath;
    curServerSet = new ServerSetImpl(zkClient, prefixPath);
    try {
        curServerSet.monitor(new HostChangeMonitor<ServiceInstance>() {

            @Override
            public void onChange(ImmutableSet<ServiceInstance> instances) {
                if ( instances.isEmpty() ) {
                    // Notify, there is no kestrel servers and go to sleep
                    LOG.error("There is no more Kestrel Servers!");
                    System.err.println("There is no more KestrelServers!");
                    isStartListening.set(false);
                    return;
                } 


                if ( isStartListening.get() && instances.size() == listeningServerCount.get() ) {
                    System.out.println("Connection Already made!");
                    return;
                }

                listeningServerCount.set( instances.size());
                isStartListening.set(true);

                ArrayList<ReadHandle> handles = new ArrayList<ReadHandle>();
                final JavaTimer timer = new JavaTimer();
                final Callable<Iterator<Duration>> backoffs = Backoff.toJava(
                        Backoff
                        .exponential(
                                // 100ms initial backoff
                                Duration.fromTimeUnit(100, TimeUnit.MILLISECONDS),
                                // multiplier
                                2)
                                // fail after 10 tries
                                .take(10));

                for ( ServiceInstance instance : instances ) {
                    LOG.info(String.format("ServerSet - %s:%s connected!" , instance.serviceEndpoint.getHost() , instance.serviceEndpoint.getPort()) );
                    System.out.println(String.format("ServerSet - %s:%s connected!" , instance.serviceEndpoint.getHost() , instance.serviceEndpoint.getPort()) );


                    ServiceFactory<Command, Response> factory = 
                            ClientBuilder
                            .safeBuildFactory(
                                    ClientBuilder
                                    .get()
                                    .codec(Kestrel.get())
                                    .logger(java.util.logging.Logger.getLogger("debug"))
                                    .hosts( String.format("%s:%d", instance.serviceEndpoint.getHost(), instance.serviceEndpoint.getPort()) )
                                    .tcpConnectTimeout(Duration.fromTimeUnit(5, TimeUnit.SECONDS))
                                    .hostConnectionLimit(10));

                    curClient = Client.newInstance(factory);


                    ReadHandle readHandle = curClient.readReliably(queueName, timer, backoffs);

                    handles.add(readHandle);
                }

                if ( handles.size() > 0 ) {

                    currentReadHandle = MultiReader.apply(handles.iterator());

                    Future<Throwable> f = currentReadHandle.error().sync();
                    f.addEventListener(new FutureEventListener<Throwable>() {

                        @Override
                        public void onFailure(Throwable e) {
                            System.err.println( "Failure! : " + e.getMessage() );
                            e.printStackTrace();
                        }

                        @Override
                        public void onSuccess(Throwable e) {
                            LOG.error("Kestrel Client Connect Exception!", e);

                            LOG.error("Try to Reconnect!...");
                            System.err.println("Try to Reconnect!...");
                            e.printStackTrace();

                            try {
                                Thread.sleep(10000);
                            } catch (InterruptedException e1) {
                                // TODO Auto-generated catch block
                                e1.printStackTrace();
                            }

                            readQueueFromZk(queueName);
                        }
                    });



                    LOG.info(">> Ready  for listen message!");
                    listenMessages(currentReadHandle);
                }




            }
        });
    } catch (MonitorException e1) {
        LOG.error("ServerSet Monitoring Exception!", e1);
    }


    while( true ) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            LOG.error("wait ... timeout interrupt!", e);
        }
    }
}

private static void listenMessages(final ReadHandle handle ) {
    final JsonParser parser = new JsonParser();
    try {
        Future<ReadMessage> m = handle.messages().sync();

        m.addEventListener(new FutureEventListener<ReadMessage>() {

            @Override
            public void onFailure(Throwable e) {
                e.printStackTrace();
            }

            @Override
            public void onSuccess(ReadMessage message) {
                JsonObject msgObj = parser.parse(message.bytes().toString(CharsetUtil.UTF_8)).getAsJsonObject();

                System.out.println( ":: " + msgObj.toString());
                message.ack().sync();

                listenMessages(handle);
            }
        });

// List<Future> futures = new ArrayList<Future>();
// futures.add(m);
// Future.collect(futures);
} catch ( IllegalMonitorStateException e) {
LOG.error("IllegalMonitorStateException!!!", e);
}
}
}

@zuercher

This comment has been minimized.

Show comment Hide comment
@zuercher

zuercher Jan 5, 2013

Have you tried using ZookeeperServerSetCluster from finagle-serversets? (See https://github.com/twitter/finagle/blob/master/finagle-serversets/src/main/scala/com/twitter/finagle/zookeeper/ZookeeperServerSetCluster.scala)

Then you can create the ReadHandle with something like:

 MultiReader.newBuilder(cluster, queueName)
    .clientBuilder(factory)
    .retryBackoffs(backoffs, timer)
    .build()

where queueName, factory, backoffs, and timer are all from your existing code. Well, backoffs need to be converted into a scala Stream, which you can see an example of here: https://github.com/twitter/finagle/blob/master/finagle-kestrel/src/main/java/com/twitter/finagle/kestrel/java/ClientBase.java#L68

I think the will eliminate most of the complexity in your code. The zookeeper cluster read handle should survive an empty serverset and pick up with new hosts when they appear.

zuercher commented Jan 5, 2013

Have you tried using ZookeeperServerSetCluster from finagle-serversets? (See https://github.com/twitter/finagle/blob/master/finagle-serversets/src/main/scala/com/twitter/finagle/zookeeper/ZookeeperServerSetCluster.scala)

Then you can create the ReadHandle with something like:

 MultiReader.newBuilder(cluster, queueName)
    .clientBuilder(factory)
    .retryBackoffs(backoffs, timer)
    .build()

where queueName, factory, backoffs, and timer are all from your existing code. Well, backoffs need to be converted into a scala Stream, which you can see an example of here: https://github.com/twitter/finagle/blob/master/finagle-kestrel/src/main/java/com/twitter/finagle/kestrel/java/ClientBase.java#L68

I think the will eliminate most of the complexity in your code. The zookeeper cluster read handle should survive an empty serverset and pick up with new hosts when they appear.

@wishoping

This comment has been minimized.

Show comment Hide comment
@wishoping

wishoping Jan 6, 2013

Thank you for your help!

I tried with your advice.
But I met some problem with code ...
Below code is new version ...

ZookeeperServerSetCluster serverSetCluster = new ZookeeperServerSetCluster(curServerSet);

    final JavaTimer timer = new JavaTimer();
    final Callable<Iterator<Duration>> backoffs = Backoff.toJava(
            Backoff
            .exponential(
                    // 100ms initial backoff
                    Duration.fromTimeUnit(100, TimeUnit.MILLISECONDS),
                    // multiplier
                    2)
                    // fail after 10 tries
                    .take(10));

    Function0<Stream<Duration>> backoffsFunction = new com.twitter.util.Function0<Stream<Duration>>() {
          public Stream<Duration> apply() {
            try {
              return JavaConversions.asScalaIterator(backoffs.call()).toStream();
            } catch (Exception e) {
              return null;
            }
          }
        };

    ClientBuilder<Command, Response, Nothing$, Yes, Yes> factory =  null;
    factory = ClientBuilder
        .get()
        .codec(Kestrel.get())
        .logger(java.util.logging.Logger.getLogger("debug"))
        .cluster(serverSetCluster)
        .tcpConnectTimeout(Duration.fromTimeUnit(5, TimeUnit.SECONDS))
        .hostConnectionLimit(10);

    MultiReader.newBuilder(serverSetCluster, queueName)
        .clientBuilder(factory)
        .retryBackoffs(backoffsFunction, timer)
        .build();

but factory variable is not valid. I got a compile error.

factory = ClientBuilder
.get()
.codec(Kestrel.get())
.logger(java.util.logging.Logger.getLogger("debug"))
.cluster(serverSetCluster)
.tcpConnectTimeout(Duration.fromTimeUnit(5, TimeUnit.SECONDS))
.hostConnectionLimit(10);

returns "ClientBuilder<Command, Response, Yes, Yes, Yes>" Type ... so I can't adjust that code...

and Stream function also has a problem...
Function0<Stream> backoffsFunction = new com.twitter.util.Function0<Stream>() {
public Stream apply() {
try {
return JavaConversions.asScalaIterator(backoffs.call()).toStream();
} catch (Exception e) {
return null;
}
}
};

When catch a exception original reference code use "return Stream.empty()" But my eclipse show up compile error, "ambiguous use"... So I temporarily use return null...

If you don't mind, please help me one more..T.T

Thank you for your help in advance~

Thank you for your help!

I tried with your advice.
But I met some problem with code ...
Below code is new version ...

ZookeeperServerSetCluster serverSetCluster = new ZookeeperServerSetCluster(curServerSet);

    final JavaTimer timer = new JavaTimer();
    final Callable<Iterator<Duration>> backoffs = Backoff.toJava(
            Backoff
            .exponential(
                    // 100ms initial backoff
                    Duration.fromTimeUnit(100, TimeUnit.MILLISECONDS),
                    // multiplier
                    2)
                    // fail after 10 tries
                    .take(10));

    Function0<Stream<Duration>> backoffsFunction = new com.twitter.util.Function0<Stream<Duration>>() {
          public Stream<Duration> apply() {
            try {
              return JavaConversions.asScalaIterator(backoffs.call()).toStream();
            } catch (Exception e) {
              return null;
            }
          }
        };

    ClientBuilder<Command, Response, Nothing$, Yes, Yes> factory =  null;
    factory = ClientBuilder
        .get()
        .codec(Kestrel.get())
        .logger(java.util.logging.Logger.getLogger("debug"))
        .cluster(serverSetCluster)
        .tcpConnectTimeout(Duration.fromTimeUnit(5, TimeUnit.SECONDS))
        .hostConnectionLimit(10);

    MultiReader.newBuilder(serverSetCluster, queueName)
        .clientBuilder(factory)
        .retryBackoffs(backoffsFunction, timer)
        .build();

but factory variable is not valid. I got a compile error.

factory = ClientBuilder
.get()
.codec(Kestrel.get())
.logger(java.util.logging.Logger.getLogger("debug"))
.cluster(serverSetCluster)
.tcpConnectTimeout(Duration.fromTimeUnit(5, TimeUnit.SECONDS))
.hostConnectionLimit(10);

returns "ClientBuilder<Command, Response, Yes, Yes, Yes>" Type ... so I can't adjust that code...

and Stream function also has a problem...
Function0<Stream> backoffsFunction = new com.twitter.util.Function0<Stream>() {
public Stream apply() {
try {
return JavaConversions.asScalaIterator(backoffs.call()).toStream();
} catch (Exception e) {
return null;
}
}
};

When catch a exception original reference code use "return Stream.empty()" But my eclipse show up compile error, "ambiguous use"... So I temporarily use return null...

If you don't mind, please help me one more..T.T

Thank you for your help in advance~

@zuercher

This comment has been minimized.

Show comment Hide comment
@zuercher

zuercher Jan 7, 2013

Given these imports:

import com.twitter.util.Duration;
import com.twitter.finagle.builder.ClientBuilder;
import com.twitter.finagle.builder.ClientConfig;
import com.twitter.finagle.kestrel.protocol.Command;
import com.twitter.finagle.kestrel.protocol.Kestrel;
import com.twitter.finagle.kestrel.protocol.Response;
import java.util.concurrent.TimeUnit;
import scala.runtime.Nothing$;

This compiles:

ClientBuilder<Command, Response, Nothing$, ClientConfig.Yes, ClientConfig.Yes> factory =  null;
factory = ClientBuilder
    .get()
    .codec(Kestrel.get())
    .logger(java.util.logging.Logger.getLogger("debug"))
    .tcpConnectTimeout(Duration.fromTimeUnit(5, TimeUnit.SECONDS))
    .hostConnectionLimit(1);

(Assumes finagle-kestrel, finagle-core, util-core, util-logging and scala-library are on the classpath.)

Note that the cluster argument must be removed -- you are meant to pass a partially completed ClientBuilder to the MultiReader. The MultiReader will handle using your ClientBuilder to create Client instances as necessary.

Setting hostConnectionLimit > 1 has no effect: the MultiReader will only ever issue a single request per kestrel host.

As far as Stream.empty() vs null goes: using Stream.empty seems to compile without any issues, so perhaps that's more an issue with Eclipse. Explicitly casting it to Stream might help. Also, it's the invocation of backoffs.call() that is marked as throwing a checked exception. You could wrap that with an exception handler that produces an empty Iterator and then pass the resulting Iterator to JavaConversions.asScalaIterator, removing the need to call Stream.empty altogether.

zuercher commented Jan 7, 2013

Given these imports:

import com.twitter.util.Duration;
import com.twitter.finagle.builder.ClientBuilder;
import com.twitter.finagle.builder.ClientConfig;
import com.twitter.finagle.kestrel.protocol.Command;
import com.twitter.finagle.kestrel.protocol.Kestrel;
import com.twitter.finagle.kestrel.protocol.Response;
import java.util.concurrent.TimeUnit;
import scala.runtime.Nothing$;

This compiles:

ClientBuilder<Command, Response, Nothing$, ClientConfig.Yes, ClientConfig.Yes> factory =  null;
factory = ClientBuilder
    .get()
    .codec(Kestrel.get())
    .logger(java.util.logging.Logger.getLogger("debug"))
    .tcpConnectTimeout(Duration.fromTimeUnit(5, TimeUnit.SECONDS))
    .hostConnectionLimit(1);

(Assumes finagle-kestrel, finagle-core, util-core, util-logging and scala-library are on the classpath.)

Note that the cluster argument must be removed -- you are meant to pass a partially completed ClientBuilder to the MultiReader. The MultiReader will handle using your ClientBuilder to create Client instances as necessary.

Setting hostConnectionLimit > 1 has no effect: the MultiReader will only ever issue a single request per kestrel host.

As far as Stream.empty() vs null goes: using Stream.empty seems to compile without any issues, so perhaps that's more an issue with Eclipse. Explicitly casting it to Stream might help. Also, it's the invocation of backoffs.call() that is marked as throwing a checked exception. You could wrap that with an exception handler that produces an empty Iterator and then pass the resulting Iterator to JavaConversions.asScalaIterator, removing the need to call Stream.empty altogether.

@mosesn

This comment has been minimized.

Show comment Hide comment
@mosesn

mosesn Dec 20, 2013

Contributor

@wishoping this is a very old issue, but were you able to solve your problem?

Contributor

mosesn commented Dec 20, 2013

@wishoping this is a very old issue, but were you able to solve your problem?

@mosesn mosesn closed this Apr 21, 2014

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment