Skip to content

Commit

Permalink
[RESTEASY-2670] Ensure immutability in some SSE implementation classes (
Browse files Browse the repository at this point in the history
#2516)

* [RESTEASY-2670] Ensure immutability in some SSE implementation classes

* [RESTEASY-2670] Remove SseEventOutputProvider from javax.ws.rs.ext.Providers.
  • Loading branch information
ronsigal committed Sep 11, 2020
1 parent 67c46dc commit 610288a
Show file tree
Hide file tree
Showing 9 changed files with 56 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response.Status.Family;
import javax.ws.rs.sse.InboundSseEvent;
import javax.ws.rs.ext.Providers;
import javax.ws.rs.sse.SseEventSource;

import org.jboss.resteasy.client.jaxrs.ResteasyWebTarget;
Expand Down Expand Up @@ -49,7 +50,7 @@ private enum State {

private final List<Runnable> onCompleteConsumers = new CopyOnWriteArrayList<>();

private boolean alwaysReconnect;
private final boolean alwaysReconnect;

private volatile ClientResponse response;

Expand All @@ -63,6 +64,9 @@ public static class SourceBuilder extends Builder

private ScheduledExecutorService executor;

//tck requires this default behavior
private boolean alwaysReconnect = true;

public SourceBuilder()
{
//NOOP
Expand All @@ -76,7 +80,7 @@ public Builder named(String name)

public SseEventSource build()
{
return new SseEventSourceImpl(target, name, reconnect, false, executor);
return new SseEventSourceImpl(target, name, reconnect, false, executor, alwaysReconnect);
}

@Override
Expand All @@ -102,6 +106,11 @@ public Builder executor(ScheduledExecutorService executor)
this.executor = executor;
return this;
}

public Builder alwaysReconnect(boolean alwaysReconnect) {
this.alwaysReconnect = alwaysReconnect;
return this;
}
}

public SseEventSourceImpl(final WebTarget target)
Expand All @@ -111,19 +120,18 @@ public SseEventSourceImpl(final WebTarget target)

public SseEventSourceImpl(final WebTarget target, final boolean open)
{
this(target, null, RECONNECT_DEFAULT, open, null);
this(target, null, RECONNECT_DEFAULT, open, null, true);
}

private SseEventSourceImpl(final WebTarget target, final String name, final long reconnectDelay, final boolean open, final ScheduledExecutorService executor)
private SseEventSourceImpl(final WebTarget target, final String name, final long reconnectDelay, final boolean open, final ScheduledExecutorService executor, final boolean alwaysReconnect)
{
if (target == null)
{
throw new IllegalArgumentException(Messages.MESSAGES.webTargetIsNotSetForEventSource());
}
this.target = target;
this.reconnectDelay = reconnectDelay;
//tck requries this
this.alwaysReconnect = true;
this.alwaysReconnect = alwaysReconnect;

if (executor == null)
{
Expand Down Expand Up @@ -266,17 +274,6 @@ private void internalClose()
onCompleteConsumers.forEach(Runnable::run);
}

@Deprecated
public void setAlwasyReconnect(boolean always)
{
setAlwaysReconnect(always);
}

public void setAlwaysReconnect(boolean always)
{
this.alwaysReconnect = always;
}

private class EventHandler implements Runnable
{

Expand Down Expand Up @@ -376,6 +373,7 @@ public void run()
return;
}

final Providers providers = (ClientConfiguration) target.getConfiguration();
while (!Thread.currentThread().isInterrupted() && state.get() == State.OPEN)
{
if (eventInput == null || eventInput.isClosed())
Expand All @@ -385,8 +383,7 @@ public void run()
}
try
{
eventInput.setProviders((ClientConfiguration) target.getConfiguration());
InboundSseEvent event = eventInput.read();
InboundSseEvent event = eventInput.read(providers);
if (event != null)
{
onEvent(event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class InboundSseEventImpl implements InboundSseEvent

private final MultivaluedMap<String, String> headers;

private Providers providers;
private final Providers providers;

static class Builder
{
Expand All @@ -56,6 +56,8 @@ static class Builder

private final StringBuilder commentBuilder;

private Providers providers;

Builder(final Annotation[] annotations, final MediaType mediaType, final MultivaluedMap<String, String> headers)
{
this.annotations = annotations;
Expand Down Expand Up @@ -111,20 +113,26 @@ public Builder write(byte[] data)
return this;
}

public Builder providers(Providers providers)
{
this.providers = providers;
return this;
}

public InboundSseEvent build()
{
//from https://html.spec.whatwg.org/multipage/server-sent-events.html#processField
//If the data buffer's last character is a U+000A LINE FEED (LF) character,
//then remove the last character from the data buffer
return new InboundSseEventImpl(name, id, commentBuilder.length() > 0 ? commentBuilder.substring(0,
commentBuilder.length() - 1) : null, reconnectDelay, dataStream.toByteArray(), annotations, mediaType,
headers);
headers, providers);
}
}

private InboundSseEventImpl(final String name, final String id, final String comment, final long reconnectDelay,
final byte[] data, final Annotation[] annotations, final MediaType mediaType,
final MultivaluedMap<String, String> headers)
final MultivaluedMap<String, String> headers, final Providers providers)
{
this.name = name;
this.id = id;
Expand All @@ -134,6 +142,7 @@ private InboundSseEventImpl(final String name, final String id, final String com
this.annotations = annotations;
this.mediaType = mediaType;
this.headers = headers;
this.providers = providers;
}

public String getName()
Expand Down Expand Up @@ -259,10 +268,4 @@ public MediaType getMediaType()
return mediaType;
}

public void setProvider(Providers providers)
{
this.providers = providers;

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,21 @@

public class SseEventInputImpl implements EventInput, Closeable
{
private Annotation[] annotations;
private final Annotation[] annotations;

private MediaType mediaType;
private final MediaType mediaType;

private boolean textLike;
private final boolean textLike;

private MultivaluedMap<String, String> httpHeaders;
private final MultivaluedMap<String, String> httpHeaders;

private InputStream inputStream;
private final InputStream inputStream;

private volatile boolean isClosed = false;

private boolean lastFieldWasData;
private final boolean escape;

private boolean escape = false;

private Providers providers;

private final String DELIMITER = new String(SseConstants.EVENT_DELIMITER, StandardCharsets.UTF_8);
private static final String DELIMITER = new String(SseConstants.EVENT_DELIMITER, StandardCharsets.UTF_8);

public SseEventInputImpl(final Annotation[] annotations, final MediaType streamType, final MediaType elementType,
final MultivaluedMap<String, String> httpHeaders, final InputStream inputStream)
Expand All @@ -66,6 +62,12 @@ public boolean isClosed()

public InboundSseEvent read() throws IOException
{
return read(null);
}

public InboundSseEvent read(Providers providers) throws IOException
{
boolean lastFieldWasData = false;
byte[] chunk = null;
try
{
Expand Down Expand Up @@ -160,7 +162,7 @@ public InboundSseEvent read() throws IOException
}
}

processField(eventBuilder, fieldName, mediaType, temSave.toByteArray());
lastFieldWasData = processField(lastFieldWasData, eventBuilder, fieldName, mediaType, temSave.toByteArray());
temSave.reset();
currentState = SseConstants.EVENT.START;
continue;
Expand All @@ -171,11 +173,8 @@ public InboundSseEvent read() throws IOException
throw new IOException(Messages.MESSAGES.readEventException(), e);
}
}
InboundSseEventImpl event = (InboundSseEventImpl) eventBuilder.build();
if (this.providers != null)
{
event.setProvider(this.providers);
}

InboundSseEventImpl event = (InboundSseEventImpl) eventBuilder.providers(providers).build();
return event;
}

Expand Down Expand Up @@ -206,7 +205,7 @@ else if (out != null)
return b;
}

private void processField(final InboundSseEventImpl.Builder inboundEventBuilder, final String name,
private static boolean processField(boolean lastFieldWasData, final InboundSseEventImpl.Builder inboundEventBuilder, final String name,
final MediaType mediaType, final byte[] value)
{
Charset charset = StandardCharsets.UTF_8;
Expand Down Expand Up @@ -246,7 +245,7 @@ else if ("retry".equals(name))
{
LogMessages.LOGGER.skipUnkownFiled(name);
}
lastFieldWasData = newLastFieldWasData;
return newLastFieldWasData;
}

public byte[] readEvent(final InputStream in) throws IOException
Expand Down Expand Up @@ -303,9 +302,4 @@ public byte[] readEvent(final InputStream in) throws IOException
}
return null;
}

