Skip to content

Commit

Permalink
Merge 68f347c into 661e989
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA committed Mar 11, 2024
2 parents 661e989 + 68f347c commit f23e3be
Show file tree
Hide file tree
Showing 26 changed files with 246 additions and 53 deletions.
2 changes: 1 addition & 1 deletion ydb/core/external_sources/external_source_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TStri
},
{
ToString(NYql::EDatabaseType::YT),
CreateExternalDataSource(TString{NYql::YtProviderName}, {"NONE"}, {}, hostnamePatternsRegEx)
CreateExternalDataSource(TString{NYql::YtProviderName}, {"TOKEN"}, {}, hostnamePatternsRegEx)
}
});
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1337,6 +1337,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
task.Meta.ShardId = shardId;
shardTasks.emplace(shardId, task.Id);

FillSecureParamsFromStage(task.Meta.SecureParams, stage);
BuildSinks(stage, task);

return task;
Expand Down
17 changes: 17 additions & 0 deletions ydb/core/kqp/executer_actor/kqp_executer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -830,6 +830,15 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {


protected:
void FillSecureParamsFromStage(THashMap<TString, TString>& secureParams, const NKqpProto::TKqpPhyStage& stage) {
for (const auto& [secretName, authInfo] : stage.GetSecureParams()) {
const auto& structuredToken = NYql::CreateStructuredTokenParser(authInfo).ToBuilder().ReplaceReferences(SecureParams).ToJson();
const auto& structuredTokenParser = NYql::CreateStructuredTokenParser(structuredToken);
YQL_ENSURE(structuredTokenParser.HasIAMToken(), "only token authentification supported for compute tasks");
secureParams.emplace(secretName, structuredTokenParser.GetIAMToken());
}
}

void BuildSysViewScanTasks(TStageInfo& stageInfo) {
Y_DEBUG_ABORT_UNLESS(stageInfo.Meta.IsSysView());

Expand Down Expand Up @@ -873,6 +882,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
task.Meta.ReadInfo.Reverse = op.GetReadRange().GetReverse();
task.Meta.Type = TTaskMeta::TTaskType::Compute;

FillSecureParamsFromStage(task.Meta.SecureParams, stage);
BuildSinks(stage, task);

LOG_D("Stage " << stageInfo.Id << " create sysview scan task: " << task.Id);
Expand Down Expand Up @@ -959,6 +969,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
if (structuredToken) {
task.Meta.SecureParams.emplace(sourceName, structuredToken);
}
FillSecureParamsFromStage(task.Meta.SecureParams, stage);

if (resourceSnapshot.empty()) {
task.Meta.Type = TTaskMeta::TTaskType::Compute;
Expand Down Expand Up @@ -1051,6 +1062,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
YQL_ENSURE(!shardsResolved);
task.Meta.ShardId = taskLocation;
}
FillSecureParamsFromStage(task.Meta.SecureParams, stage);

const auto& stageSource = stage.GetSources(0);
auto& input = task.Inputs[stageSource.GetInputIndex()];
Expand Down Expand Up @@ -1306,6 +1318,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
auto& task = TasksGraph.AddTask(stageInfo);
task.Meta.Type = TTaskMeta::TTaskType::Compute;
task.Meta.ExecuterId = SelfId();
FillSecureParamsFromStage(task.Meta.SecureParams, stage);
BuildSinks(stage, task);
LOG_D("Stage " << stageInfo.Id << " create compute task: " << task.Id);
}
Expand Down Expand Up @@ -1441,13 +1454,15 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
THashMap<ui64, ui64>& assignedShardsCount,
const bool sorted, const bool isOlapScan)
{
const auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
ui64 nodeId = ShardIdToNodeId.at(shardId);
if (stageInfo.Meta.IsOlap() && sorted) {
auto& task = TasksGraph.AddTask(stageInfo);
task.Meta.ExecuterId = SelfId();
task.Meta.NodeId = nodeId;
task.Meta.ScanTask = true;
task.Meta.Type = TTaskMeta::TTaskType::Scan;
FillSecureParamsFromStage(task.Meta.SecureParams, stage);
return task;
}

Expand All @@ -1459,6 +1474,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
task.Meta.NodeId = nodeId;
task.Meta.ScanTask = true;
task.Meta.Type = TTaskMeta::TTaskType::Scan;
FillSecureParamsFromStage(task.Meta.SecureParams, stage);
tasks.push_back(task.Id);
++cnt;
return task;
Expand Down Expand Up @@ -1552,6 +1568,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
task.Meta.ScanTask = true;
task.Meta.Type = TTaskMeta::TTaskType::Scan;
task.SetMetaId(metaGlueingId);
FillSecureParamsFromStage(task.Meta.SecureParams, stage);
BuildSinks(stage, task);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ void FillCreateExternalDataSourceDesc(NKikimrSchemeOp::TExternalDataSourceDescri
aws.SetAwsAccessKeyIdSecretName(GetOrEmpty(settings, "aws_access_key_id_secret_name"));
aws.SetAwsSecretAccessKeySecretName(GetOrEmpty(settings, "aws_secret_access_key_secret_name"));
aws.SetAwsRegion(GetOrEmpty(settings, "aws_region"));
} else if (authMethod == "TOKEN") {
auto& token = *externaDataSourceDesc.MutableAuth()->MutableToken();
token.SetTokenSecretName(GetOrEmpty(settings, "token_secret_name"));
} else {
ythrow yexception() << "Internal error. Unknown auth method: " << authMethod;
}
Expand Down
15 changes: 15 additions & 0 deletions ydb/core/kqp/gateway/kqp_metadata_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,14 @@ void UpdateExternalDataSourceSecretsValue(TTableMetadataResult& externalDataSour
externalDataSourceMetadata.Metadata->ExternalSource.AwsSecretAccessKey = objectDescription.SecretValues[1];
return;
}
case NKikimrSchemeOp::TAuth::kToken: {
if (objectDescription.SecretValues.size() != 1) {
SetError(externalDataSourceMetadata, TStringBuilder{} << "Token auth contains invalid count of secrets: " << objectDescription.SecretValues.size() << " instead of 1");
return;
}
externalDataSourceMetadata.Metadata->ExternalSource.Token = objectDescription.SecretValues[0];
return;
}
case NKikimrSchemeOp::TAuth::IDENTITY_NOT_SET: {
SetError(externalDataSourceMetadata, "identity case is not specified in case of update external data source secrets");
return;
Expand Down Expand Up @@ -494,6 +502,13 @@ NThreading::TFuture<TEvDescribeSecretsResponse::TDescription> LoadExternalDataSo
return promise.GetFuture();
}

case NKikimrSchemeOp::TAuth::kToken: {
const TString& tokenSecretId = authDescription.GetToken().GetTokenSecretName();
auto promise = NewPromise<TEvDescribeSecretsResponse::TDescription>();
actorSystem->Register(CreateDescribeSecretsActor(userToken ? userToken->GetUserSID() : "", {tokenSecretId}, promise, maximalSecretsSnapshotWaitTime));
return promise.GetFuture();
}

case NKikimrSchemeOp::TAuth::IDENTITY_NOT_SET:
return MakeFuture(TEvDescribeSecretsResponse::TDescription(Ydb::StatusIds::BAD_REQUEST, { NYql::TIssue("identity case is not specified") }));
}
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/kqp/provider/yql_kikimr_datasource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ TString FillAuthProperties(THashMap<TString, TString>& properties, const TExtern
properties["awsRegion"] = externalSource.DataSourceAuth.GetAws().GetAwsRegion();
return {};

case NKikimrSchemeOp::TAuth::kToken:
properties["authMethod"] = "TOKEN";
properties["token"] = externalSource.Token;
properties["tokenReference"] = externalSource.DataSourceAuth.GetToken().GetTokenSecretName();
return {};

case NKikimrSchemeOp::TAuth::IDENTITY_NOT_SET:
return {"Identity case is not specified"};
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/provider/yql_kikimr_gateway.h
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ struct TExternalSource {
TString Password;
TString AwsAccessKeyId;
TString AwsSecretAccessKey;
TString Token;
NKikimrSchemeOp::TAuth DataSourceAuth;
NKikimrSchemeOp::TExternalDataSourceProperties Properties;
};
Expand Down
30 changes: 30 additions & 0 deletions ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,36 @@ Y_UNIT_TEST_SUITE(KikimrIcGateway) {
UNIT_ASSERT_VALUES_EQUAL(response.Metadata->ExternalSource.Properties.GetProperties().at("use_tls"), "true");
UNIT_ASSERT_VALUES_EQUAL(response.Metadata->ExternalSource.Properties.GetProperties().at("schema"), "public");
}

Y_UNIT_TEST(TestLoadTokenSecretValueFromExternalDataSourceMetadata) {
TKikimrRunner kikimr;
kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

TString secretTokenId = "myTokenSecretId";
TString secretTokenValue = "token";
CreateSecretObject(secretTokenId, secretTokenValue, session);

TString externalDataSourceName = "/Root/ExternalDataSource";
auto query = TStringBuilder() << R"(
CREATE EXTERNAL DATA SOURCE `)" << externalDataSourceName << R"(` WITH (
SOURCE_TYPE="YT",
LOCATION="localhost",
AUTH_METHOD="TOKEN",
TOKEN_SECRET_NAME=")" << secretTokenId << R"("
);)";
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());

