From 12c03ec2703379c70623958c6695db244e8a384c Mon Sep 17 00:00:00 2001 From: Andrii Rosa Date: Wed, 8 Dec 2021 13:24:49 -0500 Subject: [PATCH] Refactor KuduSplit Remove KuduTableHandle from the KuduSplit to simplify memory accounting. KuduTableHandle contains KuduTable, a class defined by the Kudu client library. Implementing memory accounting correctly for that object is challenging. --- .../java/io/trino/plugin/kudu/KuduClientSession.java | 2 +- .../java/io/trino/plugin/kudu/KuduRecordSet.java | 7 ++++++- .../main/java/io/trino/plugin/kudu/KuduSplit.java | 12 +++++++----- 3 files changed, 14 insertions(+), 7 deletions(-) diff --git a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduClientSession.java b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduClientSession.java index 3c25f72feea49..1b08fc9aaca8a 100644 --- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduClientSession.java +++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduClientSession.java @@ -603,7 +603,7 @@ private KuduSplit toKuduSplit(KuduTableHandle tableHandle, KuduScanToken token, { try { byte[] serializedScanToken = token.serialize(); - return new KuduSplit(tableHandle, primaryKeyColumnCount, serializedScanToken, bucketNumber); + return new KuduSplit(tableHandle.getSchemaTableName(), primaryKeyColumnCount, serializedScanToken, bucketNumber); } catch (IOException e) { throw new TrinoException(GENERIC_INTERNAL_ERROR, e); diff --git a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduRecordSet.java b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduRecordSet.java index a2241f96437cf..f688cb0c40349 100755 --- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduRecordSet.java +++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduRecordSet.java @@ -34,6 +34,8 @@ public class KuduRecordSet private final KuduSplit kuduSplit; private final List columns; + private KuduTable kuduTable; + public KuduRecordSet(KuduClientSession clientSession, KuduSplit kuduSplit, List columns) { this.clientSession = clientSession; @@ -70,7 +72,10 @@ public RecordCursor cursor() KuduTable getTable() { - return kuduSplit.getTableHandle().getTable(clientSession); + if (kuduTable == null) { + kuduTable = clientSession.openTable(kuduSplit.getSchemaTableName()); + } + return kuduTable; } KuduClientSession getClientSession() diff --git a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduSplit.java b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduSplit.java index 5e2c73fec2b7a..06ab4d4b476da 100755 --- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduSplit.java +++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduSplit.java @@ -18,6 +18,7 @@ import com.google.common.collect.ImmutableList; import io.trino.spi.HostAddress; import io.trino.spi.connector.ConnectorSplit; +import io.trino.spi.connector.SchemaTableName; import java.util.List; @@ -27,18 +28,19 @@ public class KuduSplit implements ConnectorSplit { - private final KuduTableHandle tableHandle; + private final SchemaTableName schemaTableName; private final int primaryKeyColumnCount; private final byte[] serializedScanToken; private final int bucketNumber; @JsonCreator - public KuduSplit(@JsonProperty("tableHandle") KuduTableHandle tableHandle, + public KuduSplit( + @JsonProperty("schemaTableName") SchemaTableName schemaTableName, @JsonProperty("primaryKeyColumnCount") int primaryKeyColumnCount, @JsonProperty("serializedScanToken") byte[] serializedScanToken, @JsonProperty("bucketNumber") int bucketNumber) { - this.tableHandle = requireNonNull(tableHandle, "tableHandle is null"); + this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null"); this.primaryKeyColumnCount = primaryKeyColumnCount; this.serializedScanToken = requireNonNull(serializedScanToken, "serializedScanToken is null"); checkArgument(bucketNumber >= 0, "bucketNumber is negative"); @@ -46,9 +48,9 @@ public KuduSplit(@JsonProperty("tableHandle") KuduTableHandle tableHandle, } @JsonProperty - public KuduTableHandle getTableHandle() + public SchemaTableName getSchemaTableName() { - return tableHandle; + return schemaTableName; } @JsonProperty