Skip to content

Commit

Permalink
Make reloading the owned partitions in map service context thread safe
Browse files Browse the repository at this point in the history
Since the migration finalizations can be called concurrently, the
owned partitions might be reloaded concurrently. This means that the
set of owned partitions first might be set to a newer version and
then to an older version, leading to an incorrect set of owned
partitions.
This affects the query engine when it performs queries off the
partition thread as every member reports its own set of owned
partitions which is in this case incorrect. If the results from the
actual partition owner are received by the query engine later than
from the "lying" partition owner, they are discarded. This can cause
the query engine to return incorrect results until the partitions are
reloaded again on an another migration.
The fix reloads the partitions in a CAS loop ensuring that the newest
partition state will always be applied.

Also, added some type parameters and improved javadoc.

Fixes :
hazelcast#10107
hazelcast#9870
hazelcast#10776
  • Loading branch information
Matko Medenjak committed Sep 28, 2017
1 parent 93016e6 commit 7290048
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 9 deletions.
Expand Up @@ -175,12 +175,16 @@ boolean isMigrationAllowed() {
}

/**
* Finalizes a migration that has finished with {@link MigrationStatus#SUCCESS} or {@link MigrationStatus#FAILED}
* by invoking {@link FinalizeMigrationOperation} locally if this is the source or destination and removes the active
* migration. Clears the migration flag if this node is the partition owner of a backup migration.
* Otherwise, the migration flag is cleared asynchronously within {@link FinalizeMigrationOperation}
* Finalizes a migration that has finished with {@link MigrationStatus#SUCCESS}
* or {@link MigrationStatus#FAILED} by invoking {@link FinalizeMigrationOperation}
* locally if this is the source or destination. The finalization is asynchronous
* and there might be other ongoing migration finalizations.
* <p>
* This method should not be called on a node which is not the source, destination or partition owner for this migration.
* It will also cleanup the migration state by removing the active migration and
* clearing the migration flag on the partition owner.
* <p>
* This method should not be called on a node which is not the source, destination
* or partition owner for this migration.
*
* @param migrationInfo the migration to be finalized
*/
Expand Down
Expand Up @@ -39,6 +39,7 @@
* Invoked locally on the source or destination of the migration to finalize the migration.
* This will notify the {@link MigrationAwareService}s that the migration finished, updates the replica versions,
* clears the migration flag and notifies the node engine when successful.
* There might be ongoing concurrent finalization operations for different or even for the same partition.
*/
public final class FinalizeMigrationOperation extends AbstractPartitionOperation
implements PartitionAwareOperation, MigrationCycleOperation {
Expand Down
Expand Up @@ -124,6 +124,9 @@ public interface MapServiceContext extends MapServiceContextInterceptorSupport,

Collection<Integer> getOwnedPartitions();

/**
* Reloads the cached collection of partitions owned by this node.
*/
void reloadOwnedPartitions();

AtomicInteger getWriteBehindQueueItemCounter();
Expand Down
Expand Up @@ -88,6 +88,7 @@
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Semaphore;
Expand Down Expand Up @@ -437,11 +438,23 @@ public Collection<Integer> getOwnedPartitions() {
return partitions;
}

/**
* {@inheritDoc}
* <p>
* The method will set the owned partition set in a CAS loop because
* this method can be called concurrently.
*/
@Override
public void reloadOwnedPartitions() {
IPartitionService partitionService = nodeEngine.getPartitionService();
Collection<Integer> partitions = partitionService.getMemberPartitions(nodeEngine.getThisAddress());
ownedPartitions.set(Collections.unmodifiableSet(new LinkedHashSet<Integer>(partitions)));
final IPartitionService partitionService = nodeEngine.getPartitionService();
for (; ; ) {
final Collection<Integer> expected = ownedPartitions.get();
final Collection<Integer> partitions = partitionService.getMemberPartitions(nodeEngine.getThisAddress());
final Set<Integer> newSet = Collections.unmodifiableSet(new LinkedHashSet<Integer>(partitions));
if (ownedPartitions.compareAndSet(expected, newSet)) {
return;
}
}
}

@Override
Expand Down
Expand Up @@ -102,6 +102,8 @@ public interface MigrationAwareService {
* and master member receives success response from all participants.
* <p>
* Commit is not expected to fail at this point, all exceptions will be suppressed and logged.
* Implementations of this method must be thread safe as this method may be called concurrently
* for different migrations (for different and even for the same partitions).
*
* @param event migration event
*/
Expand All @@ -114,6 +116,8 @@ public interface MigrationAwareService {
* or failure(s) of any of the migration participants; either master or source or destination.
* <p>
* Rollback is not expected to fail at this point, all exceptions will be suppressed and logged.
* Implementations of this method must be thread safe as this method may be called concurrently
* for different migrations (for different and even for the same partitions).
*
* @param event migration event
*/
Expand Down
Expand Up @@ -119,7 +119,7 @@ public static class QueryRunnable implements Runnable {
// query age min-max range, min is randomized, max = min+1000
private final Random random = new Random();
private final int numberOfResults = 1000;
private IMap map;
private IMap<String, SampleTestObjects.Employee> map;

public QueryRunnable(HazelcastInstance hazelcastInstance) {
this.hazelcastInstance = hazelcastInstance;
Expand Down

0 comments on commit 7290048

Please sign in to comment.