Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Load Testing randomly throws UnsupportedOperationException #717

Closed
pckeyan opened this issue Nov 13, 2019 · 15 comments
Closed

Load Testing randomly throws UnsupportedOperationException #717

pckeyan opened this issue Nov 13, 2019 · 15 comments
Assignees
Labels

Comments

@pckeyan
Copy link

pckeyan commented Nov 13, 2019

I am performing a load test on a LoadBalancer with two server nodes. Randomly I am finding the below exception: Please let me know if I am performing anything wrong here.

2019-11-13 12:06:50.802 ERROR 16280 --- [actor-tcp-nio-4] io.rsocket.FrameLogger                   : Error receiving frame:

java.lang.IllegalArgumentException: promise already done: DefaultChannelPromise@2f5598ce(failure: java.lang.UnsupportedOperationException)
	at io.netty.channel.AbstractChannelHandlerContext.isNotValidPromise(AbstractChannelHandlerContext.java:891) [netty-transport-4.1.39.Final.jar!/:4.1.39.Final]
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:773) [netty-transport-4.1.39.Final.jar!/:4.1.39.Final]
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:701) [netty-transport-4.1.39.Final.jar!/:4.1.39.Final]
	at io.netty.handler.ssl.SslHandler.finishWrap(SslHandler.java:899) ~[netty-handler-4.1.39.Final.jar!/:4.1.39.Final]
	at io.netty.handler.ssl.SslHandler.wrap(SslHandler.java:885) ~[netty-handler-4.1.39.Final.jar!/:4.1.39.Final]
	at io.netty.handler.ssl.SslHandler.wrapAndFlush(SslHandler.java:797) ~[netty-handler-4.1.39.Final.jar!/:4.1.39.Final]
	at io.netty.handler.ssl.SslHandler.flush(SslHandler.java:778) ~[netty-handler-4.1.39.Final.jar!/:4.1.39.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:749) [netty-transport-4.1.39.Final.jar!/:4.1.39.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:741) [netty-transport-4.1.39.Final.jar!/:4.1.39.Final]
	at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:727) [netty-transport-4.1.39.Final.jar!/:4.1.39.Final]
	at reactor.netty.channel.MonoSendMany$SendManyInner$AsyncFlush.run(MonoSendMany.java:607) ~[reactor-netty-0.8.11.RELEASE.jar!/:0.8.11.RELEASE]
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) ~[netty-common-4.1.39.Final.jar!/:4.1.39.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:416) ~[netty-common-4.1.39.Final.jar!/:4.1.39.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:515) ~[netty-transport-4.1.39.Final.jar!/:4.1.39.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918) ~[netty-common-4.1.39.Final.jar!/:4.1.39.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.39.Final.jar!/:4.1.39.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.39.Final.jar!/:4.1.39.Final]
	at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_221]

Sample Client Code:

package com.rsocket.examples;

import io.rsocket.Payload;
import io.rsocket.RSocketFactory;
import io.rsocket.client.LoadBalancedRSocketMono;
import io.rsocket.client.filter.RSocketSupplier;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.util.DefaultPayload;
import org.apache.commons.lang3.SerializationUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.io.Serializable;
import java.util.*;
import java.util.stream.Collectors;

public class LoadTestClient {

    public static void main(String[] args) {
        LoadTestClient loadTestClient = new LoadTestClient();
        loadTestClient.callLB();

    }

    private void callLB() {

        int[] server1 = new int[] {9000, 9001};

        Set<RSocketSupplier> suppliers = new HashSet<>();

        suppliers.addAll(Arrays.stream(server1)
                .mapToObj(port -> new RSocketSupplier(() -> Mono.just(RSocketFactory
                        .connect()
                        .transport(TcpClientTransport.create("localhost", port))
                        .start().doOnSubscribe(s -> System.out.println("RSocket connection established." + s))
                        .block())))
                .collect(
                        Collectors.toSet()));

        LoadBalancedRSocketMono balancer = LoadBalancedRSocketMono
                .create(Flux.just(suppliers));

        long start = new Date().getTime();
        Flux.range(1, 2)
                .flatMap(i -> thrGrp(balancer))
                .subscribe();
        System.out.println("Total Time taken for 10K records ---> (ms)" + (new Date().getTime() - start));

    }