auto responseFuture = GetIcGateway(kikimr.GetTestServer())->LoadTableMetadata(TestCluster, externalDataSourceName, IKikimrGateway::TLoadTableMetadataSettings());
responseFuture.Wait();

auto response = responseFuture.GetValue();
UNIT_ASSERT_C(response.Success(), response.Issues().ToOneLineString());
UNIT_ASSERT_VALUES_EQUAL(response.Metadata->ExternalSource.Token, secretTokenValue);
UNIT_ASSERT_VALUES_EQUAL(response.Metadata->ExternalSource.Properties.GetProperties().size(), 0);
}
}

} // namespace NYql
50 changes: 32 additions & 18 deletions ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,29 @@ void FillOlapProgram(const T& node, const NKikimr::NMiniKQL::TType* miniKqlResul
CompileOlapProgram(node.Process(), tableMeta, readProto, resultColNames, ctx);
}

THashMap<TString, TString> FindSecureParams(const TExprNode::TPtr& node, const TTypeAnnotationContext& typesCtx, TSet<TString>& SecretNames) {
THashMap<TString, TString> secureParams;
NYql::NCommon::FillSecureParams(node, typesCtx, secureParams);

for (auto& [secretName, structuredToken] : secureParams) {
const auto& tokenParser = CreateStructuredTokenParser(structuredToken);
tokenParser.ListReferences(SecretNames);
structuredToken = tokenParser.ToBuilder().RemoveSecrets().ToJson();
}

return secureParams;
}

