Skip to content
4 changes: 3 additions & 1 deletion ydb/core/kqp/opt/kqp_query_blocks_transformer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ class TKqpQueryBlocksTransformer : public TGraphTransformerBase {
}

TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) override {
return QueryBlockTransformer->ApplyAsyncChanges(input, output, ctx);
YQL_ENSURE(CurrentBlock < input->ChildrenSize());
output = input;
return QueryBlockTransformer->ApplyAsyncChanges(input->Child(CurrentBlock), output->ChildRef(CurrentBlock), ctx);
}

private:
Expand Down
17 changes: 13 additions & 4 deletions ydb/core/kqp/ut/federated_query/datastreams/datastreams_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -699,11 +699,14 @@ class TStreamingTestFixture : public NUnitTest::TBaseFixture {
.TypeMappingSettings(typeMappingSettings);

auto listSplitsBuilder = mockClient->ExpectListSplits();
listSplitsBuilder
auto fillListSplitExpectation = listSplitsBuilder
.ValidateArgs(settings.ValidateListSplitsArgs)
.Select()
.DataSourceInstance(GetMockConnectorSourceInstance())
.Table(settings.TableName);
.Table(settings.TableName)
.What();

FillMockConnectorRequestColumns(fillListSplitExpectation, settings.Columns);

for (ui64 i = 0; i < settings.DescribeCount; ++i) {
auto responseBuilder = describeTableBuilder.Response();
Expand Down Expand Up @@ -1944,6 +1947,7 @@ Y_UNIT_TEST_SUITE(KqpStreamingQueriesDdl) {
));

{ // Prepare connector mock

const std::vector<TColumn> columns = {
{"fqdn", Ydb::Type::STRING},
{"payload", Ydb::Type::STRING}
Expand All @@ -1952,7 +1956,10 @@ Y_UNIT_TEST_SUITE(KqpStreamingQueriesDdl) {
.TableName = ydbTable,
.Columns = columns,
.DescribeCount = 2,
.ListSplitsCount = 2
// For stream queries type annotation is executed twice, but
// now List Split is done after type annotation optimization.
// That is why only single call to List Split is expected.
.ListSplitsCount = 1
});

const std::vector<std::string> fqdnColumn = {"host1.example.com", "host2.example.com", "host3.example.com"};
Expand Down Expand Up @@ -2054,7 +2061,9 @@ Y_UNIT_TEST_SUITE(KqpStreamingQueriesDdl) {
.TableName = ydbTable,
.Columns = columns,
.DescribeCount = 2,
.ListSplitsCount = 5,
// Now List Split is done after type annotation, that is the
// reason why this value equal to 4 not 5
.ListSplitsCount = 4,
.ValidateListSplitsArgs = false
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,65 @@ namespace NKikimr::NKqp {
return databaseAsyncResolverMock;
}

///
/// Fixture that prepares mocks and services for a provider.
///
/// TODO:
/// Make it reusable, currently it fails if multiple
/// expects are applied to mock
///
class TQueryExecutorFixture : public NUnitTest::TBaseFixture {
public:
TQueryExecutorFixture(EProviderType providerType)
: DataSourceInstance(MakeDataSourceInstance(providerType))
, ClientMock(std::make_shared<TConnectorClientMock>())
{
auto databaseAsyncResolverMock = MakeDatabaseAsyncResolver(providerType);
auto appConfig = CreateDefaultAppConfig();
auto s3ActorsFactory = NYql::NDq::CreateS3ActorsFactory();

Kikimr = MakeKikimrRunner(
false,
ClientMock,
databaseAsyncResolverMock,
appConfig,
s3ActorsFactory,
{.CredentialsFactory = CreateCredentialsFactory()}
);

CreateExternalDataSource(providerType, Kikimr);
QueryClient = Kikimr->GetQueryClient();
}

TQueryClient GetQueryClient() {
return *QueryClient;
}

TAsyncExecuteQueryResult ExecuteQuery(const TString& query) {
return GetQueryClient()
.ExecuteQuery(query, TTxControl::BeginTx().CommitTx(), TExecuteQuerySettings());
}

NThreading::TFuture<TScriptExecutionOperation> ExecuteScript(const TString& script) {
return GetQueryClient()
.ExecuteScript(script);
}

TConnectorClientMock::TSelectBuilder<> GetSelectBuilder() {
TConnectorClientMock::TSelectBuilder<> builder;
builder.DataSourceInstance(DataSourceInstance);
return builder;
}

public:
const NYql::TGenericDataSourceInstance DataSourceInstance;
std::shared_ptr<TConnectorClientMock> ClientMock;

protected:
std::shared_ptr<TKikimrRunner> Kikimr;
std::optional<TQueryClient> QueryClient;
};

Y_UNIT_TEST_SUITE(GenericFederatedQuery) {
void TestSelectAllFields(EProviderType providerType) {
// prepare mock
Expand All @@ -163,6 +222,9 @@ namespace NKikimr::NKqp {
clientMock->ExpectListSplits()
.Select()
.DataSourceInstance(dataSourceInstance)
.What()
.Column("col1", Ydb::Type::UINT16)
.Done()
.Done()
.Result()
.AddResponse(NewSuccess())
Expand Down Expand Up @@ -284,6 +346,8 @@ namespace NKikimr::NKqp {
clientMock->ExpectListSplits()
.Select()
.DataSourceInstance(dataSourceInstance)
.What()
.Done()
.Done()
.Result()
.AddResponse(NewSuccess())
Expand Down Expand Up @@ -399,6 +463,8 @@ namespace NKikimr::NKqp {
clientMock->ExpectListSplits()
.Select()
.DataSourceInstance(dataSourceInstance)
.What()
.Done()
.Done()
.Result()
.AddResponse(NewSuccess())
Expand Down Expand Up @@ -489,104 +555,122 @@ namespace NKikimr::NKqp {
TestSelectCount(EProviderType::IcebergHadoopToken);
}

void TestFilterPushdown(EProviderType providerType) {
// prepare mock
auto clientMock = std::make_shared<TConnectorClientMock>();

const NYql::TGenericDataSourceInstance dataSourceInstance = MakeDataSourceInstance(providerType);

// clang-format off
const NApi::TSelect selectInListSplits = TConnectorClientMock::TSelectBuilder<>()
.DataSourceInstance(dataSourceInstance).GetResult();

const NApi::TSelect selectInReadSplits = TConnectorClientMock::TSelectBuilder<>()
.DataSourceInstance(dataSourceInstance)
///
/// Test a filter pushdown for a provider
///
/// @param[in] providerType Provider's type
/// @param[in] where Where clause that will be appended to a sql query
/// @param[in] expectedWhere Where clause that will be expected in a list split and read split requests
///
void TestFilterPushdown(EProviderType providerType, const TString& where, NApi::TSelect::TWhere& expectedWhere) {
auto f = std::make_shared<TQueryExecutorFixture>(providerType);
auto expectedSelect = f->GetSelectBuilder()
.What()
.NullableColumn("data_column", Ydb::Type::STRING)
.NullableColumn("filtered_column", Ydb::Type::INT32)
.Done()
.Where()
.Filter()
.Equal()
.Column("filtered_column")
.Value<i32>(42)
.Done()
.Done()
.NullableColumn("colDate", Ydb::Type::DATE)
.NullableColumn("colInt32", Ydb::Type::INT32)
.NullableColumn("colString", Ydb::Type::STRING)
.Done()
.Where(expectedWhere)
.GetResult();
// clang-format on

// step 1: DescribeTable
// clang-format off
clientMock->ExpectDescribeTable()
.DataSourceInstance(dataSourceInstance)
f->ClientMock->ExpectDescribeTable()
.DataSourceInstance(f->DataSourceInstance)
.TypeMappingSettings(MakeTypeMappingSettings(NYql::NConnector::NApi::STRING_FORMAT))
.Response()
.NullableColumn("filtered_column", Ydb::Type::INT32)
.NullableColumn("data_column", Ydb::Type::STRING);
// clang-format on
.NullableColumn("colDate", Ydb::Type::DATE)
.NullableColumn("colInt32", Ydb::Type::INT32)
.NullableColumn("colString", Ydb::Type::STRING);

// step 2: ListSplits
// clang-format off
clientMock->ExpectListSplits()
.Select(selectInListSplits)
f->ClientMock->ExpectListSplits()
.Select(expectedSelect)
.Result()
.AddResponse(NewSuccess())
.Description("some binary description")
.Select(selectInReadSplits);
// clang-format on
.Select(expectedSelect);

// step 3: ReadSplits
// Return data such that it contains values not satisfying the filter conditions.
// Then check that, despite that connector reads additional data,
// our generic provider then filters it out.
std::vector<std::string> colData = {"Filtered text", "Text"};
std::vector<i32> filterColumnData = {42, 24};
// clang-format off
clientMock->ExpectReadSplits()
std::vector<std::string> colString = {"Filtered text", "Text"};
std::vector<ui16> colDate = {20326, 20329};
std::vector<i32> colInt32 = {42, 24};

f->ClientMock->ExpectReadSplits()
.Filtering(NYql::NConnector::NApi::TReadSplitsRequest::FILTERING_OPTIONAL)
.Split()
.Description("some binary description")
.Select(selectInReadSplits)
.Select(expectedSelect)
.Done()
.Result()
.AddResponse(MakeRecordBatch(
MakeArray<arrow::BinaryBuilder>("data_column", colData, arrow::binary()),
MakeArray<arrow::Int32Builder>("filtered_column", filterColumnData, arrow::int32())),
MakeArray<arrow::UInt16Builder>("colDate", colDate, arrow::uint16()),
MakeArray<arrow::Int32Builder>("colInt32", colInt32, arrow::int32()),
MakeArray<arrow::BinaryBuilder>("colString", colString, arrow::binary())),
NewSuccess());
// clang-format on

// prepare database resolver mock
auto databaseAsyncResolverMock = MakeDatabaseAsyncResolver(providerType);

// run test
auto appConfig = CreateDefaultAppConfig();
auto s3ActorsFactory = NYql::NDq::CreateS3ActorsFactory();
auto kikimr = MakeKikimrRunner(false, clientMock, databaseAsyncResolverMock, appConfig, s3ActorsFactory,
{.CredentialsFactory = CreateCredentialsFactory()});

CreateExternalDataSource(providerType, kikimr);

const TString query = fmt::format(
R"(
PRAGMA generic.UsePredicatePushdown="true";
SELECT data_column FROM {data_source_name}.{table_name} WHERE filtered_column = 42;
SELECT colDate, colInt32, colString FROM {data_source_name}.{table_name} WHERE {table_where};
)",
"data_source_name"_a = DEFAULT_DATA_SOURCE_NAME,
"table_name"_a = DEFAULT_TABLE);
"table_name"_a = DEFAULT_TABLE,
"table_where"_a = where
);

auto db = kikimr->GetQueryClient();
auto queryResult = db.ExecuteQuery(query, TTxControl::BeginTx().CommitTx(), TExecuteQuerySettings()).ExtractValueSync();
auto queryResult = f->ExecuteQuery(query).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(queryResult.GetStatus(), EStatus::SUCCESS, queryResult.GetIssues().ToString());

// Check a query result
TResultSetParser resultSet(queryResult.GetResultSetParser(0));
UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 1);
UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 3);
UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 1);

// check every row
// Check that, despite returning nonfiltered data in connector, response will be correct
std::vector<TMaybe<TString>> result = {"Filtered text"}; // Only data satisfying filter conditions
MATCH_RESULT_WITH_INPUT(result, resultSet, GetOptionalString);
// Check values for the query result
std::vector<std::optional<TInstant>> colDateResults = {TInstant::Days(20326)};
std::vector<std::optional<int>> colInt32Result = {42};
std::vector<std::optional<TString>> colStringResult = {"Filtered text"};

for (size_t i = 0; i < colDateResults.size(); ++i) {
resultSet.TryNextRow();

MATCH_OPT_RESULT_WITH_VAL_IDX(colDateResults[i], resultSet, GetOptionalDate, 0);
MATCH_OPT_RESULT_WITH_VAL_IDX(colInt32Result[i], resultSet, GetOptionalInt32, 1);
MATCH_OPT_RESULT_WITH_VAL_IDX(colStringResult[i], resultSet, GetOptionalString, 2);
}
}

///
/// Test a filter pushdown for a provider
///
/// @param[in] providerType Provider's type
///
void TestFilterPushdown(EProviderType providerType) {
using namespace NYql::NConnector::NTest;

auto expectedWhereInt = TConnectorClientMock::TWhereBuilder<>()
.Filter().Equal()
.Column("colInt32")
.Value<i32>(42)
.Done()
.Done()
.GetResult();

TestFilterPushdown(providerType, "colInt32 = 42", expectedWhereInt);
TestFilterPushdown(providerType, "colInt32 = EvaluateExpr(44 - 2)", expectedWhereInt);
TestFilterPushdown(providerType, "colInt32 = 44 - 2", expectedWhereInt);

auto expectedWhereDate = TConnectorClientMock::TWhereBuilder<>()
.Filter().Equal()
.Column("colDate")
.Value<ui32>(20326, ::Ydb::Type::DATE)
.Done()
.Done()
.GetResult();

TestFilterPushdown(providerType, "colDate = Date('2025-08-26')", expectedWhereDate);
TestFilterPushdown(providerType, "colDate = EvaluateExpr(Date('2025-08-27') - Interval(\"P1D\"))", expectedWhereDate);
TestFilterPushdown(providerType, "colDate = Date('2025-08-27') - Interval(\"P1D\")", expectedWhereDate);
}

Y_UNIT_TEST(PostgreSQLFilterPushdown) {
Expand Down
3 changes: 3 additions & 0 deletions ydb/library/yql/providers/common/pushdown/collection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,9 @@ class TPredicateMarkup {
node.Maybe<TCoUint64>()) {
return true;
}
if (Settings.IsEnabled(EFlag::DateCtor) && node.Maybe<TCoDate>()) {
return true;
}
if (Settings.IsEnabled(EFlag::TimestampCtor) && node.Maybe<TCoTimestamp>()) {
return true;
}
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/providers/common/pushdown/settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ struct TSettings {
MinMax = 1 << 27,
NonDeterministic = 1 << 28,
DecimalCtor = 1 << 29,
DateCtor = 1 << 30,
};

explicit TSettings(NLog::EComponent logComponent)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ namespace NYql::NConnector::NTest {
DEFINE_SIMPLE_TYPE_SETTER(i64, INT64, int64_value);
DEFINE_SIMPLE_TYPE_SETTER(ui64, UINT64, uint64_value);

template <>
void SetValue(const ui32& value, Ydb::TypedValue* proto, const ::Ydb::Type::PrimitiveTypeId& typeId, bool optional) {
*proto->mutable_type() = MakeYdbType(typeId, optional);
proto->mutable_value()->Y_CAT(set_, uint32_value)(value);
}

void CreatePostgreSQLExternalDataSource(
const std::shared_ptr<NKikimr::NKqp::TKikimrRunner>& kikimr,
const TString& dataSourceName,
Expand Down
Loading
Loading