    private Mono<Void> thrGrp(LoadBalancedRSocketMono balancedRSocketMono) {
        Flux.range(1, 10000)
                .flatMap(i -> callServer(i, balancedRSocketMono))
                .subscribe();
        return Mono.empty();
    }

    private Flux<Void> callServer(int i, LoadBalancedRSocketMono balancedRSocketMono) {
        balancedRSocketMono.block().requestResponse(DefaultPayload
                .create(SerializationUtils.serialize(getEmpData(i))))
                .map(Payload::getDataUtf8)
                .map(s -> "Response Received for " + i + " in Client -->" + s)
                .doOnNext(System.out::println)
                .block();
        return Flux.empty();
    }

    private EmployeeData getEmpData(int i) {
        return new EmployeeData("John" + i, "Doe", 22);
    }
}

class EmployeeData implements Serializable {

    private static final long serialVersionUID = 2892015151528050314L;

    String firstName;

    String lastName;

    int age;

    public EmployeeData(String firstName, String lastName, int age) {
        this.firstName = firstName;
        this.lastName = lastName;
        this.age = age;
    }

    public String getFirstName() {
        return firstName;
    }

    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }

    public String getLastName() {
        return lastName;
    }

    public void setLastName(String lastName) {
        this.lastName = lastName;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o)
            return true;
        if (o == null || getClass() != o.getClass())
            return false;
        EmployeeData that = (EmployeeData) o;
        return age == that.age &&
                Objects.equals(firstName, that.firstName) &&
                Objects.equals(lastName, that.lastName);
    }

    @Override
    public int hashCode() {
        return Objects.hash(firstName, lastName, age);
    }
}

Server Code below:

package com.rsocket.examples;

import io.netty.buffer.ByteBuf;
import io.rsocket.*;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.util.DefaultPayload;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.FastDateFormat;
import reactor.core.publisher.Mono;

import java.util.Arrays;
import java.util.Date;

public class LoadTestServers {

    static final String HOST = "localhost";

    //    static Logger log = Logger.getLogger(LoadTestServers.class.getName());

    static final int[] PORTS = new int[] {9000, 9001};

    public static void main(String[] args) throws InterruptedException {

        Arrays.stream(PORTS)
                .forEach(port -> RSocketFactory.receive()
                        .acceptor(new SimpleSocketAcceptor("SERVER-" + port))
                        .transport(TcpServerTransport.create(HOST, port))
                        .start()
                        .subscribe());

        System.out.println("Servers running");

        Thread.currentThread().join();
    }

    static class SimpleSocketAcceptor implements SocketAcceptor {

        private String serverName;

        SimpleSocketAcceptor(String serverName) {
            this.serverName = serverName;
        }

        @Override
        public Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket sendingSocket) {
            System.out.println("Received setup connection on acceptor: [{}]" + serverName);
            return Mono.just(new AbstractRSocket() {
                @Override
                public Mono<Payload> requestResponse(Payload payload) {
                    ByteBuf dataByteBuf = payload.sliceData();
                    byte[] dataBytes = new byte[dataByteBuf.readableBytes()];
                    dataByteBuf.getBytes(dataByteBuf.readerIndex(), dataBytes);
                    EmployeeData employeeData = SerializationUtils.deserialize(dataBytes);
                    String response = StringUtils.join(employeeData.getFirstName(), "response at ->",
                            FastDateFormat.getInstance("MM-dd-yyyy").format(new Date()));
                    return Mono.just(DefaultPayload.create(response));
                }
            });
        }
    }
}

I call the Client using GNU Parallel utility tool:

START=$(date +%s);
parallel java LoadTestClient ::: {1..10} > client.log
END=$(date +%s);
echo Total Time taken - $((END-START)) Secs
echo  Process completed!
@OlegDokuka
Copy link
Member

Hello, @pckeyan!

Thanks for opening the issue!
Can you please provide a version of RSocket as well as a demo repository/project/gist where you have a full version of the project which reproduces your issue.

So far, the given stacktrace is useless cuz it does not provide the cause of it and the source you have provided does not explain how you configure RSocket Server/Client.

Regards,
Oleh

