Skip to content

Commit

Permalink
Add read flow for IndexRoutingTable
Browse files Browse the repository at this point in the history
Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>
  • Loading branch information
Arpit-Bandejiya committed May 8, 2024
1 parent 441f520 commit ee70dca
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,22 +65,18 @@ public void write(StreamOutput out) throws IOException {

/**
* Reads the contents on the byte array into the corresponding {@link IndexRoutingTableHeader}
* @param inBytes
* @param source
* @return
* @param in
* @return IndexRoutingTableHeader
* @throws IOException
*/
public IndexRoutingTableHeader read(byte[] inBytes, String source) throws IOException {
public static IndexRoutingTableHeader read(BufferedChecksumStreamInput in) throws IOException {
try {
try (BufferedChecksumStreamInput in = new BufferedChecksumStreamInput(new BytesStreamInput(inBytes), source)) {
readHeaderVersion(in);
final long version = in.readLong();
final int nodeVersion = in.readInt();
final String name = in.readString();
assert version >= 0 : "Version must be non-negative [" + version + "]";
assert in.readByte() == -1 : "Header is not fully read";
return new IndexRoutingTableHeader(version, name, Version.fromId(nodeVersion));
}
} catch (EOFException e) {
throw new IOException("index routing header truncated", e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway.remote.routingtable;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.common.io.stream.BufferedChecksumStreamInput;
import org.opensearch.core.common.io.stream.BytesStreamInput;
import org.opensearch.core.common.io.stream.InputStreamStreamInput;
import org.opensearch.core.common.io.stream.StreamInput;

import java.io.BufferedReader;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class IndexRoutingTableInputStreamReader {

private final StreamInput streamInput;

private static final Logger logger = LogManager.getLogger(IndexRoutingTableInputStreamReader.class);

public IndexRoutingTableInputStreamReader(InputStream inputStream) throws IOException {
this.streamInput = new InputStreamStreamInput(inputStream);
}

public Map<String, IndexShardRoutingTable> read() throws IOException {
try {
try (BufferedChecksumStreamInput in = new BufferedChecksumStreamInput(streamInput, "assertion")) {
// Read the Table Header first
IndexRoutingTableHeader.read(in);
int shards = in.readVInt();
logger.info("Number of Index Routing Table {}", shards);
Map<String, IndexShardRoutingTable> indicesRouting = new HashMap<String, IndexShardRoutingTable>(Collections.EMPTY_MAP);
for(int i=0; i<shards; i++)
{
IndexShardRoutingTable indexShardRoutingTable = IndexShardRoutingTable.Builder.readFrom(in);
logger.info("Index Shard Routing Table reading {}", indexShardRoutingTable);
indicesRouting.put(indexShardRoutingTable.getShardId().getIndexName(), indexShardRoutingTable);

}
verifyCheckSum(in);
// Return indices Routing table
return indicesRouting;
}
} catch (EOFException e) {
throw new IOException("Indices Routing table is corrupted", e);
}

}

private void verifyCheckSum(BufferedChecksumStreamInput in) throws IOException {
long expectedChecksum = in.getChecksum();
long readChecksum = in.readLong();
if (readChecksum != expectedChecksum) {
throw new IOException(
"checksum verification failed - expected: 0x"
+ Long.toHexString(expectedChecksum)
+ ", got: 0x"
+ Long.toHexString(readChecksum)
);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway.remote.routingtable;

import org.apache.lucene.util.BytesRef;
import org.opensearch.Version;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.index.seqno.ReplicationTrackerTestCase;
import org.opensearch.test.OpenSearchTestCase;

import java.io.IOException;
import java.io.InputStream;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasToString;

public class IndexRoutingTableInputStreamTests extends ReplicationTrackerTestCase {

public void testRoutingTableInputStream() throws IOException {
Metadata metadata = Metadata.builder()
.put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
.build();

RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build();

initialRoutingTable.getIndicesRouting().values().forEach(indexShardRoutingTables -> {
try {
logger.info("IndexShardRoutingTables: {}", indexShardRoutingTables);
InputStream indexRoutingStream = new IndexRoutingTableInputStream(indexShardRoutingTables,
initialRoutingTable.version(), Version.CURRENT);

IndexRoutingTableInputStreamReader reader = new IndexRoutingTableInputStreamReader(indexRoutingStream);
Map<String, IndexShardRoutingTable> indexShardRoutingTableMap = reader.read();

logger.info("indexShardRoutingTableMap: {}", indexShardRoutingTableMap);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}

}

0 comments on commit ee70dca

Please sign in to comment.