Skip to content

No keep-alive acks in the process of receiving data #712

@Zeymo

Description

@Zeymo

use requestChannel to upload large file in a weak network environment which upstream package 50% is lost , no keep-alive acks in the process of receiving data when using 1.0.0-RC5

resume is disabled , when enable resume also throw Exception cos's missmatch position

server log

18:23:03.130 [reactor-tcp-nio-2] DEBUG io.rsocket.FrameLogger - sending ->
Frame => Stream ID: 0 Type: KEEPALIVE Flags: 0b10000000 Length: 14

...

18:24:20.092 [reactor-tcp-nio-2] DEBUG io.rsocket.FrameLogger - sending ->
Frame => Stream ID: 1 Type: REQUEST_N Flags: 0b0 Length: 10
Data:

18:24:21.591 [reactor-tcp-nio-2] DEBUG io.rsocket.FrameLogger - receiving ->
Frame => Stream ID: 1 Type: NEXT Flags: 0b10100000 Length: 1024
Data:

io.rsocket.exceptions.ConnectionErrorException: No keep-alive acks for 90000 ms
	at io.rsocket.RSocketRequester.terminate(RSocketRequester.java:115)
	at io.rsocket.keepalive.KeepAliveSupport.tryTimeout(KeepAliveSupport.java:110)
	at io.rsocket.keepalive.KeepAliveSupport$ClientKeepAliveSupport.onIntervalTick(KeepAliveSupport.java:146)
	at io.rsocket.keepalive.KeepAliveSupport.lambda$start$0(KeepAliveSupport.java:54)
	at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
	at reactor.core.publisher.FluxInterval$IntervalRunnable.run(FluxInterval.java:123)
	at reactor.core.scheduler.PeriodicWorkerTask.call(PeriodicWorkerTask.java:59)
	at reactor.core.scheduler.PeriodicWorkerTask.run(PeriodicWorkerTask.java:73)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:186)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:300)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1152)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:627)
	at java.lang.Thread.run(Thread.java:882)

[reactor-tcp-nio-2] DEBUG reactor.netty.ReactorNetty - [id: 0xabc57eaa, L:/xx.xx.xx.xx ! R:/xx.xx.xx.xx:59782] Non Removed handler: RSocketLengthCodec, context: ChannelHandlerContext(RSocketLengthCodec, [id: 0xabc57eaa, L:/xx.xx.xx.xx:5900 ! R:/xx.xx.xx.xx:59782]), pipeline: DefaultChannelPipeline{(RSocketLengthCodec = io.rsocket.transport.netty.RSocketLengthCodec), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}

[parallel-1] DEBUG reactor.core.publisher.Operators - onNextDropped: Tuple2ByteBuf{capacity=14, one=PooledUnsafeDirectByteBuf(ridx: 0, widx: 14, cap: 256), two=EmptyByteBufBE, allocator=PooledByteBufAllocator(directByDefault: true), oneReadIndex=0, twoReadIndex=0, oneReadableBytes=14, twoReadableBytes=0, twoRelativeIndex=14}

cilent log

receive ack 1014

client code

public static void main(String[] args) throws InterruptedException {

        int mtu = Integer.getInteger("mtu", 1024 * 16);
        int chunkSize = Integer.getInteger("chunk", 1024 * 100);
        String fileName = System.getProperty("path","receive.txt");
        boolean resume = Boolean.getBoolean("resume");
        int timeoutPerSeconds = Integer.getInteger("timeout",3 * 60);
        RSocketFactory.ClientRSocketFactory factory = RSocketFactory.connect();

        if (resume){
            factory.resume()
                .resumeStrategy(
                    () -> new VerboseResumeStrategy(new PeriodicResumeStrategy(Duration.ofSeconds(1))))
                .resumeSessionDuration(Duration.ofMinutes(5));
        }

        if (mtu > 0) {
            factory.fragment(mtu);
        }

        RSocket client =
            factory.transport(TcpClientTransport.create("11.165.70.66", 5900))
                .start()
                .block();

        long start = System.currentTimeMillis();
        client
            .requestChannel(Files.fileSource(fileName, chunkSize).map(DefaultPayload::create))
            .timeout(Duration.ofSeconds(timeoutPerSeconds))
            .subscribe(new Subscriber<Payload>() {
                Subscription s;

                @Override
                public void onSubscribe(Subscription s) {
                    this.s = s;
                    this.s.request(1);
                }

                @Override
                public void onNext(Payload payload) {
                    System.out.println("receive ack " + payload.getDataUtf8());
                    payload.release();
                    this.s.request(1);
                }

                @Override
                public void onError(Throwable t) {
                    System.out.println("client Throwable" + t);
                }

                @Override
                public void onComplete() {
                    System.out.println("client upload complete, cost "+ (System.currentTimeMillis() - start));
                }
            });
        client.onClose().block();
    }

server code

@Override
    public Flux<Payload> requestChannel(Publisher<Payload> payload) {
        AtomicBoolean compeletly = new AtomicBoolean(false);
        AtomicInteger received = new AtomicInteger(0);
        Flux.from(payload).subscribeOn(Schedulers.single()).subscribe(new Subscriber<Payload>() {
            int windowSize = 4;
            Subscription s;
            int requests = windowSize;
            OutputStream outputStream;
            int receivedBytes = 0;
            int receivedCount = 0;

            @Override
            public void onSubscribe(Subscription s) {
                this.s = s;
                this.s.request(requests);
            }

            @Override
            public void onNext(Payload payload) {
                ByteBuf data = payload.data();
                receivedBytes += data.readableBytes();
                received.set(receivedBytes);
                receivedCount += 1;
                //System.out.println(
                //    "Received file chunk: " + receivedCount + ". Total size: " + receivedBytes+","+ Thread.currentThread());
                if (outputStream == null) {
                    outputStream = open(fileName);
                }
                write(outputStream, data);
                payload.release();
                requests--;
                if (requests == windowSize / 2) {
                    requests += windowSize;
                    s.request(windowSize);
                }
            }

            @Override
            public void onError(Throwable t) {
                System.out.println("onError " + t);
                close(outputStream);
            }

            @Override
            public void onComplete() {
                System.out.println("server receive onComplete");
                close(outputStream);
                compeletly.compareAndSet(false, true);
            }

            private void write(OutputStream outputStream, ByteBuf byteBuf) {
                try {
                    byteBuf.readBytes(outputStream, byteBuf.readableBytes());
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }

            private OutputStream open(String filename) {
                try {
                    /*do not buffer for demo purposes*/
                    return new FileOutputStream(filename);
                } catch (FileNotFoundException e) {
                    throw new RuntimeException(e);
                }
            }

            private void close(OutputStream stream) {
                if (stream != null) {
                    try {
                        stream.close();
                    } catch (IOException e) {
                    }
                }
            }

        });

        Flux<Payload> receivedFlux = Flux.generate(sink -> {
            int receivedBytes = received.get();
            if (compeletly.get()) {
                sink.next(DefaultPayload.create(String.valueOf(receivedBytes)));
                sink.complete();
            }else{
                sink.next(DefaultPayload.create(String.valueOf(receivedBytes)));
            }
        });

        return receivedFlux.subscribeOn(Schedulers.elastic());
    }

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions