Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
pveentjer committed May 22, 2015
1 parent f5df32a commit 6268552
Show file tree
Hide file tree
Showing 7 changed files with 354 additions and 234 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -325,12 +325,14 @@ public int getGenericOperationThreadCount() {
@Override
public void execute(Operation op) {
checkNotNull(op, "op can't be null");

execute(op, op.getPartitionId(), op.isUrgent());
}

@Override
public void execute(PartitionSpecificRunnable task) {
checkNotNull(task, "task can't be null");

execute(task, task.getPartitionId(), false);
}

Expand All @@ -346,10 +348,7 @@ public void runOnCallingThreadIfPossible(Operation op) {
@Override
public void execute(Packet packet) {
checkNotNull(packet, "packet can't be null");

if (!packet.isHeaderSet(Packet.HEADER_OP)) {
throw new IllegalStateException("Packet " + packet + " doesn't have Packet.HEADER_OP set");
}
checkOpPacket(packet);

if (packet.isHeaderSet(Packet.HEADER_RESPONSE)) {
// it's a response packet
Expand All @@ -362,6 +361,12 @@ public void execute(Packet packet) {
}
}

private void checkOpPacket(Packet packet) {
if (!packet.isHeaderSet(Packet.HEADER_OP)) {
throw new IllegalStateException("Packet " + packet + " doesn't have Packet.HEADER_OP set");
}
}

@Override
public void runOnCallingThread(Operation operation) {
checkNotNull(operation, "operation can't be null");
Expand All @@ -371,17 +376,27 @@ public void runOnCallingThread(Operation operation) {
+ Thread.currentThread());
}

// TODO: we need to find the correct operation handler
OperationRunner operationRunner = getCurrentThreadOperationRunner();
OperationRunner operationRunner = getOperationRunner(operation);
operationRunner.run(operation);
}

public OperationRunner getCurrentThreadOperationRunner() {
OperationRunner getOperationRunner(Operation operation) {
checkNotNull(operation, "operation can't be null");

if (operation.getPartitionId() >= 0) {
// retrieving an OperationRunner for a partition specific operation is easy; we can just use the partition id.
return partitionOperationRunners[operation.getPartitionId()];
}

Thread thread = Thread.currentThread();
if (!(thread instanceof OperationThread)) {
// if thread is not an operation thread, we return the adHocOperationRunner
return adHocOperationRunner;
}

// It is a generic operation and we are running on an operation-thread. So we can just return the operation-runner
// for that thread. There won't be any partition-conflict since generic operations are allowed to be executed by
// a partition-specific operation-runner.
OperationThread operationThread = (OperationThread) thread;
return operationThread.getCurrentOperationRunner();
}
Expand All @@ -403,7 +418,7 @@ private void execute(Object task, int partitionId, boolean priority) {
}
}

private int toPartitionThreadIndex(int partitionId) {
public int toPartitionThreadIndex(int partitionId) {
return partitionId % partitionOperationThreads.length;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,43 +9,46 @@
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;

public class GetCurrentThreadOperationRunnerTest extends AbstractClassicOperationExecutorTest {
public class GetOperationRunnerTest extends AbstractClassicOperationExecutorTest {

@Test(expected = NullPointerException.class)
public void test_whenNull() {
initExecutor();

executor.getOperationRunner(null);
}

@Test
public void test_whenCallerIsNormalThread() {
public void test_whenCallerIsNormalThread_andGenericOperation_thenReturnAdHocRunner() {
initExecutor();

OperationRunner operationRunner = executor.getCurrentThreadOperationRunner();
Operation op = new DummyOperation(-1);
OperationRunner operationRunner = executor.getOperationRunner(op);

DummyOperationRunnerFactory f = (DummyOperationRunnerFactory) handlerFactory;

assertSame(f.adhocHandler, operationRunner);
}

@Test
public void test_whenCallerIsPartitionOperationThread() {
public void test_whenPartitionSpecificOperation_thenReturnCorrectPartitionOperationRunner() {
initExecutor();

final GetCurrentThreadOperationHandlerOperation op = new GetCurrentThreadOperationHandlerOperation();
op.setPartitionId(0);
int partitionId = 0;

executor.execute(op);
Operation op = new DummyOperation(partitionId);
OperationRunner runner = executor.getOperationRunner(op);

assertTrueEventually(new AssertTask() {
@Override
public void run() throws Exception {
OperationRunner expected = executor.getPartitionOperationRunners()[0];
OperationRunner actual = op.getResponse();
assertSame(expected, actual);
}
});
assertSame(executor.getPartitionOperationRunners()[partitionId], runner);
}

@Test
public void test_whenCallerIsGenericOperationThread() {
initExecutor();

final GetCurrentThreadOperationHandlerOperation op = new GetCurrentThreadOperationHandlerOperation();
Operation nestedOp = new DummyOperation(-1);


final GetCurrentThreadOperationHandlerOperation op = new GetCurrentThreadOperationHandlerOperation(nestedOp);
op.setPartitionId(Operation.GENERIC_PARTITION_ID);

executor.execute(op);
Expand All @@ -69,15 +72,31 @@ public void run() throws Exception {

public class GetCurrentThreadOperationHandlerOperation extends AbstractOperation {
volatile OperationRunner operationRunner;
final Operation op;

public GetCurrentThreadOperationHandlerOperation(Operation op) {
this.op = op;
}

@Override
public void run() throws Exception {
operationRunner = executor.getCurrentThreadOperationRunner();
operationRunner = executor.getOperationRunner(op);
}

@Override
public OperationRunner getResponse() {
return operationRunner;
}
}

class DummyOperation extends AbstractOperation {
DummyOperation(int partitionId) {
setPartitionId(partitionId);
}

@Override
public void run() throws Exception {

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,20 @@
import com.hazelcast.spi.impl.operationexecutor.OperationRunner;
import com.hazelcast.spi.impl.operationexecutor.OperationRunnerFactory;
import com.hazelcast.test.AssertTask;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.annotation.QuickTest;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

@RunWith(HazelcastSerialClassRunner.class)
@Category(QuickTest.class)
public class OperationThreadTest extends AbstractClassicOperationExecutorTest {

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

import static org.junit.Assert.assertTrue;

public class RunOperationOnCallingThreadTest extends AbstractClassicOperationExecutorTest {
public class RunOnCallingThreadTest extends AbstractClassicOperationExecutorTest {

@Test(expected = NullPointerException.class)
public void test_whenNull() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package com.hazelcast.spi.impl.operationservice.impl;

import com.hazelcast.config.Config;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.instance.GroupProperties;
import com.hazelcast.spi.InternalCompletableFuture;
import com.hazelcast.spi.OperationService;
import com.hazelcast.test.HazelcastParallelClassRunner;
import com.hazelcast.test.annotation.QuickTest;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

@RunWith(HazelcastParallelClassRunner.class)
@Category(QuickTest.class)
public class InvocationNestedLocalTest extends InvocationNestedTest {

private final String response = "someresponse";

@Test
public void whenPartition_callsGeneric() {
HazelcastInstance hz = createHazelcastInstance();
OperationService operationService = getOperationService(hz);

InnerOperation innerOperation = new InnerOperation(response, -1);
OuterOperation outerOperation = new OuterOperation(innerOperation, 0);

InternalCompletableFuture f = operationService.invokeOnPartition(null, outerOperation, outerOperation.getPartitionId());

assertEquals(response, f.getSafely());
}

@Test
public void whenPartition_callsCorrectPartition() {
HazelcastInstance hz = createHazelcastInstance();
OperationService operationService = getOperationService(hz);

int partitionId = 0;
InnerOperation innerOperation = new InnerOperation(response, partitionId);
OuterOperation outerOperation = new OuterOperation(innerOperation, partitionId);

InternalCompletableFuture f = operationService.invokeOnPartition(null, outerOperation, outerOperation.getPartitionId());

assertEquals(response, f.getSafely());
}

@Test
public void whenPartition_callsIncorrectPartition() {
HazelcastInstance hz = createHazelcastInstance();
OperationService operationService = getOperationService(hz);

int outerPartitionId = 0;
int innerPartitionId = 1;
for (; innerPartitionId < hz.getPartitionService().getPartitions().size(); innerPartitionId++) {
if (!mappedToSameThread(operationService, outerPartitionId, innerPartitionId)) {
break;
}
}

InnerOperation innerOperation = new InnerOperation(response, innerPartitionId);
OuterOperation outerOperation = new OuterOperation(innerOperation, outerPartitionId);

InternalCompletableFuture f = operationService.invokeOnPartition(null, outerOperation, outerOperation.getPartitionId());

try {
f.getSafely();
fail();
} catch (IllegalThreadStateException e) {
}
}

@Test
public void whenPartition_callsDifferentPartition_butMappedToSameThread() throws ExecutionException, InterruptedException {
Config config = new Config();
config.setProperty(GroupProperties.PROP_PARTITION_COUNT, "2");
config.setProperty(GroupProperties.PROP_PARTITION_OPERATION_THREAD_COUNT, "1");
HazelcastInstance hz = createHazelcastInstance(config);
final OperationService operationService = getOperationService(hz);

int innerPartitionId = 0;
int outerPartitionId = 1;
InnerOperation innerOperation = new InnerOperation(response, innerPartitionId);
OuterOperation outerOperation = new OuterOperation(innerOperation, outerPartitionId);

Future f = operationService.invokeOnPartition(null, outerOperation, outerOperation.getPartitionId());
assertEquals(response, f.get());
}

@Test
public void whenGeneric_callsGeneric() {
HazelcastInstance hz = createHazelcastInstance();
OperationService operationService = getOperationService(hz);

InnerOperation innerOperation = new InnerOperation(response, -1);
OuterOperation outerOperation = new OuterOperation(innerOperation, -1);

InternalCompletableFuture f = operationService.invokeOnTarget(null, outerOperation, getAddress(hz));

assertEquals(response, f.getSafely());
}

@Test
public void whenGeneric_callsPartitionSpecific() {
HazelcastInstance hz = createHazelcastInstance();
OperationService operationService = getOperationService(hz);

int innerPartitionId = 0;
InnerOperation innerOperation = new InnerOperation(response, innerPartitionId);
OuterOperation outerOperation = new OuterOperation(innerOperation, -1);

InternalCompletableFuture f = operationService.invokeOnTarget(null, outerOperation, getAddress(hz));

assertEquals(response, f.getSafely());
}
}
Loading

0 comments on commit 6268552

Please sign in to comment.