Skip to content

Commit

Permalink
fixing test failure
Browse files Browse the repository at this point in the history
  • Loading branch information
lolocohen committed Apr 10, 2019
1 parent 9ec2183 commit ffb2f33
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ public void handleProviderResponse(final AsyncResourceRequest request) throws Ex
public void sendResponse(final JPPFResourceWrapper response) throws Exception {
if (local) {
setLocalResponse(response);
AsyncNodeClassMessageWriter.handleResponseSent(this, response);
} else {
final ClassLoaderNioMessage message = serializeResource(response);
offerMessageToSend(message);
Expand All @@ -203,11 +204,23 @@ public int getNbPendingResponses() {
*
*/
void close() {
lockResponse.lock();
try {
pendingResponses.clear();
} finally {
lockResponse.unlock();
if (closed.compareAndSet(false, true)) {
lockResponse.lock();
try {
pendingResponses.clear();
currentNodeRequests.clear();
} finally {
lockResponse.unlock();
}
sendQueue.clear();
if (local) {
localLock.lock();
try {
responseSent.signalAll();
} finally {
localLock.unlock();
}
}
}
}

Expand All @@ -226,7 +239,7 @@ public void setLocalRequest(final JPPFResourceWrapper localRequest) throws Excep
public JPPFResourceWrapper awaitLocalResponse() throws Exception {
localLock.lock();
try {
while (localResponse == null) responseSent.await();
while ((localResponse == null) && !closed.get()) responseSent.await();
return localResponse;
} finally {
localLock.unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@
import java.util.concurrent.atomic.AtomicLong;

import org.jppf.classloader.*;
import org.jppf.nio.ClassLoaderNioMessage;
import org.jppf.server.JPPFDriver;
import org.jppf.server.nio.classloader.*;
import org.jppf.server.nio.classloader.client.*;
import org.jppf.server.nio.nodeserver.BaseNodeContext;
import org.jppf.utils.TraversalList;
import org.jppf.utils.*;
import org.jppf.utils.configuration.JPPFProperties;
import org.jppf.utils.stats.JPPFStatisticsHelper;
import org.slf4j.*;
Expand Down Expand Up @@ -96,12 +95,7 @@ void handshakeRequest(final AsyncNodeClassContext context, final JPPFResourceWra
resource.setState(context.isPeer() ? JPPFResourceWrapper.State.NODE_INITIATION : JPPFResourceWrapper.State.NODE_RESPONSE);
resource.setProviderUuid(driver.getUuid());
if (debugEnabled) log.debug("sending handshake response {} to {}, providerUuid={}", resource, context, resource.getProviderUuid());
if (context.isLocal()) {
context.setLocalResponse(resource);
} else {
final ClassLoaderNioMessage message = context.serializeResource(resource);
context.offerMessageToSend(message);
}
context.sendResponse(resource);
}

/**
Expand Down Expand Up @@ -129,6 +123,7 @@ void closeChannelRequest(final AsyncNodeClassContext context, final JPPFResource
void nodeRequest(final AsyncNodeClassContext context, final JPPFResourceWrapper resource) throws Exception {
if (debugEnabled) log.debug("read resource request {} from node: {}", resource, context);
resource.setRequestStartTime(System.nanoTime());
if (context.isLocal()) Thread.sleep(10L);
final long id = resourceSequence.incrementAndGet();
final String uuid = driver.getUuid();
resource.setResourceId(uuid, id);
Expand All @@ -142,12 +137,8 @@ void nodeRequest(final AsyncNodeClassContext context, final JPPFResourceWrapper
allDefinitionsFound &= b;
}
if (allDefinitionsFound) {
if (context.isLocal()) {
context.setLocalResponse(resource);
} else {
context.removeNodeRequest(resource);
context.sendResponse(resource);
}
if (!context.isLocal()) context.removeNodeRequest(resource);
context.sendResponse(resource);
}
if (debugEnabled) log.debug("pending responses {} for node: {}", context.getNbPendingResponses(), context);
}
Expand Down Expand Up @@ -255,7 +246,7 @@ private AsyncClientClassContext findProviderConnection(final String uuid) throws
}

/**
* Called when a repsonse has been sent to a node.
* Called when a response has been sent to a node.
* @param context represents the node channel.
* @param resource the response that was sent.
* @throws Exception if any error occurs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,20 @@ protected void postWrite(final AsyncNodeClassContext context, final NioMessage d
final ClassLoaderNioMessage msg = (ClassLoaderNioMessage) data;
final JPPFResourceWrapper resource = msg.getResource();
if (debugEnabled) log.debug("fully sent message {} for resource = {} from context {}", data, resource, context);
handleResponseSent(context, resource);
}

/**
*
* @param context the node channel.
* @param resource the repsosnse that was sent.
* @throws Exception if any error occurs.
*/
static void handleResponseSent(final AsyncNodeClassContext context, final JPPFResourceWrapper resource) throws Exception {
if ((resource != null) && (resource.getState() == JPPFResourceWrapper.State.NODE_RESPONSE)) {
NioHelper.getGlobalexecutor().execute(() -> {
try {
((AsyncNodeClassNioServer) server).getMessageHandler().responseSent(context, resource);
context.getServer().getMessageHandler().responseSent(context, resource);
} catch (final Exception e) {
context.handleException(e);
}
Expand Down

0 comments on commit ffb2f33

Please sign in to comment.