Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/import_generation.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
26
27
2 changes: 1 addition & 1 deletion .github/last_commit.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
234f6d55e1e71a242c2767a22b990d68ff7fce72
78aa6fa75eba124b91b200bfe76c67e94ee968a4
2 changes: 1 addition & 1 deletion examples/topic_reader/eventloop/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ int main(int argc, const char* argv[]) {
stopPartitionSessionEvent->Confirm();
} else if (auto* endPartitionSessionEvent = std::get_if<NYdb::NTopic::TReadSessionEvent::TEndPartitionSessionEvent>(&*event)) {
endPartitionSessionEvent->Confirm();
} else if (auto* closeSessionEvent = std::get_if<NYdb::NTopic::TSessionClosedEvent>(&*event)) {
} else if (std::get_if<NYdb::NTopic::TSessionClosedEvent>(&*event)) {
break;
}
}
Expand Down
14 changes: 10 additions & 4 deletions include/ydb-cpp-sdk/client/iam/common/generic_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class TGrpcIamCredentialsProvider : public ICredentialsProvider {
protected:
using TRequestFiller = std::function<void(TRequest&)>;
using TAsyncInterface = typename TService::Stub::async_interface;
using TAsyncRpc = void (TAsyncInterface::*)(grpc::ClientContext*, const TRequest*, TResponse*, std::function<void(grpc::Status)>);
using TAsyncRpc = std::function<void(typename TService::Stub*, grpc::ClientContext*, const TRequest*, TResponse*, std::function<void(grpc::Status)>)>;

private:
class TImpl : public std::enable_shared_from_this<TGrpcIamCredentialsProvider<TRequest, TResponse, TService>::TImpl> {
Expand Down Expand Up @@ -96,7 +96,7 @@ class TGrpcIamCredentialsProvider : public ICredentialsProvider {
context->AddMetadata("authorization", "Bearer " + AuthTokenProvider_->GetAuthInfo());
}

(Stub_->async()->*Rpc_)(context.get(), &req, response.get(), std::move(cb));
Rpc_(Stub_.get(), context.get(), &req, response.get(), std::move(cb));

if (sync) {
resultPromise.GetFuture().Wait(2 * IamEndpoint_.RequestTimeout);
Expand Down Expand Up @@ -127,6 +127,8 @@ class TGrpcIamCredentialsProvider : public ICredentialsProvider {
}
NeedStop_ = true;
}
Stub_.reset();
Channel_.reset();
}

private:
Expand Down Expand Up @@ -215,7 +217,9 @@ class TIamJwtCredentialsProvider : public TGrpcIamCredentialsProvider<TRequest,
: TGrpcIamCredentialsProvider<TRequest, TResponse, TService>(params,
[jwtParams = params.JwtParams](TRequest& req) {
req.set_jwt(MakeSignedJwt(jwtParams));
}, &TService::Stub::async_interface::Create) {}
}, [](typename TService::Stub* stub, grpc::ClientContext* context, const TRequest* request, TResponse* response, std::function<void(grpc::Status)> cb) {
stub->async()->Create(context, request, response, std::move(cb));
}) {}
};

template<typename TRequest, typename TResponse, typename TService>
Expand All @@ -225,7 +229,9 @@ class TIamOAuthCredentialsProvider : public TGrpcIamCredentialsProvider<TRequest
: TGrpcIamCredentialsProvider<TRequest, TResponse, TService>(params,
[token = params.OAuthToken](TRequest& req) {
req.set_yandex_passport_oauth_token(TStringType{token});
}, &TService::Stub::async_interface::Create) {}
}, [](typename TService::Stub* stub, grpc::ClientContext* context, const TRequest* request, TResponse* response, std::function<void(grpc::Status)> cb) {
stub->async()->Create(context, request, response, std::move(cb));
}) {}
};

template<typename TRequest, typename TResponse, typename TService>
Expand Down
39 changes: 39 additions & 0 deletions include/ydb-cpp-sdk/client/topic/control_plane.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,28 @@ enum class EAutoPartitioningStrategy: uint32_t {
Paused = 4,
};

// 0 - unspecified
// 1 - disabeld
// 2 - database level metrics
// 3 - object level metrics
// 4 - detailed metrics
using EMetricsLevel = uint32_t;

