Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import tech.ydb.spark.connector.common.KeysRange;
import tech.ydb.spark.connector.common.OperationOption;
import tech.ydb.spark.connector.common.PartitionOption;
import tech.ydb.spark.connector.common.ReadMethod;
import tech.ydb.spark.connector.read.YdbReadTable;
import tech.ydb.spark.connector.read.YdbScanTable;
import tech.ydb.spark.connector.write.YdbRowLevelBuilder;
Expand Down Expand Up @@ -216,14 +215,14 @@ public Transform[] partitioning() {

@Override
public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
ReadMethod method = OperationOption.READ_METHOD.readEnum(options, ReadMethod.QUERY);
boolean useReadTable = OperationOption.USE_READ_TABLE.readBoolean(options, false);
switch (type) {
case COLUMN:
return new YdbScanTable(this, options);
case ROW:
case INDEX:
default:
if (method == ReadMethod.READ_TABLE) {
if (useReadTable) {
return new YdbReadTable(this, options);
}
return new YdbScanTable(this, options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ public enum OperationOption implements SparkOption {
INGEST_METHOD("method"),

/**
* YDB data reading method: ReadTable/QueryService/QueryService with ApacheArrow
* Force to use ReadTable for row-oriented tables
*/
READ_METHOD("read.method"),
USE_READ_TABLE("useReadTable"),

/**
* YDB max batch rows for ingestion.
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package tech.ydb.spark.connector.read;

import tech.ydb.spark.connector.YdbTypes;
import tech.ydb.spark.connector.common.FieldInfo;
import tech.ydb.spark.connector.common.KeysRange;
import tech.ydb.table.values.TupleValue;
import tech.ydb.table.values.Value;

/**
*
* @author Aleksandr Gorshenin
*/
public class PrimaryKeyExpression implements YdbPartition {
private static final long serialVersionUID = 3138686052261512975L;

private final String expression;
private final String valueName;
private final Value<?> value;

private PrimaryKeyExpression(FieldInfo[] columns, TupleValue value, String valueName, String operation) {
StringBuilder ex = new StringBuilder();
if (value.size() > 1) {
ex.append("(");
}
String del = "";
for (int idx = 0; idx < columns.length && idx < value.size(); idx++) {
ex.append(del);
ex.append('`').append(columns[idx].getName()).append('`');
del = ", ";
}
if (value.size() > 1) {
ex.append(")");
}
ex.append(" ").append(operation).append(" ").append(valueName);

this.expression = ex.toString();
this.valueName = valueName;
this.value = value.size() > 1 ? value : value.get(0);
}

@Override
public SelectQuery makeQuery(SelectQuery origin) {
return origin.addExpressionWithParam(expression, valueName, value);
}

public static PrimaryKeyExpression keyRangeFrom(YdbTypes types, FieldInfo[] columns, KeysRange keyRange) {
if (!keyRange.hasFromValue()) {
return null;
}

TupleValue fromValue = keyRange.readFromValue(types, columns);
return new PrimaryKeyExpression(columns, fromValue, "$f", keyRange.includesFromValue() ? ">=" : ">");
}

public static PrimaryKeyExpression keyRangeTo(YdbTypes types, FieldInfo[] columns, KeysRange keyRange) {
if (!keyRange.hasToValue()) {
return null;
}

TupleValue toValue = keyRange.readToValue(types, columns);
return new PrimaryKeyExpression(columns, toValue, "$t", keyRange.includesToValue() ? "<=" : "<");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
*
* @author Aleksandr Gorshenin
*/
abstract class LazyReader implements PartitionReader<InternalRow> {
private static final Logger logger = LoggerFactory.getLogger(LazyReader.class);
abstract class StreamReader implements PartitionReader<InternalRow> {
private static final Logger logger = LoggerFactory.getLogger(StreamReader.class);
private static final AtomicInteger COUNTER = new AtomicInteger(0);

private final String[] outColumns;
Expand All @@ -39,7 +39,7 @@ abstract class LazyReader implements PartitionReader<InternalRow> {
private volatile QueueItem currentItem = null;
private volatile Status finishStatus = null;

protected LazyReader(YdbTypes types, int maxQueueSize, StructType schema) {
protected StreamReader(YdbTypes types, int maxQueueSize, StructType schema) {
this.types = types;
this.queue = new ArrayBlockingQueue<>(maxQueueSize);
this.outColumns = schema.fieldNames();
Expand Down Expand Up @@ -86,7 +86,7 @@ public boolean next() {
if (id == null) {
startedAt = System.currentTimeMillis();
id = start();
logger.info("[{}] started, {} total", id, COUNTER.incrementAndGet());
logger.debug("[{}] started, {} total", id, COUNTER.incrementAndGet());
}
while (true) {
if (finishStatus != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,72 @@

import org.apache.spark.sql.connector.read.InputPartition;

import tech.ydb.spark.connector.YdbTypes;
import tech.ydb.spark.connector.common.FieldInfo;
import tech.ydb.spark.connector.common.KeysRange;

/**
*
* @author Aleksandr Gorshenin
*/
public interface YdbPartition extends InputPartition {
SelectQuery makeQuery(SelectQuery origin);

static YdbPartition none() {
return new YdbPartition() {
private static final long serialVersionUID = -7536076317892048979L;
@Override
public SelectQuery makeQuery(SelectQuery origin) {
return origin.copy().addExpression("1 = 0"); // disable all
}
};
}

static YdbPartition unrestricted() {
return new YdbPartition() {
private static final long serialVersionUID = -7536076317892048980L;
@Override
public SelectQuery makeQuery(SelectQuery origin) {
return origin.copy();
}
};
}

static YdbPartition tabletId(String tabletId) {
return new YdbPartition() {
private static final long serialVersionUID = -7536076317892048981L;
@Override
public SelectQuery makeQuery(SelectQuery origin) {
return origin.copy().setWithExpression("TabletId='" + tabletId + "'");
}
};
}

static YdbPartition keysRange(YdbTypes types, FieldInfo[] columns, KeysRange keyRange) {
if (columns.length == 0 || keyRange.isUnrestricted()) {
return unrestricted();
}

if (keyRange.isEmpty()) {
return none();
}

final PrimaryKeyExpression from = PrimaryKeyExpression.keyRangeFrom(types, columns, keyRange);
final PrimaryKeyExpression to = PrimaryKeyExpression.keyRangeTo(types, columns, keyRange);

return new YdbPartition() {
private static final long serialVersionUID = -7536076317892048982L;
@Override
public SelectQuery makeQuery(SelectQuery origin) {
SelectQuery query = origin.copy();
if (from != null) {
query = from.makeQuery(origin);
}
if (to != null) {
query = to.makeQuery(origin);
}
return query;
}
};
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public YdbReadTable(YdbTable table, CaseInsensitiveStringMap options) {
this.table = table;

this.types = new YdbTypes(options);
this.queueMaxSize = LazyReader.readQueueMaxSize(options);
this.queueMaxSize = StreamReader.readQueueMaxSize(options);
this.keys = table.getKeyColumns();

this.predicateRange = KeysRange.UNRESTRICTED;
Expand Down Expand Up @@ -321,7 +321,7 @@ static final class Lyzer {
}
}

private final class ReadTableReader extends LazyReader {
private final class ReadTableReader extends StreamReader {
private final String id;
private final GrpcReadStream<ReadTablePart> stream;

Expand Down
Loading