public void setProviders(Providers providers)
{
this.providers = providers;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,5 @@ org.jboss.resteasy.plugins.interceptors.encoding.ClientContentEncodingAnnotation
org.jboss.resteasy.plugins.interceptors.encoding.ServerContentEncodingAnnotationFeature
org.jboss.resteasy.plugins.interceptors.encoding.MessageSanitizerContainerResponseFilter
org.jboss.resteasy.plugins.providers.sse.SseEventProvider
org.jboss.resteasy.plugins.providers.sse.SseEventOutputProvider
org.jboss.resteasy.plugins.providers.sse.SseEventSinkInterceptor

Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,7 @@ private SseEventSourceImpl getEventSource()
{
builder.executor(executorService);
}
SseEventSourceImpl sseEventSource = (SseEventSourceImpl) builder.build();
sseEventSource.setAlwaysReconnect(false);
SseEventSourceImpl sseEventSource = (SseEventSourceImpl) builder.alwaysReconnect(false).build();
return sseEventSource;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,7 @@ private SseEventSourceImpl getEventSource()
{
builder.executor(executorService);
}
SseEventSourceImpl sseEventSource = (SseEventSourceImpl) builder.build();
sseEventSource.setAlwaysReconnect(false);
SseEventSourceImpl sseEventSource = (SseEventSourceImpl) builder.alwaysReconnect(false).build();
return sseEventSource;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,7 @@ private SseEventSourceImpl getEventSource()
{
builder.executor(executorService);
}
SseEventSourceImpl sseEventSource = (SseEventSourceImpl) builder.build();
sseEventSource.setAlwaysReconnect(false);
SseEventSourceImpl sseEventSource = (SseEventSourceImpl) builder.alwaysReconnect(false).build();
return sseEventSource;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.jboss.resteasy.client.jaxrs.ResteasyClient;
import org.jboss.resteasy.client.jaxrs.ResteasyClientBuilder;
import org.jboss.resteasy.plugins.providers.sse.client.SseEventSourceImpl;
import org.jboss.resteasy.plugins.providers.sse.client.SseEventSourceImpl.SourceBuilder;
import org.jboss.resteasy.utils.PermissionUtil;
import org.jboss.resteasy.utils.PortProviderUtil;
import org.jboss.resteasy.utils.TestUtil;
Expand Down Expand Up @@ -496,8 +497,8 @@ public void testNoReconnectAfterEventSinkClose() throws Exception
List<String> results = new ArrayList<String>();
Client client = new ResteasyClientBuilder().connectionPoolSize(10).build();
WebTarget target = client.target(generateURL("/service/server-sent-events/closeAfterSent"));
SseEventSourceImpl sourceImpl = (SseEventSourceImpl)SseEventSource.target(target).build();
sourceImpl.setAlwaysReconnect(false);
SourceBuilder builder = (SourceBuilder) SseEventSource.target(target);
SseEventSource sourceImpl = builder.alwaysReconnect(false).build();
try (SseEventSource source = sourceImpl)
{
source.register(event -> results.add(event.readData()));
Expand All @@ -513,8 +514,8 @@ public void testNoReconnectAfterEventSinkClose() throws Exception
//test for [Resteasy-1863]:SseEventSourceImpl should not close Client instance
results.clear();
WebTarget target2 = client.target(generateURL("/service/server-sent-events/closeAfterSent"));
SseEventSourceImpl sourceImpl2 = (SseEventSourceImpl)SseEventSource.target(target2).build();
sourceImpl2.setAlwaysReconnect(false);
SourceBuilder builder2 = (SourceBuilder) SseEventSource.target(target2);
SseEventSource sourceImpl2 = builder2.alwaysReconnect(false).build();
try (SseEventSource source = sourceImpl2)
{
source.register(event -> results.add(event.readData()));
Expand All @@ -535,8 +536,8 @@ public void testNoContent() throws Exception
Client client = ClientBuilder.newBuilder().build();
final AtomicInteger errors = new AtomicInteger(0);
WebTarget target = client.target(generateURL("/service/server-sent-events/noContent"));
SseEventSourceImpl sourceImpl = (SseEventSourceImpl)SseEventSource.target(target).build();
sourceImpl.setAlwaysReconnect(false);
SourceBuilder builder = (SourceBuilder) SseEventSource.target(target);
SseEventSource sourceImpl = builder.alwaysReconnect(false).build();
try (SseEventSource source = sourceImpl)
{
source.register(event -> {
Expand Down

0 comments on commit 610288a

Please sign in to comment.