Skip to content

Commit

Permalink
Merge branch 'main' into main-telemetry-inst-inbound
Browse files Browse the repository at this point in the history
Signed-off-by: Gagan Juneja <gagandeepjuneja@gmail.com>
  • Loading branch information
Gaganjuneja committed Oct 4, 2023
2 parents 68ce05b + e5024a8 commit 438f66a
Show file tree
Hide file tree
Showing 58 changed files with 923 additions and 146 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add capability to restrict async durability mode for remote indexes ([#10189](https://github.com/opensearch-project/OpenSearch/pull/10189))
- Add Doc Status Counter for Indexing Engine ([#4562](https://github.com/opensearch-project/OpenSearch/issues/4562))
- Add unreferenced file cleanup count to merge stats ([#10204](https://github.com/opensearch-project/OpenSearch/pull/10204))
- [Remote Store] Add support to restrict creation & deletion if system repository and mutation of immutable settings of system repository ([#9839](https://github.com/opensearch-project/OpenSearch/pull/9839))

### Dependencies
- Bump `peter-evans/create-or-update-comment` from 2 to 3 ([#9575](https://github.com/opensearch-project/OpenSearch/pull/9575))
Expand Down Expand Up @@ -125,6 +126,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Pass parent filter to inner query in nested query ([#10246](https://github.com/opensearch-project/OpenSearch/pull/10246))
- Disable concurrent segment search when terminate_after is used ([#10200](https://github.com/opensearch-project/OpenSearch/pull/10200))
- Add instrumentation in Inbound Handler. ([#100143](https://github.com/opensearch-project/OpenSearch/pull/10143))
- Enable remote segment upload backpressure by default ([#10356](https://github.com/opensearch-project/OpenSearch/pull/10356))

### Deprecated

Expand All @@ -142,4 +144,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Security

[Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.11...2.x
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.11...2.x
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.repositories.blobstore.MeteredBlobStoreRepository;

import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.function.Function;
Expand Down Expand Up @@ -192,4 +194,13 @@ protected ByteSizeValue chunkSize() {
public boolean isReadOnly() {
return readonly;
}

@Override
public List<Setting<?>> getRestrictedSystemRepositorySettings() {
List<Setting<?>> restrictedSettings = new ArrayList<>();
restrictedSettings.addAll(super.getRestrictedSystemRepositorySettings());
restrictedSettings.add(Repository.BASE_PATH_SETTING);
restrictedSettings.add(Repository.LOCATION_MODE_SETTING);
return restrictedSettings;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,20 @@

import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.blobstore.BlobStoreTestUtil;
import org.opensearch.test.OpenSearchTestCase;
import org.junit.AfterClass;

import java.util.List;

import reactor.core.scheduler.Schedulers;

import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -179,4 +183,21 @@ public void testChunkSize() {
);
}

public void testSystemRepositoryDefault() {
assertThat(azureRepository(Settings.EMPTY).isSystemRepository(), is(false));
}

public void testSystemRepositoryOn() {
assertThat(azureRepository(Settings.builder().put("system_repository", true).build()).isSystemRepository(), is(true));
}

public void testRestrictedSettingsDefault() {
List<Setting<?>> restrictedSettings = azureRepository(Settings.EMPTY).getRestrictedSystemRepositorySettings();
assertThat(restrictedSettings.size(), is(5));
assertTrue(restrictedSettings.contains(BlobStoreRepository.SYSTEM_REPOSITORY_SETTING));
assertTrue(restrictedSettings.contains(BlobStoreRepository.READONLY_SETTING));
assertTrue(restrictedSettings.contains(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY));
assertTrue(restrictedSettings.contains(AzureRepository.Repository.BASE_PATH_SETTING));
assertTrue(restrictedSettings.contains(AzureRepository.Repository.LOCATION_MODE_SETTING));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import org.opensearch.repositories.RepositoryException;
import org.opensearch.repositories.blobstore.MeteredBlobStoreRepository;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

Expand Down Expand Up @@ -138,6 +140,15 @@ protected ByteSizeValue chunkSize() {
return chunkSize;
}

@Override
public List<Setting<?>> getRestrictedSystemRepositorySettings() {
List<Setting<?>> restrictedSettings = new ArrayList<>();
restrictedSettings.addAll(super.getRestrictedSystemRepositorySettings());
restrictedSettings.add(BUCKET);
restrictedSettings.add(BASE_PATH);
return restrictedSettings;
}

/**
* Get a given setting from the repository settings, throwing a {@link RepositoryException} if the setting does not exist or is empty.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@
import org.opensearch.snapshots.SnapshotInfo;
import org.opensearch.threadpool.Scheduler;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
Expand Down Expand Up @@ -388,6 +390,15 @@ protected ByteSizeValue chunkSize() {
return chunkSize;
}

@Override
public List<Setting<?>> getRestrictedSystemRepositorySettings() {
List<Setting<?>> restrictedSettings = new ArrayList<>();
restrictedSettings.addAll(super.getRestrictedSystemRepositorySettings());
restrictedSettings.add(BUCKET_SETTING);
restrictedSettings.add(BASE_PATH_SETTING);
return restrictedSettings;
}

@Override
protected void doClose() {
final Scheduler.Cancellable cancellable = finalizationFuture.getAndSet(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,20 @@

import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.repositories.RepositoryException;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.blobstore.BlobStoreTestUtil;
import org.opensearch.test.OpenSearchTestCase;
import org.hamcrest.Matchers;

import java.nio.file.Path;
import java.util.List;
import java.util.Map;

import static org.hamcrest.Matchers.containsString;
Expand Down Expand Up @@ -133,6 +136,19 @@ public void testDefaultBufferSize() {
}
}

public void testRestrictedSettingsDefault() {
final RepositoryMetadata metadata = new RepositoryMetadata("dummy-repo", "mock", Settings.EMPTY);
try (S3Repository s3repo = createS3Repo(metadata)) {
List<Setting<?>> restrictedSettings = s3repo.getRestrictedSystemRepositorySettings();
assertThat(restrictedSettings.size(), is(5));
assertTrue(restrictedSettings.contains(BlobStoreRepository.SYSTEM_REPOSITORY_SETTING));
assertTrue(restrictedSettings.contains(BlobStoreRepository.READONLY_SETTING));
assertTrue(restrictedSettings.contains(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY));
assertTrue(restrictedSettings.contains(S3Repository.BUCKET_SETTING));
assertTrue(restrictedSettings.contains(S3Repository.BASE_PATH_SETTING));
}
}

private S3Repository createS3Repo(RepositoryMetadata metadata) {
return new S3Repository(
metadata,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,15 @@ public void postDelete(ShardId shardId, Engine.Delete delete, Engine.DeleteResul
}
}
};
final IndexShard newShard = newIndexShard(indexService, shard, wrapper, getInstanceFromNode(CircuitBreakerService.class), listener);
NodeEnvironment env = getInstanceFromNode(NodeEnvironment.class);
final IndexShard newShard = newIndexShard(
indexService,
shard,
wrapper,
getInstanceFromNode(CircuitBreakerService.class),
env.nodeId(),
listener
);
shardRef.set(newShard);
recoverShard(newShard);

Expand All @@ -674,6 +682,7 @@ public static final IndexShard newIndexShard(
final IndexShard shard,
CheckedFunction<DirectoryReader, DirectoryReader, IOException> wrapper,
final CircuitBreakerService cbs,
final String nodeId,
final IndexingOperationListener... listeners
) throws IOException {
ShardRouting initializingShardRouting = getInitializingShardRouting(shard.routingEntry());
Expand Down Expand Up @@ -702,7 +711,8 @@ public static final IndexShard newIndexShard(
SegmentReplicationCheckpointPublisher.EMPTY,
null,
null,
() -> IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL
() -> IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL,
nodeId
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

public abstract class AbstractRemoteStoreMockRepositoryIntegTestCase extends AbstractSnapshotIntegTestCase {

Expand Down Expand Up @@ -107,11 +106,11 @@ public Settings buildRemoteStoreNodeAttributes(Path repoLocation, double ioFailu
.build();
}

protected void deleteRepo() {
logger.info("--> Deleting the repository={}", REPOSITORY_NAME);
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME));
logger.info("--> Deleting the repository={}", TRANSLOG_REPOSITORY_NAME);
assertAcked(clusterAdmin().prepareDeleteRepository(TRANSLOG_REPOSITORY_NAME));
protected void cleanupRepo() {
logger.info("--> Cleanup the repository={}", REPOSITORY_NAME);
clusterAdmin().prepareCleanupRepository(REPOSITORY_NAME).execute().actionGet();
logger.info("--> Cleanup the repository={}", TRANSLOG_REPOSITORY_NAME);
clusterAdmin().prepareCleanupRepository(TRANSLOG_REPOSITORY_NAME).execute().actionGet();
}

protected String setup(Path repoLocation, double ioFailureRate, String skipExceptionBlobList, long maxFailure) {
Expand All @@ -125,6 +124,8 @@ protected String setup(Path repoLocation, double ioFailureRate, String skipExcep
settings.put(CLUSTER_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT);
}

disableRepoConsistencyCheck("Remote Store Creates System Repository");

internalCluster().startClusterManagerOnlyNode(settings.build());
String dataNodeName = internalCluster().startDataOnlyNode(settings.build());
createIndex(INDEX_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.nio.file.Path;

import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteIndexRecoveryIT extends IndexRecoveryIT {
Expand Down Expand Up @@ -57,7 +56,7 @@ public Settings indexSettings() {

@After
public void teardown() {
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME));
clusterAdmin().prepareCleanupRepository(REPOSITORY_NAME).get();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void setup() {

@After
public void teardown() {
assertAcked(clusterAdmin().prepareDeleteRepository(BASE_REMOTE_REPO));
clusterAdmin().prepareCleanupRepository(BASE_REMOTE_REPO).get();
}

@Override
Expand Down Expand Up @@ -422,7 +422,7 @@ public void testRestoreShallowCopySnapshotWithDifferentRepo() throws IOException
assertDocsPresentInIndex(client, restoredIndexName1, numDocsInIndex1 + 2);
}

public void testRestoreShallowSnapshotRepositoryOverriden() throws ExecutionException, InterruptedException {
public void testRestoreShallowSnapshotRepository() throws ExecutionException, InterruptedException {
String indexName1 = "testindex1";
String snapshotRepoName = "test-restore-snapshot-repo";
String remoteStoreRepoNameUpdated = "test-rs-repo-updated" + TEST_REMOTE_STORE_REPO_SUFFIX;
Expand Down Expand Up @@ -464,22 +464,7 @@ public void testRestoreShallowSnapshotRepositoryOverriden() throws ExecutionExce
assertThat(snapshotInfo1.successfulShards(), equalTo(snapshotInfo1.totalShards()));
assertThat(snapshotInfo1.state(), equalTo(SnapshotState.SUCCESS));

createRepository(BASE_REMOTE_REPO, "fs", absolutePath2);

RestoreSnapshotResponse restoreSnapshotResponse = client.admin()
.cluster()
.prepareRestoreSnapshot(snapshotRepoName, snapshotName1)
.setWaitForCompletion(true)
.setIndices(indexName1)
.setRenamePattern(indexName1)
.setRenameReplacement(restoredIndexName1)
.get();

assertTrue(restoreSnapshotResponse.getRestoreInfo().failedShards() > 0);

ensureRed(restoredIndexName1);

client().admin().indices().close(Requests.closeIndexRequest(restoredIndexName1)).get();
client().admin().indices().close(Requests.closeIndexRequest(indexName1)).get();
createRepository(remoteStoreRepoNameUpdated, "fs", remoteRepoPath);
RestoreSnapshotResponse restoreSnapshotResponse2 = client.admin()
.cluster()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ private void validateBackpressure(
stats = stats();
indexDocAndRefresh(initialSource, initialDocsToIndex);
assertEquals(rejectionCount, stats.rejectionCount);
deleteRepo();
cleanupRepo();
}

private RemoteSegmentTransferTracker.Stats stats() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.fs.FsRepository;
import org.opensearch.test.OpenSearchIntegTestCase;
Expand All @@ -50,7 +51,6 @@
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

public class RemoteStoreBaseIntegTestCase extends OpenSearchIntegTestCase {
protected static final String REPOSITORY_NAME = "test-remote-store-repo";
Expand Down Expand Up @@ -314,8 +314,8 @@ public void teardown() {
clusterSettingsSuppliedByTest = false;
assertRemoteStoreRepositoryOnAllNodes(REPOSITORY_NAME);
assertRemoteStoreRepositoryOnAllNodes(REPOSITORY_2_NAME);
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME));
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_2_NAME));
clusterAdmin().prepareCleanupRepository(REPOSITORY_NAME).get();
clusterAdmin().prepareCleanupRepository(REPOSITORY_2_NAME).get();
}

public RepositoryMetadata buildRepositoryMetadata(DiscoveryNode node, String name) {
Expand Down Expand Up @@ -343,11 +343,18 @@ public void assertRemoteStoreRepositoryOnAllNodes(String repositoryName) {
.custom(RepositoriesMetadata.TYPE);
RepositoryMetadata actualRepository = repositories.repository(repositoryName);

final RepositoriesService repositoriesService = internalCluster().getClusterManagerNodeInstance(RepositoriesService.class);
final BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(repositoryName);

for (String nodeName : internalCluster().getNodeNames()) {
ClusterService clusterService = internalCluster().getInstance(ClusterService.class, nodeName);
DiscoveryNode node = clusterService.localNode();
RepositoryMetadata expectedRepository = buildRepositoryMetadata(node, repositoryName);
assertTrue(actualRepository.equalsIgnoreGenerations(expectedRepository));

// Validated that all the restricted settings are entact on all the nodes.
repository.getRestrictedSystemRepositorySettings()
.stream()
.forEach(setting -> assertEquals(setting.get(actualRepository.settings()), setting.get(expectedRepository.settings())));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void testRemoteRefreshRetryOnFailure() throws Exception {
logger.info("Local files = {}, Repo files = {}", sortedFilesInLocal, sortedFilesInRepo);
assertTrue(filesInRepo.containsAll(filesInLocal));
}, 90, TimeUnit.SECONDS);
deleteRepo();
cleanupRepo();
}

public void testRemoteRefreshSegmentPressureSettingChanged() {
Expand Down

0 comments on commit 438f66a

Please sign in to comment.