From ddae2146c161fc67de6280cccc17bcf4aa6b2280 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 5 Jun 2018 14:49:37 -0400 Subject: [PATCH] Use follower primary term when applying operations The primary shard copy on the following has authority of the replication operations that occur on the following side in cross-cluster replication. Yet today we are using the primary term directly from the operations on the leader side. Instead we should be replacing the primary term on the following side with the primary term of the primary on the following side. This commit does this by copying the translog operations with the corrected primary term. This ensures that we use this primary term while applying the operations on the primary, and when replicating them across to the replica (where the replica request was carrying the primary term of the primary shard copy on the follower). --- .../elasticsearch/index/shard/IndexShard.java | 4 +- .../index/shard/IndexShardTestCase.java | 88 +++++++++++++------ .../TransportBulkShardOperationsAction.java | 63 +++++++++++-- .../action/bulk/BulkShardOperationsTests.java | 82 +++++++++++++++++ 4 files changed, 199 insertions(+), 38 deletions(-) create mode 100644 x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 0ff8cff04473e..e90b98902b148 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -653,7 +653,7 @@ private Engine.IndexResult applyIndexOperation(long seqNo, long opPrimaryTerm, l long autoGeneratedTimeStamp, boolean isRetry, Engine.Operation.Origin origin, SourceToParse sourceToParse) throws IOException { assert opPrimaryTerm <= this.primaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.primaryTerm + "]"; - assert versionType.validateVersionForWrites(version); + assert versionType.validateVersionForWrites(version) : "version [" + version + "], version type [" + versionType + "]"; ensureWriteAllowed(origin); Engine.Index operation; try { @@ -741,7 +741,7 @@ public Engine.DeleteResult applyDeleteOperationOnReplica(long seqNo, long versio private Engine.DeleteResult applyDeleteOperation(long seqNo, long opPrimaryTerm, long version, String type, String id, VersionType versionType, Engine.Operation.Origin origin) throws IOException { assert opPrimaryTerm <= this.primaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.primaryTerm + "]"; - assert versionType.validateVersionForWrites(version); + assert versionType.validateVersionForWrites(version) : "version [" + version + "], version type [" + versionType + "]"; ensureWriteAllowed(origin); // When there is a single type, the unique identifier is only composed of the _id, // so there is no way to differenciate foo#1 from bar#1. This is especially an issue diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 83e8b3bb1a50a..00c63bb352e3b 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -18,13 +18,8 @@ */ package org.elasticsearch.index.shard; -import org.apache.lucene.document.Document; import org.apache.lucene.index.IndexNotFoundException; -import org.apache.lucene.index.LeafReader; -import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.store.Directory; -import org.apache.lucene.util.Bits; -import org.apache.lucene.util.BytesRef; import org.elasticsearch.Version; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.index.IndexRequest; @@ -57,10 +52,8 @@ import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.engine.InternalEngineFactory; -import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.SourceToParse; -import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.seqno.ReplicationTracker; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.similarity.SimilarityService; @@ -155,37 +148,61 @@ public Directory newDirectory() throws IOException { } /** - * creates a new initializing shard. The shard will have its own unique data path. + * Creates a new initializing shard. The shard will have its own unique data path. * - * @param primary indicates whether to a primary shard (ready to recover from an empty store) or a replica - * (ready to recover from another shard) + * @param primary indicates whether to a primary shard (ready to recover from an empty store) or a replica (ready to recover from + * another shard) */ protected IndexShard newShard(boolean primary) throws IOException { ShardRouting shardRouting = TestShardRouting.newShardRouting(new ShardId("index", "_na_", 0), randomAlphaOfLength(10), primary, - ShardRoutingState.INITIALIZING, - primary ? RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE); - return newShard(shardRouting); + ShardRoutingState.INITIALIZING, + primary ? RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE); + return newShard(shardRouting, Settings.EMPTY, new InternalEngineFactory()); } /** - * creates a new initializing shard. The shard will have its own unique data path. + * Creates a new initializing shard. The shard will have its own unique data path. * - * @param shardRouting the {@link ShardRouting} to use for this shard - * @param listeners an optional set of listeners to add to the shard + * @param primary indicates whether to a primary shard (ready to recover from an empty store) or a replica (ready to recover from + * another shard) + * @param settings the settings to use for this shard + * @param engineFactory the engine factory to use for this shard + */ + protected IndexShard newShard(boolean primary, Settings settings, EngineFactory engineFactory) throws IOException { + ShardRouting shardRouting = TestShardRouting.newShardRouting(new ShardId("index", "_na_", 0), randomAlphaOfLength(10), primary, + ShardRoutingState.INITIALIZING, + primary ? RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE); + return newShard(shardRouting, settings, engineFactory); + } + + protected IndexShard newShard(ShardRouting shardRouting, final IndexingOperationListener... listeners) throws IOException { + return newShard(shardRouting, Settings.EMPTY, new InternalEngineFactory(), listeners); + } + + /** + * Creates a new initializing shard. The shard will have its own unique data path. + * + * @param shardRouting the {@link ShardRouting} to use for this shard + * @param settings the settings to use for this shard + * @param engineFactory the engine factory to use for this shard + * @param listeners an optional set of listeners to add to the shard */ protected IndexShard newShard( final ShardRouting shardRouting, + final Settings settings, + final EngineFactory engineFactory, final IndexingOperationListener... listeners) throws IOException { assert shardRouting.initializing() : shardRouting; - Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .build(); + Settings indexSettings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(settings) + .build(); IndexMetaData.Builder metaData = IndexMetaData.builder(shardRouting.getIndexName()) - .settings(settings) + .settings(indexSettings) .primaryTerm(0, primaryTerm) .putMapping("_doc", "{ \"properties\": {} }"); - return newShard(shardRouting, metaData.build(), listeners); + return newShard(shardRouting, metaData.build(), engineFactory, listeners); } /** @@ -200,7 +217,7 @@ protected IndexShard newShard(ShardId shardId, boolean primary, IndexingOperatio ShardRouting shardRouting = TestShardRouting.newShardRouting(shardId, randomAlphaOfLength(5), primary, ShardRoutingState.INITIALIZING, primary ? RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE); - return newShard(shardRouting, listeners); + return newShard(shardRouting, Settings.EMPTY, new InternalEngineFactory(), listeners); } /** @@ -240,9 +257,10 @@ protected IndexShard newShard(ShardId shardId, boolean primary, String nodeId, I * @param indexMetaData indexMetaData for the shard, including any mapping * @param listeners an optional set of listeners to add to the shard */ - protected IndexShard newShard(ShardRouting routing, IndexMetaData indexMetaData, IndexingOperationListener... listeners) + protected IndexShard newShard( + ShardRouting routing, IndexMetaData indexMetaData, EngineFactory engineFactory, IndexingOperationListener... listeners) throws IOException { - return newShard(routing, indexMetaData, null, new InternalEngineFactory(), () -> {}, listeners); + return newShard(routing, indexMetaData, null, engineFactory, () -> {}, listeners); } /** @@ -342,19 +360,31 @@ protected IndexShard reinitShard(IndexShard current, ShardRouting routing, Index } /** - * creates a new empyu shard and starts it. The shard will be either a replica or a primary. + * Creates a new empty shard and starts it. The shard will randomly be a replica or a primary. */ protected IndexShard newStartedShard() throws IOException { return newStartedShard(randomBoolean()); } /** - * creates a new empty shard and starts it. + * Creates a new empty shard and starts it. * * @param primary controls whether the shard will be a primary or a replica. */ - protected IndexShard newStartedShard(boolean primary) throws IOException { - IndexShard shard = newShard(primary); + protected IndexShard newStartedShard(final boolean primary) throws IOException { + return newStartedShard(primary, Settings.EMPTY, new InternalEngineFactory()); + } + + /** + * Creates a new empty shard with the specified settings and engine factory and starts it. + * + * @param primary controls whether the shard will be a primary or a replica. + * @param settings the settings to use for this shard + * @param engineFactory the engine factory to use for this shard + */ + protected IndexShard newStartedShard( + final boolean primary, final Settings settings, final EngineFactory engineFactory) throws IOException { + IndexShard shard = newShard(primary, settings, engineFactory); if (primary) { recoverShardFromStore(shard); } else { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java index 8991e758621ea..b4c4e298d4be0 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java @@ -3,8 +3,10 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ + package org.elasticsearch.xpack.ccr.action.bulk; +import org.apache.logging.log4j.Logger; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.replication.TransportWriteAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; @@ -14,12 +16,14 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.Arrays; public class TransportBulkShardOperationsAction extends TransportWriteAction { @@ -52,27 +56,72 @@ public TransportBulkShardOperationsAction( @Override protected WritePrimaryResult shardOperationOnPrimary( final BulkShardOperationsRequest request, final IndexShard primary) throws Exception { - final Translog.Location location = applyTranslogOperations(request, primary, Engine.Operation.Origin.PRIMARY); - return new WritePrimaryResult<>(request, new BulkShardOperationsResponse(), location, null, primary, logger); + return shardOperationOnPrimary(request.shardId(), request.getOperations(), primary, logger); + } + + static WritePrimaryResult shardOperationOnPrimary( + final ShardId shardId, + final Translog.Operation[] sourceOperations, + final IndexShard primary, + final Logger logger) throws IOException { + final Translog.Operation[] targetOperations = Arrays.stream(sourceOperations).map(operation -> { + final Translog.Operation operationWithPrimaryTerm; + switch (operation.opType()) { + case INDEX: + final Translog.Index index = (Translog.Index) operation; + operationWithPrimaryTerm = new Translog.Index( + index.type(), + index.id(), + index.seqNo(), + primary.getPrimaryTerm(), + index.version(), + index.versionType(), + index.source().toBytesRef().bytes, + index.routing(), + index.getAutoGeneratedIdTimestamp()); + break; + case DELETE: + final Translog.Delete delete = (Translog.Delete) operation; + operationWithPrimaryTerm = new Translog.Delete( + delete.type(), + delete.id(), + delete.uid(), + delete.seqNo(), + primary.getPrimaryTerm(), + delete.version(), + delete.versionType()); + break; + case NO_OP: + final Translog.NoOp noOp = (Translog.NoOp) operation; + operationWithPrimaryTerm = new Translog.NoOp(noOp.seqNo(), primary.getPrimaryTerm(), noOp.reason()); + break; + default: + throw new IllegalStateException("unexpected operation type [" + operation.opType() + "]"); + } + return operationWithPrimaryTerm; + }).toArray(Translog.Operation[]::new); + final Translog.Location location = applyTranslogOperations(targetOperations, primary, Engine.Operation.Origin.PRIMARY); + final BulkShardOperationsRequest replicaRequest = new BulkShardOperationsRequest(shardId, targetOperations); + return new WritePrimaryResult<>(replicaRequest, new BulkShardOperationsResponse(), location, null, primary, logger); } @Override protected WriteReplicaResult shardOperationOnReplica( final BulkShardOperationsRequest request, final IndexShard replica) throws Exception { - final Translog.Location location = applyTranslogOperations(request, replica, Engine.Operation.Origin.REPLICA); + final Translog.Location location = applyTranslogOperations(request.getOperations(), replica, Engine.Operation.Origin.REPLICA); return new WriteReplicaResult<>(request, location, null, replica, logger); } - private Translog.Location applyTranslogOperations( - final BulkShardOperationsRequest request, final IndexShard shard, final Engine.Operation.Origin origin) throws IOException { + private static Translog.Location applyTranslogOperations( + final Translog.Operation[] operations, final IndexShard shard, final Engine.Operation.Origin origin) throws IOException { Translog.Location location = null; - for (final Translog.Operation operation : request.getOperations()) { + for (final Translog.Operation operation : operations) { final Engine.Result result = shard.applyTranslogOperation(operation, origin); assert result.getSeqNo() == operation.seqNo(); assert result.getResultType() == Engine.Result.Type.SUCCESS; location = locationToSync(location, result.getTranslogLocation()); } - assert request.getOperations().length == 0 || location != null; + assert operations.length == 0 || location != null; return location; } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java new file mode 100644 index 0000000000000..6155e9b2f34e6 --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java @@ -0,0 +1,82 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr.action.bulk; + +import org.apache.lucene.index.Term; +import org.elasticsearch.action.support.replication.TransportWriteAction; +import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.TestShardRouting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.mapper.Uid; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardTestCase; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.xpack.ccr.CcrSettings; +import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +import static org.hamcrest.Matchers.equalTo; + +public class BulkShardOperationsTests extends IndexShardTestCase { + + private static final byte[] SOURCE = "{}".getBytes(StandardCharsets.UTF_8); + + // test that we use the primary term on the follower when applying operations from the leader + public void testPrimaryTermFromFollower() throws IOException { + final Settings settings = Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true).build(); + final IndexShard followerPrimary = newStartedShard(true, settings, new FollowingEngineFactory()); + + // we use this primary on the operations yet we expect the applied operations to have the primary term of the follower + final long primaryTerm = randomLongBetween(1, followerPrimary.getPrimaryTerm()); + + final Translog.Operation[] operations = new Translog.Operation[randomIntBetween(0, 127)]; + for (int i = 0; i < operations.length; i++) { + final String id = Integer.toString(i); + final long seqNo = i; + final Translog.Operation.Type type = + randomValueOtherThan(Translog.Operation.Type.CREATE, () -> randomFrom(Translog.Operation.Type.values())); + switch (type) { + case INDEX: + operations[i] = new Translog.Index("_doc", id, seqNo, primaryTerm, 0, VersionType.INTERNAL, SOURCE, null, -1); + break; + case DELETE: + operations[i] = + new Translog.Delete("_doc", id, new Term("_id", Uid.encodeId(id)), seqNo, primaryTerm, 0, VersionType.INTERNAL); + break; + case NO_OP: + operations[i] = new Translog.NoOp(seqNo, primaryTerm, "test"); + break; + default: + throw new IllegalStateException("unexpected operation type [" + type + "]"); + } + } + + final TransportWriteAction.WritePrimaryResult result = + TransportBulkShardOperationsAction.shardOperationOnPrimary(followerPrimary.shardId(), operations, followerPrimary, logger); + + try (Translog.Snapshot snapshot = followerPrimary.newTranslogSnapshotFromMinSeqNo(0)) { + assertThat(snapshot.totalOperations(), equalTo(operations.length)); + Translog.Operation operation; + while ((operation = snapshot.next()) != null) { + assertThat(operation.primaryTerm(), equalTo(followerPrimary.getPrimaryTerm())); + } + } + + for (final Translog.Operation operation : result.replicaRequest().getOperations()) { + assertThat(operation.primaryTerm(), equalTo(followerPrimary.getPrimaryTerm())); + } + + closeShards(followerPrimary); + } + +}