Skip to content

Commit

Permalink
Support secrets: yql & kqp parts (#4476)
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberROFL committed May 13, 2024
1 parent 39dcd45 commit df7d588
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 7 deletions.
9 changes: 4 additions & 5 deletions ydb/core/kqp/host/kqp_gateway_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1849,12 +1849,11 @@ class TKqpGatewayProxy : public IKikimrGateway {
if (const auto& database = settings.Settings.Database) {
params.SetDatabase(*database);
}
if (const auto& token = settings.Settings.OAuthToken) {
params.MutableOAuthToken()->SetToken(*token);
if (const auto& oauth = settings.Settings.OAuthToken) {
oauth->Serialize(*params.MutableOAuthToken());
}
if (const auto& creds = settings.Settings.StaticCredentials) {
params.MutableStaticCredentials()->SetUser(creds->UserName);
params.MutableStaticCredentials()->SetPassword(creds->Password);
if (const auto& staticCreds = settings.Settings.StaticCredentials) {
staticCreds->Serialize(*params.MutableStaticCredentials());
}

auto& targets = *config.MutableSpecific();
Expand Down
21 changes: 20 additions & 1 deletion ydb/core/kqp/provider/yql_kikimr_exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1851,13 +1851,20 @@ class TKiSinkCallableExecutionTransformer : public TAsyncCallbackTransformer<TKi
} else if (name == "database") {
settings.Settings.Database = setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value();
} else if (name == "token") {
settings.Settings.OAuthToken = setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value();
settings.Settings.EnsureOAuthToken().Token =
setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value();
} else if (name == "token_secret_name") {
settings.Settings.EnsureOAuthToken().TokenSecretName =
setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value();
} else if (name == "user") {
settings.Settings.EnsureStaticCredentials().UserName =
setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value();
} else if (name == "password") {
settings.Settings.EnsureStaticCredentials().Password =
setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value();
} else if (name == "password_secret_name") {
settings.Settings.EnsureStaticCredentials().PasswordSecretName =
setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value();
}
}

Expand Down Expand Up @@ -1885,6 +1892,18 @@ class TKiSinkCallableExecutionTransformer : public TAsyncCallbackTransformer<TKi
return SyncError();
}

if (const auto& x = settings.Settings.OAuthToken; x && x->Token && x->TokenSecretName) {
ctx.AddError(TIssue(ctx.GetPosition(createReplication.Pos()),
TStringBuilder() << "TOKEN and TOKEN_SECRET_NAME are mutually exclusive"));
return SyncError();
}

if (const auto& x = settings.Settings.StaticCredentials; x && x->Password && x->PasswordSecretName) {
ctx.AddError(TIssue(ctx.GetPosition(createReplication.Pos()),
TStringBuilder() << "PASSWORD and PASSWORD_SECRET_NAME are mutually exclusive"));
return SyncError();
}

auto cluster = TString(createReplication.DataSink().Cluster());
auto future = Gateway->CreateReplication(cluster, settings);

Expand Down
20 changes: 20 additions & 0 deletions ydb/core/kqp/provider/yql_kikimr_gateway.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <ydb/core/base/path.h>
#include <ydb/core/base/table_index.h>
#include <ydb/core/kqp/gateway/utils/scheme_helpers.h>
#include <ydb/core/protos/replication.pb.h>

#include <util/string/split.h>
#include <util/string/strip.h>
Expand Down Expand Up @@ -54,6 +55,25 @@ TKikimrPathId TKikimrPathId::Parse(const TStringBuf& str) {
return TKikimrPathId(FromString<ui64>(ownerStr), FromString<ui64>(idStr));
}

void TReplicationSettings::TOAuthToken::Serialize(NKikimrReplication::TOAuthToken& proto) const {
if (Token) {
proto.SetToken(Token);
}
if (TokenSecretName) {
proto.SetTokenSecretName(TokenSecretName);
}
}

void TReplicationSettings::TStaticCredentials::Serialize(NKikimrReplication::TStaticCredentials& proto) const {
proto.SetUser(UserName);
if (Password) {
proto.SetPassword(Password);
}
if (PasswordSecretName) {
proto.SetPasswordSecretName(PasswordSecretName);
}
}

TFuture<IKikimrGateway::TGenericResult> IKikimrGateway::CreatePath(const TString& path, TCreateDirFunc createDir) {
auto partsHolder = std::make_shared<TVector<TString>>(NKikimr::SplitPath(path));
auto& parts = *partsHolder;
Expand Down
25 changes: 24 additions & 1 deletion ydb/core/kqp/provider/yql_kikimr_gateway.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ namespace NKikimr {
}
}

namespace NKikimrReplication {
class TOAuthToken;
class TStaticCredentials;
}

namespace NYql {

using NUdf::EDataSlot;
Expand Down Expand Up @@ -701,18 +706,36 @@ struct TReplicationSettings {
EFailoverMode FailoverMode;
};

struct TOAuthToken {
TString Token;
TString TokenSecretName;

void Serialize(NKikimrReplication::TOAuthToken& proto) const;
};

struct TStaticCredentials {
TString UserName;
TString Password;
TString PasswordSecretName;

void Serialize(NKikimrReplication::TStaticCredentials& proto) const;
};

TMaybe<TString> ConnectionString;
TMaybe<TString> Endpoint;
TMaybe<TString> Database;
TMaybe<TString> OAuthToken;
TMaybe<TOAuthToken> OAuthToken;
TMaybe<TStaticCredentials> StaticCredentials;
TMaybe<TStateDone> StateDone;

TOAuthToken& EnsureOAuthToken() {
if (!OAuthToken) {
OAuthToken = TOAuthToken();
}

return *OAuthToken;
}

TStaticCredentials& EnsureStaticCredentials() {
if (!StaticCredentials) {
StaticCredentials = TStaticCredentials();
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kqp/provider/yql_kikimr_type_ann.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1638,8 +1638,10 @@ virtual TStatus HandleCreateTable(TKiCreateTable create, TExprContext& ctx) over
"endpoint",
"database",
"token",
"token_secret_name",
"user",
"password",
"password_secret_name",
};

if (!CheckReplicationSettings(node.ReplicationSettings(), supportedSettings, ctx)) {
Expand Down
80 changes: 80 additions & 0 deletions ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5242,6 +5242,39 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString());
}

// token & token_secret_name are mutually exclusive
{
auto query = R"(
--!syntax_v1
CREATE ASYNC REPLICATION `/Root/replication` FOR
`/Root/table` AS `/Root/replica`
WITH (
TOKEN = "foo",
TOKEN_SECRET_NAME = "bar"
);
)";

const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString());
}