std::optional<std::pair<TString, TString>> FindOneSecureParam(const TExprNode::TPtr& node, const TTypeAnnotationContext& typesCtx, const TString& nodeName, TSet<TString>& SecretNames) {
const auto& secureParams = FindSecureParams(node, typesCtx, SecretNames);
if (secureParams.empty()) {
return std::nullopt;
}

YQL_ENSURE(secureParams.size() == 1, "Only one SecureParams per " << nodeName << " allowed");
return *secureParams.begin();
}

class TKqpQueryCompiler : public IKqpQueryCompiler {
public:
TKqpQueryCompiler(const TString& cluster, const TIntrusivePtr<TKikimrTablesData> tablesData,
Expand Down Expand Up @@ -707,6 +730,9 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
return true;
});

const auto& secureParams = FindSecureParams(stage.Program().Ptr(), TypesCtx, SecretNames);
stageProto.MutableSecureParams()->insert(secureParams.begin(), secureParams.end());

auto result = stage.Program().Body();
auto resultType = result.Ref().GetTypeAnn();
ui32 outputsCount = 0;
Expand Down Expand Up @@ -976,15 +1002,9 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
externalSource.AddPartitionedTaskParams(partitionParam);
}

THashMap<TString, TString> secureParams;
NYql::NCommon::FillSecureParams(source.Ptr(), TypesCtx, secureParams);
if (!secureParams.empty()) {
YQL_ENSURE(secureParams.size() == 1, "Only one SecureParams per source allowed");
auto it = secureParams.begin();
externalSource.SetSourceName(it->first);
auto token = it->second;
externalSource.SetAuthInfo(CreateStructuredTokenParser(token).ToBuilder().RemoveSecrets().ToJson());
CreateStructuredTokenParser(token).ListReferences(SecretNames);
if (const auto& secureParams = FindOneSecureParam(source.Ptr(), TypesCtx, "source", SecretNames)) {
externalSource.SetSourceName(secureParams->first);
externalSource.SetAuthInfo(secureParams->second);
}

google::protobuf::Any& settings = *externalSource.MutableSettings();
Expand Down Expand Up @@ -1062,15 +1082,9 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
YQL_ENSURE(!settings.type_url().empty(), "Data sink provider \"" << dataSinkCategory << "\" did't fill dq sink settings for its dq sink node");
YQL_ENSURE(sinkType, "Data sink provider \"" << dataSinkCategory << "\" did't fill dq sink settings type for its dq sink node");

THashMap<TString, TString> secureParams;
NYql::NCommon::FillSecureParams(sink.Ptr(), TypesCtx, secureParams);
if (!secureParams.empty()) {
YQL_ENSURE(secureParams.size() == 1, "Only one SecureParams per sink allowed");
auto it = secureParams.begin();
externalSink.SetSinkName(it->first);
auto token = it->second;
externalSink.SetAuthInfo(CreateStructuredTokenParser(token).ToBuilder().RemoveSecrets().ToJson());
CreateStructuredTokenParser(token).ListReferences(SecretNames);
if (const auto& secureParams = FindOneSecureParam(sink.Ptr(), TypesCtx, "sink", SecretNames)) {
externalSink.SetSinkName(secureParams->first);
externalSink.SetAuthInfo(secureParams->second);
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/protos/flat_scheme_op.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1863,13 +1863,18 @@ message TBasic {
optional string PasswordSecretName = 2;
}

message TToken {
optional string TokenSecretName = 2;
}

message TAuth {
oneof identity {
TNoneAuth None = 3;
TServiceAccountAuth ServiceAccount = 4;
TBasic Basic = 5;
TMdbBasic MdbBasic = 6;
TAws Aws = 7;
TToken Token = 8;
}
}

Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/kqp_physical.proto
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ message TKqpPhyStage {
repeated TKqpSource Sources = 9;
bool IsSinglePartition = 10;
repeated TKqpSink Sinks = 11;
map<string, string> SecureParams = 12;
}

message TKqpPhyResult {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ bool ValidateAuth(const NKikimrSchemeOp::TAuth& auth,
return CheckAuth("BASIC", availableAuthMethods, errStr);
case NKikimrSchemeOp::TAuth::kAws:
return CheckAuth("AWS", availableAuthMethods, errStr);
case NKikimrSchemeOp::TAuth::kToken:
return CheckAuth("TOKEN", availableAuthMethods, errStr);
case NKikimrSchemeOp::TAuth::kNone:
return CheckAuth("NONE", availableAuthMethods, errStr);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,26 @@ Y_UNIT_TEST_SUITE(TokenBuilderTest) {
UNIT_ASSERT_VALUES_EQUAL(R"({"basic_login":"my_login"})", b.ToJson());
}

Y_UNIT_TEST(TokenAuthWithSecret) {
TStructuredTokenBuilder b;
b.SetTokenAuthWithSecret("my_token_reference", "my_token");
UNIT_ASSERT_VALUES_EQUAL(R"({"token":"my_token","token_ref":"my_token_reference"})", b.ToJson());
TStructuredTokenParser p = CreateStructuredTokenParser(b.ToJson());
UNIT_ASSERT(!p.HasServiceAccountIdAuth());
UNIT_ASSERT(!p.HasBasicAuth());
UNIT_ASSERT(p.HasIAMToken());
UNIT_ASSERT(!p.IsNoAuth());
UNIT_ASSERT(p.GetIAMToken() == "my_token");
TSet<TString> references;
p.ListReferences(references);
UNIT_ASSERT_VALUES_EQUAL(references.size(), 1);
UNIT_ASSERT(references.contains("my_token_reference"));
b.RemoveSecrets();
UNIT_ASSERT_VALUES_EQUAL(R"({"token_ref":"my_token_reference"})", b.ToJson());
b.ReplaceReferences({{"my_token_reference", "my_token"}});
UNIT_ASSERT_VALUES_EQUAL(R"({"token":"my_token"})", b.ToJson());
}

Y_UNIT_TEST(IAMToken) {
TStructuredTokenBuilder b;
b.SetIAMToken("my_token");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ TStructuredTokenBuilder& TStructuredTokenBuilder::SetBasicAuthWithSecret(const T
return *this;
}

TStructuredTokenBuilder& TStructuredTokenBuilder::SetTokenAuthWithSecret(const TString& tokenReference, const TString& token) {
Data.SetField("token_ref", tokenReference);
Data.SetField("token", token);
return *this;
}

TStructuredTokenBuilder& TStructuredTokenBuilder::SetIAMToken(const TString& token) {
Data.SetField("token", token);
return *this;
Expand All @@ -56,12 +62,18 @@ TStructuredTokenBuilder& TStructuredTokenBuilder::ReplaceReferences(const std::m
Data.ClearField("sa_id_signature_ref");
Data.SetField("sa_id_signature", secrets.at(reference));
}
if (Data.HasField("token_ref")) {
auto reference = Data.GetField("token_ref");
Data.ClearField("token_ref");
Data.SetField("token", secrets.at(reference));
}
return *this;
}

TStructuredTokenBuilder& TStructuredTokenBuilder::RemoveSecrets() {
Data.ClearField("basic_password");
Data.ClearField("sa_id_signature");
Data.ClearField("token");
return *this;
}

Expand Down Expand Up @@ -127,6 +139,9 @@ void TStructuredTokenParser::ListReferences(TSet<TString>& references) const {
if (Data.HasField("sa_id_signature_ref")) {
references.insert(Data.GetField("sa_id_signature_ref"));
}
if (Data.HasField("token_ref")) {
references.insert(Data.GetField("token_ref"));
}
}

TStructuredTokenBuilder TStructuredTokenParser::ToBuilder() const {
Expand Down Expand Up @@ -178,4 +193,16 @@ TString ComposeStructuredTokenJsonForBasicAuthWithSecret(const TString& login, c
return result.ToJson();
}

TString ComposeStructuredTokenJsonForTokenAuthWithSecret(const TString& tokenSecretName, const TString& token) {
TStructuredTokenBuilder result;

if (tokenSecretName && token) {
result.SetTokenAuthWithSecret(tokenSecretName, token);
return result.ToJson();
}

result.SetNoAuth();
return result.ToJson();
}

}
Loading

0 comments on commit f23e3be

Please sign in to comment.