Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
asoldano committed Apr 21, 2017
1 parent 74518ad commit c3765b8
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 28 deletions.
4 changes: 4 additions & 0 deletions resteasy-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
<dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>javax.ws.rs-api</artifactId>
</dependency>
</dependencies>

<profiles>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@
import org.jboss.resteasy.client.jaxrs.i18n.Messages;
import org.jboss.resteasy.client.jaxrs.internal.ClientConfiguration;
import org.jboss.resteasy.client.jaxrs.internal.ClientWebTarget;
import org.jboss.resteasy.client.jaxrs.engines.ApacheHttpClient4Engine;
import org.jboss.resteasy.spi.NotImplementedYetException;
import org.jboss.resteasy.spi.ResteasyProviderFactory;

import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLContext;
Expand All @@ -19,7 +16,7 @@
import java.net.URI;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

/**
* @author <a href="mailto:bill@burkecentral.com">Bill Burke</a>
Expand All @@ -29,11 +26,22 @@ public class ResteasyClient implements Client
{
protected volatile ClientHttpEngine httpEngine;
protected volatile ExecutorService asyncInvocationExecutor;
protected volatile ScheduledExecutorService scheduledExecutorService;
protected ClientConfiguration configuration;
protected boolean closed;
protected boolean cleanupExecutor;


ResteasyClient(ClientHttpEngine httpEngine, ExecutorService asyncInvocationExecutor, boolean cleanupExecutor,
ScheduledExecutorService scheduledExecutorService, ClientConfiguration configuration)
{
this.cleanupExecutor = cleanupExecutor;
this.httpEngine = httpEngine;
this.asyncInvocationExecutor = asyncInvocationExecutor;
this.configuration = configuration;
this.scheduledExecutorService = scheduledExecutorService;
}

ResteasyClient(ClientHttpEngine httpEngine, ExecutorService asyncInvocationExecutor, boolean cleanupExecutor, ClientConfiguration configuration)
{
this.cleanupExecutor = cleanupExecutor;
Expand All @@ -54,6 +62,11 @@ public ExecutorService asyncInvocationExecutor()
return asyncInvocationExecutor;
}

public ScheduledExecutorService getScheduledExecutor()
{
return this.scheduledExecutorService;
}

public void abortIfClosed()
{
if (isClosed()) throw new IllegalStateException(Messages.MESSAGES.clientIsClosed());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.jboss.resteasy.client.jaxrs.internal.ClientConfiguration;
import org.jboss.resteasy.client.jaxrs.internal.LocalResteasyProviderFactory;
import org.jboss.resteasy.plugins.providers.RegisterBuiltin;
import org.jboss.resteasy.spi.NotImplementedYetException;
import org.jboss.resteasy.spi.ResteasyProviderFactory;

import javax.net.ssl.HostnameVerifier;
Expand Down Expand Up @@ -95,6 +94,7 @@ public static enum HostnameVerificationPolicy
protected HostnameVerificationPolicy policy = HostnameVerificationPolicy.WILDCARD;
protected ResteasyProviderFactory providerFactory;
protected ExecutorService asyncExecutor;
protected ScheduledExecutorService scheduledExecutorService;
protected boolean cleanupExecutor;
protected SSLContext sslContext;
protected Map<String, Object> properties = new HashMap<String, Object>();
Expand Down Expand Up @@ -145,6 +145,7 @@ public ResteasyClientBuilder asyncExecutor(ExecutorService asyncExecutor)
* @param cleanupExecutor true if the Client should close the executor when it is closed
* @return
*/
@Deprecated
public ResteasyClientBuilder asyncExecutor(ExecutorService asyncExecutor, boolean cleanupExecutor)
{
this.asyncExecutor = asyncExecutor;
Expand Down Expand Up @@ -397,7 +398,7 @@ public ResteasyClient buildOld()
if (engine == null) {
engine = initDefaultEngine();
}
return new ResteasyClient(engine, executor, cleanupExecutor, config);
return new ResteasyClient(engine, executor, cleanupExecutor, scheduledExecutorService, config);

}

Expand Down Expand Up @@ -425,7 +426,7 @@ public ResteasyClient build()
if (engine == null) {
engine = HttpClientBuilder43.initDefaultEngine43(this);
}
return new ResteasyClient(engine, executor, cleanupExecutor, config);
return new ResteasyClient(engine, executor, cleanupExecutor, scheduledExecutorService, config);

}

