Skip to content

Commit

Permalink
Account memory for OrcDeletedRows
Browse files Browse the repository at this point in the history
  • Loading branch information
losipiuk committed Nov 18, 2021
1 parent 5cda4cd commit e3560ed
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 3 deletions.
Expand Up @@ -14,6 +14,8 @@
package io.trino.plugin.hive.orc;

import com.google.common.collect.ImmutableSet;
import io.trino.memory.context.AggregatedMemoryContext;
import io.trino.memory.context.LocalMemoryContext;
import io.trino.orc.OrcCorruptionException;
import io.trino.plugin.hive.AcidInfo;
import io.trino.plugin.hive.HdfsEnvironment;
Expand All @@ -30,6 +32,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.BucketCodec;
import org.openjdk.jol.info.ClassLayout;

import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
Expand All @@ -43,6 +46,7 @@

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Verify.verify;
import static io.airlift.slice.SizeOf.sizeOfObjectArray;
import static io.trino.plugin.hive.BackgroundHiveSplitLoader.hasAttemptId;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_BAD_DATA;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_CURSOR_ERROR;
Expand All @@ -65,6 +69,7 @@ public class OrcDeletedRows
private final HdfsEnvironment hdfsEnvironment;
private final AcidInfo acidInfo;
private final OptionalInt bucketNumber;
private final LocalMemoryContext systemMemoryUsage;

@Nullable
private Set<RowId> deletedRows;
Expand All @@ -76,7 +81,8 @@ public OrcDeletedRows(
Configuration configuration,
HdfsEnvironment hdfsEnvironment,
AcidInfo acidInfo,
OptionalInt bucketNumber)
OptionalInt bucketNumber,
AggregatedMemoryContext systemMemoryContext)
{
this.sourceFileName = requireNonNull(sourceFileName, "sourceFileName is null");
this.pageSourceFactory = requireNonNull(pageSourceFactory, "pageSourceFactory is null");
Expand All @@ -85,6 +91,7 @@ public OrcDeletedRows(
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.acidInfo = requireNonNull(acidInfo, "acidInfo is null");
this.bucketNumber = requireNonNull(bucketNumber, "bucketNumber is null");
this.systemMemoryUsage = requireNonNull(systemMemoryContext, "systemMemoryContext is null").newLocalMemoryContext(OrcDeletedRows.class.getSimpleName());
}

public MaskDeletedRowsFunction getMaskDeletedRowsFunction(Page sourcePage, OptionalLong startRowId)
Expand Down Expand Up @@ -262,10 +269,19 @@ private Set<RowId> getDeletedRows()
throw new TrinoException(HIVE_CURSOR_ERROR, "Failed to read ORC delete delta file: " + path, e);
}
}

deletedRows = deletedRowsBuilder.build();
// Not updating memory usage in the loop, when deletedRows are built, as recorded information is propagated
// to operator memory context via OrcPageSource only at the end of processing of page.
systemMemoryUsage.setBytes(memorySizeOfRowIdsArray(deletedRows.size()));
return deletedRows;
}

private long memorySizeOfRowIdsArray(int rowCount)
{
return sizeOfObjectArray(rowCount) + (long) rowCount * RowId.INSTANCE_SIZE;
}

private static Path createPath(AcidInfo acidInfo, AcidInfo.DeleteDeltaInfo deleteDeltaInfo, String fileName)
{
Path directory = new Path(acidInfo.getPartitionLocation(), deleteDeltaInfo.getDirectoryName());
Expand All @@ -285,6 +301,8 @@ private static Path createPath(AcidInfo acidInfo, AcidInfo.DeleteDeltaInfo delet

private static class RowId
{
public static final int INSTANCE_SIZE = ClassLayout.parseClass(RowId.class).instanceSize();

private final long originalTransaction;
private final int bucket;
private final int statementId;
Expand Down
Expand Up @@ -390,7 +390,8 @@ else if (column.getBaseHiveColumnIndex() < fileColumns.size()) {
configuration,
hdfsEnvironment,
info,
bucketNumber));
bucketNumber,
systemMemoryUsage));

Optional<Long> originalFileRowId = acidInfo
.filter(OrcPageSourceFactory::hasOriginalFilesAndDeleteDeltas)
Expand Down
Expand Up @@ -33,6 +33,7 @@
import java.util.OptionalLong;
import java.util.Set;

import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext;
import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT;
import static io.trino.plugin.hive.HiveTestUtils.SESSION;
import static io.trino.spi.type.BigintType.BIGINT;
Expand Down Expand Up @@ -167,7 +168,8 @@ private static OrcDeletedRows createOrcDeletedRows(AcidInfo acidInfo, String sou
configuration,
HDFS_ENVIRONMENT,
acidInfo,
OptionalInt.of(0));
OptionalInt.of(0),
newSimpleAggregatedMemoryContext());
}

private Page createTestPage(int originalTransactionStart, int originalTransactionEnd)
Expand Down

0 comments on commit e3560ed

Please sign in to comment.