Skip to content

Commit

Permalink
[eclipse-archived#50] Fix concurrency issue when adding message inter…
Browse files Browse the repository at this point in the history
…ceptors.
  • Loading branch information
Kai Hudalla committed Nov 24, 2016
1 parent 99d56f2 commit 832c896
Showing 1 changed file with 63 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.logging.Level;
Expand Down Expand Up @@ -166,112 +166,125 @@ public class CoapEndpoint implements Endpoint {
private boolean started;

/** The list of endpoint observers (has nothing to do with CoAP observe relations) */
private List<EndpointObserver> observers = new ArrayList<>(0);
private List<EndpointObserver> observers = new CopyOnWriteArrayList<>();

/** The list of interceptors */
private List<MessageInterceptor> interceptors = new ArrayList<>(0);
private List<MessageInterceptor> interceptors = new CopyOnWriteArrayList<>();

/** The list of Notification listener (use for CoAP observer relations) */
private List<NotificationListener> notificationListeners = new ArrayList<NotificationListener>(0);
private List<NotificationListener> notificationListeners = new CopyOnWriteArrayList<>();

private final MessageExchangeStore exchangeStore;

/**
* Instantiates a new endpoint with an ephemeral port.
* Creates a new <em>coap</em> endpoint using default configuration.
* <p>
* The endpoint will bind to all network interfaces and listen on an ephemeral port.
*/
public CoapEndpoint() {
this(0);
}

/**
* Instantiates a new endpoint with the specified port
* Creates a new <em>coap</em> endpoint using default configuration.
* <p>
* The endpoint will bind to all network interfaces.
*
* @param port the port
* @param port The port to listen on.
*/
public CoapEndpoint(final int port) {
this(new InetSocketAddress(port));
}

/**
* Instantiates a new endpoint with the specified address.
* Creates a new <em>coap</em> endpoint using default configuration.
*
* @param address the address
* @param address The IP address and port to bind to.
*/
public CoapEndpoint(final InetSocketAddress address) {
this(address, NetworkConfig.getStandard());
}

/**
* Creates a new endpoint based on given network configuration parameters.
* Creates a new <em>coap</em> endpoint for a configuration.
* <p>
* The endpoint will bind to all network interfaces and listen on an ephemeral port.
* </p>
*
* @param config the configuration to use.
* @param config The configuration values to use.
*/
public CoapEndpoint(final NetworkConfig config) {
this(new InetSocketAddress(0), config);
}

/**
* Instantiates a new endpoint with the specified port and configuration.
* Creates a new <em>coap</em> endpoint for a port and configuration.
* <p>
* The endpoint will bind to all network interfaces and listen on an ephemeral port.
*
* @param port the UDP port
* @param config the network configuration
* @param port The port to listen on.
* @param config The configuration values to use.
*/
public CoapEndpoint(final int port, final NetworkConfig config) {
this(new InetSocketAddress(port), config);
}

/**
* Instantiates a new endpoint with the specified address and configuration.
* Creates a new <em>coap</em> endpoint for a configuration.
*
* @param address the address
* @param config the network configuration
* @param address The IP address and port to bind to.
* @param config The configuration values to use.
*/
public CoapEndpoint(final InetSocketAddress address, final NetworkConfig config) {
this(createUDPConnector(address, config), config, null, null);
}

/**
* Creates a new UDP endpoint for a bind address, configuration and message exchange store.
* Creates a new <em>coap</em> endpoint for a configuration and message exchange store.
*
* @param address the address
* @param config the network configuration
* @param exchangeStore the store to use for keeping track of message exchanges.
* @param address The IP address and port to bind to.
* @param config The configuration values to use.
* @param exchangeStore The store to use for keeping track of message exchanges.
*/
public CoapEndpoint(final InetSocketAddress address, final NetworkConfig config, final MessageExchangeStore exchangeStore) {
this(createUDPConnector(address, config), config, null, exchangeStore);
}

/**
* Instantiates a new endpoint with the specified connector and
* configuration.
*
* @param connector the connector
* @param config the config
* Creates a new endpoint for a connector and configuration.
* <p>
* The endpoint will support the connector's implemented scheme and will bind to
* the IP address and port the connector is configured for.
*
* @param connector The connector to use.
* @param config The configuration values to use.
*/
public CoapEndpoint(Connector connector, NetworkConfig config) {
public CoapEndpoint(final Connector connector, final NetworkConfig config) {
this(connector, config, null, null);
}

/**
* Instantiates a new endpoint with the specified address and configuration
* and observe request store (used to persist observation).
*
* @param connector the connector
* @param config the config
* Creates a new <em>coap</em> endpoint for a configuration and observation store.
*
* @param address The IP address and port to bind to.
* @param config The configuration values to use.
* @param store The store to use for keeping track of observations initiated by this
* endpoint.
*/
public CoapEndpoint(InetSocketAddress address, NetworkConfig config, ObservationStore store) {
public CoapEndpoint(final InetSocketAddress address, final NetworkConfig config, final ObservationStore store) {
this(createUDPConnector(address, config), config, store, null);
}

/**
* Instantiates a new endpoint with the specified connector and configuration
* and observe request store (used to persist observation).
* Creates a new endpoint for a connector, configuration, message exchange and observation store.
* <p>
* The endpoint will support the connector's implemented scheme and will bind to
* the IP address and port the connector is configured for.
*
* @param connector the connector
* @param config the config
* @param connector The connector to use.
* @param config The configuration values to use.
* @param store The store to use for keeping track of observations initiated by this
* endpoint.
* @param exchangeStore The store to use for keeping track of message exchanges.
*/
public CoapEndpoint(Connector connector, NetworkConfig config, ObservationStore store, MessageExchangeStore exchangeStore) {
this.config = config;
Expand Down Expand Up @@ -318,9 +331,6 @@ private static Connector createUDPConnector(final InetSocketAddress address, fin
return c;
}

/* (non-Javadoc)
* @see org.eclipse.californium.core.network.Endpoint#start()
*/
@Override
public synchronized void start() throws IOException {
if (started) {
Expand All @@ -335,6 +345,8 @@ public synchronized void start() throws IOException {
if (this.executor == null) {
LOGGER.log(Level.CONFIG, "Endpoint [{0}] requires an executor to start, using default single-threaded daemon executor", getAddress());

// in production environments the executor should be set to a multi threaded version
// in order to utilize all cores of the processor
setExecutor(Executors.newSingleThreadScheduledExecutor(
new Utils.DaemonThreadFactory("CoapEndpoint-" + connector.getAddress() + '#'))); //$NON-NLS-1$
addObserver(new EndpointObserver() {
Expand All @@ -354,8 +366,7 @@ public void destroyed(final Endpoint endpoint) {
}

if (exchangeStore == null) {
InMemoryMessageExchangeStore inMemoryMessageExchangeStore = new InMemoryMessageExchangeStore(config);
matcher.setMessageExchangeStore(inMemoryMessageExchangeStore);
matcher.setMessageExchangeStore(new InMemoryMessageExchangeStore(config));
} else {
matcher.setMessageExchangeStore(exchangeStore);
}
Expand Down Expand Up @@ -394,9 +405,6 @@ public void run() {
});
}

/* (non-Javadoc)
* @see org.eclipse.californium.core.network.Endpoint#stop()
*/
@Override
public synchronized void stop() {
if (!started) {
Expand All @@ -413,9 +421,6 @@ public synchronized void stop() {
}
}

/* (non-Javadoc)
* @see org.eclipse.californium.core.network.Endpoint#destroy()
*/
@Override
public synchronized void destroy() {
LOGGER.log(Level.INFO, "Destroying endpoint at address {0}", getAddress());
Expand All @@ -429,97 +434,58 @@ public synchronized void destroy() {
}
}

/* (non-Javadoc)
* @see org.eclipse.californium.core.network.Endpoint#clear()
*/
@Override
public void clear() {
matcher.clear();
}

/* (non-Javadoc)
* @see org.eclipse.californium.core.network.Endpoint#isStarted()
*/
@Override
public boolean isStarted() {
public synchronized boolean isStarted() {
return started;
}

/* (non-Javadoc)
* @see org.eclipse.californium.core.network.Endpoint#setExecutor(java.util.concurrent.ScheduledExecutorService)
*/
@Override
public synchronized void setExecutor(final ScheduledExecutorService executor) {
// TODO: don't we need to stop and shut down the previous executor?
this.executor = executor;
this.coapstack.setExecutor(executor);
}

/*
* (non-Javadoc)
*
* @see
* org.eclipse.californium.core.network.Endpoint#addNotificationListener(org.eclipse.californium.core.observe.NotificationListener)
*/
@Override
public void addNotificationListener(NotificationListener lis) {
public void addNotificationListener(final NotificationListener lis) {
notificationListeners.add(lis);
}

/*
* (non-Javadoc)
*
* @see
* org.eclipse.californium.core.network.Endpoint#removeNotificationListener(org.eclipse.californium.core.observe.NotificationListener)
*/
@Override
public void removeNotificationListener(NotificationListener lis) {
public void removeNotificationListener(final NotificationListener lis) {
notificationListeners.remove(lis);
}

/* (non-Javadoc)
* @see org.eclipse.californium.core.network.Endpoint#addObserver(org.eclipse.californium.core.network.EndpointObserver)
*/
@Override
public void addObserver(final EndpointObserver observer) {
observers.add(observer);
}

/* (non-Javadoc)
* @see org.eclipse.californium.core.network.Endpoint#removeObserver(org.eclipse.californium.core.network.EndpointObserver)
*/
@Override
public void removeObserver(final EndpointObserver observer) {
observers.remove(observer);
}

/* (non-Javadoc)
* @see org.eclipse.californium.core.network.Endpoint#addInterceptor(org.eclipse.californium.core.network.MessageIntercepter)
*/
@Override
public void addInterceptor(final MessageInterceptor interceptor) {
interceptors.add(interceptor);
}

/* (non-Javadoc)
* @see org.eclipse.californium.core.network.Endpoint#removeInterceptor(org.eclipse.californium.core.network.MessageIntercepter)
*/
@Override
public void removeInterceptor(final MessageInterceptor interceptor) {
interceptors.remove(interceptor);
}

/* (non-Javadoc)
* @see org.eclipse.californium.core.network.Endpoint#getInterceptors()
*/
@Override
public List<MessageInterceptor> getInterceptors() {
return Collections.unmodifiableList(interceptors);
}

/* (non-Javadoc)
* @see org.eclipse.californium.core.network.Endpoint#sendRequest(org.eclipse.californium.core.coap.Request)
*/
@Override
public void sendRequest(final Request request) {
// always use endpoint executor
Expand All @@ -531,9 +497,6 @@ public void run() {
});
}

/* (non-Javadoc)
* @see org.eclipse.californium.core.network.Endpoint#sendResponse(org.eclipse.californium.core.network.Exchange, org.eclipse.californium.core.coap.Response)
*/
@Override
public void sendResponse(final Exchange exchange, final Response response) {
if (exchange.hasCustomExecutor()) {
Expand All @@ -550,9 +513,6 @@ public void run() {
}
}

/* (non-Javadoc)
* @see org.eclipse.californium.core.network.Endpoint#sendEmptyMessage(org.eclipse.californium.core.network.Exchange, org.eclipse.californium.core.coap.EmptyMessage)
*/
@Override
public void sendEmptyMessage(final Exchange exchange, final EmptyMessage message) {
// send empty messages right away in the same thread to ensure execution order
Expand Down Expand Up @@ -592,8 +552,11 @@ public NetworkConfig getConfig() {

private class NotificationDispatcher implements NotificationListener {
@Override
public void onNotification(Request request, Response response) {
for (NotificationListener notificationListener : new ArrayList<>(notificationListeners)) {
public void onNotification(final Request request, final Response response) {

// we can rely on the fact that the CopyOnWriteArrayList just provides a
// "snapshot" iterator over the notification listeners
for (NotificationListener notificationListener : notificationListeners) {
notificationListener.onNotification(request, response);
}
}
Expand Down Expand Up @@ -856,7 +819,7 @@ private void receiveEmptyMessage(final EmptyMessage message, final RawData raw)
if (!message.isCanceled()) {
// CoAP Ping
if (message.getType() == Type.CON || message.getType() == Type.NON) {
LOGGER.log(Level.FINER, "Responding to ping by {0}", raw.getInetSocketAddress());
LOGGER.log(Level.FINER, "responding to ping from {0}", raw.getInetSocketAddress());
reject(message);
} else {
Exchange exchange = matcher.receiveEmptyMessage(message);
Expand Down

0 comments on commit 832c896

Please sign in to comment.