Skip to content

Commit

Permalink
incremental metadata upload
Browse files Browse the repository at this point in the history
  • Loading branch information
soosinha committed Aug 11, 2023
1 parent 2e53375 commit 9806f28
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
*/
public class RemoteClusterStateService {

public static final String METADATA_NAME_FORMAT = "meta-%s.dat";
public static final String METADATA_NAME_FORMAT = "%s.dat";

public static final String METADATA_MARKER_NAME_FORMAT = "%s";

Expand Down Expand Up @@ -54,29 +54,29 @@ public RemoteClusterStateService(Supplier<RepositoriesService> repositoriesServi
this.settings = settings;
}

public void writeFullMetadata(long currentTerm, ClusterState clusterState) throws IOException {
public ClusterMetadataMarker writeFullMetadata(long currentTerm, ClusterState clusterState) throws IOException {
if (!clusterState.nodes().isLocalNodeElectedClusterManager()) {
logger.error("Local node is not elected cluster manager. Exiting");
return;
return null;
}
setRepository();
if (blobStoreRepository == null) {
logger.error("Unable to set repository");
return;
return null;
}

final Map<String, ClusterMetadataMarker.UploadedIndexMetadata> allUploadedIndexMetadata = new HashMap<>();
//todo parallel upload
// any validations before/after upload ?
for (IndexMetadata indexMetadata : clusterState.metadata().indices().values()) {
//123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX/metadata_4_1690947200
String indexMetadataKey = writeIndexMetadata(clusterState.getClusterName().value(), clusterState.getMetadata().clusterUUID(),
final String indexMetadataKey = writeIndexMetadata(clusterState.getClusterName().value(), clusterState.getMetadata().clusterUUID(),
indexMetadata, indexMetadataFileName(indexMetadata));
UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata(indexMetadata.getIndex().getName(), indexMetadata.getIndexUUID(),
final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata(indexMetadata.getIndex().getName(), indexMetadata.getIndexUUID(),
indexMetadataKey);
allUploadedIndexMetadata.put(indexMetadata.getIndex().getName(), uploadedIndexMetadata);
}
uploadMarker(clusterState, allUploadedIndexMetadata);
return uploadMarker(clusterState, allUploadedIndexMetadata);
}

private void setRepository() {
Expand All @@ -97,8 +97,38 @@ private void setRepository() {
}
}

public void writeIncrementalMetadata(long currentTerm, ClusterState previousClusterState, ClusterState clusterState) {
//todo
public ClusterMetadataMarker writeIncrementalMetadata(long currentTerm, ClusterState previousClusterState, ClusterState clusterState, ClusterMetadataMarker previousMarker) throws IOException {
assert previousClusterState.metadata().coordinationMetadata().term() == clusterState.metadata().coordinationMetadata().term();
final Map<String, Long> indexMetadataVersionByName = new HashMap<>();
for (final IndexMetadata indexMetadata : previousClusterState.metadata().indices().values()) {
indexMetadataVersionByName.putIfAbsent(indexMetadata.getIndex().getName(), indexMetadata.getVersion());
}

int numIndicesUpdated = 0;
int numIndicesUnchanged = 0;
final Map<String, ClusterMetadataMarker.UploadedIndexMetadata> allUploadedIndexMetadata = new HashMap<>(previousMarker.getIndices());
for (final IndexMetadata indexMetadata : clusterState.metadata().indices().values()) {
// Is it okay to use indexName as key ?
final Long previousVersion = indexMetadataVersionByName.get(indexMetadata.getIndex().getName());
if (previousVersion == null || indexMetadata.getVersion() != previousVersion) {
logger.trace("updating metadata for [{}], changing version from [{}] to [{}]", indexMetadata.getIndex(), previousVersion,
indexMetadata.getVersion());
numIndicesUpdated++;
String indexMetadataKey = writeIndexMetadata(clusterState.getClusterName().value(), clusterState.getMetadata().clusterUUID(),
indexMetadata, indexMetadataFileName(indexMetadata));
UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata(indexMetadata.getIndex().getName(), indexMetadata.getIndexUUID(),
indexMetadataKey);
allUploadedIndexMetadata.put(indexMetadata.getIndex().getName(), uploadedIndexMetadata);
} else {
numIndicesUnchanged++;
}
indexMetadataVersionByName.remove(indexMetadata.getIndex().getName());
}

for (String removedIndexName : indexMetadataVersionByName.keySet()) {
allUploadedIndexMetadata.remove(removedIndexName);
}
return uploadMarker(clusterState, allUploadedIndexMetadata);
}

public ClusterState getLatestClusterState(String clusterUUID) {
Expand All @@ -107,13 +137,14 @@ public ClusterState getLatestClusterState(String clusterUUID) {
}

//todo exception handling
public void uploadMarker(ClusterState clusterState, Map<String, ClusterMetadataMarker.UploadedIndexMetadata> uploadedIndexMetadata) throws IOException {
public ClusterMetadataMarker uploadMarker(ClusterState clusterState, Map<String, ClusterMetadataMarker.UploadedIndexMetadata> uploadedIndexMetadata) throws IOException {
synchronized (this) {
String markerFileName = getMarkerFileName(clusterState.term(), clusterState.version());
ClusterMetadataMarker marker = new ClusterMetadataMarker(uploadedIndexMetadata, clusterState.term(), clusterState.getVersion(),
clusterState.metadata().clusterUUID(),
clusterState.stateUUID());
writeMetadataMarker(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), marker, markerFileName);
return marker;
}
}

Expand All @@ -131,8 +162,8 @@ public void writeMetadataMarker(String clusterName, String clusterUUID, ClusterM

private static String getMarkerFileName(long term, long version) {
//123456789012_test-cluster/cluster-state/dsgYj10Nkso7/marker/2147483642_2147483637_456536447_marker
return String.join(DELIMITER, String.valueOf(Long.MAX_VALUE - term), String.valueOf(Long.MAX_VALUE - version),
String.valueOf(Long.MAX_VALUE - System.currentTimeMillis()), "marker");
return String.join(DELIMITER,"marker", String.valueOf(Long.MAX_VALUE - term), String.valueOf(Long.MAX_VALUE - version),
String.valueOf(Long.MAX_VALUE - System.currentTimeMillis()));
}


Expand Down
50 changes: 28 additions & 22 deletions server/src/main/java/org/opensearch/gateway/GatewayMetaState.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.opensearch.cluster.metadata.MetadataIndexUpgradeService;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.cluster.store.ClusterMetadataMarker;
import org.opensearch.cluster.store.RemoteClusterStateService;
import org.opensearch.common.SetOnce;
import org.opensearch.common.collect.Tuple;
Expand Down Expand Up @@ -86,20 +87,21 @@
/**
* Loads (and maybe upgrades) cluster metadata at startup, and persistently stores cluster metadata for future restarts.
*
* When started, ensures that this version is compatible with the state stored on disk, and performs a state upgrade if necessary. Note that
* the state being loaded when constructing the instance of this class is not necessarily the state that will be used as {@link
* ClusterState#metadata()} because it might be stale or incomplete. Cluster-manager-eligible nodes must perform an election to find a complete and
* non-stale state, and cluster-manager-ineligible nodes receive the real cluster state from the elected cluster-manager after joining the cluster.
* When started, ensures that this version is compatible with the state stored on disk, and performs a state upgrade if necessary. Note that the state being
* loaded when constructing the instance of this class is not necessarily the state that will be used as {@link ClusterState#metadata()} because it might be
* stale or incomplete. Cluster-manager-eligible nodes must perform an election to find a complete and non-stale state, and cluster-manager-ineligible nodes
* receive the real cluster state from the elected cluster-manager after joining the cluster.
*
* @opensearch.internal
*/
public class GatewayMetaState implements Closeable {

private static final Logger logger = LogManager.getLogger(GatewayMetaState.class);

/**
* Fake node ID for a voting configuration written by a cluster-manager-ineligible data node to indicate that its on-disk state is potentially
* stale (since it is written asynchronously after application, rather than before acceptance). This node ID means that if the node is
* restarted as a cluster-manager-eligible node then it does not win any elections until it has received a fresh cluster state.
* Fake node ID for a voting configuration written by a cluster-manager-ineligible data node to indicate that its on-disk state is potentially stale (since
* it is written asynchronously after application, rather than before acceptance). This node ID means that if the node is restarted as a
* cluster-manager-eligible node then it does not win any elections until it has received a fresh cluster state.
*/
public static final String STALE_STATE_CONFIG_NODE_ID = "STALE_STATE_CONFIG";

Expand Down Expand Up @@ -248,8 +250,8 @@ Metadata upgradeMetadataForNode(
}

/**
* This method calls {@link MetadataIndexUpgradeService} to makes sure that indices are compatible with the current
* version. The MetadataIndexUpgradeService might also update obsolete settings if needed.
* This method calls {@link MetadataIndexUpgradeService} to makes sure that indices are compatible with the current version. The MetadataIndexUpgradeService
* might also update obsolete settings if needed.
*
* @return input <code>metadata</code> if no upgrade is needed or an upgraded metadata
*/
Expand Down Expand Up @@ -622,7 +624,9 @@ public static class RemotePersistedState implements PersistedState {
//todo check diff between currentTerm and clusterState term
private long currentTerm;
private ClusterState lastAcceptedState;
private ClusterMetadataMarker lastAcceptedMarker;
private final RemoteClusterStateService remoteClusterStateService;
//todo Is this needed?
private boolean writeNextStateFully;

public RemotePersistedState(final RemoteClusterStateService remoteClusterStateService, final long currentTerm, final ClusterState lastAcceptedState) {
Expand All @@ -635,7 +639,6 @@ public RemotePersistedState(final RemoteClusterStateService remoteClusterStateSe

public RemotePersistedState(final RemoteClusterStateService remoteClusterStateService) {
this.remoteClusterStateService = remoteClusterStateService;
this.writeNextStateFully = true;
}

@Override
Expand All @@ -656,22 +659,26 @@ public void setCurrentTerm(long currentTerm) {
@Override
public void setLastAcceptedState(ClusterState clusterState) {
try {
// if (writeNextStateFully) {
remoteClusterStateService.writeFullMetadata(currentTerm, clusterState);
// writeNextStateFully = false;
// } else {
// if (clusterState.term() != lastAcceptedState.term()) {
// assert clusterState.term() > lastAcceptedState.term() : clusterState.term() + " vs " + lastAcceptedState.term();
// remoteClusterStateService.writeFullStateAndCommit(currentTerm, clusterState);
// } else {
// remoteClusterStateService.writeIncrementalStateAndCommit(currentTerm, lastAcceptedState, clusterState);
// }
// }
final ClusterMetadataMarker marker;
if (shouldWriteFullClusterState(clusterState)) {
marker = remoteClusterStateService.writeFullMetadata(currentTerm, clusterState);
} else {
marker = remoteClusterStateService.writeIncrementalMetadata(currentTerm, lastAcceptedState, clusterState, lastAcceptedMarker);
}
lastAcceptedState = clusterState;
lastAcceptedMarker = marker;
} catch (Exception e) {
handleExceptionOnWrite(e);
}
}

private boolean shouldWriteFullClusterState(ClusterState clusterState) {
if (lastAcceptedState == null || lastAcceptedMarker == null || lastAcceptedState.term() != clusterState.term()) {
return true;
}
return false;
}

@Override
public void markLastAcceptedStateAsCommitted() {
// no-op
Expand All @@ -683,7 +690,6 @@ public void close() throws IOException {
}

private void handleExceptionOnWrite(Exception e) {
writeNextStateFully = true;
throw ExceptionsHelper.convertToRuntime(e);
}
}
Expand Down

0 comments on commit 9806f28

Please sign in to comment.