@pckeyan
Copy link
Author

pckeyan commented Nov 19, 2019

Hello @OlegDokuka,

I am using RSocket 1.0.0-RC5. I have updated the question with Sample Client and Server Code along with a bash script to load test. Please let me know if you need any.

Just fyi, I added below code to all my rsocket client connections, I am not seeing this issue. I ran few tests till 1 million.

RSocketFactory.connect().frameDecoder(PayloadDecoder.ZERO_COPY)
                .keepAlive(Duration.ofSeconds(300), Duration.ofSeconds(600), 10)

But if I change the client call to server from block() to subscribe() I get the same issue: Please advise how I should handle it:

private Flux<Void> callServer(int i, LoadBalancedRSocketMono balancedRSocketMono) {
        balancedRSocketMono.block().requestResponse(DefaultPayload
                .create(SerializationUtils.serialize(getEmpData(i))))
                .map(Payload::getDataUtf8)
                .map(s -> "Response Received for " + i + " in Client -->" + s)
                .doOnNext(System.out::println)
                .subscribe();
                //.block();
        return Flux.empty();
    }

Regards
Karthik

@pckeyan
Copy link
Author

pckeyan commented Nov 20, 2019

All I can achieve is blocking calls, Can you please advise how I can do non-blocking to achieve high throughput?

@OlegDokuka
Copy link
Member

@pckeyan looking into your case

@OlegDokuka
Copy link
Member

OlegDokuka commented Nov 26, 2019

@pckeyan was not able to reproduce the issue locally. Here is the project under the test - https://github.com/OlegDokuka/rsocket-issue-717.

Feel free to share more insights.

Also, a little bit on the LoadBalance RSocket behavior:

