Skip to content

Commit

Permalink
[Remote State] Create service to publish cluster state to remote store (
Browse files Browse the repository at this point in the history
#9160) (#9657)

* Upload all index metadata to remote store using BlobStoreRepository interface

Signed-off-by: Sooraj Sinha <soosinha@amazon.com>
(cherry picked from commit 79e5aee)
  • Loading branch information
soosinha committed Aug 31, 2023
1 parent ae42deb commit a3548ba
Show file tree
Hide file tree
Showing 10 changed files with 1,443 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add support for reading partial files to HDFS repository ([#9513](https://github.com/opensearch-project/OpenSearch/issues/9513))
- [Remote Store] Rate limiter integration for remote store uploads and downloads([#9448](https://github.com/opensearch-project/OpenSearch/pull/9448/))
- Add support for extensions to search responses using SearchExtBuilder ([#9379](https://github.com/opensearch-project/OpenSearch/pull/9379))
- [Remote State] Create service to publish cluster state to remote store ([#9160](https://github.com/opensearch-project/OpenSearch/pull/9160))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.17.1 to 2.20.0 ([#8307](https://github.com/opensearch-project/OpenSearch/pull/8307))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
import org.opensearch.gateway.DanglingIndicesState;
import org.opensearch.gateway.GatewayService;
import org.opensearch.gateway.PersistedClusterStateService;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.http.HttpTransportSettings;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
Expand Down Expand Up @@ -664,7 +665,11 @@ public void apply(Settings value, Settings current, Settings previous) {

// Related to monitoring of task cancellation
TaskCancellationMonitoringSettings.IS_ENABLED_SETTING,
TaskCancellationMonitoringSettings.DURATION_MILLIS_SETTING
TaskCancellationMonitoringSettings.DURATION_MILLIS_SETTING,

// Remote cluster state settings
RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING,
RemoteClusterStateService.REMOTE_CLUSTER_STATE_REPOSITORY_SETTING
)
)
);
Expand Down
138 changes: 129 additions & 9 deletions server/src/main/java/org/opensearch/gateway/GatewayMetaState.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,11 @@
import org.opensearch.common.util.concurrent.OpenSearchThreadPoolExecutor;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.env.NodeMetadata;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.node.Node;
import org.opensearch.plugins.MetadataUpgrader;
import org.opensearch.repositories.RepositoryMissingException;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

Expand All @@ -85,19 +88,19 @@
/**
* Loads (and maybe upgrades) cluster metadata at startup, and persistently stores cluster metadata for future restarts.
*
* When started, ensures that this version is compatible with the state stored on disk, and performs a state upgrade if necessary. Note that
* the state being loaded when constructing the instance of this class is not necessarily the state that will be used as {@link
* ClusterState#metadata()} because it might be stale or incomplete. Cluster-manager-eligible nodes must perform an election to find a complete and
* non-stale state, and cluster-manager-ineligible nodes receive the real cluster state from the elected cluster-manager after joining the cluster.
* When started, ensures that this version is compatible with the state stored on disk, and performs a state upgrade if necessary. Note that the state being
* loaded when constructing the instance of this class is not necessarily the state that will be used as {@link ClusterState#metadata()} because it might be
* stale or incomplete. Cluster-manager-eligible nodes must perform an election to find a complete and non-stale state, and cluster-manager-ineligible nodes
* receive the real cluster state from the elected cluster-manager after joining the cluster.
*
* @opensearch.internal
*/
public class GatewayMetaState implements Closeable {

/**
* Fake node ID for a voting configuration written by a cluster-manager-ineligible data node to indicate that its on-disk state is potentially
* stale (since it is written asynchronously after application, rather than before acceptance). This node ID means that if the node is
* restarted as a cluster-manager-eligible node then it does not win any elections until it has received a fresh cluster state.
* Fake node ID for a voting configuration written by a cluster-manager-ineligible data node to indicate that its on-disk state is potentially stale (since
* it is written asynchronously after application, rather than before acceptance). This node ID means that if the node is restarted as a
* cluster-manager-eligible node then it does not win any elections until it has received a fresh cluster state.
*/
public static final String STALE_STATE_CONFIG_NODE_ID = "STALE_STATE_CONFIG";

Expand Down Expand Up @@ -235,8 +238,8 @@ Metadata upgradeMetadataForNode(
}

/**
* This method calls {@link MetadataIndexUpgradeService} to makes sure that indices are compatible with the current
* version. The MetadataIndexUpgradeService might also update obsolete settings if needed.
* This method calls {@link MetadataIndexUpgradeService} to makes sure that indices are compatible with the current version. The MetadataIndexUpgradeService
* might also update obsolete settings if needed.
*
* @return input <code>metadata</code> if no upgrade is needed or an upgraded metadata
*/
Expand Down Expand Up @@ -600,4 +603,121 @@ public void close() throws IOException {
IOUtils.close(persistenceWriter.getAndSet(null));
}
}

/**
* Encapsulates the writing of metadata to a remote store using {@link RemoteClusterStateService}.
*/
public static class RemotePersistedState implements PersistedState {

private static final Logger logger = LogManager.getLogger(RemotePersistedState.class);

private ClusterState lastAcceptedState;
private ClusterMetadataManifest lastAcceptedManifest;
private final RemoteClusterStateService remoteClusterStateService;

public RemotePersistedState(final RemoteClusterStateService remoteClusterStateService) {
this.remoteClusterStateService = remoteClusterStateService;
}

@Override
public long getCurrentTerm() {
return lastAcceptedState != null ? lastAcceptedState.term() : 0L;
}

@Override
public ClusterState getLastAcceptedState() {
return lastAcceptedState;
}

@Override
public void setCurrentTerm(long currentTerm) {
// no-op
// For LucenePersistedState, setCurrentTerm is used only while handling StartJoinRequest by all follower nodes.
// But for RemotePersistedState, the state is only pushed by the active cluster. So this method is not required.
}

@Override
public void setLastAcceptedState(ClusterState clusterState) {
try {
if (lastAcceptedState == null || lastAcceptedState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
// On the initial bootstrap, repository will not be available. So we do not persist the cluster state and bail out.
logger.info("Cluster is not yet ready to publish state to remote store");
lastAcceptedState = clusterState;
return;
}
final ClusterMetadataManifest manifest;
if (shouldWriteFullClusterState(clusterState)) {
manifest = remoteClusterStateService.writeFullMetadata(clusterState);
} else {
assert verifyManifestAndClusterState(lastAcceptedManifest, lastAcceptedState) == true
: "Previous manifest and previous ClusterState are not in sync";
manifest = remoteClusterStateService.writeIncrementalMetadata(lastAcceptedState, clusterState, lastAcceptedManifest);
}
assert verifyManifestAndClusterState(manifest, clusterState) == true : "Manifest and ClusterState are not in sync";
lastAcceptedManifest = manifest;
lastAcceptedState = clusterState;
} catch (RepositoryMissingException e) {
// TODO This logic needs to be modified once PR for repo registration during bootstrap is pushed
// https://github.com/opensearch-project/OpenSearch/pull/9105/
// After the above PR is pushed, we can remove this silent failure and throw the exception instead.
logger.error("Remote repository is not yet registered");
lastAcceptedState = clusterState;
} catch (Exception e) {
handleExceptionOnWrite(e);
}
}

private boolean verifyManifestAndClusterState(ClusterMetadataManifest manifest, ClusterState clusterState) {
assert manifest != null : "ClusterMetadataManifest is null";
assert clusterState != null : "ClusterState is null";
assert clusterState.metadata().indices().size() == manifest.getIndices().size()
: "Number of indices in last accepted state and manifest are different";
manifest.getIndices().stream().forEach(md -> {
assert clusterState.metadata().indices().containsKey(md.getIndexName())
: "Last accepted state does not contain the index : " + md.getIndexName();
assert clusterState.metadata().indices().get(md.getIndexName()).getIndexUUID().equals(md.getIndexUUID())
: "Last accepted state and manifest do not have same UUID for index : " + md.getIndexName();
});
return true;
}

private boolean shouldWriteFullClusterState(ClusterState clusterState) {
if (lastAcceptedState == null
|| lastAcceptedManifest == null
|| lastAcceptedState.term() != clusterState.term()
|| lastAcceptedManifest.getOpensearchVersion() != Version.CURRENT) {
return true;
}
return false;
}

@Override
public void markLastAcceptedStateAsCommitted() {
try {
if (lastAcceptedState == null
|| lastAcceptedManifest == null
|| lastAcceptedState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
// On the initial bootstrap, repository will not be available. So we do not persist the cluster state and bail out.
logger.trace("Cluster is not yet ready to publish state to remote store");
return;
}
final ClusterMetadataManifest committedManifest = remoteClusterStateService.markLastStateAsCommitted(
lastAcceptedState,
lastAcceptedManifest
);
lastAcceptedManifest = committedManifest;
} catch (Exception e) {
handleExceptionOnWrite(e);
}
}

@Override
public void close() throws IOException {
remoteClusterStateService.close();
}

private void handleExceptionOnWrite(Exception e) {
throw ExceptionsHelper.convertToRuntime(e);
}
}
}

0 comments on commit a3548ba

Please sign in to comment.