Skip to content

Commit

Permalink
Update serialization logic for ClusterMetadataMarker
Browse files Browse the repository at this point in the history
  • Loading branch information
soosinha committed Aug 15, 2023
1 parent 2386402 commit 3873b77
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 66 deletions.
Original file line number Diff line number Diff line change
@@ -1,23 +1,20 @@
package org.opensearch.cluster.store;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.opensearch.core.ParseField;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.ConstructingObjectParser;
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.core.xcontent.XContentParser.Token;

/**
* Marker file which contains the details of the uploaded entity metadata
Expand All @@ -32,8 +29,8 @@ public class ClusterMetadataMarker implements Writeable, ToXContentFragment {
private static final ParseField CLUSTER_UUID_FIELD = new ParseField("cluster_uuid");
private static final ParseField STATE_UUID_FIELD = new ParseField("state_uuid");

private static List<UploadedIndexMetadata> indices(Object[] fields) {
return new ArrayList<>((List<UploadedIndexMetadata>) fields[0]);
private static Map<String, UploadedIndexMetadata> indices(Object[] fields) {
return (Map<String, UploadedIndexMetadata>) fields[0];
}

private static long term(Object[] fields) {
Expand All @@ -52,13 +49,18 @@ private static String stateUUID(Object[] fields) {
return (String) fields[4];
}

private static final ConstructingObjectParser<ClusterMetadataMarker, Void> PARSER = new ConstructingObjectParser<>(
"cluster_metadata_marker",
fields -> new ClusterMetadataMarker(indices(fields), term(fields), version(fields), clusterUUID(fields), stateUUID(fields))
);
private static final ConstructingObjectParser<ClusterMetadataMarker, Void> PARSER = new ConstructingObjectParser<>("cluster_metadata_marker",
fields -> new ClusterMetadataMarker(indices(fields), term(fields), version(fields), clusterUUID(fields), stateUUID(fields)));

static {
PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), (p, c) -> UploadedIndexMetadata.fromXContent(p), INDICES_FIELD);
PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> {
Map<String, UploadedIndexMetadata> uploadMetadataMap = new HashMap<>();
while (p.nextToken() != Token.END_OBJECT) {
UploadedIndexMetadata uploadMetadata = UploadedIndexMetadata.fromXContent(p);
uploadMetadataMap.put(uploadMetadata.getIndexName(), uploadMetadata);
}
return uploadMetadataMap;
}, INDICES_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), TERM_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), VERSION_FIELD);
PARSER.declareString(ConstructingObjectParser.constructorArg(), CLUSTER_UUID_FIELD);
Expand Down Expand Up @@ -99,33 +101,23 @@ public ClusterMetadataMarker(Map<String, UploadedIndexMetadata> indices, long te
this.stateUUID = stateUUID;
}

public ClusterMetadataMarker(List<UploadedIndexMetadata> indices, long term, long version, String clusterUUID, String stateUUID) {
this.indices = Collections.unmodifiableMap(toMap(indices));
this.term = term;
this.version = version;
this.clusterUUID = clusterUUID;
this.stateUUID = stateUUID;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startArray(INDICES_FIELD.getPreferredName());
builder.startObject(INDICES_FIELD.getPreferredName());
{
for (UploadedIndexMetadata uploadedIndexMetadata : indices.values()) {
uploadedIndexMetadata.toXContent(builder, params);
}
}
builder.endArray();
builder.field(TERM_FIELD.getPreferredName(), getTerm())
.field(VERSION_FIELD.getPreferredName(), getVersion())
.field(CLUSTER_UUID_FIELD.getPreferredName(), getClusterUUID())
.field(STATE_UUID_FIELD.getPreferredName(), getStateUUID());
builder.endObject();
builder.field(TERM_FIELD.getPreferredName(), getTerm()).field(VERSION_FIELD.getPreferredName(), getVersion())
.field(CLUSTER_UUID_FIELD.getPreferredName(), getClusterUUID()).field(STATE_UUID_FIELD.getPreferredName(), getStateUUID());
return builder;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeCollection(indices.values());
out.writeMap(indices, StreamOutput::writeString, (stream, uploadedMetadata) -> uploadedMetadata.writeTo(stream));
out.writeVLong(term);
out.writeVLong(version);
out.writeString(clusterUUID);
Expand All @@ -150,16 +142,13 @@ public int hashCode() {
return Objects.hash(indices, term, version, clusterUUID, stateUUID);
}

public static ClusterMetadataMarker fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
@Override
public String toString() {
return Strings.toString(MediaTypeRegistry.JSON, this);
}

private static Map<String, UploadedIndexMetadata> toMap(final Collection<UploadedIndexMetadata> uploadedIndexMetadataList) {
// use a linked hash map to preserve order
return uploadedIndexMetadataList.stream().collect(Collectors.toMap(UploadedIndexMetadata::getIndexName, Function.identity(), (left, right) -> {
assert left.getIndexName().equals(right.getIndexName()) : "expected [" + left.getIndexName() + "] to equal [" + right.getIndexName() + "]";
throw new IllegalStateException("duplicate index name [" + left.getIndexName() + "]");
}, LinkedHashMap::new));
public static ClusterMetadataMarker fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}

/**
Expand Down Expand Up @@ -262,15 +251,15 @@ public String getIndexUUID() {

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.startObject()
.field(INDEX_NAME_FIELD.getPreferredName(), getIndexName())
.field(INDEX_UUID_FIELD.getPreferredName(), getIndexUUID())
.field(UPLOADED_FILENAME_FIELD.getPreferredName(), getUploadedFilename())
return builder.startObject(getIndexName()).field(INDEX_NAME_FIELD.getPreferredName(), getIndexName())
.field(INDEX_UUID_FIELD.getPreferredName(), getIndexUUID()).field(UPLOADED_FILENAME_FIELD.getPreferredName(), getUploadedFilename())
.endObject();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(indexName);
out.writeString(indexUUID);
out.writeString(uploadedFilename);
}

Expand All @@ -292,6 +281,11 @@ public int hashCode() {
return Objects.hash(indexName, indexUUID, uploadedFilename);
}

@Override
public String toString() {
return Strings.toString(MediaTypeRegistry.JSON, this);
}

public static UploadedIndexMetadata fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.indices.IndicesService;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryMissingException;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat;

Expand Down Expand Up @@ -55,7 +56,7 @@ public RemoteClusterStateService(Supplier<RepositoriesService> repositoriesServi
}

public ClusterMetadataMarker writeFullMetadata(long currentTerm, ClusterState clusterState) throws IOException {
if (!clusterState.nodes().isLocalNodeElectedClusterManager()) {
if (clusterState.nodes().isLocalNodeElectedClusterManager() == false) {
logger.error("Local node is not elected cluster manager. Exiting");
return null;
}
Expand Down Expand Up @@ -85,19 +86,20 @@ private void setRepository() {
return;
}
if (IndicesService.CLUSTER_REMOTE_STORE_ENABLED_SETTING.get(settings)) {
String remoteStoreRepo = CLUSTER_REMOTE_STATE_REPOSITORY_SETTING.get(settings);
Repository repository = repositoriesService.get().repository(remoteStoreRepo);
final String remoteStoreRepo = CLUSTER_REMOTE_STATE_REPOSITORY_SETTING.get(settings);
final Repository repository = repositoriesService.get().repository(remoteStoreRepo);
assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
blobStoreRepository = (BlobStoreRepository) repository;
} else {
logger.info("remote store is not enabled");
}
} catch (Exception e) {
logger.error("set repo exception", e);
} catch (RepositoryMissingException e) {
logger.error("Remote state repository is missing", e);
}
}

public ClusterMetadataMarker writeIncrementalMetadata(long currentTerm, ClusterState previousClusterState, ClusterState clusterState, ClusterMetadataMarker previousMarker) throws IOException {
public ClusterMetadataMarker writeIncrementalMetadata(long currentTerm, ClusterState previousClusterState, ClusterState clusterState,
ClusterMetadataMarker previousMarker) throws IOException {
assert previousClusterState.metadata().coordinationMetadata().term() == clusterState.metadata().coordinationMetadata().term();
final Map<String, Long> indexMetadataVersionByName = new HashMap<>();
for (final IndexMetadata indexMetadata : previousClusterState.metadata().indices().values()) {
Expand All @@ -108,15 +110,14 @@ public ClusterMetadataMarker writeIncrementalMetadata(long currentTerm, ClusterS
int numIndicesUnchanged = 0;
final Map<String, ClusterMetadataMarker.UploadedIndexMetadata> allUploadedIndexMetadata = new HashMap<>(previousMarker.getIndices());
for (final IndexMetadata indexMetadata : clusterState.metadata().indices().values()) {
// Is it okay to use indexName as key ?
final Long previousVersion = indexMetadataVersionByName.get(indexMetadata.getIndex().getName());
if (previousVersion == null || indexMetadata.getVersion() != previousVersion) {
logger.trace("updating metadata for [{}], changing version from [{}] to [{}]", indexMetadata.getIndex(), previousVersion,
indexMetadata.getVersion());
numIndicesUpdated++;
String indexMetadataKey = writeIndexMetadata(clusterState.getClusterName().value(), clusterState.getMetadata().clusterUUID(),
final String indexMetadataKey = writeIndexMetadata(clusterState.getClusterName().value(), clusterState.getMetadata().clusterUUID(),
indexMetadata, indexMetadataFileName(indexMetadata));
UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata(indexMetadata.getIndex().getName(), indexMetadata.getIndexUUID(),
final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata(indexMetadata.getIndex().getName(), indexMetadata.getIndexUUID(),
indexMetadataKey);
allUploadedIndexMetadata.put(indexMetadata.getIndex().getName(), uploadedIndexMetadata);
} else {
Expand All @@ -136,11 +137,11 @@ public ClusterState getLatestClusterState(String clusterUUID) {
return null;
}

//todo exception handling
public ClusterMetadataMarker uploadMarker(ClusterState clusterState, Map<String, ClusterMetadataMarker.UploadedIndexMetadata> uploadedIndexMetadata) throws IOException {
public ClusterMetadataMarker uploadMarker(ClusterState clusterState, Map<String, ClusterMetadataMarker.UploadedIndexMetadata> uploadedIndexMetadata)
throws IOException {
synchronized (this) {
String markerFileName = getMarkerFileName(clusterState.term(), clusterState.version());
ClusterMetadataMarker marker = new ClusterMetadataMarker(uploadedIndexMetadata, clusterState.term(), clusterState.getVersion(),
final String markerFileName = getMarkerFileName(clusterState.term(), clusterState.version());
final ClusterMetadataMarker marker = new ClusterMetadataMarker(uploadedIndexMetadata, clusterState.term(), clusterState.getVersion(),
clusterState.metadata().clusterUUID(),
clusterState.stateUUID());
writeMetadataMarker(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), marker, markerFileName);
Expand All @@ -149,28 +150,17 @@ public ClusterMetadataMarker uploadMarker(ClusterState clusterState, Map<String,
}

public String writeIndexMetadata(String clusterName, String clusterUUID, IndexMetadata indexMetadata, String fileName) throws IOException {
BlobContainer indexMetadataContainer = indexMetadataContainer(clusterName, clusterUUID, indexMetadata.getIndexUUID());
final BlobContainer indexMetadataContainer = indexMetadataContainer(clusterName, clusterUUID, indexMetadata.getIndexUUID());
INDEX_METADATA_FORMAT.write(indexMetadata, indexMetadataContainer, fileName, blobStoreRepository.getCompressor());
// returning full path
return indexMetadataContainer.path().buildAsString() + fileName;
}

public void writeMetadataMarker(String clusterName, String clusterUUID, ClusterMetadataMarker marker, String fileName) throws IOException {
BlobContainer metadataMarkerContainer = markerContainer(clusterName, clusterUUID);
final BlobContainer metadataMarkerContainer = markerContainer(clusterName, clusterUUID);
RemoteClusterStateService.CLUSTER_METADATA_MARKER_FORMAT.write(marker, metadataMarkerContainer, fileName, blobStoreRepository.getCompressor());
}

private static String getMarkerFileName(long term, long version) {
//123456789012_test-cluster/cluster-state/dsgYj10Nkso7/marker/2147483642_2147483637_456536447_marker
return String.join(DELIMITER,"marker", String.valueOf(Long.MAX_VALUE - term), String.valueOf(Long.MAX_VALUE - version),
String.valueOf(Long.MAX_VALUE - System.currentTimeMillis()));
}


private static String indexMetadataFileName(IndexMetadata indexMetadata) {
return String.join(DELIMITER, "metadata", String.valueOf(indexMetadata.getVersion()), String.valueOf(System.currentTimeMillis()));
}

public BlobContainer indexMetadataContainer(String clusterName, String clusterUUID, String indexUUID) {
//123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX
return blobStoreRepository.blobStore()
Expand All @@ -183,5 +173,16 @@ public BlobContainer markerContainer(String clusterName, String clusterUUID) {
.blobContainer(blobStoreRepository.basePath().add(clusterName).add("cluster-state").add(clusterUUID).add("marker"));
}

private static String getMarkerFileName(long term, long version) {
//123456789012_test-cluster/cluster-state/dsgYj10Nkso7/marker/2147483642_2147483637_456536447_marker
return String.join(DELIMITER, "marker", String.valueOf(Long.MAX_VALUE - term), String.valueOf(Long.MAX_VALUE - version),
String.valueOf(Long.MAX_VALUE - System.currentTimeMillis()));
}


private static String indexMetadataFileName(IndexMetadata indexMetadata) {
return String.join(DELIMITER, "metadata", String.valueOf(indexMetadata.getVersion()), String.valueOf(System.currentTimeMillis()));
}


}

0 comments on commit 3873b77

Please sign in to comment.