// password & password_secret_name are mutually exclusive
{
auto query = R"(
--!syntax_v1
CREATE ASYNC REPLICATION `/Root/replication` FOR
`/Root/table` AS `/Root/replica`
WITH (
USER = "user",
PASSWORD = "bar"
PASSWORD_SECRET_NAME = "baz"
);
)";

const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString());
}

// unsupported setting (STATE) in CREATE
{
auto query = R"(
Expand Down Expand Up @@ -5291,6 +5324,53 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
}
}

Y_UNIT_TEST(CreateAsyncReplicationWithSecret) {
TKikimrRunner kikimr("root@builtin");
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

{
auto query = R"(
--!syntax_v1
CREATE TABLE `/Root/table` (
Key Uint64,
Value String,
PRIMARY KEY (Key)
);
)";

auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

// ok
{
auto query = Sprintf(R"(
--!syntax_v1
CREATE OBJECT mysecret (TYPE SECRET) WITH (value = "root@builtin");
CREATE ASYNC REPLICATION `/Root/replication` FOR
`/Root/table` AS `/Root/replica`
WITH (
ENDPOINT = "%s",
DATABASE = "/Root",
TOKEN_SECRET_NAME = "mysecret"
);
)", kikimr.GetEndpoint().c_str());

const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

while (true) {
auto describe = session.DescribeTable("/Root/replica").GetValueSync();
if (describe.GetStatus() == EStatus::SUCCESS) {
break;
}

Sleep(TDuration::Seconds(1));
}
}

Y_UNIT_TEST(AlterAsyncReplication) {
TKikimrRunner kikimr;
auto db = kikimr.GetTableClient();
Expand Down
2 changes: 2 additions & 0 deletions ydb/library/yql/sql/v1/sql_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@ static bool AsyncReplicationSettingsEntry(std::map<TString, TNodePtr>& out,
{"endpoint", false},
{"database", false},
{"token", false},
{"token_secret_name", false},
{"user", false},
{"password", false},
{"password_secret_name", false},
{"state", true},
{"failover_mode", true},
};
Expand Down
2 changes: 2 additions & 0 deletions ydb/library/yql/sql/v1/sql_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2661,8 +2661,10 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) {
{"ENDPOINT", "localhost:2135"},
{"DATABASE", "/MyDatabase"},
{"TOKEN", "foo"},
{"TOKEN_SECRET_NAME", "foo_secret_name"},
{"USER", "user"},
{"PASSWORD", "bar"},
{"PASSWORD_SECRET_NAME", "bar_secret_name"},
};

for (const auto& [k, v] : settings) {
Expand Down

0 comments on commit df7d588

Please sign in to comment.