Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class IClientDataConsumer : public TThrRefBase {
using TPtr = TIntrusivePtr<IClientDataConsumer>;

public:
virtual TVector<TSchemaColumn> GetColumns() const = 0;
virtual const TVector<TSchemaColumn>& GetColumns() const = 0;
virtual const TString& GetWhereFilter() const = 0;
virtual TPurecalcCompileSettings GetPurecalcSettings() const = 0;
virtual NActors::TActorId GetClientId() const = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class TFormatHandlerFixture : public TBaseFixture {
}

public:
TVector<TSchemaColumn> GetColumns() const override {
const TVector<TSchemaColumn>& GetColumns() const override {
return Columns;
}

Expand Down
75 changes: 45 additions & 30 deletions ydb/core/fq/libs/row_dispatcher/topic_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,23 +100,41 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
: Self(self)
, LogPrefix(logPrefix)
, HandlerSettings(handlerSettings)
, Settings(ev->Get()->Record)
, QueryId(ev->Get()->Record.GetQueryId())
, EnabledLLVM(ev->Get()->Record.GetSource().GetEnabledLLVM())
, StartingMessageTimestampMs(ev->Get()->Record.GetStartingMessageTimestampMs())
, Predicate(ev->Get()->Record.GetSource().GetPredicate())
, Columns(GetColumns(ev->Get()->Record.GetSource()))
, ConsumerName(ev->Get()->Record.GetSource().GetConsumerName())
, UseSsl(ev->Get()->Record.GetSource().GetUseSsl())
, ReadActorId(ev->Sender)
, Counters(counters)
{
if (offset) {
NextMessageOffset = *offset;
InitialOffset = *offset;
}
Y_UNUSED(TDuration::TryParse(Settings.GetSource().GetReconnectPeriod(), ReconnectPeriod));
auto queryGroup = Counters->GetSubgroup("query_id", ev->Get()->Record.GetQueryId());
Y_UNUSED(TDuration::TryParse(ev->Get()->Record.GetSource().GetReconnectPeriod(), ReconnectPeriod));
auto queryGroup = Counters->GetSubgroup("query_id", QueryId);
auto readSubGroup = queryGroup->GetSubgroup("read_group", SanitizeLabel(readGroup));
FilteredDataRate = readSubGroup->GetCounter("FilteredDataRate", true);
RestartSessionByOffsetsByQuery = readSubGroup->GetCounter("RestartSessionByOffsetsByQuery", true);

const auto& source = ev->Get()->Record.GetSource();
Y_ENSURE(source.ColumnsSize() == source.ColumnTypesSize(), "Columns size and types size should be equal, but got " << source.ColumnsSize() << " columns and " << source.ColumnTypesSize() << " types");
}

~TClientsInfo() {
Counters->RemoveSubgroup("query_id", Settings.GetQueryId());
Counters->RemoveSubgroup("query_id", QueryId);
}

static TVector<TSchemaColumn> GetColumns(const NYql::NPq::NProto::TDqPqTopicSource& source) {
TVector<TSchemaColumn> result;
result.reserve(source.ColumnsSize());
for (ui64 i = 0; i < source.ColumnsSize(); ++i) {
result.emplace_back(TSchemaColumn{.Name = source.GetColumns().Get(i), .TypeYson = source.GetColumnTypes().Get(i)});
}
return result;
}

TActorId GetClientId() const override {
Expand All @@ -127,25 +145,16 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
return NextMessageOffset;
}

TVector<TSchemaColumn> GetColumns() const override {
const auto& source = Settings.GetSource();
Y_ENSURE(source.ColumnsSize() == source.ColumnTypesSize(), "Columns size and types size should be equal, but got " << source.ColumnsSize() << " columns and " << source.ColumnTypesSize() << " types");

TVector<TSchemaColumn> Columns;
Columns.reserve(source.ColumnsSize());
for (ui64 i = 0; i < source.ColumnsSize(); ++i) {
Columns.emplace_back(TSchemaColumn{.Name = source.GetColumns().Get(i), .TypeYson = source.GetColumnTypes().Get(i)});
}

const TVector<TSchemaColumn>& GetColumns() const override {
return Columns;
}

const TString& GetWhereFilter() const override {
return Settings.GetSource().GetPredicate();
return Predicate;
}

TPurecalcCompileSettings GetPurecalcSettings() const override {
return {.EnabledLLVM = Settings.GetSource().GetEnabledLLVM()};
return {.EnabledLLVM = EnabledLLVM};
}

void OnClientError(TStatus status) override {
Expand Down Expand Up @@ -185,7 +194,13 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
TTopicSession& Self;
const TString& LogPrefix;
const ITopicFormatHandler::TSettings HandlerSettings;
const NFq::NRowDispatcherProto::TEvStartSession Settings;
const TString QueryId;
const bool EnabledLLVM;
const ui64 StartingMessageTimestampMs;
const TString Predicate;
const TVector<TSchemaColumn> Columns;
const TString ConsumerName;
const bool UseSsl;
const TActorId ReadActorId;
TDuration ReconnectPeriod;

Expand Down Expand Up @@ -281,9 +296,9 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
static constexpr char ActorName[] = "FQ_ROW_DISPATCHER_SESSION";

private:
NYdb::NTopic::TTopicClientSettings GetTopicClientSettings(const NYql::NPq::NProto::TDqPqTopicSource& sourceParams) const;
NYql::ITopicClient& GetTopicClient(const NYql::NPq::NProto::TDqPqTopicSource& sourceParams);
NYdb::NTopic::TReadSessionSettings GetReadSessionSettings(const NYql::NPq::NProto::TDqPqTopicSource& sourceParams) const;
NYdb::NTopic::TTopicClientSettings GetTopicClientSettings(bool useSsl) const;
NYql::ITopicClient& GetTopicClient(bool useSsl);
NYdb::NTopic::TReadSessionSettings GetReadSessionSettings(const TString& consumerName) const;
void CreateTopicSession();
void CloseTopicSession();
void SubscribeOnNextEvent();
Expand Down Expand Up @@ -411,17 +426,17 @@ void TTopicSession::SubscribeOnNextEvent() {
});
}

NYdb::NTopic::TTopicClientSettings TTopicSession::GetTopicClientSettings(const NYql::NPq::NProto::TDqPqTopicSource& sourceParams) const {
NYdb::NTopic::TTopicClientSettings TTopicSession::GetTopicClientSettings(bool useSsl) const {
return PqGateway->GetTopicClientSettings()
.Database(Database)
.DiscoveryEndpoint(Endpoint)
.SslCredentials(NYdb::TSslCredentials(sourceParams.GetUseSsl()))
.SslCredentials(NYdb::TSslCredentials(useSsl))
.CredentialsProviderFactory(CredentialsProviderFactory);
}

NYql::ITopicClient& TTopicSession::GetTopicClient(const NYql::NPq::NProto::TDqPqTopicSource& sourceParams) {
NYql::ITopicClient& TTopicSession::GetTopicClient(bool useSsl) {
if (!TopicClient) {
TopicClient = PqGateway->GetTopicClient(Driver, GetTopicClientSettings(sourceParams));
TopicClient = PqGateway->GetTopicClient(Driver, GetTopicClientSettings(useSsl));
}
return *TopicClient;
}
Expand All @@ -430,13 +445,13 @@ TInstant TTopicSession::GetMinStartingMessageTimestamp() const {
auto result = TInstant::Max();
Y_ENSURE(!Clients.empty());
for (const auto& [actorId, info] : Clients) {
ui64 time = info->Settings.GetStartingMessageTimestampMs();
ui64 time = info->StartingMessageTimestampMs;
result = std::min(result, TInstant::MilliSeconds(time));
}
return result;
}

NYdb::NTopic::TReadSessionSettings TTopicSession::GetReadSessionSettings(const NYql::NPq::NProto::TDqPqTopicSource& sourceParams) const {
NYdb::NTopic::TReadSessionSettings TTopicSession::GetReadSessionSettings(const TString& consumerName) const {
NYdb::NTopic::TTopicReadSettings topicReadSettings;
topicReadSettings.Path(TopicPath);
topicReadSettings.AppendPartitionIds(PartitionId);
Expand All @@ -453,7 +468,7 @@ NYdb::NTopic::TReadSessionSettings TTopicSession::GetReadSessionSettings(const N
if (Config.GetWithoutConsumer()) {
settings.WithoutConsumer();
} else {
settings.ConsumerName(sourceParams.GetConsumerName());
settings.ConsumerName(consumerName);
}
return settings;
}
Expand All @@ -465,8 +480,8 @@ void TTopicSession::CreateTopicSession() {

if (!ReadSession) {
// Use any sourceParams.
const NYql::NPq::NProto::TDqPqTopicSource& sourceParams = Clients.begin()->second->Settings.GetSource();
ReadSession = GetTopicClient(sourceParams).CreateReadSession(GetReadSessionSettings(sourceParams));
const auto& client = Clients.begin()->second;
ReadSession = GetTopicClient(client->UseSsl).CreateReadSession(GetReadSessionSettings(client->ConsumerName));
StartingMessageTimestamp = GetMinStartingMessageTimestamp();
SubscribeOnNextEvent();
}
Expand Down Expand Up @@ -699,7 +714,7 @@ void TTopicSession::SendData(TClientsInfo& info) {

void TTopicSession::StartClientSession(TClientsInfo& info) {
if (ReadSession) {
auto offset = GetOffset(info.Settings);
auto offset = info.GetNextMessageOffset();
if (offset && offset <= LastMessageOffset) {
LOG_ROW_DISPATCHER_INFO("New client has less offset (" << offset << ") than the last message (" << LastMessageOffset << "), stop (restart) topic session");
Metrics.RestartSessionByOffsets->Inc();
Expand Down
Loading