Skip to content

Commit

Permalink
Refactor reader and add failure test
Browse files Browse the repository at this point in the history
Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>
  • Loading branch information
Arpit-Bandejiya committed May 14, 2024
1 parent 7aeecc8 commit 7b2bc79
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ private static List<UploadedIndexMetadata> indicesRouting(Object[] fields) {
clusterUUIDCommitted(fields),
routingTableVersion(fields),
indicesRouting(fields)
)
)
);

private static final ConstructingObjectParser<ClusterMetadataManifest, Void> CURRENT_PARSER = PARSER_V2;
Expand Down Expand Up @@ -287,8 +287,22 @@ public ClusterMetadataManifest(
String previousClusterUUID,
boolean clusterUUIDCommitted
) {
this(clusterTerm, version, clusterUUID, stateUUID, opensearchVersion, nodeId, committed, codecVersion,
globalMetadataFileName, indices, previousClusterUUID, clusterUUIDCommitted, -1, new ArrayList<>());
this(
clusterTerm,
version,
clusterUUID,
stateUUID,
opensearchVersion,
nodeId,
committed,
codecVersion,
globalMetadataFileName,
indices,
previousClusterUUID,
clusterUUIDCommitted,
-1,
new ArrayList<>()
);
}

public ClusterMetadataManifest(
Expand All @@ -306,7 +320,7 @@ public ClusterMetadataManifest(
boolean clusterUUIDCommitted,
long routingTableVersion,
List<UploadedIndexMetadata> indicesRouting
) {
) {
this.clusterTerm = clusterTerm;
this.stateVersion = version;
this.clusterUUID = clusterUUID;
Expand Down Expand Up @@ -694,8 +708,7 @@ public String getIndexUUID() {

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder
.field(INDEX_NAME_FIELD.getPreferredName(), getIndexName())
return builder.field(INDEX_NAME_FIELD.getPreferredName(), getIndexName())
.field(INDEX_UUID_FIELD.getPreferredName(), getIndexUUID())
.field(UPLOADED_FILENAME_FIELD.getPreferredName(), getUploadedFilePath());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ public IndexRoutingTableInputStream(IndexRoutingTable indexRoutingTable) throws
this(indexRoutingTable, BUFFER_SIZE);
}

public IndexRoutingTableInputStream(IndexRoutingTable indexRoutingTable, int size)
throws IOException {
public IndexRoutingTableInputStream(IndexRoutingTable indexRoutingTable, int size) throws IOException {
this.buf = new byte[size];
this.shardIter = indexRoutingTable.iterator();
this.indexRoutingTableHeader = new IndexRoutingTableHeader(indexRoutingTable.getIndex().getName());
Expand All @@ -85,33 +84,34 @@ private void initialFill(int shardCount) throws IOException {
indexRoutingTableHeader.write(out);
out.writeVInt(shardCount);

System.arraycopy(bytesStreamOutput.bytes().toBytesRef().bytes, 0 , buf, 0, bytesStreamOutput.bytes().length());
System.arraycopy(bytesStreamOutput.bytes().toBytesRef().bytes, 0, buf, 0, bytesStreamOutput.bytes().length());
count = bytesStreamOutput.bytes().length();
bytesStreamOutput.reset();
fill(buf);
}

private void fill(byte[] buf) throws IOException {
if (leftOverBuf != null) {
if(leftOverBuf.length > buf.length - count) {
// leftOverBuf has more content than length of buf, so we need to copy only based on buf length and keep the remaining in leftOverBuf.
if (leftOverBuf.length > buf.length - count) {
// leftOverBuf has more content than length of buf, so we need to copy only based on buf length and keep the remaining in
// leftOverBuf.
System.arraycopy(leftOverBuf, 0, buf, count, buf.length - count);
byte[] tempLeftOverBuffer = new byte[leftOverBuf.length - (buf.length - count)];
System.arraycopy(leftOverBuf, buf.length - count , tempLeftOverBuffer, 0, leftOverBuf.length - (buf.length - count));
byte[] tempLeftOverBuffer = new byte[leftOverBuf.length - (buf.length - count)];
System.arraycopy(leftOverBuf, buf.length - count, tempLeftOverBuffer, 0, leftOverBuf.length - (buf.length - count));
leftOverBuf = tempLeftOverBuffer;
count = buf.length - count;
} else {
System.arraycopy(leftOverBuf, 0, buf, count, leftOverBuf.length);
count += leftOverBuf.length;
count += leftOverBuf.length;
leftOverBuf = null;
}
}

if (count < buf.length && shardIter.hasNext()) {
IndexShardRoutingTable next = shardIter.next();
IndexShardRoutingTable.Builder.writeTo(next, out);
//Add checksum for the file after all shards are done
if(!shardIter.hasNext()) {
// Add checksum for the file after all shards are done
if (!shardIter.hasNext()) {
out.writeLong(out.getChecksum());
}
out.flush();
Expand All @@ -125,7 +125,7 @@ private void fill(byte[] buf) throws IOException {
} else {
System.arraycopy(bytesRef.toBytesRef().bytes, 0, buf, count, buf.length - count);
leftOverBuf = new byte[bytesRef.length() - (buf.length - count)];
System.arraycopy(bytesRef.toBytesRef().bytes, buf.length - count , leftOverBuf, 0, bytesRef.length() - (buf.length - count));
System.arraycopy(bytesRef.toBytesRef().bytes, buf.length - count, leftOverBuf, 0, bytesRef.length() - (buf.length - count));
count = buf.length;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,13 @@
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.common.io.stream.BufferedChecksumStreamInput;
import org.opensearch.core.common.io.stream.BytesStreamInput;
import org.opensearch.core.common.io.stream.InputStreamStreamInput;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.index.Index;

import java.io.BufferedReader;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class IndexRoutingTableInputStreamReader {

Expand All @@ -34,32 +28,31 @@ public class IndexRoutingTableInputStreamReader {
private static final Logger logger = LogManager.getLogger(IndexRoutingTableInputStreamReader.class);

public IndexRoutingTableInputStreamReader(InputStream inputStream) throws IOException {
this.streamInput = new InputStreamStreamInput(inputStream);
streamInput = new InputStreamStreamInput(inputStream);
}

public Map<String, IndexShardRoutingTable> read() throws IOException {
public IndexRoutingTable readIndexRoutingTable(Index index) throws IOException {
try {
try (BufferedChecksumStreamInput in = new BufferedChecksumStreamInput(streamInput, "assertion")) {
// Read the Table Header first
IndexRoutingTableHeader.read(in);
int shards = in.readVInt();
logger.info("Number of Index Routing Table {}", shards);
Map<String, IndexShardRoutingTable> indicesRouting = new HashMap<String, IndexShardRoutingTable>(Collections.EMPTY_MAP);
for(int i=0; i<shards; i++)
{
// Read the Table Header first and confirm the index
IndexRoutingTableHeader indexRoutingTableHeader = IndexRoutingTableHeader.read(in);
assert indexRoutingTableHeader.getIndexName().equals(index.getName());

int numberOfShardRouting = in.readVInt();
logger.debug("Number of Index Routing Table {}", numberOfShardRouting);
IndexRoutingTable.Builder indicesRoutingTable = IndexRoutingTable.builder(index);
for (int idx = 0; idx < numberOfShardRouting; idx++) {
IndexShardRoutingTable indexShardRoutingTable = IndexShardRoutingTable.Builder.readFrom(in);
logger.info("Index Shard Routing Table reading {}", indexShardRoutingTable);
indicesRouting.put(indexShardRoutingTable.getShardId().getIndexName(), indexShardRoutingTable);
logger.debug("Index Shard Routing Table reading {}", indexShardRoutingTable);
indicesRoutingTable.addIndexShard(indexShardRoutingTable);

}
verifyCheckSum(in);
// Return indices Routing table
return indicesRouting;
return indicesRoutingTable.build();
}
} catch (EOFException e) {
throw new IOException("Indices Routing table is corrupted", e);
}

}

private void verifyCheckSum(BufferedChecksumStreamInput in) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,34 +8,21 @@

package org.opensearch.gateway.remote.routingtable;

import org.apache.lucene.util.BytesRef;
import org.opensearch.Version;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.index.seqno.ReplicationTrackerTestCase;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.test.OpenSearchTestCase;

import java.io.IOException;
import java.io.InputStream;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasToString;
import java.util.concurrent.atomic.AtomicInteger;

public class IndexRoutingTableInputStreamTests extends OpenSearchTestCase {

public void testRoutingTableInputStream(){
public void testRoutingTableInputStream() {
Metadata metadata = Metadata.builder()
.put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
.build();
Expand All @@ -47,15 +34,40 @@ public void testRoutingTableInputStream(){
InputStream indexRoutingStream = new IndexRoutingTableInputStream(indexShardRoutingTables);

IndexRoutingTableInputStreamReader reader = new IndexRoutingTableInputStreamReader(indexRoutingStream);
Map<String, IndexShardRoutingTable> indexShardRoutingTableMap = reader.read();
IndexRoutingTable indexRoutingTable = reader.readIndexRoutingTable(metadata.index("test").getIndex());

assertEquals(1, indexShardRoutingTableMap.size());
assertNotNull(indexShardRoutingTableMap.get("test"));
assertEquals(2,indexShardRoutingTableMap.get("test").shards().size());
assertEquals(1, indexRoutingTable.getShards().size());
assertEquals(indexRoutingTable.getIndex(), metadata.index("test").getIndex());
assertEquals(indexRoutingTable.shardsWithState(ShardRoutingState.UNASSIGNED).size(), 2);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}

public void testRoutingTableInputStreamWithInvalidIndex() {
Metadata metadata = Metadata.builder()
.put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
.put(IndexMetadata.builder("invalid-index").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
.build();

RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build();
AtomicInteger assertionError = new AtomicInteger();
initialRoutingTable.getIndicesRouting().values().forEach(indexShardRoutingTables -> {
try {
InputStream indexRoutingStream = new IndexRoutingTableInputStream(indexShardRoutingTables);

IndexRoutingTableInputStreamReader reader = new IndexRoutingTableInputStreamReader(indexRoutingStream);
reader.readIndexRoutingTable(metadata.index("invalid-index").getIndex());

} catch (AssertionError e) {
assertionError.getAndIncrement();
} catch (IOException e) {
throw new RuntimeException(e);
}
});

assertEquals(1, assertionError.get());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.compress.DeflateCompressor;
import org.opensearch.common.io.Streams;
import org.opensearch.common.io.stream.BufferedChecksumStreamOutput;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesArray;
Expand All @@ -55,7 +56,6 @@
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.common.io.stream.BufferedChecksumStreamOutput;
import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat;
import org.opensearch.test.OpenSearchTestCase;

Expand Down

0 comments on commit 7b2bc79

Please sign in to comment.