Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import tech.ydb.spark.connector.common.KeysRange;
import tech.ydb.spark.connector.common.OperationOption;
import tech.ydb.spark.connector.common.PartitionOption;
import tech.ydb.spark.connector.common.ReadMethod;
import tech.ydb.spark.connector.read.YdbReadTable;
import tech.ydb.spark.connector.read.YdbScanTable;
import tech.ydb.spark.connector.write.YdbRowLevelBuilder;
Expand Down Expand Up @@ -215,13 +216,17 @@ public Transform[] partitioning() {

@Override
public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
ReadMethod method = OperationOption.READ_METHOD.readEnum(options, ReadMethod.QUERY);
switch (type) {
case COLUMN:
return new YdbScanTable(this, options);
case ROW:
case INDEX:
default:
return new YdbReadTable(this, options);
if (method == ReadMethod.READ_TABLE) {
return new YdbReadTable(this, options);
}
return new YdbScanTable(this, options);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ public enum OperationOption implements SparkOption {
*/
INGEST_METHOD("method"),

/**
* YDB data reading method: ReadTable/QueryService/QueryService with ApacheArrow
*/
READ_METHOD("read.method"),

/**
* YDB max batch rows for ingestion.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package tech.ydb.spark.connector.common;

/**
*
* @author Aleksandr Gorshenin
*/
public enum ReadMethod {
QUERY,
READ_TABLE,
QUERY_APACHE_ARROW,
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import tech.ydb.table.Session;
import tech.ydb.table.SessionSupplier;
import tech.ydb.table.description.TableDescription;
import tech.ydb.table.description.TableOptionDescription;
import tech.ydb.table.query.DataQuery;
import tech.ydb.table.query.DataQueryResult;
import tech.ydb.table.query.ExplainDataQueryResult;
Expand All @@ -38,6 +39,7 @@
import tech.ydb.table.settings.CopyTablesSettings;
import tech.ydb.table.settings.CreateTableSettings;
import tech.ydb.table.settings.DeleteSessionSettings;
import tech.ydb.table.settings.DescribeTableOptionsSettings;
import tech.ydb.table.settings.DescribeTableSettings;
import tech.ydb.table.settings.DropTableSettings;
import tech.ydb.table.settings.ExecuteDataQuerySettings;
Expand Down Expand Up @@ -282,4 +284,9 @@ public CompletableFuture<Result<State>> keepAlive(KeepAliveSessionSettings setti
public CompletableFuture<Status> delete(DeleteSessionSettings settings) {
throw new UnsupportedOperationException("Not supported for implicit sessions");
}

@Override
public CompletableFuture<Result<TableOptionDescription>> describeTableOptions(DescribeTableOptionsSettings opts) {
throw new UnsupportedOperationException("Not supported for implicit sessions");
}
}
139 changes: 139 additions & 0 deletions connector/src/main/java/tech/ydb/spark/connector/read/SelectQuery.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package tech.ydb.spark.connector.read;


import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;

import tech.ydb.spark.connector.YdbTable;
import tech.ydb.spark.connector.common.FieldInfo;
import tech.ydb.table.query.Params;
import tech.ydb.table.values.Value;

/**
*
* @author Aleksandr Gorshenin
*/
public class SelectQuery implements Serializable {
private static final long serialVersionUID = -7754646422175214023L;

private final String tableName;
private final ArrayList<String> predicates;
private final ArrayList<String> expressions;
private final ArrayList<String> groupBy;
private final HashMap<String, Value<?>> params;

private String withExpression;
private long rowLimit;

public SelectQuery(YdbTable table) {
this.tableName = table.getTablePath();

FieldInfo[] keys = table.getKeyColumns();
this.predicates = new ArrayList<>(keys.length);
for (int idx = 0; idx < table.getKeyColumns().length; idx += 1) {
this.predicates.add("`" + keys[idx].getName() + "`");
}

this.expressions = new ArrayList<>();
this.groupBy = new ArrayList<>();
this.params = new HashMap<>();
this.withExpression = null;
this.rowLimit = -1;
}

private SelectQuery(String tableName, ArrayList<String> predicates, ArrayList<String> expressions,
ArrayList<String> groupBy, HashMap<String, Value<?>> params, String withExpression, long rowLimit) {
this.tableName = tableName;
this.expressions = expressions;
this.predicates = predicates;
this.groupBy = groupBy;
this.params = params;
this.withExpression = withExpression;
this.rowLimit = rowLimit;
}

public SelectQuery copy() {
return new SelectQuery(tableName, new ArrayList<>(predicates), new ArrayList<>(expressions),
new ArrayList<>(groupBy), new HashMap<>(params), withExpression, rowLimit);
}

public SelectQuery setWithExpression(String expression) {
if (withExpression != null) {
throw new IllegalArgumentException("Cannot rewrite WITH expression");
}
withExpression = expression;
return this;
}

public SelectQuery addExpression(String exp) {
if (exp != null) {
expressions.add(exp);
}
return this;
}

public SelectQuery addExpressionWithParam(String exp, String prmName, Value<?> prmValue) {
if (exp != null && prmValue != null) {
expressions.add(exp);
params.put(prmName, prmValue);
}
return this;
}

public SelectQuery withRowLimit(int limit) {
this.rowLimit = limit;
return this;
}

public SelectQuery replacePredicates(String[] predicates) {
this.predicates.clear();
for (String predicate: predicates) {
this.predicates.add("`" + predicate + "`");
}
return this;
}

public String toQuery() {
StringBuilder sb = new StringBuilder();

for (Map.Entry<String, Value<?>> entry: params.entrySet()) {
sb.append("DECLARE ")
.append(entry.getKey())
.append(" AS ")
.append(entry.getValue().getType().toString())
.append("; ");
}

sb.append("SELECT");
char pDep = ' ';
for (String col: predicates) {
sb.append(pDep);
sb.append(col);
pDep = ',';
}
sb.append(" FROM `").append(tableName).append("`");

String eDep = " WHERE ";
for (String exp: expressions) {
sb.append(eDep);
sb.append(exp);
eDep = " AND ";
}

if (withExpression != null) {
sb.append(" WITH ").append(withExpression);
}

if (rowLimit > 0) {
sb.append(" LIMIT ").append(rowLimit);
}

return sb.toString();
}

public Params toQueryParams() {
return Params.copyOf(params);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package tech.ydb.spark.connector.read;

import org.apache.spark.sql.connector.read.InputPartition;

/**
*
* @author Aleksandr Gorshenin
*/
public interface YdbPartition extends InputPartition {
SelectQuery makeQuery(SelectQuery origin);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package tech.ydb.spark.connector.read;


import tech.ydb.spark.connector.YdbTypes;
import tech.ydb.spark.connector.common.FieldInfo;
import tech.ydb.spark.connector.common.KeysRange;
import tech.ydb.table.values.TupleValue;
import tech.ydb.table.values.Value;


/**
*
* @author Aleksandr Gorshenin
*/
public class YdbPartitions {
private YdbPartitions() { }

public static YdbPartition none() {
return new Unrestricted();
}

public static YdbPartition tabletId(String id) {
return new TabletId(id);
}

public static YdbPartition keysRange(YdbTypes types, FieldInfo[] columns, KeysRange keyRange) {
if (columns.length == 0 || keyRange.isUnrestricted()) {
return new Unrestricted();
}

if (keyRange.isEmpty()) {
return new Empty();
}

StringBuilder keys = new StringBuilder("");
String del = "";
for (FieldInfo fi: columns) {
keys.append(del);
keys.append('`').append(fi.getName()).append('`');
del = ", ";
}

TupleValue from = keyRange.hasFromValue() ? keyRange.readFromValue(types, columns) : null;
TupleValue to = keyRange.hasToValue() ? keyRange.readToValue(types, columns) : null;
boolean includeFrom = keyRange.includesFromValue();
boolean includeTo = keyRange.includesToValue();

String cols = keys.toString();
if (columns.length == 1) {
Value<?> fromValue = from != null ? from.get(0) : null;
Value<?> toValue = to != null ? to.get(0) : null;
return new KeyRangeShard(cols, fromValue, toValue, includeFrom, includeTo);
}

return new KeyRangeShard("(" + cols + ")", from, to, includeFrom, includeTo);
}

private static class Unrestricted implements YdbPartition {
private static final long serialVersionUID = 2842737463454106189L;

@Override
public SelectQuery makeQuery(SelectQuery origin) {
return origin.copy();
}
}

private static class Empty implements YdbPartition {
private static final long serialVersionUID = 8062207972148236947L;

@Override
public SelectQuery makeQuery(SelectQuery origin) {
return origin.copy().addExpression("1 = 0"); // disable all
}
}

private static class TabletId implements YdbPartition {
private static final long serialVersionUID = 8869357112576530212L;

private final String tabletId;

TabletId(String tabletId) {
this.tabletId = tabletId;
}

@Override
public SelectQuery makeQuery(SelectQuery origin) {
return origin.copy().setWithExpression("TabletId='" + tabletId + "'");
}
}

private static class KeyRangeShard implements YdbPartition {
private static final long serialVersionUID = -8344489162515121573L;

private final String columns;
private final Value<?> from;
private final Value<?> to;
private final boolean includeFrom;
private final boolean includeTo;

KeyRangeShard(String columns, Value<?> from, Value<?> to, boolean includeFrom, boolean includeTo) {
this.columns = columns;
this.from = from;
this.to = to;
this.includeFrom = includeFrom;
this.includeTo = includeTo;
}

@Override
public SelectQuery makeQuery(SelectQuery origin) {
SelectQuery query = origin.copy();
if (from != null) {
String exp = columns + (includeFrom ? " >= " : " > ") + "$f";
query = query.addExpressionWithParam(exp, "$f", from);
}
if (to != null) {
String exp = columns + (includeTo ? " <= " : " < ") + "$t";
query = query.addExpressionWithParam(exp, "$t", to);
}
return query;
}
}
}
Loading