diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 0ff8cff04473e..e90b98902b148 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -653,7 +653,7 @@ private Engine.IndexResult applyIndexOperation(long seqNo, long opPrimaryTerm, l long autoGeneratedTimeStamp, boolean isRetry, Engine.Operation.Origin origin, SourceToParse sourceToParse) throws IOException { assert opPrimaryTerm <= this.primaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.primaryTerm + "]"; - assert versionType.validateVersionForWrites(version); + assert versionType.validateVersionForWrites(version) : "version [" + version + "], version type [" + versionType + "]"; ensureWriteAllowed(origin); Engine.Index operation; try { @@ -741,7 +741,7 @@ public Engine.DeleteResult applyDeleteOperationOnReplica(long seqNo, long versio private Engine.DeleteResult applyDeleteOperation(long seqNo, long opPrimaryTerm, long version, String type, String id, VersionType versionType, Engine.Operation.Origin origin) throws IOException { assert opPrimaryTerm <= this.primaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.primaryTerm + "]"; - assert versionType.validateVersionForWrites(version); + assert versionType.validateVersionForWrites(version) : "version [" + version + "], version type [" + versionType + "]"; ensureWriteAllowed(origin); // When there is a single type, the unique identifier is only composed of the _id, // so there is no way to differenciate foo#1 from bar#1. This is especially an issue diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 83e8b3bb1a50a..00c63bb352e3b 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -18,13 +18,8 @@ */ package org.elasticsearch.index.shard; -import org.apache.lucene.document.Document; import org.apache.lucene.index.IndexNotFoundException; -import org.apache.lucene.index.LeafReader; -import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.store.Directory; -import org.apache.lucene.util.Bits; -import org.apache.lucene.util.BytesRef; import org.elasticsearch.Version; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.index.IndexRequest; @@ -57,10 +52,8 @@ import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.engine.InternalEngineFactory; -import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.SourceToParse; -import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.seqno.ReplicationTracker; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.similarity.SimilarityService; @@ -155,37 +148,61 @@ public Directory newDirectory() throws IOException { } /** - * creates a new initializing shard. The shard will have its own unique data path. + * Creates a new initializing shard. The shard will have its own unique data path. * - * @param primary indicates whether to a primary shard (ready to recover from an empty store) or a replica - * (ready to recover from another shard) + * @param primary indicates whether to a primary shard (ready to recover from an empty store) or a replica (ready to recover from + * another shard) */ protected IndexShard newShard(boolean primary) throws IOException { ShardRouting shardRouting = TestShardRouting.newShardRouting(new ShardId("index", "_na_", 0), randomAlphaOfLength(10), primary, - ShardRoutingState.INITIALIZING, - primary ? RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE); - return newShard(shardRouting); + ShardRoutingState.INITIALIZING, + primary ? RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE); + return newShard(shardRouting, Settings.EMPTY, new InternalEngineFactory()); } /** - * creates a new initializing shard. The shard will have its own unique data path. + * Creates a new initializing shard. The shard will have its own unique data path. * - * @param shardRouting the {@link ShardRouting} to use for this shard - * @param listeners an optional set of listeners to add to the shard + * @param primary indicates whether to a primary shard (ready to recover from an empty store) or a replica (ready to recover from + * another shard) + * @param settings the settings to use for this shard + * @param engineFactory the engine factory to use for this shard + */ + protected IndexShard newShard(boolean primary, Settings settings, EngineFactory engineFactory) throws IOException { + ShardRouting shardRouting = TestShardRouting.newShardRouting(new ShardId("index", "_na_", 0), randomAlphaOfLength(10), primary, + ShardRoutingState.INITIALIZING, + primary ? RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE); + return newShard(shardRouting, settings, engineFactory); + } + + protected IndexShard newShard(ShardRouting shardRouting, final IndexingOperationListener... listeners) throws IOException { + return newShard(shardRouting, Settings.EMPTY, new InternalEngineFactory(), listeners); + } + + /** + * Creates a new initializing shard. The shard will have its own unique data path. + * + * @param shardRouting the {@link ShardRouting} to use for this shard + * @param settings the settings to use for this shard + * @param engineFactory the engine factory to use for this shard + * @param listeners an optional set of listeners to add to the shard */ protected IndexShard newShard( final ShardRouting shardRouting, + final Settings settings, + final EngineFactory engineFactory, final IndexingOperationListener... listeners) throws IOException { assert shardRouting.initializing() : shardRouting; - Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .build(); + Settings indexSettings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(settings) + .build(); IndexMetaData.Builder metaData = IndexMetaData.builder(shardRouting.getIndexName()) - .settings(settings) + .settings(indexSettings) .primaryTerm(0, primaryTerm) .putMapping("_doc", "{ \"properties\": {} }"); - return newShard(shardRouting, metaData.build(), listeners); + return newShard(shardRouting, metaData.build(), engineFactory, listeners); } /** @@ -200,7 +217,7 @@ protected IndexShard newShard(ShardId shardId, boolean primary, IndexingOperatio ShardRouting shardRouting = TestShardRouting.newShardRouting(shardId, randomAlphaOfLength(5), primary, ShardRoutingState.INITIALIZING, primary ? RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE); - return newShard(shardRouting, listeners); + return newShard(shardRouting, Settings.EMPTY, new InternalEngineFactory(), listeners); } /** @@ -240,9 +257,10 @@ protected IndexShard newShard(ShardId shardId, boolean primary, String nodeId, I * @param indexMetaData indexMetaData for the shard, including any mapping * @param listeners an optional set of listeners to add to the shard */ - protected IndexShard newShard(ShardRouting routing, IndexMetaData indexMetaData, IndexingOperationListener... listeners) + protected IndexShard newShard( + ShardRouting routing, IndexMetaData indexMetaData, EngineFactory engineFactory, IndexingOperationListener... listeners) throws IOException { - return newShard(routing, indexMetaData, null, new InternalEngineFactory(), () -> {}, listeners); + return newShard(routing, indexMetaData, null, engineFactory, () -> {}, listeners); } /** @@ -342,19 +360,31 @@ protected IndexShard reinitShard(IndexShard current, ShardRouting routing, Index } /** - * creates a new empyu shard and starts it. The shard will be either a replica or a primary. + * Creates a new empty shard and starts it. The shard will randomly be a replica or a primary. */ protected IndexShard newStartedShard() throws IOException { return newStartedShard(randomBoolean()); } /** - * creates a new empty shard and starts it. + * Creates a new empty shard and starts it. * * @param primary controls whether the shard will be a primary or a replica. */ - protected IndexShard newStartedShard(boolean primary) throws IOException { - IndexShard shard = newShard(primary); + protected IndexShard newStartedShard(final boolean primary) throws IOException { + return newStartedShard(primary, Settings.EMPTY, new InternalEngineFactory()); + } + + /** + * Creates a new empty shard with the specified settings and engine factory and starts it. + * + * @param primary controls whether the shard will be a primary or a replica. + * @param settings the settings to use for this shard + * @param engineFactory the engine factory to use for this shard + */ + protected IndexShard newStartedShard( + final boolean primary, final Settings settings, final EngineFactory engineFactory) throws IOException { + IndexShard shard = newShard(primary, settings, engineFactory); if (primary) { recoverShardFromStore(shard); } else { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java index 8991e758621ea..b4c4e298d4be0 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java @@ -3,8 +3,10 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ + package org.elasticsearch.xpack.ccr.action.bulk; +import org.apache.logging.log4j.Logger; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.replication.TransportWriteAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; @@ -14,12 +16,14 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.Arrays; public class TransportBulkShardOperationsAction extends TransportWriteAction { @@ -52,27 +56,72 @@ public TransportBulkShardOperationsAction( @Override protected WritePrimaryResult shardOperationOnPrimary( final BulkShardOperationsRequest request, final IndexShard primary) throws Exception { - final Translog.Location location = applyTranslogOperations(request, primary, Engine.Operation.Origin.PRIMARY); - return new WritePrimaryResult<>(request, new BulkShardOperationsResponse(), location, null, primary, logger); + return shardOperationOnPrimary(request.shardId(), request.getOperations(), primary, logger); + } + + static WritePrimaryResult shardOperationOnPrimary( + final ShardId shardId, + final Translog.Operation[] sourceOperations, + final IndexShard primary, + final Logger logger) throws IOException { + final Translog.Operation[] targetOperations = Arrays.stream(sourceOperations).map(operation -> { + final Translog.Operation operationWithPrimaryTerm; + switch (operation.opType()) { + case INDEX: + final Translog.Index index = (Translog.Index) operation; + operationWithPrimaryTerm = new Translog.Index( + index.type(), + index.id(), + index.seqNo(), + primary.getPrimaryTerm(), + index.version(), + index.versionType(), + index.source().toBytesRef().bytes, + index.routing(), + index.getAutoGeneratedIdTimestamp()); + break; + case DELETE: + final Translog.Delete delete = (Translog.Delete) operation; + operationWithPrimaryTerm = new Translog.Delete( + delete.type(), + delete.id(), + delete.uid(), + delete.seqNo(), + primary.getPrimaryTerm(), + delete.version(), + delete.versionType()); + break; + case NO_OP: + final Translog.NoOp noOp = (Translog.NoOp) operation; + operationWithPrimaryTerm = new Translog.NoOp(noOp.seqNo(), primary.getPrimaryTerm(), noOp.reason()); + break; + default: + throw new IllegalStateException("unexpected operation type [" + operation.opType() + "]"); + } + return operationWithPrimaryTerm; + }).toArray(Translog.Operation[]::new); + final Translog.Location location = applyTranslogOperations(targetOperations, primary, Engine.Operation.Origin.PRIMARY); + final BulkShardOperationsRequest replicaRequest = new BulkShardOperationsRequest(shardId, targetOperations); + return new WritePrimaryResult<>(replicaRequest, new BulkShardOperationsResponse(), location, null, primary, logger); } @Override protected WriteReplicaResult shardOperationOnReplica( final BulkShardOperationsRequest request, final IndexShard replica) throws Exception { - final Translog.Location location = applyTranslogOperations(request, replica, Engine.Operation.Origin.REPLICA); + final Translog.Location location = applyTranslogOperations(request.getOperations(), replica, Engine.Operation.Origin.REPLICA); return new WriteReplicaResult<>(request, location, null, replica, logger); } - private Translog.Location applyTranslogOperations( - final BulkShardOperationsRequest request, final IndexShard shard, final Engine.Operation.Origin origin) throws IOException { + private static Translog.Location applyTranslogOperations( + final Translog.Operation[] operations, final IndexShard shard, final Engine.Operation.Origin origin) throws IOException { Translog.Location location = null; - for (final Translog.Operation operation : request.getOperations()) { + for (final Translog.Operation operation : operations) { final Engine.Result result = shard.applyTranslogOperation(operation, origin); assert result.getSeqNo() == operation.seqNo(); assert result.getResultType() == Engine.Result.Type.SUCCESS; location = locationToSync(location, result.getTranslogLocation()); } - assert request.getOperations().length == 0 || location != null; + assert operations.length == 0 || location != null; return location; } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java new file mode 100644 index 0000000000000..6155e9b2f34e6 --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java @@ -0,0 +1,82 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr.action.bulk; + +import org.apache.lucene.index.Term; +import org.elasticsearch.action.support.replication.TransportWriteAction; +import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.TestShardRouting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.mapper.Uid; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardTestCase; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.xpack.ccr.CcrSettings; +import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +import static org.hamcrest.Matchers.equalTo; + +public class BulkShardOperationsTests extends IndexShardTestCase { + + private static final byte[] SOURCE = "{}".getBytes(StandardCharsets.UTF_8); + + // test that we use the primary term on the follower when applying operations from the leader + public void testPrimaryTermFromFollower() throws IOException { + final Settings settings = Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true).build(); + final IndexShard followerPrimary = newStartedShard(true, settings, new FollowingEngineFactory()); + + // we use this primary on the operations yet we expect the applied operations to have the primary term of the follower + final long primaryTerm = randomLongBetween(1, followerPrimary.getPrimaryTerm()); + + final Translog.Operation[] operations = new Translog.Operation[randomIntBetween(0, 127)]; + for (int i = 0; i < operations.length; i++) { + final String id = Integer.toString(i); + final long seqNo = i; + final Translog.Operation.Type type = + randomValueOtherThan(Translog.Operation.Type.CREATE, () -> randomFrom(Translog.Operation.Type.values())); + switch (type) { + case INDEX: + operations[i] = new Translog.Index("_doc", id, seqNo, primaryTerm, 0, VersionType.INTERNAL, SOURCE, null, -1); + break; + case DELETE: + operations[i] = + new Translog.Delete("_doc", id, new Term("_id", Uid.encodeId(id)), seqNo, primaryTerm, 0, VersionType.INTERNAL); + break; + case NO_OP: + operations[i] = new Translog.NoOp(seqNo, primaryTerm, "test"); + break; + default: + throw new IllegalStateException("unexpected operation type [" + type + "]"); + } + } + + final TransportWriteAction.WritePrimaryResult result = + TransportBulkShardOperationsAction.shardOperationOnPrimary(followerPrimary.shardId(), operations, followerPrimary, logger); + + try (Translog.Snapshot snapshot = followerPrimary.newTranslogSnapshotFromMinSeqNo(0)) { + assertThat(snapshot.totalOperations(), equalTo(operations.length)); + Translog.Operation operation; + while ((operation = snapshot.next()) != null) { + assertThat(operation.primaryTerm(), equalTo(followerPrimary.getPrimaryTerm())); + } + } + + for (final Translog.Operation operation : result.replicaRequest().getOperations()) { + assertThat(operation.primaryTerm(), equalTo(followerPrimary.getPrimaryTerm())); + } + + closeShards(followerPrimary); + } + +}