Skip to content

Commit

Permalink
Use follower primary term when applying operations
Browse files Browse the repository at this point in the history
The primary shard copy on the following has authority of the replication
operations that occur on the following side in cross-cluster
replication. Yet today we are using the primary term directly from the
operations on the leader side. Instead we should be replacing the
primary term on the following side with the primary term of the primary
on the following side. This commit does this by copying the translog
operations with the corrected primary term. This ensures that we use
this primary term while applying the operations on the primary, and when
replicating them across to the replica (where the replica request was
carrying the primary term of the primary shard copy on the follower).
  • Loading branch information
jasontedor committed Jun 5, 2018
1 parent 530089f commit ddae214
Show file tree
Hide file tree
Showing 4 changed files with 199 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ private Engine.IndexResult applyIndexOperation(long seqNo, long opPrimaryTerm, l
long autoGeneratedTimeStamp, boolean isRetry, Engine.Operation.Origin origin,
SourceToParse sourceToParse) throws IOException {
assert opPrimaryTerm <= this.primaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.primaryTerm + "]";
assert versionType.validateVersionForWrites(version);
assert versionType.validateVersionForWrites(version) : "version [" + version + "], version type [" + versionType + "]";
ensureWriteAllowed(origin);
Engine.Index operation;
try {
Expand Down Expand Up @@ -741,7 +741,7 @@ public Engine.DeleteResult applyDeleteOperationOnReplica(long seqNo, long versio
private Engine.DeleteResult applyDeleteOperation(long seqNo, long opPrimaryTerm, long version, String type, String id,
VersionType versionType, Engine.Operation.Origin origin) throws IOException {
assert opPrimaryTerm <= this.primaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.primaryTerm + "]";
assert versionType.validateVersionForWrites(version);
assert versionType.validateVersionForWrites(version) : "version [" + version + "], version type [" + versionType + "]";
ensureWriteAllowed(origin);
// When there is a single type, the unique identifier is only composed of the _id,
// so there is no way to differenciate foo#1 from bar#1. This is especially an issue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,8 @@
*/
package org.elasticsearch.index.shard;

import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexNotFoundException;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.index.IndexRequest;
Expand Down Expand Up @@ -57,10 +52,8 @@
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.EngineTestCase;
import org.elasticsearch.index.engine.InternalEngineFactory;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.seqno.ReplicationTracker;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.similarity.SimilarityService;
Expand Down Expand Up @@ -155,37 +148,61 @@ public Directory newDirectory() throws IOException {
}

/**
* creates a new initializing shard. The shard will have its own unique data path.
* Creates a new initializing shard. The shard will have its own unique data path.
*
* @param primary indicates whether to a primary shard (ready to recover from an empty store) or a replica
* (ready to recover from another shard)
* @param primary indicates whether to a primary shard (ready to recover from an empty store) or a replica (ready to recover from
* another shard)
*/
protected IndexShard newShard(boolean primary) throws IOException {
ShardRouting shardRouting = TestShardRouting.newShardRouting(new ShardId("index", "_na_", 0), randomAlphaOfLength(10), primary,
ShardRoutingState.INITIALIZING,
primary ? RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE);
return newShard(shardRouting);
ShardRoutingState.INITIALIZING,
primary ? RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE);
return newShard(shardRouting, Settings.EMPTY, new InternalEngineFactory());
}

/**
* creates a new initializing shard. The shard will have its own unique data path.
* Creates a new initializing shard. The shard will have its own unique data path.
*
* @param shardRouting the {@link ShardRouting} to use for this shard
* @param listeners an optional set of listeners to add to the shard
* @param primary indicates whether to a primary shard (ready to recover from an empty store) or a replica (ready to recover from
* another shard)
* @param settings the settings to use for this shard
* @param engineFactory the engine factory to use for this shard
*/
protected IndexShard newShard(boolean primary, Settings settings, EngineFactory engineFactory) throws IOException {
ShardRouting shardRouting = TestShardRouting.newShardRouting(new ShardId("index", "_na_", 0), randomAlphaOfLength(10), primary,
ShardRoutingState.INITIALIZING,
primary ? RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE);
return newShard(shardRouting, settings, engineFactory);
}

protected IndexShard newShard(ShardRouting shardRouting, final IndexingOperationListener... listeners) throws IOException {
return newShard(shardRouting, Settings.EMPTY, new InternalEngineFactory(), listeners);
}

/**
* Creates a new initializing shard. The shard will have its own unique data path.
*
* @param shardRouting the {@link ShardRouting} to use for this shard
* @param settings the settings to use for this shard
* @param engineFactory the engine factory to use for this shard
* @param listeners an optional set of listeners to add to the shard
*/
protected IndexShard newShard(
final ShardRouting shardRouting,
final Settings settings,
final EngineFactory engineFactory,
final IndexingOperationListener... listeners) throws IOException {
assert shardRouting.initializing() : shardRouting;
Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.build();
Settings indexSettings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(settings)
.build();
IndexMetaData.Builder metaData = IndexMetaData.builder(shardRouting.getIndexName())
.settings(settings)
.settings(indexSettings)
.primaryTerm(0, primaryTerm)
.putMapping("_doc", "{ \"properties\": {} }");
return newShard(shardRouting, metaData.build(), listeners);
return newShard(shardRouting, metaData.build(), engineFactory, listeners);
}

/**
Expand All @@ -200,7 +217,7 @@ protected IndexShard newShard(ShardId shardId, boolean primary, IndexingOperatio
ShardRouting shardRouting = TestShardRouting.newShardRouting(shardId, randomAlphaOfLength(5), primary,
ShardRoutingState.INITIALIZING,
primary ? RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE);
return newShard(shardRouting, listeners);
return newShard(shardRouting, Settings.EMPTY, new InternalEngineFactory(), listeners);
}

/**
Expand Down Expand Up @@ -240,9 +257,10 @@ protected IndexShard newShard(ShardId shardId, boolean primary, String nodeId, I
* @param indexMetaData indexMetaData for the shard, including any mapping
* @param listeners an optional set of listeners to add to the shard
*/
protected IndexShard newShard(ShardRouting routing, IndexMetaData indexMetaData, IndexingOperationListener... listeners)
protected IndexShard newShard(
ShardRouting routing, IndexMetaData indexMetaData, EngineFactory engineFactory, IndexingOperationListener... listeners)
throws IOException {
return newShard(routing, indexMetaData, null, new InternalEngineFactory(), () -> {}, listeners);
return newShard(routing, indexMetaData, null, engineFactory, () -> {}, listeners);
}

/**
Expand Down Expand Up @@ -342,19 +360,31 @@ protected IndexShard reinitShard(IndexShard current, ShardRouting routing, Index
}

/**
* creates a new empyu shard and starts it. The shard will be either a replica or a primary.
* Creates a new empty shard and starts it. The shard will randomly be a replica or a primary.
*/
protected IndexShard newStartedShard() throws IOException {
return newStartedShard(randomBoolean());
}

/**
* creates a new empty shard and starts it.
* Creates a new empty shard and starts it.
*
* @param primary controls whether the shard will be a primary or a replica.
*/
protected IndexShard newStartedShard(boolean primary) throws IOException {
IndexShard shard = newShard(primary);
protected IndexShard newStartedShard(final boolean primary) throws IOException {
return newStartedShard(primary, Settings.EMPTY, new InternalEngineFactory());
}

/**
* Creates a new empty shard with the specified settings and engine factory and starts it.
*
* @param primary controls whether the shard will be a primary or a replica.
* @param settings the settings to use for this shard
* @param engineFactory the engine factory to use for this shard
*/
protected IndexShard newStartedShard(
final boolean primary, final Settings settings, final EngineFactory engineFactory) throws IOException {
IndexShard shard = newShard(primary, settings, engineFactory);
if (primary) {
recoverShardFromStore(shard);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.ccr.action.bulk;

import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
Expand All @@ -14,12 +16,14 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Arrays;

public class TransportBulkShardOperationsAction
extends TransportWriteAction<BulkShardOperationsRequest, BulkShardOperationsRequest, BulkShardOperationsResponse> {
Expand Down Expand Up @@ -52,27 +56,72 @@ public TransportBulkShardOperationsAction(
@Override
protected WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResponse> shardOperationOnPrimary(
final BulkShardOperationsRequest request, final IndexShard primary) throws Exception {
final Translog.Location location = applyTranslogOperations(request, primary, Engine.Operation.Origin.PRIMARY);
return new WritePrimaryResult<>(request, new BulkShardOperationsResponse(), location, null, primary, logger);
return shardOperationOnPrimary(request.shardId(), request.getOperations(), primary, logger);
}

static WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResponse> shardOperationOnPrimary(
final ShardId shardId,
final Translog.Operation[] sourceOperations,
final IndexShard primary,
final Logger logger) throws IOException {
final Translog.Operation[] targetOperations = Arrays.stream(sourceOperations).map(operation -> {
final Translog.Operation operationWithPrimaryTerm;
switch (operation.opType()) {
case INDEX:
final Translog.Index index = (Translog.Index) operation;
operationWithPrimaryTerm = new Translog.Index(
index.type(),
index.id(),
index.seqNo(),
primary.getPrimaryTerm(),
index.version(),
index.versionType(),
index.source().toBytesRef().bytes,
index.routing(),
index.getAutoGeneratedIdTimestamp());
break;
case DELETE:
final Translog.Delete delete = (Translog.Delete) operation;
operationWithPrimaryTerm = new Translog.Delete(
delete.type(),
delete.id(),
delete.uid(),
delete.seqNo(),
primary.getPrimaryTerm(),
delete.version(),
delete.versionType());
break;
case NO_OP:
final Translog.NoOp noOp = (Translog.NoOp) operation;
operationWithPrimaryTerm = new Translog.NoOp(noOp.seqNo(), primary.getPrimaryTerm(), noOp.reason());
break;
default:
throw new IllegalStateException("unexpected operation type [" + operation.opType() + "]");
}
return operationWithPrimaryTerm;
}).toArray(Translog.Operation[]::new);
final Translog.Location location = applyTranslogOperations(targetOperations, primary, Engine.Operation.Origin.PRIMARY);
final BulkShardOperationsRequest replicaRequest = new BulkShardOperationsRequest(shardId, targetOperations);
return new WritePrimaryResult<>(replicaRequest, new BulkShardOperationsResponse(), location, null, primary, logger);
}

@Override
protected WriteReplicaResult<BulkShardOperationsRequest> shardOperationOnReplica(
final BulkShardOperationsRequest request, final IndexShard replica) throws Exception {
final Translog.Location location = applyTranslogOperations(request, replica, Engine.Operation.Origin.REPLICA);
final Translog.Location location = applyTranslogOperations(request.getOperations(), replica, Engine.Operation.Origin.REPLICA);
return new WriteReplicaResult<>(request, location, null, replica, logger);
}

private Translog.Location applyTranslogOperations(
final BulkShardOperationsRequest request, final IndexShard shard, final Engine.Operation.Origin origin) throws IOException {
private static Translog.Location applyTranslogOperations(
final Translog.Operation[] operations, final IndexShard shard, final Engine.Operation.Origin origin) throws IOException {
Translog.Location location = null;
for (final Translog.Operation operation : request.getOperations()) {
for (final Translog.Operation operation : operations) {
final Engine.Result result = shard.applyTranslogOperation(operation, origin);
assert result.getSeqNo() == operation.seqNo();
assert result.getResultType() == Engine.Result.Type.SUCCESS;
location = locationToSync(location, result.getTranslogLocation());
}
assert request.getOperations().length == 0 || location != null;
assert operations.length == 0 || location != null;
return location;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.ccr.action.bulk;

import org.apache.lucene.index.Term;
import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.xpack.ccr.CcrSettings;
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

import static org.hamcrest.Matchers.equalTo;

public class BulkShardOperationsTests extends IndexShardTestCase {

private static final byte[] SOURCE = "{}".getBytes(StandardCharsets.UTF_8);

// test that we use the primary term on the follower when applying operations from the leader
public void testPrimaryTermFromFollower() throws IOException {
final Settings settings = Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true).build();
final IndexShard followerPrimary = newStartedShard(true, settings, new FollowingEngineFactory());

// we use this primary on the operations yet we expect the applied operations to have the primary term of the follower
final long primaryTerm = randomLongBetween(1, followerPrimary.getPrimaryTerm());

final Translog.Operation[] operations = new Translog.Operation[randomIntBetween(0, 127)];
for (int i = 0; i < operations.length; i++) {
final String id = Integer.toString(i);
final long seqNo = i;
final Translog.Operation.Type type =
randomValueOtherThan(Translog.Operation.Type.CREATE, () -> randomFrom(Translog.Operation.Type.values()));
switch (type) {
case INDEX:
operations[i] = new Translog.Index("_doc", id, seqNo, primaryTerm, 0, VersionType.INTERNAL, SOURCE, null, -1);
break;
case DELETE:
operations[i] =
new Translog.Delete("_doc", id, new Term("_id", Uid.encodeId(id)), seqNo, primaryTerm, 0, VersionType.INTERNAL);
break;
case NO_OP:
operations[i] = new Translog.NoOp(seqNo, primaryTerm, "test");
break;
default:
throw new IllegalStateException("unexpected operation type [" + type + "]");
}
}

final TransportWriteAction.WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResponse> result =
TransportBulkShardOperationsAction.shardOperationOnPrimary(followerPrimary.shardId(), operations, followerPrimary, logger);

try (Translog.Snapshot snapshot = followerPrimary.newTranslogSnapshotFromMinSeqNo(0)) {
assertThat(snapshot.totalOperations(), equalTo(operations.length));
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
assertThat(operation.primaryTerm(), equalTo(followerPrimary.getPrimaryTerm()));
}
}

for (final Translog.Operation operation : result.replicaRequest().getOperations()) {
assertThat(operation.primaryTerm(), equalTo(followerPrimary.getPrimaryTerm()));
}

closeShards(followerPrimary);
}

}

0 comments on commit ddae214

Please sign in to comment.