Skip to content

Commit

Permalink
Expose counters for ORC writer memory usage
Browse files Browse the repository at this point in the history
  • Loading branch information
highker committed Mar 13, 2018
1 parent 475fbce commit 1ae1910
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 3 deletions.
10 changes: 10 additions & 0 deletions presto-orc/src/main/java/com/facebook/presto/orc/OrcWriter.java
Expand Up @@ -111,6 +111,7 @@ public class OrcWriter
private long columnWritersRetainedBytes;
private long closedStripesRetainedBytes;
private long validatorRetainedBytes;
private long previouslyRecordedSizeInBytes;
private boolean closed;

@Nullable
Expand Down Expand Up @@ -195,6 +196,9 @@ public OrcWriter(
}

this.validatorRetainedBytes = validationBuilder == null ? 0 : VALIDATOR_INSTANCE_SIZE;

this.previouslyRecordedSizeInBytes = getRetainedBytes();
stats.updateSizeInBytes(previouslyRecordedSizeInBytes);
}

public int getBufferedBytes()
Expand Down Expand Up @@ -235,6 +239,10 @@ public void write(Page page)
}
writeChunk(chunk);
}

long recordedSizeInBytes = getRetainedBytes();
stats.updateSizeInBytes(recordedSizeInBytes - previouslyRecordedSizeInBytes);
previouslyRecordedSizeInBytes = recordedSizeInBytes;
}

private void writeChunk(Page chunk)
Expand Down Expand Up @@ -383,6 +391,8 @@ public void close()
return;
}
closed = true;
stats.updateSizeInBytes(-previouslyRecordedSizeInBytes);
previouslyRecordedSizeInBytes = 0;

writeStripe(CLOSED);

Expand Down
Expand Up @@ -16,6 +16,8 @@
import org.weakref.jmx.Managed;
import org.weakref.jmx.Nested;

import java.util.concurrent.atomic.AtomicLong;

import static com.facebook.presto.orc.OrcWriterStats.FlushReason.CLOSED;
import static com.facebook.presto.orc.OrcWriterStats.FlushReason.DICTIONARY_FULL;
import static com.facebook.presto.orc.OrcWriterStats.FlushReason.MAX_BYTES;
Expand All @@ -34,13 +36,19 @@ public enum FlushReason
private final OrcWriterFlushStats maxBytesFlush = new OrcWriterFlushStats(MAX_ROWS.name());
private final OrcWriterFlushStats dictionaryFullFlush = new OrcWriterFlushStats(DICTIONARY_FULL.name());
private final OrcWriterFlushStats closedFlush = new OrcWriterFlushStats(CLOSED.name());
private final AtomicLong writerSizeInBytes = new AtomicLong();

public void recordStripeWritten(FlushReason flushReason, long stripeBytes, int stripeRows, int dictionaryBytes)
{
getFlushStats(flushReason).recordStripeWritten(stripeBytes, stripeRows, dictionaryBytes);
allFlush.recordStripeWritten(stripeBytes, stripeRows, dictionaryBytes);
}

public void updateSizeInBytes(long deltaInBytes)
{
writerSizeInBytes.addAndGet(deltaInBytes);
}

@Managed
@Nested
public OrcWriterFlushStats getAllFlush()
Expand Down Expand Up @@ -76,6 +84,12 @@ public OrcWriterFlushStats getClosedFlush()
return closedFlush;
}

@Managed
public long getWriterSizeInBytes()
{
return writerSizeInBytes.get();
}

private OrcWriterFlushStats getFlushStats(FlushReason flushReason)
{
switch (flushReason) {
Expand All @@ -101,6 +115,7 @@ public String toString()
.add("maxBytesFlush", maxBytesFlush)
.add("dictionaryFullFlush", dictionaryFullFlush)
.add("closedFlush", closedFlush)
.add("writerSizeInBytes", writerSizeInBytes.get())
.toString();
}
}
Expand Up @@ -439,6 +439,7 @@ public void assertRoundTrip(Type type, List<?> readValues)
public void assertRoundTrip(Type type, List<?> readValues, boolean verifyWithHiveReader)
throws Exception
{
OrcWriterStats stats = new OrcWriterStats();
for (Format format : formats) {
if (!format.supportsType(type)) {
return;
Expand All @@ -458,7 +459,7 @@ public void assertRoundTrip(Type type, List<?> readValues, boolean verifyWithHiv

// write Presto, read Hive and Presto
try (TempFile tempFile = new TempFile()) {
writeOrcColumnPresto(tempFile.getFile(), format, compression, type, readValues.iterator());
writeOrcColumnPresto(tempFile.getFile(), format, compression, type, readValues.iterator(), stats);

if (verifyWithHiveReader && hiveSupported) {
assertFileContentsHive(type, tempFile, format, readValues);
Expand All @@ -476,6 +477,8 @@ public void assertRoundTrip(Type type, List<?> readValues, boolean verifyWithHiv
}
}
}

assertEquals(stats.getWriterSizeInBytes(), 0);
}

private static void assertFileContentsPresto(
Expand Down Expand Up @@ -614,7 +617,7 @@ static OrcRecordReader createCustomOrcRecordReader(TempFile tempFile, OrcEncodin
return orcReader.createRecordReader(ImmutableMap.of(0, type), predicate, HIVE_STORAGE_TIME_ZONE, newSimpleAggregatedMemoryContext());
}

private static void writeOrcColumnPresto(File outputFile, Format format, CompressionKind compression, Type type, Iterator<?> values)
private static void writeOrcColumnPresto(File outputFile, Format format, CompressionKind compression, Type type, Iterator<?> values, OrcWriterStats stats)
throws Exception
{
ImmutableMap.Builder<String, String> metadata = ImmutableMap.builder();
Expand All @@ -632,7 +635,7 @@ private static void writeOrcColumnPresto(File outputFile, Format format, Compres
ImmutableMap.of(),
HIVE_STORAGE_TIME_ZONE,
true,
new OrcWriterStats());
stats);

BlockBuilder blockBuilder = type.createBlockBuilder(new BlockBuilderStatus(), 1024);
while (values.hasNext()) {
Expand Down

0 comments on commit 1ae1910

Please sign in to comment.