Expand Down Expand Up @@ -688,6 +689,7 @@ public ClientBuilder executorService(ExecutorService executorService)
@Override
public ClientBuilder scheduledExecutorService(ScheduledExecutorService scheduledExecutorService)
{
throw new NotImplementedYetException();
this.scheduledExecutorService = scheduledExecutorService;
return this;
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.jboss.resteasy.plugins.providers.sse;
package org.jboss.resteasy.plugins.providers.sse.client;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
Expand All @@ -18,29 +18,31 @@
import javax.ws.rs.sse.SseEventSource;

import org.apache.http.HttpHeaders;
import org.jboss.resteasy.client.jaxrs.ResteasyWebTarget;
import org.jboss.resteasy.plugins.providers.sse.SseConstants;
import org.jboss.resteasy.plugins.providers.sse.SseEventInputImpl;
import org.jboss.resteasy.resteasy_jaxrs.i18n.Messages;


public class SseEventSourceImpl implements SseEventSource
{
public static final long RECONNECT_DEFAULT = 500;

private enum State {
READY, OPEN, CLOSED
}

private final WebTarget target;
private final long reconnectDelay;
private final boolean disableKeepAlive;
private final ScheduledExecutorService executor;
private final AtomicReference<State> state = new AtomicReference<>(State.READY);
private enum State {
PENDING, OPEN, CLOSED
}
private final AtomicReference<State> state = new AtomicReference<>(State.PENDING);
private final List<Consumer<InboundSseEvent>> onEventConsumers = new CopyOnWriteArrayList<>();
private final List<Consumer<Throwable>> onErrorConsumers = new CopyOnWriteArrayList<>();
private final List<Runnable> onCompleteConsumers = new CopyOnWriteArrayList<>();

protected static class SourceBuilder extends Builder
{
private WebTarget endpoint = null;
private WebTarget target = null;
private long reconnect = RECONNECT_DEFAULT;
private String name = null;
private boolean disableKeepAlive = false;
Expand All @@ -58,16 +60,16 @@ public Builder named(String name)

public SseEventSource build()
{
return new SseEventSourceImpl(endpoint, name, reconnect, disableKeepAlive, false);
return new SseEventSourceImpl(target, name, reconnect, disableKeepAlive, false);
}

@Override
public Builder target(WebTarget endpoint)
public Builder target(WebTarget target)
{
if (endpoint == null) {
if (target == null) {
throw new NullPointerException();
}
this.endpoint = endpoint;
this.target = target;
return this;
}

Expand All @@ -79,14 +81,14 @@ public Builder reconnectingEvery(long delay, TimeUnit unit)
}
}

public SseEventSourceImpl(final WebTarget endpoint)
public SseEventSourceImpl(final WebTarget target)
{
this(endpoint, true);
this(target, true);
}

public SseEventSourceImpl(final WebTarget endpoint, final boolean open)
public SseEventSourceImpl(final WebTarget target, final boolean open)
{
this(endpoint, null, RECONNECT_DEFAULT, false, open);
this(target, null, RECONNECT_DEFAULT, false, open);
}

private SseEventSourceImpl(final WebTarget target, String name, long reconnectDelay, final boolean disableKeepAlive,
Expand All @@ -104,7 +106,11 @@ private SseEventSourceImpl(final WebTarget target, String name, long reconnectDe
{
name = String.format("sse-event-source(%s)", target.getUri());
}
this.executor = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory());
ScheduledExecutorService scheduledExecutor = null;
if (target instanceof ResteasyWebTarget) {
scheduledExecutor = ((ResteasyWebTarget)target).getResteasyClient().getScheduledExecutor();
}
this.executor = scheduledExecutor != null ? scheduledExecutor : Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory());
if (open)
{
open();
Expand Down Expand Up @@ -136,15 +142,15 @@ public Thread newThread(Runnable r)
@Override
public void open()
{
if (!state.compareAndSet(State.READY, State.OPEN))
if (!state.compareAndSet(State.PENDING, State.OPEN))
{
throw new IllegalStateException(Messages.MESSAGES.eventSourceIsNotReadyForOpen());
}
EventHandler handler = new EventHandler(reconnectDelay, null);
executor.submit(handler);
handler.awaitConnected();
}

@Override
public boolean isOpen()
{
return state.get() == State.OPEN;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.jboss.resteasy.plugins.providers.sse.client.SseEventSourceImpl$SourceBuilder

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import org.jboss.arquillian.container.test.api.RunAsClient;
import org.jboss.arquillian.junit.Arquillian;
import org.jboss.resteasy.client.jaxrs.ResteasyClientBuilder;
import org.jboss.resteasy.plugins.providers.sse.SseEventSourceImpl;
import org.jboss.resteasy.plugins.providers.sse.client.SseEventSourceImpl;
import org.jboss.resteasy.utils.PortProviderUtil;
import org.jboss.resteasy.utils.TestUtil;
import org.jboss.shrinkwrap.api.Archive;
Expand Down

0 comments on commit c3765b8

Please sign in to comment.