Skip to content

Commit

Permalink
Fixing IndexRoutingTableInputStream and moving checksum to end to file
Browse files Browse the repository at this point in the history
Signed-off-by: Himshikha Gupta <himshikh@amazon.com>
  • Loading branch information
Himshikha Gupta committed May 7, 2024
1 parent f65b102 commit 441f520
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ public ClusterMetadataManifest(
boolean clusterUUIDCommitted
) {
this(clusterTerm, version, clusterUUID, stateUUID, opensearchVersion, nodeId, committed, codecVersion,
globalMetadataFileName, indices, previousClusterUUID, clusterUUIDCommitted, null);
globalMetadataFileName, indices, previousClusterUUID, clusterUUIDCommitted, new ArrayList<>());
}

public ClusterMetadataManifest(
Expand Down Expand Up @@ -355,7 +355,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.startArray(INDICES_FIELD.getPreferredName());
{
for (UploadedIndexMetadata uploadedIndexMetadata : indices) {
builder.startObject();
uploadedIndexMetadata.toXContent(builder, params);
builder.endObject();
}
}
builder.endArray();
Expand All @@ -369,9 +371,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.startArray(INDICES_ROUTING_FIELD.getPreferredName());
{
for (UploadedIndexMetadata uploadedIndexMetadata : indicesRouting) {
builder.startObject();
uploadedIndexMetadata.toXContent(builder, params);
builder.endObject();
}
}
builder.endArray();
}
return builder;
}
Expand All @@ -391,7 +396,8 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_12_0)) {
out.writeInt(codecVersion);
out.writeString(globalMetadataFileName);
} else if (out.getVersion().onOrAfter(Version.V_2_14_0)) {
}
if (out.getVersion().onOrAfter(Version.V_2_14_0)) {
out.writeCollection(indicesRouting);
}
}
Expand Down Expand Up @@ -659,11 +665,10 @@ public String getIndexUUID() {

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.startObject()
return builder
.field(INDEX_NAME_FIELD.getPreferredName(), getIndexName())
.field(INDEX_UUID_FIELD.getPreferredName(), getIndexUUID())
.field(UPLOADED_FILENAME_FIELD.getPreferredName(), getUploadedFilePath())
.endObject();
.field(UPLOADED_FILENAME_FIELD.getPreferredName(), getUploadedFilePath());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.io.stream.BytesStreamInput;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;

import java.io.EOFException;
import java.io.IOException;
Expand Down Expand Up @@ -50,26 +51,16 @@ public IndexRoutingTableHeader(long routingTableVersion, String indexName, Versi

/**
* Returns the bytes reference for the {@link IndexRoutingTableHeader}
* @return the {@link BytesReference}
* @throws IOException
*/
public BytesReference write() throws IOException {
BytesReference bytesReference;
try (
BytesStreamOutput bytesStreamOutput = new BytesStreamOutput();
BufferedChecksumStreamOutput out = new BufferedChecksumStreamOutput(bytesStreamOutput)
) {
public void write(StreamOutput out) throws IOException {
CodecUtil.writeHeader(new OutputStreamDataOutput(out), INDEX_ROUTING_HEADER_CODEC, CURRENT_VERSION);
// Write version
out.writeLong(routingTableVersion);
out.writeInt(nodeVersion.id);
out.writeString(indexName);
// Checksum header
out.writeInt((int) out.getChecksum());

out.flush();
bytesReference = bytesStreamOutput.bytes();
}
return bytesReference;
}

/**
Expand All @@ -83,10 +74,9 @@ public IndexRoutingTableHeader read(byte[] inBytes, String source) throws IOExce
try {
try (BufferedChecksumStreamInput in = new BufferedChecksumStreamInput(new BytesStreamInput(inBytes), source)) {
readHeaderVersion(in);
final int version = in.readInt();
final long version = in.readLong();
final int nodeVersion = in.readInt();
final String name = in.readString();
verifyChecksum(in);
assert version >= 0 : "Version must be non-negative [" + version + "]";
assert in.readByte() == -1 : "Header is not fully read";
return new IndexRoutingTableHeader(version, name, Version.fromId(nodeVersion));
Expand All @@ -96,20 +86,6 @@ public IndexRoutingTableHeader read(byte[] inBytes, String source) throws IOExce
}
}

static void verifyChecksum(BufferedChecksumStreamInput in) throws IOException {
// This absolutely must come first, or else reading the checksum becomes part of the checksum
long expectedChecksum = in.getChecksum();
long readChecksum = Integer.toUnsignedLong(in.readInt());
if (readChecksum != expectedChecksum) {
throw new IOException(
"checksum verification failed - expected: 0x"
+ Long.toHexString(expectedChecksum)
+ ", got: 0x"
+ Long.toHexString(readChecksum)
);
}
}

static int readHeaderVersion(final StreamInput in) throws IOException {
final int version;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ public class IndexRoutingTableInputStream extends InputStream {
private static final int BUFFER_SIZE = 8192;

private final IndexRoutingTableHeader indexRoutingTableHeader;

private final Iterator<IndexShardRoutingTable> shardIter;
private final BytesStreamOutput bytesStreamOutput;
private final BufferedChecksumStreamOutput out;

public IndexRoutingTableInputStream(IndexRoutingTable indexRoutingTable, long version, Version nodeVersion) throws IOException {
this(indexRoutingTable, version, nodeVersion, BUFFER_SIZE);
Expand All @@ -66,7 +67,10 @@ public IndexRoutingTableInputStream(IndexRoutingTable indexRoutingTable, long ve
this.buf = new byte[size];
this.shardIter = indexRoutingTable.iterator();
this.indexRoutingTableHeader = new IndexRoutingTableHeader(version, indexRoutingTable.getIndex().getName(), nodeVersion);
initialFill();
this.bytesStreamOutput = new BytesStreamOutput();
this.out = new BufferedChecksumStreamOutput(bytesStreamOutput);

initialFill(indexRoutingTable.shards().size());
}

@Override
Expand All @@ -78,39 +82,52 @@ public int read() throws IOException {
return buf[pos++] & 0xff;
}

private void initialFill() throws IOException {
BytesReference bytesReference = indexRoutingTableHeader.write();
buf = bytesReference.toBytesRef().bytes;
count = bytesReference.length();
private void initialFill(int shardCount) throws IOException {
indexRoutingTableHeader.write(out);
out.writeVInt(shardCount);

System.arraycopy(bytesStreamOutput.bytes().toBytesRef().bytes, 0 , buf, 0, bytesStreamOutput.bytes().length());
count = bytesStreamOutput.bytes().length();
bytesStreamOutput.reset();
fill(buf);
}

private void fill(byte[] buf) throws IOException {
if (leftOverBuf != null) {
System.arraycopy(leftOverBuf, 0, buf, count, leftOverBuf.length);
if(leftOverBuf.length > buf.length - count) {
// leftOverBuf has more content than length of buf, so we need to copy only based on buf length and keep the remaining in leftOverBuf.
System.arraycopy(leftOverBuf, 0, buf, count, buf.length - count);
byte[] tempLeftOverBuffer = new byte[leftOverBuf.length - (buf.length - count)];
System.arraycopy(leftOverBuf, buf.length - count , tempLeftOverBuffer, 0, leftOverBuf.length - (buf.length - count));
leftOverBuf = tempLeftOverBuffer;
count = buf.length - count;
} else {
System.arraycopy(leftOverBuf, 0, buf, count, leftOverBuf.length);
count += leftOverBuf.length;
leftOverBuf = null;
}
}

if (count < buf.length && shardIter.hasNext()) {
IndexShardRoutingTable next = shardIter.next();
BytesReference bytesRef;
try (
BytesStreamOutput bytesStreamOutput = new BytesStreamOutput();
BufferedChecksumStreamOutput out = new BufferedChecksumStreamOutput(bytesStreamOutput)
) {
IndexShardRoutingTable.Builder.writeTo(next, out);
// Checksum header
out.writeInt((int) out.getChecksum());
out.flush();
bytesRef = bytesStreamOutput.bytes();
IndexShardRoutingTable.Builder.writeTo(next, out);
//Add checksum for the file after all shards are done
if(!shardIter.hasNext()) {
out.writeLong(out.getChecksum());
}
out.flush();
BytesReference bytesRef = bytesStreamOutput.bytes();
bytesStreamOutput.reset();

if (bytesRef.length() < buf.length - count) {
System.arraycopy(bytesRef.toBytesRef().bytes, 0, buf, count, bytesRef.length());
count += bytesRef.length();
leftOverBuf = null;
} else {
System.arraycopy(bytesRef.toBytesRef().bytes, 0, buf, count, buf.length - count);
count += buf.length - count;
leftOverBuf = new byte[bytesRef.length() - count];
System.arraycopy(bytesRef.toBytesRef().bytes, buf.length - count + 1, leftOverBuf, 0, bytesRef.length() - count);
leftOverBuf = new byte[bytesRef.length() - (buf.length - count)];
System.arraycopy(bytesRef.toBytesRef().bytes, buf.length - count , leftOverBuf, 0, bytesRef.length() - (buf.length - count));
count = buf.length;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway.remote.routingtable;

import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.store.InputStreamDataInput;
import org.opensearch.Version;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.core.common.io.stream.BytesStreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.test.OpenSearchTestCase;

import java.io.IOException;

public class IndexRoutingTableHeaderTests extends OpenSearchTestCase {

public void testWrite() throws IOException {
BytesStreamOutput out = new BytesStreamOutput();
IndexRoutingTableHeader header = new IndexRoutingTableHeader(1, "dummyIndex", Version.V_3_0_0);
header.write(out);

BytesStreamInput in = new BytesStreamInput(out.bytes().toBytesRef().bytes);
CodecUtil.checkHeader(new InputStreamDataInput(in),IndexRoutingTableHeader.INDEX_ROUTING_HEADER_CODEC, IndexRoutingTableHeader.INITIAL_VERSION, IndexRoutingTableHeader.CURRENT_VERSION );
assertEquals(1, in.readLong());
assertEquals(Version.V_3_0_0.id, in.readInt());
assertEquals("dummyIndex", in.readString());
}

}

0 comments on commit 441f520

Please sign in to comment.