class TConsumer {
public:
TConsumer(const Ydb::Topic::Consumer&);

const std::string& GetConsumerName() const;
bool GetImportant() const;
TDuration GetAvailabilityPeriod() const;
const TInstant& GetReadFrom() const;
const std::vector<ECodec>& GetSupportedCodecs() const;
const std::map<std::string, std::string>& GetAttributes() const;

private:
std::string ConsumerName_;
bool Important_;
TDuration AvailabilityPeriod_;
TInstant ReadFrom_;
std::map<std::string, std::string> Attributes_;
std::vector<ECodec> SupportedCodecs_;
Expand Down Expand Up @@ -307,6 +316,8 @@ class TTopicDescription {

const TTopicStats& GetTopicStats() const;

std::optional<EMetricsLevel> GetMetricsLevel() const;

void SerializeTo(Ydb::Topic::CreateTopicRequest& request) const;
private:

Expand All @@ -330,6 +341,7 @@ class TTopicDescription {
NScheme::TVirtualTimestamp CreationTimestamp_;
std::vector<NScheme::TPermissions> Permissions_;
std::vector<NScheme::TPermissions> EffectivePermissions_;
std::optional<EMetricsLevel> MetricsLevel_;
};

class TConsumerDescription {
Expand Down Expand Up @@ -452,6 +464,7 @@ struct TConsumerSettings {

FLUENT_SETTING(std::string, ConsumerName);
FLUENT_SETTING_DEFAULT(bool, Important, false);
FLUENT_SETTING_DEFAULT(TDuration, AvailabilityPeriod, TDuration::Zero());
FLUENT_SETTING_DEFAULT(TInstant, ReadFrom, TInstant::Zero());

FLUENT_SETTING_VECTOR(ECodec, SupportedCodecs);
Expand Down Expand Up @@ -488,6 +501,11 @@ struct TConsumerSettings {
return *this;
}

TConsumerSettings& SetAvailiabilityPeriod(TDuration availabilityPeriod) {
AvailabilityPeriod_ = availabilityPeriod;
return *this;
}

TSettings& EndAddConsumer() { return Parent_; };

private:
Expand All @@ -504,6 +522,7 @@ struct TAlterConsumerSettings {

FLUENT_SETTING(std::string, ConsumerName);
FLUENT_SETTING_OPTIONAL(bool, SetImportant);
FLUENT_SETTING_OPTIONAL(TDuration, SetAvailabilityPeriod);
FLUENT_SETTING_OPTIONAL(TInstant, SetReadFrom);

FLUENT_SETTING_OPTIONAL_VECTOR(ECodec, SetSupportedCodecs);
Expand All @@ -524,6 +543,11 @@ struct TAlterConsumerSettings {
return *this;
}

TAlterConsumerSettings& SetAvailabilityPeriod(TDuration availabilityPeriod) {
SetAvailabilityPeriod_ = availabilityPeriod;
return *this;
}

TAlterTopicSettings& EndAlterConsumer() { return Parent_; };

private:
Expand Down Expand Up @@ -557,6 +581,8 @@ struct TCreateTopicSettings : public TOperationRequestSettings<TCreateTopicSetti

FLUENT_SETTING(TAttributes, Attributes);

FLUENT_SETTING_OPTIONAL(EMetricsLevel, MetricsLevel);

TCreateTopicSettings& SetSupportedCodecs(std::vector<ECodec>&& codecs) {
SupportedCodecs_ = std::move(codecs);
return *this;
Expand Down Expand Up @@ -729,7 +755,20 @@ struct TAlterTopicSettings : public TOperationRequestSettings<TAlterTopicSetting
return *this;
}

TAlterTopicSettings& SetMetricsLevel(EMetricsLevel level) {
MetricsLevel_ = level;
return *this;
}
TAlterTopicSettings& ResetMetricsLevel() {
MetricsLevel_ = true;
return *this;
}

std::optional<TAlterPartitioningSettings> AlterPartitioningSettings_;
std::variant<
bool, // Reset
EMetricsLevel // Set
> MetricsLevel_ = false;
};

inline TPartitioningSettingsBuilder TCreateTopicSettings::BeginConfigurePartitioningSettings() {
Expand Down
10 changes: 10 additions & 0 deletions include/ydb-cpp-sdk/client/value/value.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ enum class EPrimitiveType {
TzDatetime = 0x0035,
TzTimestamp = 0x0036,
String = 0x1001,
Bytes = 0x1001,
Utf8 = 0x1200,
Text = 0x1200,
Yson = 0x1201,
Json = 0x1202,
Uuid = 0x1203,
Expand Down Expand Up @@ -338,7 +340,9 @@ class TValueParser : public TMoveOnly {
const std::string& GetTzDatetime() const;
const std::string& GetTzTimestamp() const;
const std::string& GetString() const;
const std::string& GetBytes() const;
const std::string& GetUtf8() const;
const std::string& GetText() const;
const std::string& GetYson() const;
const std::string& GetJson() const;
TDecimalValue GetDecimal() const;
Expand Down Expand Up @@ -370,7 +374,9 @@ class TValueParser : public TMoveOnly {
std::optional<std::string> GetOptionalTzDatetime() const;
std::optional<std::string> GetOptionalTzTimestamp() const;
std::optional<std::string> GetOptionalString() const;
std::optional<std::string> GetOptionalBytes() const;
std::optional<std::string> GetOptionalUtf8() const;
std::optional<std::string> GetOptionalText() const;
std::optional<std::string> GetOptionalYson() const;
std::optional<std::string> GetOptionalJson() const;
std::optional<TDecimalValue> GetOptionalDecimal() const;
Expand Down Expand Up @@ -448,7 +454,9 @@ class TValueBuilderBase : public TMoveOnly {
TDerived& TzDatetime(const std::string& value);
TDerived& TzTimestamp(const std::string& value);
TDerived& String(const std::string& value);
TDerived& Bytes(const std::string& value);
TDerived& Utf8(const std::string& value);
TDerived& Text(const std::string& value);
TDerived& Yson(const std::string& value);
TDerived& Json(const std::string& value);
TDerived& Decimal(const TDecimalValue& value);
Expand Down Expand Up @@ -480,7 +488,9 @@ class TValueBuilderBase : public TMoveOnly {
TDerived& OptionalTzDatetime(const std::optional<std::string>& value);
TDerived& OptionalTzTimestamp(const std::optional<std::string>& value);
TDerived& OptionalString(const std::optional<std::string>& value);
TDerived& OptionalBytes(const std::optional<std::string>& value);
TDerived& OptionalUtf8(const std::optional<std::string>& value);
TDerived& OptionalText(const std::optional<std::string>& value);
TDerived& OptionalYson(const std::optional<std::string>& value);
TDerived& OptionalJson(const std::optional<std::string>& value);
TDerived& OptionalUuid(const std::optional<TUuidValue>& value);
Expand Down
6 changes: 0 additions & 6 deletions src/api/protos/ydb_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,3 @@ message VirtualTimestamp {
uint64 plan_step = 1;
uint64 tx_id = 2;
}

enum MetricsLevel {
Database = 0;
Object = 1;
Detailed = 2;
}
6 changes: 6 additions & 0 deletions src/api/protos/ydb_persqueue_v1.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1159,6 +1159,9 @@ message TopicSettings {

// Client service type.
string service_type = 7;

// Message for this consumer will not expire due to retention for at least `availability_period` if they aren't commited.
optional google.protobuf.Duration availability_period = 8;
}

// List of consumer read rules for this topic.
Expand All @@ -1184,6 +1187,9 @@ message TopicSettings {
}
// remote mirror rule for this topic.
RemoteMirrorRule remote_mirror_rule = 11;

// Set or reset metrics level.
optional uint32 metrics_level = 16;
}

message AutoPartitioningSettings {
Expand Down
25 changes: 23 additions & 2 deletions src/api/protos/ydb_topic.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import "src/api/protos/annotations/sensitive.proto";
import "src/api/protos/annotations/validation.proto";

import "google/protobuf/duration.proto";
import "google/protobuf/empty.proto";
import "google/protobuf/timestamp.proto";

package Ydb.Topic;
Expand Down Expand Up @@ -155,7 +156,7 @@ message StreamWriteMessage {
// Message sequence number, provided by client for deduplication.
// Starts at 1
int64 seq_no = 1;
// Creation timestamp
// Creation timestamp.
google.protobuf.Timestamp created_at = 2;
// Compressed client message body.
bytes data = 3;
Expand Down Expand Up @@ -822,6 +823,9 @@ message Consumer {
// Bytes read statistics.
MultipleWindowsStat bytes_read = 4;
}

// Message for this consumer will not expire due to retention for at least `availability_period` if they aren't commited.
optional google.protobuf.Duration availability_period = 8;
}

// Consumer alter description.
Expand All @@ -844,6 +848,12 @@ message AlterConsumer {
// User and server attributes of consumer. Server attributes starts from "_" and will be validated by server.
// Leave the value blank to drop an attribute.
map<string, string> alter_attributes = 6;

// Change message lifetime if consumer is important.
oneof availability_period_action {
google.protobuf.Duration set_availability_period = 7;
google.protobuf.Empty reset_availability_period = 8;
}
}

enum AutoPartitioningStrategy {
Expand Down Expand Up @@ -992,6 +1002,9 @@ message CreateTopicRequest {

// Metering mode for the topic in a serverless database.
MeteringMode metering_mode = 12;

// Metrics level. If the level is unset, use database setting.
optional uint32 metrics_level = 13;
}

// Create topic response sent from server to client.
Expand Down Expand Up @@ -1125,6 +1138,9 @@ message DescribeTopicResult {
// How much bytes were written statistics.
MultipleWindowsStat bytes_written = 4;
}

// Metrics level.
optional uint32 metrics_level = 16;
}

// Describe partition request sent from client to server.
Expand Down Expand Up @@ -1252,7 +1268,6 @@ message PartitionStats {
int32 partition_node_id = 8 [deprecated=true]; //Use PartitionLocation
}


// Update existing topic request sent from client to server.
message AlterTopicRequest {
Ydb.Operations.OperationParams operation_params = 1;
Expand Down Expand Up @@ -1297,6 +1312,12 @@ message AlterTopicRequest {

// Set metering mode for topic in serverless database.
MeteringMode set_metering_mode = 14;

// Set or reset metrics level.
oneof metrics_level {
uint32 set_metrics_level = 15;
google.protobuf.Empty reset_metrics_level = 16;
}
}

// Update topic response sent from server to client.
Expand Down
4 changes: 3 additions & 1 deletion src/client/iam_private/common/iam.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ class TIamServiceCredentialsProviderFactory : public ICredentialsProviderFactory
req.set_resource_type(params.ResourceType);
req.set_target_service_account_id(params.TargetServiceAccountId);
},
&TService::Stub::async_interface::CreateForService,
[](typename TService::Stub* stub, grpc::ClientContext* context, const TRequest* request, TResponse* response, std::function<void(grpc::Status)> cb) {
stub->async()->CreateForService(context, request, response, std::move(cb));
},
params.SystemServiceAccountCredentials->CreateProvider()) {}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ namespace NKikimr::NPersQueueTests {
createPartitionStreamEvent->Confirm();
} else if (auto* destroyPartitionStreamEvent = std::get_if<NYdb::NPersQueue::TReadSessionEvent::TDestroyPartitionStreamEvent>(&*event)) {
destroyPartitionStreamEvent->Confirm();
} else if (auto* closeSessionEvent = std::get_if<NYdb::NPersQueue::TSessionClosedEvent>(&*event)) {
} else if (std::get_if<NYdb::NPersQueue::TSessionClosedEvent>(&*event)) {
return {};
}
}
Expand Down
13 changes: 13 additions & 0 deletions src/client/persqueue_public/ut/ut_utils/ut_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,19 @@ class TPersQueueYdbSdkTestSetup : public ::NPersQueue::SDKTestSetup {
.ClusterDiscoveryMode(EClusterDiscoveryMode::On);
return settings;
}

void Write(const TString& topic, ui32 partitionId, const TString& data) {
auto settings = TWriteSessionSettings()
.Path(topic)
.MessageGroupId("src-id")
.PartitionGroupId(partitionId)
.Codec(ECodec::RAW);
auto writeSession = GetPersQueueClient().CreateSimpleBlockingWriteSession(settings);

writeSession->Write(data);

writeSession->Close();
}
};

struct TYDBClientEventLoop : public ::NPersQueue::IClientEventLoop {
Expand Down
6 changes: 5 additions & 1 deletion src/client/query/impl/exec_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,12 @@ class TExecuteQueryIterator::TReaderImpl {
};

TAsyncExecuteQueryPart TExecuteQueryIterator::ReadNext() {
if (!ReaderImpl_) {
RaiseError("Attempt to read a stream result part on an invalid stream. ");
}

if (ReaderImpl_->IsFinished()) {
RaiseError("Attempt to perform read on invalid or finished stream");
RaiseError("Attempt to read a stream result part on a finished stream. ");
}

return ReaderImpl_->ReadNext(ReaderImpl_);
Expand Down
Loading
Loading