Skip to content

Commit

Permalink
[Bugfix] Fix uncorrect index file (Tencent#92)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Modify the method that calculate the offset in the index file.

### Why are the changes needed?
If we don't have this patch, we run 10TB tpcds, query24a will fail.
<img width="361" alt="企业微信截图_6dc451cf-dbf4-4257-b680-e79346cd582d" src="https://user-images.githubusercontent.com/8159038/157178756-d8a39b3f-0ea6-4864-ac68-ee382a88bb0f.png">
When we write many data to dataOutputStream, dataOutputStream.size() won't increase again. dataOutputStream.size() will
always be Integer.MAX_VALUE.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Add new uts.

Co-authored-by: roryqi <roryqi@tencent.com>
  • Loading branch information
jerqi and jerqi committed Mar 8, 2022
1 parent 84410a4 commit bdcd5f0
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 4 deletions.
Expand Up @@ -30,21 +30,19 @@ public class LocalFileWriter implements Closeable {

private DataOutputStream dataOutputStream;
private FileOutputStream fileOutputStream;
private long initSize;
private long nextOffset;

public LocalFileWriter(File file) throws IOException {
fileOutputStream = new FileOutputStream(file, true);
// init fsDataOutputStream
dataOutputStream = new DataOutputStream(fileOutputStream);
initSize = file.length();
nextOffset = initSize;
nextOffset = file.length();
}

public void writeData(byte[] data) throws IOException {
if (data != null && data.length > 0) {
dataOutputStream.write(data);
nextOffset = initSize + dataOutputStream.size();
nextOffset = nextOffset + data.length;
}
}

Expand Down
Expand Up @@ -39,6 +39,7 @@
import com.tencent.rss.storage.handler.api.ShuffleWriteHandler;
import com.tencent.rss.storage.util.ShuffleStorageUtils;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Random;
Expand All @@ -53,6 +54,7 @@ public class LocalFileHandlerTest {
@Test
public void writeTest() throws Exception {
File tmpDir = Files.createTempDir();
tmpDir.deleteOnExit();
File dataDir1 = new File(tmpDir, "data1");
File dataDir2 = new File(tmpDir, "data2");
String[] basePaths = new String[]{dataDir1.getAbsolutePath(),
Expand Down Expand Up @@ -111,6 +113,21 @@ public void writeTest() throws Exception {
}
}

@Test
public void writeBigDataTest() throws IOException {
File tmpDir = Files.createTempDir();
tmpDir.deleteOnExit();
File writeFile = new File(tmpDir, "writetest");
LocalFileWriter writer = new LocalFileWriter(writeFile);
int size = Integer.MAX_VALUE / 100;
byte[] data = new byte[size];
for (int i = 0; i < 200; i++) {
writer.writeData(data);
}
long totalSize = 200L * size;
assertEquals(writer.nextOffset(), totalSize);
}


private void writeTestData(
ShuffleWriteHandler writeHandler,
Expand Down

0 comments on commit bdcd5f0

Please sign in to comment.