diff --git a/connector/src/main/java/tech/ydb/spark/connector/YdbTable.java b/connector/src/main/java/tech/ydb/spark/connector/YdbTable.java index e227ec6..3bff2b1 100644 --- a/connector/src/main/java/tech/ydb/spark/connector/YdbTable.java +++ b/connector/src/main/java/tech/ydb/spark/connector/YdbTable.java @@ -33,7 +33,6 @@ import tech.ydb.spark.connector.common.KeysRange; import tech.ydb.spark.connector.common.OperationOption; import tech.ydb.spark.connector.common.PartitionOption; -import tech.ydb.spark.connector.common.ReadMethod; import tech.ydb.spark.connector.read.YdbReadTable; import tech.ydb.spark.connector.read.YdbScanTable; import tech.ydb.spark.connector.write.YdbRowLevelBuilder; @@ -216,14 +215,14 @@ public Transform[] partitioning() { @Override public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) { - ReadMethod method = OperationOption.READ_METHOD.readEnum(options, ReadMethod.QUERY); + boolean useReadTable = OperationOption.USE_READ_TABLE.readBoolean(options, false); switch (type) { case COLUMN: return new YdbScanTable(this, options); case ROW: case INDEX: default: - if (method == ReadMethod.READ_TABLE) { + if (useReadTable) { return new YdbReadTable(this, options); } return new YdbScanTable(this, options); diff --git a/connector/src/main/java/tech/ydb/spark/connector/common/OperationOption.java b/connector/src/main/java/tech/ydb/spark/connector/common/OperationOption.java index a27cd19..acb2a0c 100644 --- a/connector/src/main/java/tech/ydb/spark/connector/common/OperationOption.java +++ b/connector/src/main/java/tech/ydb/spark/connector/common/OperationOption.java @@ -43,9 +43,9 @@ public enum OperationOption implements SparkOption { INGEST_METHOD("method"), /** - * YDB data reading method: ReadTable/QueryService/QueryService with ApacheArrow + * Force to use ReadTable for row-oriented tables */ - READ_METHOD("read.method"), + USE_READ_TABLE("useReadTable"), /** * YDB max batch rows for ingestion. diff --git a/connector/src/main/java/tech/ydb/spark/connector/common/ReadMethod.java b/connector/src/main/java/tech/ydb/spark/connector/common/ReadMethod.java deleted file mode 100644 index 778c453..0000000 --- a/connector/src/main/java/tech/ydb/spark/connector/common/ReadMethod.java +++ /dev/null @@ -1,11 +0,0 @@ -package tech.ydb.spark.connector.common; - -/** - * - * @author Aleksandr Gorshenin - */ -public enum ReadMethod { - QUERY, - READ_TABLE, - QUERY_APACHE_ARROW, -} diff --git a/connector/src/main/java/tech/ydb/spark/connector/read/PrimaryKeyExpression.java b/connector/src/main/java/tech/ydb/spark/connector/read/PrimaryKeyExpression.java new file mode 100644 index 0000000..c534b39 --- /dev/null +++ b/connector/src/main/java/tech/ydb/spark/connector/read/PrimaryKeyExpression.java @@ -0,0 +1,63 @@ +package tech.ydb.spark.connector.read; + +import tech.ydb.spark.connector.YdbTypes; +import tech.ydb.spark.connector.common.FieldInfo; +import tech.ydb.spark.connector.common.KeysRange; +import tech.ydb.table.values.TupleValue; +import tech.ydb.table.values.Value; + +/** + * + * @author Aleksandr Gorshenin + */ +public class PrimaryKeyExpression implements YdbPartition { + private static final long serialVersionUID = 3138686052261512975L; + + private final String expression; + private final String valueName; + private final Value value; + + private PrimaryKeyExpression(FieldInfo[] columns, TupleValue value, String valueName, String operation) { + StringBuilder ex = new StringBuilder(); + if (value.size() > 1) { + ex.append("("); + } + String del = ""; + for (int idx = 0; idx < columns.length && idx < value.size(); idx++) { + ex.append(del); + ex.append('`').append(columns[idx].getName()).append('`'); + del = ", "; + } + if (value.size() > 1) { + ex.append(")"); + } + ex.append(" ").append(operation).append(" ").append(valueName); + + this.expression = ex.toString(); + this.valueName = valueName; + this.value = value.size() > 1 ? value : value.get(0); + } + + @Override + public SelectQuery makeQuery(SelectQuery origin) { + return origin.addExpressionWithParam(expression, valueName, value); + } + + public static PrimaryKeyExpression keyRangeFrom(YdbTypes types, FieldInfo[] columns, KeysRange keyRange) { + if (!keyRange.hasFromValue()) { + return null; + } + + TupleValue fromValue = keyRange.readFromValue(types, columns); + return new PrimaryKeyExpression(columns, fromValue, "$f", keyRange.includesFromValue() ? ">=" : ">"); + } + + public static PrimaryKeyExpression keyRangeTo(YdbTypes types, FieldInfo[] columns, KeysRange keyRange) { + if (!keyRange.hasToValue()) { + return null; + } + + TupleValue toValue = keyRange.readToValue(types, columns); + return new PrimaryKeyExpression(columns, toValue, "$t", keyRange.includesToValue() ? "<=" : "<"); + } +} diff --git a/connector/src/main/java/tech/ydb/spark/connector/read/LazyReader.java b/connector/src/main/java/tech/ydb/spark/connector/read/StreamReader.java similarity index 95% rename from connector/src/main/java/tech/ydb/spark/connector/read/LazyReader.java rename to connector/src/main/java/tech/ydb/spark/connector/read/StreamReader.java index 14d6fda..f4466ba 100644 --- a/connector/src/main/java/tech/ydb/spark/connector/read/LazyReader.java +++ b/connector/src/main/java/tech/ydb/spark/connector/read/StreamReader.java @@ -23,8 +23,8 @@ * * @author Aleksandr Gorshenin */ -abstract class LazyReader implements PartitionReader { - private static final Logger logger = LoggerFactory.getLogger(LazyReader.class); +abstract class StreamReader implements PartitionReader { + private static final Logger logger = LoggerFactory.getLogger(StreamReader.class); private static final AtomicInteger COUNTER = new AtomicInteger(0); private final String[] outColumns; @@ -39,7 +39,7 @@ abstract class LazyReader implements PartitionReader { private volatile QueueItem currentItem = null; private volatile Status finishStatus = null; - protected LazyReader(YdbTypes types, int maxQueueSize, StructType schema) { + protected StreamReader(YdbTypes types, int maxQueueSize, StructType schema) { this.types = types; this.queue = new ArrayBlockingQueue<>(maxQueueSize); this.outColumns = schema.fieldNames(); @@ -86,7 +86,7 @@ public boolean next() { if (id == null) { startedAt = System.currentTimeMillis(); id = start(); - logger.info("[{}] started, {} total", id, COUNTER.incrementAndGet()); + logger.debug("[{}] started, {} total", id, COUNTER.incrementAndGet()); } while (true) { if (finishStatus != null) { diff --git a/connector/src/main/java/tech/ydb/spark/connector/read/YdbPartition.java b/connector/src/main/java/tech/ydb/spark/connector/read/YdbPartition.java index 274ed0f..f7bb482 100644 --- a/connector/src/main/java/tech/ydb/spark/connector/read/YdbPartition.java +++ b/connector/src/main/java/tech/ydb/spark/connector/read/YdbPartition.java @@ -2,10 +2,72 @@ import org.apache.spark.sql.connector.read.InputPartition; +import tech.ydb.spark.connector.YdbTypes; +import tech.ydb.spark.connector.common.FieldInfo; +import tech.ydb.spark.connector.common.KeysRange; + /** * * @author Aleksandr Gorshenin */ public interface YdbPartition extends InputPartition { SelectQuery makeQuery(SelectQuery origin); + + static YdbPartition none() { + return new YdbPartition() { + private static final long serialVersionUID = -7536076317892048979L; + @Override + public SelectQuery makeQuery(SelectQuery origin) { + return origin.copy().addExpression("1 = 0"); // disable all + } + }; + } + + static YdbPartition unrestricted() { + return new YdbPartition() { + private static final long serialVersionUID = -7536076317892048980L; + @Override + public SelectQuery makeQuery(SelectQuery origin) { + return origin.copy(); + } + }; + } + + static YdbPartition tabletId(String tabletId) { + return new YdbPartition() { + private static final long serialVersionUID = -7536076317892048981L; + @Override + public SelectQuery makeQuery(SelectQuery origin) { + return origin.copy().setWithExpression("TabletId='" + tabletId + "'"); + } + }; + } + + static YdbPartition keysRange(YdbTypes types, FieldInfo[] columns, KeysRange keyRange) { + if (columns.length == 0 || keyRange.isUnrestricted()) { + return unrestricted(); + } + + if (keyRange.isEmpty()) { + return none(); + } + + final PrimaryKeyExpression from = PrimaryKeyExpression.keyRangeFrom(types, columns, keyRange); + final PrimaryKeyExpression to = PrimaryKeyExpression.keyRangeTo(types, columns, keyRange); + + return new YdbPartition() { + private static final long serialVersionUID = -7536076317892048982L; + @Override + public SelectQuery makeQuery(SelectQuery origin) { + SelectQuery query = origin.copy(); + if (from != null) { + query = from.makeQuery(origin); + } + if (to != null) { + query = to.makeQuery(origin); + } + return query; + } + }; + } } diff --git a/connector/src/main/java/tech/ydb/spark/connector/read/YdbPartitions.java b/connector/src/main/java/tech/ydb/spark/connector/read/YdbPartitions.java deleted file mode 100644 index 7e584bb..0000000 --- a/connector/src/main/java/tech/ydb/spark/connector/read/YdbPartitions.java +++ /dev/null @@ -1,122 +0,0 @@ -package tech.ydb.spark.connector.read; - - -import tech.ydb.spark.connector.YdbTypes; -import tech.ydb.spark.connector.common.FieldInfo; -import tech.ydb.spark.connector.common.KeysRange; -import tech.ydb.table.values.TupleValue; -import tech.ydb.table.values.Value; - - -/** - * - * @author Aleksandr Gorshenin - */ -public class YdbPartitions { - private YdbPartitions() { } - - public static YdbPartition none() { - return new Unrestricted(); - } - - public static YdbPartition tabletId(String id) { - return new TabletId(id); - } - - public static YdbPartition keysRange(YdbTypes types, FieldInfo[] columns, KeysRange keyRange) { - if (columns.length == 0 || keyRange.isUnrestricted()) { - return new Unrestricted(); - } - - if (keyRange.isEmpty()) { - return new Empty(); - } - - StringBuilder keys = new StringBuilder(""); - String del = ""; - for (FieldInfo fi: columns) { - keys.append(del); - keys.append('`').append(fi.getName()).append('`'); - del = ", "; - } - - TupleValue from = keyRange.hasFromValue() ? keyRange.readFromValue(types, columns) : null; - TupleValue to = keyRange.hasToValue() ? keyRange.readToValue(types, columns) : null; - boolean includeFrom = keyRange.includesFromValue(); - boolean includeTo = keyRange.includesToValue(); - - String cols = keys.toString(); - if (columns.length == 1) { - Value fromValue = from != null ? from.get(0) : null; - Value toValue = to != null ? to.get(0) : null; - return new KeyRangeShard(cols, fromValue, toValue, includeFrom, includeTo); - } - - return new KeyRangeShard("(" + cols + ")", from, to, includeFrom, includeTo); - } - - private static class Unrestricted implements YdbPartition { - private static final long serialVersionUID = 2842737463454106189L; - - @Override - public SelectQuery makeQuery(SelectQuery origin) { - return origin.copy(); - } - } - - private static class Empty implements YdbPartition { - private static final long serialVersionUID = 8062207972148236947L; - - @Override - public SelectQuery makeQuery(SelectQuery origin) { - return origin.copy().addExpression("1 = 0"); // disable all - } - } - - private static class TabletId implements YdbPartition { - private static final long serialVersionUID = 8869357112576530212L; - - private final String tabletId; - - TabletId(String tabletId) { - this.tabletId = tabletId; - } - - @Override - public SelectQuery makeQuery(SelectQuery origin) { - return origin.copy().setWithExpression("TabletId='" + tabletId + "'"); - } - } - - private static class KeyRangeShard implements YdbPartition { - private static final long serialVersionUID = -8344489162515121573L; - - private final String columns; - private final Value from; - private final Value to; - private final boolean includeFrom; - private final boolean includeTo; - - KeyRangeShard(String columns, Value from, Value to, boolean includeFrom, boolean includeTo) { - this.columns = columns; - this.from = from; - this.to = to; - this.includeFrom = includeFrom; - this.includeTo = includeTo; - } - - @Override - public SelectQuery makeQuery(SelectQuery origin) { - SelectQuery query = origin.copy(); - if (from != null) { - String exp = columns + (includeFrom ? " >= " : " > ") + "$f"; - query = query.addExpressionWithParam(exp, "$f", from); - } - if (to != null) { - String exp = columns + (includeTo ? " <= " : " < ") + "$t"; - query = query.addExpressionWithParam(exp, "$t", to); - } - return query; - } - } -} diff --git a/connector/src/main/java/tech/ydb/spark/connector/read/YdbReadTable.java b/connector/src/main/java/tech/ydb/spark/connector/read/YdbReadTable.java index f859aa9..001655a 100644 --- a/connector/src/main/java/tech/ydb/spark/connector/read/YdbReadTable.java +++ b/connector/src/main/java/tech/ydb/spark/connector/read/YdbReadTable.java @@ -65,7 +65,7 @@ public YdbReadTable(YdbTable table, CaseInsensitiveStringMap options) { this.table = table; this.types = new YdbTypes(options); - this.queueMaxSize = LazyReader.readQueueMaxSize(options); + this.queueMaxSize = StreamReader.readQueueMaxSize(options); this.keys = table.getKeyColumns(); this.predicateRange = KeysRange.UNRESTRICTED; @@ -321,7 +321,7 @@ static final class Lyzer { } } - private final class ReadTableReader extends LazyReader { + private final class ReadTableReader extends StreamReader { private final String id; private final GrpcReadStream stream; diff --git a/connector/src/main/java/tech/ydb/spark/connector/read/YdbScanTable.java b/connector/src/main/java/tech/ydb/spark/connector/read/YdbScanTable.java index 560dd9a..61d08a1 100644 --- a/connector/src/main/java/tech/ydb/spark/connector/read/YdbScanTable.java +++ b/connector/src/main/java/tech/ydb/spark/connector/read/YdbScanTable.java @@ -57,7 +57,7 @@ public YdbScanTable(YdbTable table, CaseInsensitiveStringMap options) { this.query = new SelectQuery(table); this.types = new YdbTypes(options); - this.queueMaxSize = LazyReader.readQueueMaxSize(options); + this.queueMaxSize = StreamReader.readQueueMaxSize(options); this.readSchema = table.schema(); } @@ -174,7 +174,8 @@ public InputPartition[] planInputPartitions() { InputPartition[] partitions = new InputPartition[tablets.size()]; int idx = 0; for (String id: tablets) { - partitions[idx++] = YdbPartitions.tabletId(id); + logger.debug("create tablet {} partition", id); + partitions[idx++] = YdbPartition.tabletId(id); } return shuffle(partitions); } @@ -186,17 +187,18 @@ public InputPartition[] planInputPartitions() { if (ranges.length > 0) { InputPartition[] partitions = new InputPartition[ranges.length]; for (int idx = 0; idx < ranges.length; idx++) { - partitions[idx] = YdbPartitions.keysRange(types, table.getKeyColumns(), ranges[idx]); + logger.debug("create range {} partition", ranges[idx]); + partitions[idx] = YdbPartition.keysRange(types, table.getKeyColumns(), ranges[idx]); } return shuffle(partitions); } break; } - return new InputPartition[]{YdbPartitions.none()}; + return new InputPartition[]{YdbPartition.unrestricted()}; } - private final class QueryServiceReader extends LazyReader { + private final class QueryServiceReader extends StreamReader { private final String query; private final Params params; private volatile QueryStream stream = null; @@ -220,7 +222,11 @@ protected String start() { onComplete(res.getStatus(), th); }); - return query; + StringBuilder sb = new StringBuilder(query); + params.values().forEach((k, v) -> { + sb.append(", ").append(k).append("=").append(v); + }); + return sb.toString(); } @Override diff --git a/connector/src/test/java/tech/ydb/spark/connector/DataFramesTest.java b/connector/src/test/java/tech/ydb/spark/connector/DataFramesTest.java index 989e58c..2aec74b 100644 --- a/connector/src/test/java/tech/ydb/spark/connector/DataFramesTest.java +++ b/connector/src/test/java/tech/ydb/spark/connector/DataFramesTest.java @@ -126,7 +126,7 @@ public void countRowTableTest() { long count2 = spark.read().format("ydb").option("url", ydbURL).load("row_table").count(); Assert.assertEquals(count, count2); - long count3 = spark.read().format("ydb").option("url", ydbURL).option("read.method", "READ_TABLE") + long count3 = spark.read().format("ydb").option("url", ydbURL).option("useReadTable", "true") .load("row_table").count(); Assert.assertEquals(count2, count3); } @@ -148,7 +148,7 @@ public void countSplittedTableTest() { long count2 = spark.read().format("ydb").option("url", ydbURL).load("dir/splitted").count(); Assert.assertEquals(count, count2); - long count3 = spark.read().format("ydb").option("url", ydbURL).option("read.method", "READ_TABLE") + long count3 = spark.read().format("ydb").option("url", ydbURL).option("useReadTable", "true") .load("dir/splitted").count(); Assert.assertEquals(count2, count3); } diff --git a/connector/src/test/java/tech/ydb/spark/connector/PredicatesTest.java b/connector/src/test/java/tech/ydb/spark/connector/PredicatesTest.java index 7e7a345..c05afa5 100644 --- a/connector/src/test/java/tech/ydb/spark/connector/PredicatesTest.java +++ b/connector/src/test/java/tech/ydb/spark/connector/PredicatesTest.java @@ -89,7 +89,7 @@ private static void prepareTables(YdbExecutor executor) { + " PRIMARY KEY(sv, cv)" + ") WITH (" + " AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 5, " - + " PARTITION_AT_KEYS = (500, 800, 900, 950) " + + " PARTITION_AT_KEYS = ((500), (700), (900, 2), (950, 10)) " + ")").join().expectSuccess("cannot create row_table1 table"); executor.executeSchemeQuery("CREATE TABLE row_table2 (" + " sv Uint32 NOT NULL," @@ -190,11 +190,11 @@ public void test03_loadColumnTable() { @Test public void test04_count() { long count1 = readYdb().load("row_table1").count(); - long count2 = readYdb().option("read.method", "READ_TABLE").load("row_table1").count(); + long count2 = readYdb().option("useReadTable", "true").load("row_table1").count(); Assert.assertEquals(count1, count2); long count3 = readYdb().load("row_table2").count(); - long count4 = readYdb().option("read.method", "READ_TABLE").load("row_table2").count(); + long count4 = readYdb().option("useReadTable", "true").load("row_table2").count(); Assert.assertEquals(count3, count4); long count5 = readYdb().load("column_table").count();