Once you try to select an RSoceket for a call, there is a chance that the slowest rsocket will be closed and recreated again (see https://github.com/rsocket/rsocket-java/blob/develop/rsocket-load-balancer/src/main/java/io/rsocket/client/LoadBalancedRSocketMono.java#L108 and https://github.com/rsocket/rsocket-java/blob/develop/rsocket-load-balancer/src/main/java/io/rsocket/client/LoadBalancedRSocketMono.java#L252) in order to keep the latency stable, which means that there is another high chance that if there is a call being in process in that particular RSocket, then this call may end up with an error. So, in order to avoid that, I added retry logic, which ensures the call has happened

@pckeyan
Copy link
Author

pckeyan commented Nov 26, 2019

@OlegDokuka Thanks and Appreciate your time. I configured my application like your sample. I use the TLS - two way SSL. If I make even the count to 2, it fails. It works only for one call. I am getting the below error. Please advise.

Frame => Stream ID: 1 Type: REQUEST_RESPONSE Flags: 0b100000000 Length: 302
Metadata:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| ac ed 00 05 73 72 00 24 63 6f 6d 2e 72 73 6f 63 |....sr.$com.rsoc|
|00000010| 6b 65 74 2e 65 78 61 6d 70 6c 65 73 2e 52 6f 75 |ket.examples.Rou|
|00000020| 74 69 6e 67 4d 65 74 61 64 61 74 61 ee a5 77 87 |tingMetadata..w.|
|00000030| 29 05 ba b8 02 00 03 4c 00 0b 63 6c 75 73 74 65 |)......L..cluste|
|00000040| 72 4e 61 6d 65 74 00 12 4c 6a 61 76 61 2f 6c 61 |rNamet..Ljava/la|
|00000050| 6e 67 2f 53 74 72 69 6e 67 3b 4c 00 09 72 6f 75 |ng/String;L..rou|
|00000060| 74 65 4e 61 6d 65 71 00 7e 00 01 4c 00 0b 73 65 |teNameq.~..L..se|
|00000070| 72 76 69 63 65 4e 61 6d 65 71 00 7e 00 01 78 70 |rviceNameq.~..xp|
|00000080| 74 00 03 53 42 43 74 00 0b 61 64 64 45 6d 70 6c |t..SBCt..addEmpl|
|00000090| 6f 79 65 65 74 00 0f 65 6d 70 6c 6f 79 65 65 53 |oyeet..employeeS|
|000000a0| 65 72 76 69 63 65                               |ervice          |
+--------+-------------------------------------------------+----------------+
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| ac ed 00 05 73 72 00 21 63 6f 6d 2e 72 73 6f 63 |....sr.!com.rsoc|
|00000010| 6b 65 74 2e 65 78 61 6d 70 6c 65 73 2e 45 6d 70 |ket.examples.Emp|
|00000020| 6c 6f 79 65 65 44 61 74 61 74 86 c9 f1 a1 aa 86 |loyeeDatat......|
|00000030| 57 02 00 03 49 00 03 61 67 65 4c 00 09 66 69 72 |W...I..ageL..fir|
|00000040| 73 74 4e 61 6d 65 74 00 12 4c 6a 61 76 61 2f 6c |stNamet..Ljava/l|
|00000050| 61 6e 67 2f 53 74 72 69 6e 67 3b 4c 00 08 6c 61 |ang/String;L..la|
|00000060| 73 74 4e 61 6d 65 71 00 7e 00 01 78 70 00 00 00 |stNameq.~..xp...|
|00000070| 16 74 00 05 4a 6f 68 6e 31 74 00 03 44 6f 65    |.t..John1t..Doe |
+--------+-------------------------------------------------+----------------+
2019-11-26 14:31:36.573 DEBUG 15117 --- [actor-tcp-nio-4] io.rsocket.FrameLogger                   : sending -> 
Frame => Stream ID: 3 Type: REQUEST_RESPONSE Flags: 0b100000000 Length: 302
Metadata:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| ac ed 00 05 73 72 00 24 63 6f 6d 2e 72 73 6f 63 |....sr.$com.rsoc|
|00000010| 6b 65 74 2e 65 78 61 6d 70 6c 65 73 2e 52 6f 75 |ket.examples.Rou|
|00000020| 74 69 6e 67 4d 65 74 61 64 61 74 61 ee a5 77 87 |tingMetadata..w.|
|00000030| 29 05 ba b8 02 00 03 4c 00 0b 63 6c 75 73 74 65 |)......L..cluste|
|00000040| 72 4e 61 6d 65 74 00 12 4c 6a 61 76 61 2f 6c 61 |rNamet..Ljava/la|
|00000050| 6e 67 2f 53 74 72 69 6e 67 3b 4c 00 09 72 6f 75 |ng/String;L..rou|
|00000060| 74 65 4e 61 6d 65 71 00 7e 00 01 4c 00 0b 73 65 |teNameq.~..L..se|
|00000070| 72 76 69 63 65 4e 61 6d 65 71 00 7e 00 01 78 70 |rviceNameq.~..xp|
|00000080| 74 00 03 53 42 43 74 00 0b 61 64 64 45 6d 70 6c |t..SBCt..addEmpl|
|00000090| 6f 79 65 65 74 00 0f 65 6d 70 6c 6f 79 65 65 53 |oyeet..employeeS|
|000000a0| 65 72 76 69 63 65                               |ervice          |
+--------+-------------------------------------------------+----------------+
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| ac ed 00 05 73 72 00 21 63 6f 6d 2e 72 73 6f 63 |....sr.!com.rsoc|
|00000010| 6b 65 74 2e 65 78 61 6d 70 6c 65 73 2e 45 6d 70 |ket.examples.Emp|
|00000020| 6c 6f 79 65 65 44 61 74 61 74 86 c9 f1 a1 aa 86 |loyeeDatat......|
|00000030| 57 02 00 03 49 00 03 61 67 65 4c 00 09 66 69 72 |W...I..ageL..fir|
|00000040| 73 74 4e 61 6d 65 74 00 12 4c 6a 61 76 61 2f 6c |stNamet..Ljava/l|
|00000050| 61 6e 67 2f 53 74 72 69 6e 67 3b 4c 00 08 6c 61 |ang/String;L..la|
|00000060| 73 74 4e 61 6d 65 71 00 7e 00 01 78 70 00 00 00 |stNameq.~..xp...|
|00000070| 16 74 00 05 4a 6f 68 6e 32 74 00 03 44 6f 65    |.t..John2t..Doe |
+--------+-------------------------------------------------+----------------+
In callServer make a call
java.lang.UnsupportedOperationException
	at io.rsocket.buffer.AbstractTupleByteBuf.writeBytes(AbstractTupleByteBuf.java:497)
	at io.netty.handler.ssl.SslHandler.attemptCopyToCumulation(SslHandler.java:2209)
	at io.netty.handler.ssl.SslHandler.access$2800(SslHandler.java:165)
	at io.netty.handler.ssl.SslHandler$SslHandlerCoalescingBufferQueue.compose(SslHandler.java:2173)
	at io.netty.channel.AbstractCoalescingBufferQueue.remove(AbstractCoalescingBufferQueue.java:175)
	at io.netty.handler.ssl.SslHandler.wrap(SslHandler.java:819)
	at io.netty.handler.ssl.SslHandler.wrapAndFlush(SslHandler.java:797)
	at io.netty.handler.ssl.SslHandler.flush(SslHandler.java:778)
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:749)
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:741)
	at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:727)
	at reactor.netty.channel.MonoSendMany$SendManyInner$AsyncFlush.run(MonoSendMany.java:607)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:416)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:515)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)
