Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 54 additions & 38 deletions ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

#include <ydb/core/base/counters.h>
#include <ydb/core/base/feature_flags.h>
#include <ydb/core/base/path.h>
#include <ydb/core/protos/auth.pb.h>
#include <ydb/core/protos/config.pb.h>
#include <ydb/core/protos/kqp_physical.pb.h>

#include <ydb/core/fq/libs/db_id_async_resolver_impl/database_resolver.h>
#include <ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h>
#include <ydb/core/fq/libs/db_id_async_resolver_impl/http_proxy.h>
Expand All @@ -25,7 +30,6 @@
#include <yt/yql/providers/yt/mkql_dq/yql_yt_dq_transform.h>

#include <util/system/file.h>
#include <util/stream/file.h>

namespace NKikimr::NKqp {

Expand All @@ -42,6 +46,54 @@ namespace {
return sinkSettings.GetAtomicUploadCommit();
}

NThreading::TFuture<TGetSchemeEntryResult> GetSchemeEntryTypeImpl(
TActorSystem* actorSystem,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup,
const TString& endpoint,
const TString& database,
bool useTls,
const TString& structuredTokenJson,
const TString& path,
bool addRoot) {
if (!federatedQuerySetup || !federatedQuerySetup->Driver || !endpoint || !database) {
LOG_NOTICE_S(*NActors::TActivationContext::ActorSystem(), NKikimrServices::KQP_GATEWAY, "Skipped describe for path '" << path << "' in external YDB database '" << database << "' with endpoint '" << endpoint << "'");
return NThreading::MakeFuture<TGetSchemeEntryResult>(TGetSchemeEntryResult{.EntryType = NYdb::NScheme::ESchemeEntryType::Table});
}
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory = NYql::CreateCredentialsProviderFactoryForStructuredToken(nullptr, structuredTokenJson, false);
auto driver = federatedQuerySetup->Driver;

NYdb::TCommonClientSettings opts;
opts
.DiscoveryEndpoint(endpoint)
.Database(addRoot ? "/Root" + database : database)
.SslCredentials(NYdb::TSslCredentials(useTls))
.DiscoveryMode(NYdb::EDiscoveryMode::Async)
.CredentialsProviderFactory(credentialsProviderFactory);
auto schemeClient = std::make_shared<NYdb::NScheme::TSchemeClient>(*driver, opts);

return schemeClient->DescribePath(addRoot ? "/Root" + path : path)
.Apply([actorSystem, p = path, sc = schemeClient, database, endpoint, f = federatedQuerySetup, useTls, structuredTokenJson, addRoot](const NThreading::TFuture<NYdb::NScheme::TDescribePathResult>& result) {
auto describePathResult = result.GetValue();
TGetSchemeEntryResult res;
if (!describePathResult.IsSuccess()) {
if (describePathResult.GetStatus() == NYdb::EStatus::CLIENT_UNAUTHENTICATED && !addRoot) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

А имеет ли смысл когда-либо делать /Root/Root? (соответственно, можно смотреть на .starts_with("/Root")); впрочем, как сейчас тоже нормально

return GetSchemeEntryTypeImpl(actorSystem, f, endpoint, database, useTls, structuredTokenJson, p, true);
}
TString message = TStringBuilder() << "Describe path '" << p << "' in external YDB database '" << database << "' with endpoint '" << endpoint << "' failed.";
LOG_WARN_S(*actorSystem, NKikimrServices::KQP_GATEWAY, message + describePathResult.GetIssues().ToString());
auto rootIssue = NYql::TIssue(message);
for (const auto& issue : describePathResult.GetIssues()) {
rootIssue.AddSubIssue(MakeIntrusive<NYql::TIssue>(NYdb::NAdapters::ToYqlIssue(issue)));
}
res.Issues.AddIssue(rootIssue);
} else {
NYdb::NScheme::TSchemeEntry entry = describePathResult.GetEntry();
res.EntryType = entry.Type;
}
return NThreading::MakeFuture<TGetSchemeEntryResult>(res);
});
}

} // anonymous namespace

bool CheckNestingDepth(const google::protobuf::Message& message, ui32 maxDepth) {
Expand Down Expand Up @@ -371,43 +423,7 @@ namespace {
bool useTls,
const TString& structuredTokenJson,
const TString& path) {

if (!federatedQuerySetup || !federatedQuerySetup->Driver || !endpoint || !database) {
LOG_NOTICE_S(*NActors::TActivationContext::ActorSystem(), NKikimrServices::KQP_GATEWAY, "Skipped describe for path '" << path << "' in external YDB database '" << database << "' with endpoint '" << endpoint << "'");
return NThreading::MakeFuture<TGetSchemeEntryResult>(TGetSchemeEntryResult{.EntryType = NYdb::NScheme::ESchemeEntryType::Table});
}

std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory = NYql::CreateCredentialsProviderFactoryForStructuredToken(nullptr, structuredTokenJson, false);
auto driver = federatedQuerySetup->Driver;

NYdb::TCommonClientSettings opts;
opts
.DiscoveryEndpoint(endpoint)
.Database(database)
.SslCredentials(NYdb::TSslCredentials(useTls))
.DiscoveryMode(NYdb::EDiscoveryMode::Async)
.CredentialsProviderFactory(credentialsProviderFactory);
auto schemeClient = std::make_shared<NYdb::NScheme::TSchemeClient>(*driver, opts);

return schemeClient->DescribePath(path)
.Apply([actorSystem = NActors::TActivationContext::ActorSystem(), p = path, sc = schemeClient, database, endpoint](const NThreading::TFuture<NYdb::NScheme::TDescribePathResult>& result) {
auto describePathResult = result.GetValue();
TGetSchemeEntryResult res;
if (!describePathResult.IsSuccess()) {
TString message = TStringBuilder() << "Describe path '" << p << "' in external YDB database '" << database << "' with endpoint '" << endpoint << "' failed.";
LOG_WARN_S(*actorSystem, NKikimrServices::KQP_GATEWAY, message + describePathResult.GetIssues().ToString());

auto rootIssue = NYql::TIssue(message);
for (const auto& issue : describePathResult.GetIssues()) {
rootIssue.AddSubIssue(MakeIntrusive<NYql::TIssue>(NYdb::NAdapters::ToYqlIssue(issue)));
}
res.Issues.AddIssue(rootIssue);
} else {
NYdb::NScheme::TSchemeEntry entry = describePathResult.GetEntry();
res.EntryType = entry.Type;
}
return NThreading::MakeFuture<TGetSchemeEntryResult>(res);
});
return GetSchemeEntryTypeImpl(NActors::TActivationContext::ActorSystem(), federatedQuerySetup, endpoint, NKikimr::CanonizePath(database), useTls, structuredTokenJson, path, false);
};

std::vector<NKqpProto::TKqpExternalSink> FilterExternalSinksWithEffects(const std::vector<NKqpProto::TKqpExternalSink>& sinks) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1001,7 +1001,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQueryDatastreams) {
UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::GENERIC_ERROR, status.GetIssues().ToOneLineString());
const auto& issues = status.GetIssues().ToString();
UNIT_ASSERT_STRING_CONTAINS(issues, "Couldn't determine external YDB entity type");
UNIT_ASSERT_STRING_CONTAINS(issues, "Describe path 'local/topicName' in external YDB database 'local'");
UNIT_ASSERT_STRING_CONTAINS(issues, "Describe path 'local/topicName' in external YDB database '/local'");
}

Y_UNIT_TEST_F(ReadTopic, TStreamingTestFixture) {
Expand Down
Loading