Skip to content

Commit

Permalink
Cleanup staging files in Raptor
Browse files Browse the repository at this point in the history
  • Loading branch information
electrum committed Nov 12, 2015
1 parent 15000b5 commit 0d3742b
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 12 deletions.
Expand Up @@ -115,7 +115,6 @@ public Collection<Slice> commit()
public void rollback()
{
storagePageSink.rollback();
// TODO: clean up any written files
}

/**
Expand Down
Expand Up @@ -465,6 +465,7 @@ private class OrcStoragePageSink
private final List<Long> columnIds;
private final List<Type> columnTypes;

private final List<File> stagingFiles = new ArrayList<>();
private final List<ShardInfo> shards = new ArrayList<>();
private final List<CompletableFuture<?>> futures = new ArrayList<>();

Expand Down Expand Up @@ -548,12 +549,20 @@ public List<ShardInfo> commit()
return ImmutableList.copyOf(shards);
}

@SuppressWarnings("ResultOfMethodCallIgnored")
@Override
public void rollback()
{
if (writer != null) {
writer.close();
writer = null;
try {
if (writer != null) {
writer.close();
writer = null;
}
}
finally {
for (File file : stagingFiles) {
file.delete();
}
}
}

Expand All @@ -563,6 +572,7 @@ private void createWriterIfNecessary()
shardUuid = UUID.randomUUID();
File stagingFile = storageService.getStagingFile(shardUuid);
storageService.createParents(stagingFile);
stagingFiles.add(stagingFile);
writer = new OrcFileWriter(columnIds, columnTypes, stagingFile);
}
}
Expand Down
Expand Up @@ -68,6 +68,26 @@ public List<ShardInfo> compact(long transactionId, Set<UUID> uuids, List<ColumnI
List<Type> columnTypes = columns.stream().map(ColumnInfo::getType).collect(toList());

StoragePageSink storagePageSink = storageManager.createStoragePageSink(transactionId, columnIds, columnTypes);

List<ShardInfo> shardInfos;
try {
shardInfos = compact(storagePageSink, uuids, columnIds, columnTypes);
}
catch (IOException | RuntimeException e) {
storagePageSink.rollback();
throw e;
}

inputShardsPerCompaction.add(uuids.size());
outputShardsPerCompaction.add(shardInfos.size());
compactionLatencyMillis.add(Duration.nanosSince(start).toMillis());

return shardInfos;
}

private List<ShardInfo> compact(StoragePageSink storagePageSink, Set<UUID> uuids, List<Long> columnIds, List<Type> columnTypes)
throws IOException
{
for (UUID uuid : uuids) {
try (ConnectorPageSource pageSource = storageManager.getPageSource(uuid, columnIds, columnTypes, TupleDomain.all(), readerAttributes)) {
while (!pageSource.isFinished()) {
Expand All @@ -82,13 +102,7 @@ public List<ShardInfo> compact(long transactionId, Set<UUID> uuids, List<ColumnI
}
}
}
List<ShardInfo> shardInfos = storagePageSink.commit();

inputShardsPerCompaction.add(uuids.size());
outputShardsPerCompaction.add(shardInfos.size());
compactionLatencyMillis.add(Duration.nanosSince(start).toMillis());

return shardInfos;
return storagePageSink.commit();
}

public List<ShardInfo> compactSorted(long transactionId, Set<UUID> uuids, List<ColumnInfo> columns, List<Long> sortColumnIds, List<SortOrder> sortOrders)
Expand Down Expand Up @@ -136,8 +150,11 @@ public List<ShardInfo> compactSorted(long transactionId, Set<UUID> uuids, List<C

return shardInfos;
}
catch (IOException | RuntimeException e) {
outputPageSink.rollback();
throw e;
}
finally {
outputPageSink.flush();
rowSources.stream().forEach(SortedRowSource::closeQuietly);
}
}
Expand Down
Expand Up @@ -368,6 +368,40 @@ public void testRewriter()
assertEquals(recordedShards.get(1).getNodeIdentifier(), CURRENT_NODE);
}

@Test
public void testWriterRollback()
throws Exception
{
// verify staging directory does not exist
File staging = new File(new File(temporary, "data"), "staging");
assertFalse(staging.exists());

// create a shard in staging
OrcStorageManager manager = createOrcStorageManager();

List<Long> columnIds = ImmutableList.of(3L, 7L);
List<Type> columnTypes = ImmutableList.<Type>of(BIGINT, VARCHAR);

StoragePageSink sink = createStoragePageSink(manager, columnIds, columnTypes);
List<Page> pages = rowPagesBuilder(columnTypes)
.row(123, "hello")
.row(456, "bye")
.build();
sink.appendPages(pages);

sink.flush();

// verify shard exists in staging
String[] files = staging.list();
assertEquals(files.length, 1);
assertTrue(files[0].endsWith(".orc"));

// rollback should cleanup staging files
sink.rollback();

assertEquals(staging.list(), new String[] {});
}

@Test
public void testShardStatsBigint()
{
Expand Down

0 comments on commit 0d3742b

Please sign in to comment.