diff --git a/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp b/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp index 18a8a90262e9..8f8c17b339a8 100644 --- a/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp +++ b/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp @@ -2,6 +2,11 @@ #include #include +#include +#include +#include +#include + #include #include #include @@ -25,7 +30,6 @@ #include #include -#include namespace NKikimr::NKqp { @@ -42,6 +46,54 @@ namespace { return sinkSettings.GetAtomicUploadCommit(); } + NThreading::TFuture GetSchemeEntryTypeImpl( + TActorSystem* actorSystem, + const std::optional& federatedQuerySetup, + const TString& endpoint, + const TString& database, + bool useTls, + const TString& structuredTokenJson, + const TString& path, + bool addRoot) { + if (!federatedQuerySetup || !federatedQuerySetup->Driver || !endpoint || !database) { + LOG_NOTICE_S(*NActors::TActivationContext::ActorSystem(), NKikimrServices::KQP_GATEWAY, "Skipped describe for path '" << path << "' in external YDB database '" << database << "' with endpoint '" << endpoint << "'"); + return NThreading::MakeFuture(TGetSchemeEntryResult{.EntryType = NYdb::NScheme::ESchemeEntryType::Table}); + } + std::shared_ptr credentialsProviderFactory = NYql::CreateCredentialsProviderFactoryForStructuredToken(nullptr, structuredTokenJson, false); + auto driver = federatedQuerySetup->Driver; + + NYdb::TCommonClientSettings opts; + opts + .DiscoveryEndpoint(endpoint) + .Database(addRoot ? "/Root" + database : database) + .SslCredentials(NYdb::TSslCredentials(useTls)) + .DiscoveryMode(NYdb::EDiscoveryMode::Async) + .CredentialsProviderFactory(credentialsProviderFactory); + auto schemeClient = std::make_shared(*driver, opts); + + return schemeClient->DescribePath(addRoot ? "/Root" + path : path) + .Apply([actorSystem, p = path, sc = schemeClient, database, endpoint, f = federatedQuerySetup, useTls, structuredTokenJson, addRoot](const NThreading::TFuture& result) { + auto describePathResult = result.GetValue(); + TGetSchemeEntryResult res; + if (!describePathResult.IsSuccess()) { + if (describePathResult.GetStatus() == NYdb::EStatus::CLIENT_UNAUTHENTICATED && !addRoot) { + return GetSchemeEntryTypeImpl(actorSystem, f, endpoint, database, useTls, structuredTokenJson, p, true); + } + TString message = TStringBuilder() << "Describe path '" << p << "' in external YDB database '" << database << "' with endpoint '" << endpoint << "' failed."; + LOG_WARN_S(*actorSystem, NKikimrServices::KQP_GATEWAY, message + describePathResult.GetIssues().ToString()); + auto rootIssue = NYql::TIssue(message); + for (const auto& issue : describePathResult.GetIssues()) { + rootIssue.AddSubIssue(MakeIntrusive(NYdb::NAdapters::ToYqlIssue(issue))); + } + res.Issues.AddIssue(rootIssue); + } else { + NYdb::NScheme::TSchemeEntry entry = describePathResult.GetEntry(); + res.EntryType = entry.Type; + } + return NThreading::MakeFuture(res); + }); + } + } // anonymous namespace bool CheckNestingDepth(const google::protobuf::Message& message, ui32 maxDepth) { @@ -371,43 +423,7 @@ namespace { bool useTls, const TString& structuredTokenJson, const TString& path) { - - if (!federatedQuerySetup || !federatedQuerySetup->Driver || !endpoint || !database) { - LOG_NOTICE_S(*NActors::TActivationContext::ActorSystem(), NKikimrServices::KQP_GATEWAY, "Skipped describe for path '" << path << "' in external YDB database '" << database << "' with endpoint '" << endpoint << "'"); - return NThreading::MakeFuture(TGetSchemeEntryResult{.EntryType = NYdb::NScheme::ESchemeEntryType::Table}); - } - - std::shared_ptr credentialsProviderFactory = NYql::CreateCredentialsProviderFactoryForStructuredToken(nullptr, structuredTokenJson, false); - auto driver = federatedQuerySetup->Driver; - - NYdb::TCommonClientSettings opts; - opts - .DiscoveryEndpoint(endpoint) - .Database(database) - .SslCredentials(NYdb::TSslCredentials(useTls)) - .DiscoveryMode(NYdb::EDiscoveryMode::Async) - .CredentialsProviderFactory(credentialsProviderFactory); - auto schemeClient = std::make_shared(*driver, opts); - - return schemeClient->DescribePath(path) - .Apply([actorSystem = NActors::TActivationContext::ActorSystem(), p = path, sc = schemeClient, database, endpoint](const NThreading::TFuture& result) { - auto describePathResult = result.GetValue(); - TGetSchemeEntryResult res; - if (!describePathResult.IsSuccess()) { - TString message = TStringBuilder() << "Describe path '" << p << "' in external YDB database '" << database << "' with endpoint '" << endpoint << "' failed."; - LOG_WARN_S(*actorSystem, NKikimrServices::KQP_GATEWAY, message + describePathResult.GetIssues().ToString()); - - auto rootIssue = NYql::TIssue(message); - for (const auto& issue : describePathResult.GetIssues()) { - rootIssue.AddSubIssue(MakeIntrusive(NYdb::NAdapters::ToYqlIssue(issue))); - } - res.Issues.AddIssue(rootIssue); - } else { - NYdb::NScheme::TSchemeEntry entry = describePathResult.GetEntry(); - res.EntryType = entry.Type; - } - return NThreading::MakeFuture(res); - }); + return GetSchemeEntryTypeImpl(NActors::TActivationContext::ActorSystem(), federatedQuerySetup, endpoint, NKikimr::CanonizePath(database), useTls, structuredTokenJson, path, false); }; std::vector FilterExternalSinksWithEffects(const std::vector& sinks) { diff --git a/ydb/core/kqp/ut/federated_query/datastreams/datastreams_ut.cpp b/ydb/core/kqp/ut/federated_query/datastreams/datastreams_ut.cpp index 88c0bb494f90..54f2a16271a7 100644 --- a/ydb/core/kqp/ut/federated_query/datastreams/datastreams_ut.cpp +++ b/ydb/core/kqp/ut/federated_query/datastreams/datastreams_ut.cpp @@ -1001,7 +1001,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQueryDatastreams) { UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::GENERIC_ERROR, status.GetIssues().ToOneLineString()); const auto& issues = status.GetIssues().ToString(); UNIT_ASSERT_STRING_CONTAINS(issues, "Couldn't determine external YDB entity type"); - UNIT_ASSERT_STRING_CONTAINS(issues, "Describe path 'local/topicName' in external YDB database 'local'"); + UNIT_ASSERT_STRING_CONTAINS(issues, "Describe path 'local/topicName' in external YDB database '/local'"); } Y_UNIT_TEST_F(ReadTopic, TStreamingTestFixture) {