Skip to content

Commit

Permalink
Refactor changes
Browse files Browse the repository at this point in the history
  • Loading branch information
soosinha committed Aug 10, 2023
1 parent dbe33d9 commit 495ec89
Show file tree
Hide file tree
Showing 10 changed files with 149 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import org.opensearch.cluster.service.ClusterApplier;
import org.opensearch.cluster.service.ClusterApplier.ClusterApplyListener;
import org.opensearch.cluster.service.ClusterManagerService;
import org.opensearch.cluster.store.RemoteClusterStateService;
import org.opensearch.common.Booleans;
import org.opensearch.common.Nullable;
import org.opensearch.common.Priority;
Expand Down Expand Up @@ -183,7 +182,6 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private JoinHelper.JoinAccumulator joinAccumulator;
private Optional<CoordinatorPublication> currentPublication = Optional.empty();
private final NodeHealthService nodeHealthService;
private final RemoteClusterStateService remoteClusterStateService;
private final Supplier<CoordinationState.PersistedState> remotePersistedStateSupplier;

/**
Expand All @@ -206,7 +204,6 @@ public Coordinator(
RerouteService rerouteService,
ElectionStrategy electionStrategy,
NodeHealthService nodeHealthService,
RemoteClusterStateService remoteClusterStateService,
Supplier<CoordinationState.PersistedState> remotePersistedStateSupplier
) {
this.settings = settings;
Expand Down Expand Up @@ -292,7 +289,6 @@ public Coordinator(
joinHelper::logLastFailedJoinAttempt
);
this.nodeHealthService = nodeHealthService;
this.remoteClusterStateService = remoteClusterStateService;
this.remotePersistedStateSupplier = remotePersistedStateSupplier;
this.localNodeCommissioned = true;
}
Expand Down Expand Up @@ -1318,11 +1314,8 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId())
lagDetector.setTrackedNodes(publishNodes);

PersistedState remotePersistedState = remotePersistedStateSupplier.get();
if (remotePersistedState == null) {
logger.error("remote persisted state is null");
} else {
remotePersistedState.setLastAcceptedState(clusterState);
}
assert remotePersistedState != null : "Remote state has not been initialized";
remotePersistedState.setLastAcceptedState(clusterState);
publication.start(followersChecker.getFaultyNodes());
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,19 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.core.ParseField;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.ConstructingObjectParser;
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;

/**
* Marker file which contains the details of the uploaded entity metadata
*
* @opensearch.internal
*/
public class ClusterMetadataMarker implements Writeable, ToXContentFragment {

private static final ParseField INDICES_FIELD = new ParseField("indices");
Expand Down Expand Up @@ -101,6 +106,11 @@ public static ClusterMetadataMarker fromXContent(XContentParser parser) throws I
return PARSER.parse(parser, null);
}

