Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ServerSentEventDecoder optimization #30

Closed
NiteshKant opened this issue Jan 29, 2014 · 1 comment
Closed

ServerSentEventDecoder optimization #30

NiteshKant opened this issue Jan 29, 2014 · 1 comment
Milestone

Comments

@NiteshKant
Copy link
Member

The ServerSentEventDecoder can be optimized as follows:

It currently does a data copy as it converts data to a string. Instead you can read only the attribute name & the rest can be a view of the underlying ByteBuf. This can help in cases where people want to handle raw ByteBuf. You can have a utility method on Message to get data/value as String.
It will be more optimal not to add the created message to the out list as it gets accumulated by the ByteToMessageDecoder. Instead you can just fire a channel read event which will send the created message upstream instantly.

NiteshKant pushed a commit to NiteshKant/RxNetty that referenced this issue Nov 3, 2014
ReactiveX#205 (Move SSE related classes to http): This is a new implementation of SSE for some optimizations and spec compliance.
ReactiveX#209 (Deprecated all SSE classes in text pkg): SSE is only applicable to HTTP.
ReactiveX#220 (SSE Output confusion): Default case only emits data event.
ReactiveX#222 (Improved SSE API): Better construction semantics.
ReactiveX#30 (Optimized SSE decoder): Rewrite of the existing decoder.
@NiteshKant
Copy link
Member Author

(Copying comments from #267 for reference here)

PR #266 fixes this issue as a new implementation. Existing implementation is preserved and is deprecated (Issue #209)

Comparison of object allocation between the new and old implementation is:

Old Implementation

image

New Implementation

image

The above benchmark was done for a ServerSentEvent data size of 10KB. The code for the test is below:

Server

package io.reactivex.netty.examples.http.sse;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
import io.reactivex.netty.protocol.http.server.RequestHandler;
import rx.Observable;
import rx.functions.Func1;

import java.util.Arrays;
import java.util.concurrent.TimeUnit;

public final class TestSSEServerStart {

    private static final ByteBuf data;
    static {
        final byte[] dataBytes = new byte[10 * 1024];
        Arrays.fill(dataBytes, (byte) 'c');
        data = Unpooled.buffer().writeBytes(dataBytes).retain();
    }

    public static final byte[] DATA_PREFIX = "data: ".getBytes();
    public static final byte[] EOL = "\n\n".getBytes();

    public static void main(String[] args) {
        RxNetty.createHttpServer(8091, new RequestHandler<ByteBuf, ByteBuf>() {
            @Override
            public Observable<Void> handle(HttpServerRequest<ByteBuf> request,
                                           final HttpServerResponse<ByteBuf> response) {
                return Observable.interval(1, TimeUnit.SECONDS)
                                 .flatMap(new Func1<Long, Observable<Void>>() {
                                     @Override
                                     public Observable<Void> call(Long interval) {
                                         for (int i = 0; i < 5000; i++) {
                                             response.writeBytes(DATA_PREFIX);
                                             response.writeBytes(data.retain());
                                             response.writeBytes(EOL);
                                         }
                                         return response.flush();
                                     }
                                 });
            }
        }).startAndWait();
    }
}

Client (Old)

package io.reactivex.netty.examples.http.sse;

import io.netty.buffer.ByteBuf;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.pipeline.PipelineConfigurators;
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
import io.reactivex.netty.protocol.http.client.HttpClientResponse;
import io.reactivex.netty.protocol.text.sse.ServerSentEvent;
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;

import java.util.concurrent.atomic.AtomicLong;

public final class TestSSEDecoderMemoryOld {

    public static void main(String[] args) {
        testOldSSEDecoder(8091);
    }

    private static void testOldSSEDecoder(int serverPort) {
        System.out.println("Testing old SSE decoder. Server port: " + serverPort);
        final AtomicLong counter = new AtomicLong();
        RxNetty.<ByteBuf, ServerSentEvent>newHttpClientBuilder("localhost", serverPort)
               .pipelineConfigurator(PipelineConfigurators.<ByteBuf>sseClientConfigurator())
               .build()
               .submit(HttpClientRequest.createGet("/"))
               .flatMap(new Func1<HttpClientResponse<ServerSentEvent>, Observable<ServerSentEvent>>() {
                   @Override
                   public Observable<ServerSentEvent> call(HttpClientResponse<ServerSentEvent> clientResponse) {
                       return clientResponse.getContent()
                                            .doOnNext(new Action1<ServerSentEvent>() {
                                                @Override
                                                public void call(ServerSentEvent event) {
                                                    if (counter.incrementAndGet() % 1000 == 0) {
                                                        System.out.println("Received events count: " + counter.get());
                                                    }
                                                }
                                            });
                   }
               }).toBlocking().last();
    }
}

Client (New)

package io.reactivex.netty.examples.http.sse;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.pipeline.PipelineConfigurators;
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
import io.reactivex.netty.protocol.http.client.HttpClientResponse;
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;

import java.util.concurrent.atomic.AtomicLong;

public final class TestSSEDecoderMemoryNew {

    public static void main(String[] args) {
        testNewSSEDecoder(8091);
    }

    private static void testNewSSEDecoder(final int serverPort) {
        System.out.println("Testing new SSE decoder. Server port: " + serverPort);
        final AtomicLong counter = new AtomicLong();
        RxNetty.<ByteBuf, io.reactivex.netty.protocol.http.sse.ServerSentEvent>newHttpClientBuilder("localhost",
                                                                                                    serverPort)
               .pipelineConfigurator(PipelineConfigurators.<ByteBuf>clientSseConfigurator())
               .build()
               .submit(HttpClientRequest.createGet("/"))
               .flatMap(
                       new Func1<HttpClientResponse<io.reactivex.netty.protocol.http.sse.ServerSentEvent>, Observable<io.reactivex.netty.protocol.http.sse.ServerSentEvent>>() {
                           @Override
                           public Observable<io.reactivex.netty.protocol.http.sse.ServerSentEvent> call(
                                   HttpClientResponse<io.reactivex.netty.protocol.http.sse.ServerSentEvent> response) {
                               return response.getContent()
                                              .doOnNext(
                                                      new Action1<io.reactivex.netty.protocol.http.sse.ServerSentEvent>() {
                                                          @Override
                                                          public void call(
                                                                  io.reactivex.netty.protocol.http.sse.ServerSentEvent serverSentEvent) {
                                                              if (counter.incrementAndGet() % 1000 == 0) {
                                                                  System.out.println(
                                                                          "Received events count: " + counter.get());
                                                              }
                                                          }
                                                      });
                           }
                       })
               .toBlocking().last();
    }
}

@NiteshKant NiteshKant modified the milestone: 0.3.17 Nov 3, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant