Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion ydb/core/persqueue/pqtablet/partition/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1663,6 +1663,8 @@ void TPartition::Handle(TEvPQ::TEvGetWriteInfoError::TPtr& ev, const TActorConte
void TPartition::ReplyToProposeOrPredicate(TSimpleSharedPtr<TTransaction>& tx, bool isPredicate) {

if (isPredicate) {
TabletCounters.Percentile()[COUNTER_LATENCY_PQ_TXCALCPREDICATE].IncrementFor((Now() - tx->CalcPredicateTimestamp).MilliSeconds());

tx->CalcPredicateSpan.End();
tx->CalcPredicateSpan = {};

Expand Down Expand Up @@ -2195,7 +2197,7 @@ void TPartition::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext&

void TPartition::PushBackDistrTx(TSimpleSharedPtr<TEvPQ::TEvTxCalcPredicate> event)
{
UserActionAndTransactionEvents.emplace_back(MakeSimpleShared<TTransaction>(std::move(event)));
UserActionAndTransactionEvents.emplace_back(MakeSimpleShared<TTransaction>(std::move(event), Now()));
RequestWriteInfoIfRequired();
}

Expand Down
8 changes: 5 additions & 3 deletions ydb/core/persqueue/pqtablet/partition/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,14 @@ class IAutopartitioningManager;
class TPartitionCompaction;

struct TTransaction {

explicit TTransaction(TSimpleSharedPtr<TEvPQ::TEvTxCalcPredicate> tx,
TMaybe<bool> predicate = Nothing())
TTransaction(TSimpleSharedPtr<TEvPQ::TEvTxCalcPredicate> tx,
TInstant calcPredicateTimestamp,
TMaybe<bool> predicate = Nothing())
: Tx(tx)
, Predicate(predicate)
, SupportivePartitionActor(tx->SupportivePartitionActor)
, CalcPredicateSpan(std::move(tx->Span))
, CalcPredicateTimestamp(calcPredicateTimestamp)
{
AFL_ENSURE(Tx);
}
Expand Down Expand Up @@ -123,6 +124,7 @@ struct TTransaction {
NWilson::TSpan CommitSpan;

TInstant WriteInfoResponseTimestamp;
TInstant CalcPredicateTimestamp;
};
class TPartitionCompaction;

Expand Down
16 changes: 0 additions & 16 deletions ydb/core/persqueue/ut/partition_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -282,11 +282,6 @@ class TPartitionFixture : public NUnitTest::TBaseFixture {
void SendChangePartitionConfig(const TConfigParams& config = {});
void WaitPartitionConfigChanged(const TChangePartitionConfigMatcher& matcher = {});

TTransaction MakeTransaction(ui64 step, ui64 txId,
TString consumer,
ui64 begin, ui64 end,
TMaybe<bool> predicate = Nothing());

void SendSubDomainStatus(bool subDomainOutOfSpace = false);
void SendReserveBytes(const ui64 cookie, const ui32 size, const TString& ownerCookie, const ui64 messageNo, bool lastRequest = false);
void SendChangeOwner(const ui64 cookie, const TString& owner, const TActorId& pipeClient, const bool force = true);
Expand Down Expand Up @@ -1106,17 +1101,6 @@ void TPartitionFixture::WaitPartitionConfigChanged(const TChangePartitionConfigM
}
}

TTransaction TPartitionFixture::MakeTransaction(ui64 step, ui64 txId,
TString consumer,
ui64 begin, ui64 end,
TMaybe<bool> predicate)
{
auto event = MakeSimpleShared<TEvPQ::TEvTxCalcPredicate>(step, txId);
event->AddOperation(std::move(consumer), begin, end);

return TTransaction(event, predicate);
}

template<class TIterable>
void CompareVectors(const TVector<ui64>& expected, const TIterable& actual) {
auto i = 0u;
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/protos/counters_pq.proto
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ enum EPercentileCounters {
COUNTER_LATENCY_PQ_SPLIT_MESSAGE_GROUP = 19 [(CounterOpts) = {Name: "LatencySplitMessageGroup"}];
COUNTER_LATENCY_PQ_PUBLISH_READ = 20 [(CounterOpts) = {Name: "LatencyPublishRead"}];
COUNTER_LATENCY_PQ_FORGET_READ = 21 [(CounterOpts) = {Name: "LatencyForgetRead"}];

COUNTER_LATENCY_PQ_TXCALCPREDICATE = 22 [(CounterOpts) = {Name: "LatencyTxCalcPredicate"}];
}


Expand Down
Loading