/**
* Builder for ClusterMetadataMarker
*
* @opensearch.internal
*/
public static class Builder {

private final Map<String, UploadedIndexMetadata> indices;
Expand Down Expand Up @@ -140,30 +150,59 @@ public ClusterMetadataMarker build() {

}

/**
* Metadata for uploaded index metadata
*
* @opensearch.internal
*/
public static class UploadedIndexMetadata implements Writeable, ToXContentFragment {

private static final ParseField INDEX_NAME_FIELD = new ParseField("index_name");
private static final ParseField INDEX_UUID_FIELD = new ParseField("index_uuid");
private static final ParseField UPLOADED_FILENAME_FIELD = new ParseField("uploaded_filename");

private static String uploadedFilename(Object[] fields) {
private static String indexName(Object[] fields) {
return (String) fields[0];
}

private static String indexUUID(Object[] fields) {
return (String) fields[1];
}

private static String uploadedFilename(Object[] fields) {
return (String) fields[2];
}

private static final ConstructingObjectParser<UploadedIndexMetadata, Void> PARSER = new ConstructingObjectParser<>("uploaded_index_metadata",
fields -> new UploadedIndexMetadata(uploadedFilename(fields)));
fields -> new UploadedIndexMetadata(indexName(fields), indexUUID(fields), uploadedFilename(fields)));

private final String indexName;
private final String indexUUID;
private final String uploadedFilename;

public UploadedIndexMetadata(String uploadedFileName) {
public UploadedIndexMetadata(String indexName, String indexUUID, String uploadedFileName) {
this.indexName = indexName;
this.indexUUID = indexUUID;
this.uploadedFilename = uploadedFileName;
}

public String getUploadedFilename() {
return uploadedFilename;
}

public String getIndexName() {
return indexName;
}

public String getIndexUUID() {
return indexUUID;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.field(UPLOADED_FILENAME_FIELD.getPreferredName(), getUploadedFilename());
return builder.startObject()
.field(UPLOADED_FILENAME_FIELD.getPreferredName(), getUploadedFilename())
.endObject();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,40 +1,62 @@
package org.opensearch.cluster.store;

import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_STORE_REPOSITORY_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_STATE_REPOSITORY_SETTING;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.cluster.store.ClusterMetadataMarker.UploadedIndexMetadata;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.settings.Settings;
import org.opensearch.indices.IndicesService;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat;

/**
* A Service which provides APIs to upload and download cluster metadata from remote store.
*
* @opensearch.internal
*/
public class RemoteClusterStateService {

public static final String METADATA_NAME_FORMAT = "meta-%s.dat";

public static final String METADATA_MARKER_NAME_FORMAT = "%s";

public static final ChecksumBlobStoreFormat<IndexMetadata> INDEX_METADATA_FORMAT = new ChecksumBlobStoreFormat<>(
"index-metadata",
METADATA_NAME_FORMAT,
IndexMetadata::fromXContent
);

public static final ChecksumBlobStoreFormat<ClusterMetadataMarker> CLUSTER_METADATA_MARKER_FORMAT = new ChecksumBlobStoreFormat<>(
"cluster-metadata-marker",
METADATA_MARKER_NAME_FORMAT,
ClusterMetadataMarker::fromXContent
);
private static final Logger logger = LogManager.getLogger(RemoteClusterStateService.class);

private static final String DELIMITER = "__";

private final Supplier<RepositoriesService> repositoriesService;
private final ClusterSettings clusterSettings;
private final Settings settings;
private BlobStoreRepository blobStoreRepository;

public RemoteClusterStateService(Supplier<RepositoriesService> repositoriesService, ClusterSettings clusterSettings) {
public RemoteClusterStateService(Supplier<RepositoriesService> repositoriesService, Settings settings) {
this.repositoriesService = repositoriesService;
this.clusterSettings = clusterSettings;
this.settings = settings;
}

public void writeFullStateAndCommit(long currentTerm, ClusterState clusterState) throws IOException {
public void writeFullMetadata(long currentTerm, ClusterState clusterState) throws IOException {
if (!clusterState.nodes().isLocalNodeElectedClusterManager()) {
logger.error("local node is not electer cluster manager. Exiting");
logger.error("Local node is not elected cluster manager. Exiting");
return;
}
setRepository();
Expand All @@ -43,23 +65,27 @@ public void writeFullStateAndCommit(long currentTerm, ClusterState clusterState)
return;
}

Map<String, String> indexMetadataKeys = new HashMap<>();
final Map<String, ClusterMetadataMarker.UploadedIndexMetadata> allUploadedIndexMetadata = new HashMap<>();
//todo parallel upload
// any validations before/after upload ?
for (IndexMetadata indexMetadata : clusterState.metadata().indices().values()) {
//123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX/metadata_4_1690947200
String indexMetadataKey = blobStoreRepository.writeIndexMetadata(clusterState.getClusterName().value(), clusterState.getMetadata().clusterUUID(),
String indexMetadataKey = writeIndexMetadata(clusterState.getClusterName().value(), clusterState.getMetadata().clusterUUID(),
indexMetadata, indexMetadataFileName(indexMetadata));
indexMetadataKeys.put(indexMetadata.getIndex().getName(), indexMetadataKey);
UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata(indexMetadata.getIndex().getName(), indexMetadata.getIndexUUID(),
indexMetadataKey);
allUploadedIndexMetadata.put(indexMetadata.getIndex().getName(), uploadedIndexMetadata);
}
uploadMarker(clusterState, indexMetadataKeys);
uploadMarker(clusterState, allUploadedIndexMetadata);
}

private void setRepository() {
try {
if (blobStoreRepository != null) {
return;
}
if (clusterSettings.get(IndicesService.CLUSTER_REMOTE_STORE_ENABLED_SETTING)) {
String remoteStoreRepo = clusterSettings.get(CLUSTER_REMOTE_STORE_REPOSITORY_SETTING);
if (IndicesService.CLUSTER_REMOTE_STORE_ENABLED_SETTING.get(settings)) {
String remoteStoreRepo = CLUSTER_REMOTE_STATE_REPOSITORY_SETTING.get(settings);
Repository repository = repositoriesService.get().repository(remoteStoreRepo);
assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
blobStoreRepository = (BlobStoreRepository) repository;
Expand All @@ -71,12 +97,7 @@ private void setRepository() {
}
}

public void writeIncrementalStateAndCommit(long currentTerm, ClusterState previousClusterState, ClusterState clusterState) {
//todo
}

// why do we need this ?
public void writeIncrementalTermUpdateAndCommit(long currentTerm, long lastAcceptedVersion) {
public void writeIncrementalMetadata(long currentTerm, ClusterState previousClusterState, ClusterState clusterState) {
//todo
}

Expand All @@ -86,18 +107,28 @@ public ClusterState getLatestClusterState(String clusterUUID) {
}

//todo exception handling
public void uploadMarker(ClusterState clusterState, Map<String, String> indexMetadataKeys) throws IOException {
public void uploadMarker(ClusterState clusterState, Map<String, ClusterMetadataMarker.UploadedIndexMetadata> uploadedIndexMetadata) throws IOException {
synchronized (this) {
String markerFileName = getMarkerFileName(clusterState.term(), clusterState.version());
Map<String, ClusterMetadataMarker.UploadedIndexMetadata> uploadedIndices = indexMetadataKeys.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> new ClusterMetadataMarker.UploadedIndexMetadata(e.getValue())));
ClusterMetadataMarker marker = new ClusterMetadataMarker(uploadedIndices, clusterState.term(), clusterState.getVersion(),
ClusterMetadataMarker marker = new ClusterMetadataMarker(uploadedIndexMetadata, clusterState.term(), clusterState.getVersion(),
clusterState.metadata().clusterUUID(),
clusterState.stateUUID());
blobStoreRepository.writeMetadataMarker(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), marker, markerFileName);
writeMetadataMarker(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), marker, markerFileName);
}
}

public String writeIndexMetadata(String clusterName, String clusterUUID, IndexMetadata indexMetadata, String fileName) throws IOException {
BlobContainer indexMetadataContainer = indexMetadataContainer(clusterName, clusterUUID, indexMetadata.getIndexUUID());
INDEX_METADATA_FORMAT.write(indexMetadata, indexMetadataContainer, fileName, blobStoreRepository.getCompressor());
// returning full path
return indexMetadataContainer.path().buildAsString() + fileName;
}

public void writeMetadataMarker(String clusterName, String clusterUUID, ClusterMetadataMarker marker, String fileName) throws IOException {
BlobContainer metadataMarkerContainer = markerContainer(clusterName, clusterUUID);
RemoteClusterStateService.CLUSTER_METADATA_MARKER_FORMAT.write(marker, metadataMarkerContainer, fileName, blobStoreRepository.getCompressor());
}

private static String getMarkerFileName(long term, long version) {
//123456789012_test-cluster/cluster-state/dsgYj10Nkso7/marker/2147483642_2147483637_456536447_marker
return String.join(DELIMITER, String.valueOf(Long.MAX_VALUE - term), String.valueOf(Long.MAX_VALUE - version),
Expand All @@ -109,5 +140,17 @@ private static String indexMetadataFileName(IndexMetadata indexMetadata) {
return String.join(DELIMITER, "metadata", String.valueOf(indexMetadata.getVersion()), String.valueOf(System.currentTimeMillis()));
}

public BlobContainer indexMetadataContainer(String clusterName, String clusterUUID, String indexUUID) {
//123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX
return blobStoreRepository.blobStore()
.blobContainer(blobStoreRepository.basePath().add(clusterName).add("cluster-state").add(clusterUUID).add("index").add(indexUUID));
}

public BlobContainer markerContainer(String clusterName, String clusterUUID) {
//123456789012_test-cluster/cluster-state/dsgYj10Nkso7/marker
return blobStoreRepository.blobStore()
.blobContainer(blobStoreRepository.basePath().add(clusterName).add("cluster-state").add(clusterUUID).add("marker"));
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
/**
* Package containing class to perform operations on remote cluster state
*/
package org.opensearch.cluster.store;
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,8 @@ public void apply(Settings value, Settings current, Settings previous) {
List.of(
IndicesService.CLUSTER_REMOTE_STORE_ENABLED_SETTING,
IndicesService.CLUSTER_REMOTE_SEGMENT_STORE_REPOSITORY_SETTING,
IndicesService.CLUSTER_REMOTE_TRANSLOG_REPOSITORY_SETTING
IndicesService.CLUSTER_REMOTE_TRANSLOG_REPOSITORY_SETTING,
IndicesService.CLUSTER_REMOTE_STATE_REPOSITORY_SETTING
),
List.of(FeatureFlags.CONCURRENT_SEGMENT_SEARCH),
List.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,6 @@ public DiscoveryModule(
rerouteService,
electionStrategy,
nodeHealthService,
remoteClusterStateService,
gatewayMetaState::getRemotePersistedState
);
} else {
Expand Down

0 comments on commit 495ec89

Please sign in to comment.