diff --git a/rsocket-core/src/main/java/io/rsocket/resume/ClientResume.java b/rsocket-core/src/main/java/io/rsocket/resume/ClientResume.java index a8f9eba05..415a77f92 100644 --- a/rsocket-core/src/main/java/io/rsocket/resume/ClientResume.java +++ b/rsocket-core/src/main/java/io/rsocket/resume/ClientResume.java @@ -19,7 +19,7 @@ import io.netty.buffer.ByteBuf; import java.time.Duration; -class ClientResume { +public class ClientResume { private final Duration sessionDuration; private final ByteBuf resumeToken; diff --git a/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/resume/Files.java b/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/resume/Files.java new file mode 100644 index 000000000..e6867f8b5 --- /dev/null +++ b/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/resume/Files.java @@ -0,0 +1,134 @@ +package io.rsocket.examples.transport.tcp.resume; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.rsocket.Payload; +import java.io.*; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import reactor.core.publisher.Flux; +import reactor.core.publisher.SynchronousSink; + +class Files { + + public static Flux fileSource(String fileName, int chunkSizeBytes) { + return Flux.generate( + () -> new FileState(fileName, chunkSizeBytes), FileState::consumeNext, FileState::dispose); + } + + public static Subscriber fileSink(String fileName, int windowSize) { + return new Subscriber() { + Subscription s; + int requests = windowSize; + OutputStream outputStream; + int receivedBytes; + int receivedCount; + + @Override + public void onSubscribe(Subscription s) { + this.s = s; + this.s.request(requests); + } + + @Override + public void onNext(Payload payload) { + ByteBuf data = payload.data(); + receivedBytes += data.readableBytes(); + receivedCount += 1; + System.out.println( + "Received file chunk: " + receivedCount + ". Total size: " + receivedBytes); + if (outputStream == null) { + outputStream = open(fileName); + } + write(outputStream, data); + payload.release(); + + requests--; + if (requests == windowSize / 2) { + requests += windowSize; + s.request(windowSize); + } + } + + private void write(OutputStream outputStream, ByteBuf byteBuf) { + try { + byteBuf.readBytes(outputStream, byteBuf.readableBytes()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void onError(Throwable t) { + close(outputStream); + } + + @Override + public void onComplete() { + close(outputStream); + } + + private OutputStream open(String filename) { + try { + /*do not buffer for demo purposes*/ + return new FileOutputStream(filename); + } catch (FileNotFoundException e) { + throw new RuntimeException(e); + } + } + + private void close(OutputStream stream) { + if (stream != null) { + try { + stream.close(); + } catch (IOException e) { + } + } + } + }; + } + + private static class FileState { + private final String fileName; + private final int chunkSizeBytes; + private BufferedInputStream inputStream; + private byte[] chunkBytes; + + public FileState(String fileName, int chunkSizeBytes) { + this.fileName = fileName; + this.chunkSizeBytes = chunkSizeBytes; + } + + public FileState consumeNext(SynchronousSink sink) { + if (inputStream == null) { + InputStream in = getClass().getClassLoader().getResourceAsStream(fileName); + if (in == null) { + sink.error(new FileNotFoundException(fileName)); + return this; + } + this.inputStream = new BufferedInputStream(in); + this.chunkBytes = new byte[chunkSizeBytes]; + } + try { + int consumedBytes = inputStream.read(chunkBytes); + if (consumedBytes == -1) { + sink.complete(); + } else { + sink.next(Unpooled.copiedBuffer(chunkBytes, 0, consumedBytes)); + } + } catch (IOException e) { + sink.error(e); + } + return this; + } + + public void dispose() { + if (inputStream != null) { + try { + inputStream.close(); + } catch (IOException e) { + } + } + } + } +} diff --git a/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/resume/ResumeFileTransfer.java b/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/resume/ResumeFileTransfer.java new file mode 100644 index 000000000..df8e801a6 --- /dev/null +++ b/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/resume/ResumeFileTransfer.java @@ -0,0 +1,120 @@ +package io.rsocket.examples.transport.tcp.resume; + +import io.rsocket.AbstractRSocket; +import io.rsocket.Payload; +import io.rsocket.RSocket; +import io.rsocket.RSocketFactory; +import io.rsocket.resume.ClientResume; +import io.rsocket.resume.PeriodicResumeStrategy; +import io.rsocket.resume.ResumeStrategy; +import io.rsocket.transport.netty.client.TcpClientTransport; +import io.rsocket.transport.netty.server.CloseableChannel; +import io.rsocket.transport.netty.server.TcpServerTransport; +import io.rsocket.util.DefaultPayload; +import java.time.Duration; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public class ResumeFileTransfer { + + public static void main(String[] args) { + RequestCodec requestCodec = new RequestCodec(); + + CloseableChannel server = + RSocketFactory.receive() + .resume() + .resumeSessionDuration(Duration.ofMinutes(5)) + .acceptor((setup, rSocket) -> Mono.just(new FileServer(requestCodec))) + .transport(TcpServerTransport.create("localhost", 8000)) + .start() + .block(); + + RSocket client = + RSocketFactory.connect() + .resume() + .resumeStrategy( + () -> new VerboseResumeStrategy(new PeriodicResumeStrategy(Duration.ofSeconds(1)))) + .resumeSessionDuration(Duration.ofMinutes(5)) + .transport(TcpClientTransport.create("localhost", 8001)) + .start() + .block(); + + client + .requestStream(requestCodec.encode(new Request(16, "lorem.txt"))) + .doFinally(s -> server.dispose()) + .subscribe(Files.fileSink("rsocket-examples/out/lorem_output.txt", 256)); + + server.onClose().block(); + } + + private static class FileServer extends AbstractRSocket { + private final RequestCodec requestCodec; + + public FileServer(RequestCodec requestCodec) { + this.requestCodec = requestCodec; + } + + @Override + public Flux requestStream(Payload payload) { + Request request = requestCodec.decode(payload); + payload.release(); + String fileName = request.getFileName(); + int chunkSize = request.getChunkSize(); + + Flux ticks = Flux.interval(Duration.ofMillis(500)).onBackpressureDrop(); + + return Files.fileSource(fileName, chunkSize) + .map(DefaultPayload::create) + .zipWith(ticks, (p, tick) -> p); + } + } + + private static class VerboseResumeStrategy implements ResumeStrategy { + private final ResumeStrategy resumeStrategy; + + public VerboseResumeStrategy(ResumeStrategy resumeStrategy) { + this.resumeStrategy = resumeStrategy; + } + + @Override + public Publisher apply(ClientResume clientResume, Throwable throwable) { + return Flux.from(resumeStrategy.apply(clientResume, throwable)) + .doOnNext(v -> System.out.println("Disconnected. Trying to resume connection...")); + } + } + + private static class RequestCodec { + + public Payload encode(Request request) { + String encoded = request.getChunkSize() + ":" + request.getFileName(); + return DefaultPayload.create(encoded); + } + + public Request decode(Payload payload) { + String encoded = payload.getDataUtf8(); + String[] chunkSizeAndFileName = encoded.split(":"); + int chunkSize = Integer.parseInt(chunkSizeAndFileName[0]); + String fileName = chunkSizeAndFileName[1]; + return new Request(chunkSize, fileName); + } + } + + private static class Request { + private final int chunkSize; + private final String fileName; + + public Request(int chunkSize, String fileName) { + this.chunkSize = chunkSize; + this.fileName = fileName; + } + + public int getChunkSize() { + return chunkSize; + } + + public String getFileName() { + return fileName; + } + } +} diff --git a/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/resume/readme.md b/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/resume/readme.md new file mode 100644 index 000000000..55e761fe8 --- /dev/null +++ b/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/resume/readme.md @@ -0,0 +1,29 @@ +1. Start socat. It is used for emulation of transport disconnects + +`socat -d TCP-LISTEN:8001,fork,reuseaddr TCP:localhost:8000` + +2. start `ResumeFileTransfer.main` + +3. terminate/start socat periodically for session resumption + +`ResumeFileTransfer` output is as follows + +``` +Received file chunk: 7. Total size: 112 +Received file chunk: 8. Total size: 128 +Received file chunk: 9. Total size: 144 +Received file chunk: 10. Total size: 160 +Disconnected. Trying to resume connection... +Disconnected. Trying to resume connection... +Disconnected. Trying to resume connection... +Disconnected. Trying to resume connection... +Disconnected. Trying to resume connection... +Received file chunk: 11. Total size: 176 +Received file chunk: 12. Total size: 192 +Received file chunk: 13. Total size: 208 +Received file chunk: 14. Total size: 224 +Received file chunk: 15. Total size: 240 +Received file chunk: 16. Total size: 256 +``` + +It transfers file from `resources/lorem.txt` to `build/out/lorem_output.txt` in chunks of 16 bytes every 500 millis diff --git a/rsocket-examples/src/main/resources/lorem.txt b/rsocket-examples/src/main/resources/lorem.txt new file mode 100644 index 000000000..e035ea86d --- /dev/null +++ b/rsocket-examples/src/main/resources/lorem.txt @@ -0,0 +1,32 @@ +Alteration literature to or an sympathize mr imprudence. Of is ferrars subject as enjoyed or tedious cottage. +Procuring as in resembled by in agreeable. Next long no gave mr eyes. Admiration advantages no he celebrated so pianoforte unreserved. +Not its herself forming charmed amiable. Him why feebly expect future now. + +Situation admitting promotion at or to perceived be. Mr acuteness we as estimable enjoyment up. +An held late as felt know. Learn do allow solid to grave. Middleton suspicion age her attention. +Chiefly several bed its wishing. Is so moments on chamber pressed to. Doubtful yet way properly answered humanity its desirous. + Minuter believe service arrived civilly add all. Acuteness allowance an at eagerness favourite in extensive exquisite ye. + + Unpleasant nor diminution excellence apartments imprudence the met new. Draw part them he an to he roof only. + Music leave say doors him. Tore bred form if sigh case as do. Staying he no looking if do opinion. + Sentiments way understood end partiality and his. + + Ladyship it daughter securing procured or am moreover mr. Put sir she exercise vicinity cheerful wondered. + Continual say suspicion provision you neglected sir curiosity unwilling. Simplicity end themselves increasing led day sympathize yet. + General windows effects not are drawing man garrets. Common indeed garden you his ladies out yet. Preference imprudence contrasted to remarkably in on. + Taken now you him trees tears any. Her object giving end sister except oppose. + + No comfort do written conduct at prevent manners on. Celebrated contrasted discretion him sympathize her collecting occasional. + Do answered bachelor occasion in of offended no concerns. Supply worthy warmth branch of no ye. Voice tried known to as my to. + Though wished merits or be. Alone visit use these smart rooms ham. No waiting in on enjoyed placing it inquiry. + + So insisted received is occasion advanced honoured. Among ready to which up. Attacks smiling and may out assured moments man nothing outward. + Thrown any behind afford either the set depend one temper. Instrument melancholy in acceptance collecting frequently be if. + Zealously now pronounce existence add you instantly say offending. Merry their far had widen was. Concerns no in expenses raillery formerly. + + As am hastily invited settled at limited civilly fortune me. Really spring in extent an by. Judge but built gay party world. + Of so am he remember although required. Bachelor unpacked be advanced at. Confined in declared marianne is vicinity. + + In alteration insipidity impression by travelling reasonable up motionless. Of regard warmth by unable sudden garden ladies. + No kept hung am size spot no. Likewise led and dissuade rejoiced welcomed husbands boy. Do listening on he suspected resembled. + Water would still if to. Position boy required law moderate was may. \ No newline at end of file