In callServer make a call
java.lang.UnsupportedOperationException
	at io.rsocket.buffer.AbstractTupleByteBuf.writeBytes(AbstractTupleByteBuf.java:497)
	at io.netty.handler.ssl.SslHandler.attemptCopyToCumulation(SslHandler.java:2209)
	at io.netty.handler.ssl.SslHandler.access$2800(SslHandler.java:165)
	at io.netty.handler.ssl.SslHandler$SslHandlerCoalescingBufferQueue.compose(SslHandler.java:2173)
	at io.netty.channel.AbstractCoalescingBufferQueue.remove(AbstractCoalescingBufferQueue.java:175)
	at io.netty.handler.ssl.SslHandler.wrap(SslHandler.java:819)
	at io.netty.handler.ssl.SslHandler.wrapAndFlush(SslHandler.java:797)
	at io.netty.handler.ssl.SslHandler.flush(SslHandler.java:778)
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:749)
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:741)
	at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:727)
	at reactor.netty.channel.MonoSendMany$SendManyInner$AsyncFlush.run(MonoSendMany.java:607)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:416)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:515)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)
2019-11-26 14:31:36.789 ERROR 15117 --- [actor-tcp-nio-4] io.rsocket.FrameLogger                   : Error receiving frame:

java.lang.IllegalArgumentException: promise already done: DefaultChannelPromise@3a4fb27e(failure: java.lang.UnsupportedOperationException)
	at io.netty.channel.AbstractChannelHandlerContext.isNotValidPromise(AbstractChannelHandlerContext.java:891) [netty-transport-4.1.39.Final.jar:4.1.39.Final]
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:773) [netty-transport-4.1.39.Final.jar:4.1.39.Final]
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:701) [netty-transport-4.1.39.Final.jar:4.1.39.Final]
	at io.netty.handler.ssl.SslHandler.finishWrap(SslHandler.java:899) ~[netty-handler-4.1.39.Final.jar:4.1.39.Final]
	at io.netty.handler.ssl.SslHandler.wrap(SslHandler.java:885) ~[netty-handler-4.1.39.Final.jar:4.1.39.Final]
	at io.netty.handler.ssl.SslHandler.wrapAndFlush(SslHandler.java:797) ~[netty-handler-4.1.39.Final.jar:4.1.39.Final]
	at io.netty.handler.ssl.SslHandler.flush(SslHandler.java:778) ~[netty-handler-4.1.39.Final.jar:4.1.39.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:749) [netty-transport-4.1.39.Final.jar:4.1.39.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:741) [netty-transport-4.1.39.Final.jar:4.1.39.Final]
	at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:727) [netty-transport-4.1.39.Final.jar:4.1.39.Final]
	at reactor.netty.channel.MonoSendMany$SendManyInner$AsyncFlush.run(MonoSendMany.java:607) ~[reactor-netty-0.8.11.RELEASE.jar:0.8.11.RELEASE]
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) ~[netty-common-4.1.39.Final.jar:4.1.39.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:416) ~[netty-common-4.1.39.Final.jar:4.1.39.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:515) ~[netty-transport-4.1.39.Final.jar:4.1.39.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918) ~[netty-common-4.1.39.Final.jar:4.1.39.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.39.Final.jar:4.1.39.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.39.Final.jar:4.1.39.Final]
	at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_161]

@OlegDokuka
Copy link
Member

Oh yeah) That was not mentioned in the issue) @pckeyan, can you please share the SSL setup you have?

Regards,
Oleh

@pckeyan
Copy link
Author

pckeyan commented Nov 26, 2019

@OlegDokuka My apologies, I think I found the problem, One of my connections is using a block() which I removed seems to be working. Let me test for large numbers and will update you.

@pckeyan
Copy link
Author

pckeyan commented Nov 26, 2019

@OlegDokuka When I bump up to 5000 with or without SSL, I am getting the Connection Reset by Peer exception. Please find the repo for my exact work I am using to test this without SSL. Please help me how we can fix it.

https://github.com/pckeyan/rsocket-model.git
https://github.com/pckeyan/rsocket-load-balancer.git
https://github.com/pckeyan/rsocket-sample-server.git
https://github.com/pckeyan/rsocket-sample-client.git

@OlegDokuka
Copy link
Member

OlegDokuka commented Nov 26, 2019

@pckeyan, Connection Reset by Peer is expected behavior. As I said earlier, LoadBalancedRSocket closes the slowest RSocket when it does not meet the setup SLA -> https://github.com/rsocket/rsocket-java/blob/develop/rsocket-load-balancer/src/main/java/io/rsocket/client/LoadBalancedRSocketMono.java#L359.

Feel free to tune params using the following method https://github.com/rsocket/rsocket-java/blob/develop/rsocket-load-balancer/src/main/java/io/rsocket/client/LoadBalancedRSocketMono.java#L173 in order to avoid RSocket being closed under the high load

@OlegDokuka
Copy link
Member

@pckeyan
I'm closing the issue.

Fill free to reopen it once you observe any related issues

Regards,
Oleh

@pckeyan
Copy link
Author

pckeyan commented Nov 29, 2019

@OlegDokuka Can you please let me know what are the lower and upper bound values and how it is used for me to configure? I am not able to find any documentation except code level doc. Please advise for my scenario.

@pckeyan
Copy link
Author

pckeyan commented Dec 5, 2019

@OlegDokuka After adding Two Way SSL to the sample you provided https://github.com/OlegDokuka/rsocket-issue-717. Calls failed in block(). I changed the client as below:

 suppliers.addAll(Arrays.stream(server1)
                .mapToObj(port -> new RSocketSupplier(() -> {
                    System.err.println(
                            "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Happened!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
                    return RSocketFactory
                            .connect().resumeCleanupOnKeepAlive()
                            .keepAlive(Duration.ofSeconds(300), Duration.ofSeconds(600), 10)
                            .frameDecoder(PayloadDecoder.ZERO_COPY)
                            //                                    .transport(TcpClientTransport.create("localhost", port))
                            .transport(() -> {
                                        TcpClient tcpClient = TcpClient.create().host("localhost").port(12001);
                                        return TcpClientTransport.create(
                                                tcpClient.secure(sslContextSpec -> sslContextSpec
                                                        .sslContext(
                                                                SslContextBuilder.forClient()
                                                                        .keyManager(keyCertChainFile, keyFile)
                                                                        .trustManager(
                                                                                new File(
                                                                                        "cacert.pem")))));
                                    }
                            )
                            .start().retry().cache();
                }))
                .collect(
                        Collectors.toSet()));

Now I get response back only if the CALLS_COUNT=1. If I increase to 10, I see failures of io.netty.handler.codec.DecoderException: java.lang.IllegalArgumentException: promise already done: DefaultChannelPromise@869d7cd(failure: java.lang.UnsupportedOperationException). I tried with KeepAlive and resumeCleanupOnKeepAlive, but no success. Please advise.

@OlegDokuka
Copy link
Member

OlegDokuka commented Dec 5, 2019

@pckeyan let me check that quickly

@pckeyan
Copy link
Author

pckeyan commented Dec 6, 2019

@OlegDokuka Seems like non-blocking calls is having the issue of establishing the SSL handshake. For my use case it is a blocker. Can we reopen this issue?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants