Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
The problem was that the system didn't deal correctly with an inner call for 2 different partitions, but mapped to the same thread. The error was that the OperationRunnerImpl for the inner operation was not obtained using
the partition id of the inner operation, but by accessing the OperationThread.currentOperationRunner. So the inner operation would run on the outer OperationRunner and then you get the exception.# Also tests have been
added for local and remote calls where this behaviour is tested. 1 change is ClassicOperationScheduler and 6 changes are in the tests.
  • Loading branch information
pveentjer authored and sancar committed May 27, 2015
1 parent 096ece2 commit 9680f5b
Show file tree
Hide file tree
Showing 7 changed files with 354 additions and 234 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -325,12 +325,14 @@ public int getGenericOperationThreadCount() {
@Override
public void execute(Operation op) {
checkNotNull(op, "op can't be null");

execute(op, op.getPartitionId(), op.isUrgent());
}

@Override
public void execute(PartitionSpecificRunnable task) {
checkNotNull(task, "task can't be null");

execute(task, task.getPartitionId(), false);
}

Expand All @@ -346,10 +348,7 @@ public void runOnCallingThreadIfPossible(Operation op) {
@Override
public void execute(Packet packet) {
checkNotNull(packet, "packet can't be null");

if (!packet.isHeaderSet(Packet.HEADER_OP)) {
throw new IllegalStateException("Packet " + packet + " doesn't have Packet.HEADER_OP set");
}
checkOpPacket(packet);

if (packet.isHeaderSet(Packet.HEADER_RESPONSE)) {
// it's a response packet
Expand All @@ -362,6 +361,12 @@ public void execute(Packet packet) {
}
}

private void checkOpPacket(Packet packet) {
if (!packet.isHeaderSet(Packet.HEADER_OP)) {
throw new IllegalStateException("Packet " + packet + " doesn't have Packet.HEADER_OP set");
}
}

@Override
public void runOnCallingThread(Operation operation) {
checkNotNull(operation, "operation can't be null");
Expand All @@ -371,17 +376,27 @@ public void runOnCallingThread(Operation operation) {
+ Thread.currentThread());
}

// TODO: we need to find the correct operation handler
OperationRunner operationRunner = getCurrentThreadOperationRunner();
OperationRunner operationRunner = getOperationRunner(operation);
operationRunner.run(operation);
}

public OperationRunner getCurrentThreadOperationRunner() {
OperationRunner getOperationRunner(Operation operation) {
checkNotNull(operation, "operation can't be null");

if (operation.getPartitionId() >= 0) {
// retrieving an OperationRunner for a partition specific operation is easy; we can just use the partition id.
return partitionOperationRunners[operation.getPartitionId()];
}

Thread thread = Thread.currentThread();
if (!(thread instanceof OperationThread)) {
// if thread is not an operation thread, we return the adHocOperationRunner
return adHocOperationRunner;
}

// It is a generic operation and we are running on an operation-thread. So we can just return the operation-runner
// for that thread. There won't be any partition-conflict since generic operations are allowed to be executed by
// a partition-specific operation-runner.
OperationThread operationThread = (OperationThread) thread;
return operationThread.getCurrentOperationRunner();
}
Expand All @@ -403,7 +418,7 @@ private void execute(Object task, int partitionId, boolean priority) {
}
}

private int toPartitionThreadIndex(int partitionId) {
public int toPartitionThreadIndex(int partitionId) {
return partitionId % partitionOperationThreads.length;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,43 +9,46 @@
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;

public class GetCurrentThreadOperationRunnerTest extends AbstractClassicOperationExecutorTest {
public class GetOperationRunnerTest extends AbstractClassicOperationExecutorTest {

@Test(expected = NullPointerException.class)
public void test_whenNull() {
initExecutor();

executor.getOperationRunner(null);
}

@Test
public void test_whenCallerIsNormalThread() {
public void test_whenCallerIsNormalThread_andGenericOperation_thenReturnAdHocRunner() {
initExecutor();

OperationRunner operationRunner = executor.getCurrentThreadOperationRunner();
Operation op = new DummyOperation(-1);
OperationRunner operationRunner = executor.getOperationRunner(op);

DummyOperationRunnerFactory f = (DummyOperationRunnerFactory) handlerFactory;

assertSame(f.adhocHandler, operationRunner);
}

@Test
public void test_whenCallerIsPartitionOperationThread() {
public void test_whenPartitionSpecificOperation_thenReturnCorrectPartitionOperationRunner() {
initExecutor();

final GetCurrentThreadOperationHandlerOperation op = new GetCurrentThreadOperationHandlerOperation();
op.setPartitionId(0);
int partitionId = 0;

executor.execute(op);
Operation op = new DummyOperation(partitionId);
OperationRunner runner = executor.getOperationRunner(op);

assertTrueEventually(new AssertTask() {
@Override
public void run() throws Exception {
OperationRunner expected = executor.getPartitionOperationRunners()[0];
OperationRunner actual = op.getResponse();
assertSame(expected, actual);
}
});
assertSame(executor.getPartitionOperationRunners()[partitionId], runner);
}

@Test
public void test_whenCallerIsGenericOperationThread() {
initExecutor();

final GetCurrentThreadOperationHandlerOperation op = new GetCurrentThreadOperationHandlerOperation();
Operation nestedOp = new DummyOperation(-1);


final GetCurrentThreadOperationHandlerOperation op = new GetCurrentThreadOperationHandlerOperation(nestedOp);
op.setPartitionId(Operation.GENERIC_PARTITION_ID);

executor.execute(op);
Expand All @@ -69,15 +72,31 @@ public void run() throws Exception {

public class GetCurrentThreadOperationHandlerOperation extends AbstractOperation {
volatile OperationRunner operationRunner;
final Operation op;

public GetCurrentThreadOperationHandlerOperation(Operation op) {
this.op = op;
}

@Override
public void run() throws Exception {
operationRunner = executor.getCurrentThreadOperationRunner();
operationRunner = executor.getOperationRunner(op);
}

@Override
public OperationRunner getResponse() {
return operationRunner;
}
}

class DummyOperation extends AbstractOperation {
DummyOperation(int partitionId) {
setPartitionId(partitionId);
}

@Override
public void run() throws Exception {

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,20 @@
import com.hazelcast.spi.impl.operationexecutor.OperationRunner;
import com.hazelcast.spi.impl.operationexecutor.OperationRunnerFactory;
import com.hazelcast.test.AssertTask;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.annotation.QuickTest;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

@RunWith(HazelcastSerialClassRunner.class)
@Category(QuickTest.class)
public class OperationThreadTest extends AbstractClassicOperationExecutorTest {

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

import static org.junit.Assert.assertTrue;

public class RunOperationOnCallingThreadTest extends AbstractClassicOperationExecutorTest {
public class RunOnCallingThreadTest extends AbstractClassicOperationExecutorTest {

@Test(expected = NullPointerException.class)
public void test_whenNull() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package com.hazelcast.spi.impl.operationservice.impl;

import com.hazelcast.config.Config;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.instance.GroupProperties;
import com.hazelcast.spi.InternalCompletableFuture;
import com.hazelcast.spi.OperationService;
import com.hazelcast.test.HazelcastParallelClassRunner;
import com.hazelcast.test.annotation.QuickTest;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

@RunWith(HazelcastParallelClassRunner.class)
@Category(QuickTest.class)
public class InvocationNestedLocalTest extends InvocationNestedTest {

private final String response = "someresponse";

@Test
public void whenPartition_callsGeneric() {
HazelcastInstance hz = createHazelcastInstance();
OperationService operationService = getOperationService(hz);

InnerOperation innerOperation = new InnerOperation(response, -1);
OuterOperation outerOperation = new OuterOperation(innerOperation, 0);

InternalCompletableFuture f = operationService.invokeOnPartition(null, outerOperation, outerOperation.getPartitionId());

assertEquals(response, f.getSafely());
}

@Test
public void whenPartition_callsCorrectPartition() {
HazelcastInstance hz = createHazelcastInstance();
OperationService operationService = getOperationService(hz);

int partitionId = 0;
InnerOperation innerOperation = new InnerOperation(response, partitionId);
OuterOperation outerOperation = new OuterOperation(innerOperation, partitionId);

InternalCompletableFuture f = operationService.invokeOnPartition(null, outerOperation, outerOperation.getPartitionId());

assertEquals(response, f.getSafely());
}

@Test
public void whenPartition_callsIncorrectPartition() {
HazelcastInstance hz = createHazelcastInstance();
OperationService operationService = getOperationService(hz);

int outerPartitionId = 0;
int innerPartitionId = 1;
for (; innerPartitionId < hz.getPartitionService().getPartitions().size(); innerPartitionId++) {
if (!mappedToSameThread(operationService, outerPartitionId, innerPartitionId)) {
break;
}
}

InnerOperation innerOperation = new InnerOperation(response, innerPartitionId);
OuterOperation outerOperation = new OuterOperation(innerOperation, outerPartitionId);

InternalCompletableFuture f = operationService.invokeOnPartition(null, outerOperation, outerOperation.getPartitionId());

try {
f.getSafely();
fail();
} catch (IllegalThreadStateException e) {
}
}

@Test
public void whenPartition_callsDifferentPartition_butMappedToSameThread() throws ExecutionException, InterruptedException {
Config config = new Config();
config.setProperty(GroupProperties.PROP_PARTITION_COUNT, "2");
config.setProperty(GroupProperties.PROP_PARTITION_OPERATION_THREAD_COUNT, "1");
HazelcastInstance hz = createHazelcastInstance(config);
final OperationService operationService = getOperationService(hz);

int innerPartitionId = 0;
int outerPartitionId = 1;
InnerOperation innerOperation = new InnerOperation(response, innerPartitionId);
OuterOperation outerOperation = new OuterOperation(innerOperation, outerPartitionId);

Future f = operationService.invokeOnPartition(null, outerOperation, outerOperation.getPartitionId());
assertEquals(response, f.get());
}

@Test
public void whenGeneric_callsGeneric() {
HazelcastInstance hz = createHazelcastInstance();
OperationService operationService = getOperationService(hz);

InnerOperation innerOperation = new InnerOperation(response, -1);
OuterOperation outerOperation = new OuterOperation(innerOperation, -1);

InternalCompletableFuture f = operationService.invokeOnTarget(null, outerOperation, getAddress(hz));

assertEquals(response, f.getSafely());
}

@Test
public void whenGeneric_callsPartitionSpecific() {
HazelcastInstance hz = createHazelcastInstance();
OperationService operationService = getOperationService(hz);

int innerPartitionId = 0;
InnerOperation innerOperation = new InnerOperation(response, innerPartitionId);
OuterOperation outerOperation = new OuterOperation(innerOperation, -1);

InternalCompletableFuture f = operationService.invokeOnTarget(null, outerOperation, getAddress(hz));

assertEquals(response, f.getSafely());
}
}
Loading

0 comments on commit 9680f5b

Please sign in to comment.