Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions ydb/core/persqueue/events/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -861,18 +861,21 @@ struct TEvPQ {
};

struct TEvTxCalcPredicateResult : public TEventLocal<TEvTxCalcPredicateResult, EvTxCalcPredicateResult> {
TEvTxCalcPredicateResult(ui64 step, ui64 txId, const NPQ::TPartitionId& partition, TMaybe<bool> predicate) :
TEvTxCalcPredicateResult(ui64 step, ui64 txId, const NPQ::TPartitionId& partition, TMaybe<bool> predicate,
const TString& issueMsg) :
Step(step),
TxId(txId),
Partition(partition),
Predicate(predicate)
Predicate(predicate),
IssueMsg(issueMsg)
{
}

ui64 Step;
ui64 TxId;
NPQ::TPartitionId Partition;
TMaybe<bool> Predicate;
TString IssueMsg;
};

struct TEvProposePartitionConfig : public TEventLocal<TEvProposePartitionConfig, EvProposePartitionConfig> {
Expand Down
492 changes: 329 additions & 163 deletions ydb/core/persqueue/partition.cpp

Large diffs are not rendered by default.

125 changes: 80 additions & 45 deletions ydb/core/persqueue/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,14 @@ class TPartitionCompaction;

struct TTransaction {

explicit TTransaction(TSimpleSharedPtr<TEvPQ::TEvTxCalcPredicate> tx,
TMaybe<bool> predicate = Nothing())
TTransaction(TSimpleSharedPtr<TEvPQ::TEvTxCalcPredicate> tx,
TInstant calcPredicateTimestamp,
TMaybe<bool> predicate = Nothing())
: Tx(tx)
, Predicate(predicate)
, SupportivePartitionActor(tx->SupportivePartitionActor)
, CalcPredicateSpan(std::move(tx->Span))
, CalcPredicateTimestamp(calcPredicateTimestamp)
{
Y_ABORT_UNLESS(Tx);
}
Expand Down Expand Up @@ -128,6 +130,7 @@ struct TTransaction {
NWilson::TSpan CommitSpan;

TInstant WriteInfoResponseTimestamp;
TInstant CalcPredicateTimestamp;
};
class TPartitionCompaction;

Expand Down Expand Up @@ -333,11 +336,31 @@ class TPartition : public TActorBootstrapped<TPartition> {
TAutoPtr<TEvPersQueue::TEvHasDataInfoResponse> MakeHasDataInfoResponse(ui64 lagSize, const TMaybe<ui64>& cookie, bool readingFinished = false);

void ProcessTxsAndUserActs(const TActorContext& ctx);
void ContinueProcessTxsAndUserActs(const TActorContext& ctx);
void ProcessCommitQueue();
void RunPersist();

void MoveUserActOrTxToCommitState();
enum class EProcessResult;
struct TAffectedSourceIdsAndConsumers;

void ProcessUserActionAndTxEvents();
EProcessResult ProcessUserActionAndTxEvent(TSimpleSharedPtr<TEvPQ::TEvSetClientInfo>& event,
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);
EProcessResult ProcessUserActionAndTxEvent(TSimpleSharedPtr<TTransaction>& tx,
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);
EProcessResult ProcessUserActionAndTxEvent(TMessage& msg,
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);

void MoveUserActionAndTxToPendingCommitQueue();

void ProcessUserActionAndTxPendingCommits();
void ProcessUserActionAndTxPendingCommit(TSimpleSharedPtr<TEvPQ::TEvSetClientInfo>& event,
TEvKeyValue::TEvRequest* request);
void ProcessUserActionAndTxPendingCommit(TSimpleSharedPtr<TTransaction>& tx,
TEvKeyValue::TEvRequest* request);
void ProcessUserActionAndTxPendingCommit(TMessage& msg,
TEvKeyValue::TEvRequest* request);

bool WritingCycleDoesNotExceedTheLimits() const;

void PushBackDistrTx(TSimpleSharedPtr<TEvPQ::TEvTxCalcPredicate> event);
void PushBackDistrTx(TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> event);
void PushFrontDistrTx(TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> event);
Expand Down Expand Up @@ -419,14 +442,13 @@ class TPartition : public TActorBootstrapped<TPartition> {
const TString& reason);
THolder<TEvPQ::TEvTxCommitDone> MakeCommitDone(ui64 step, ui64 txId);

bool BeginTransaction(const TEvPQ::TEvProposePartitionConfig& event);
bool BeginTransactionConfig();

void CommitTransaction(TSimpleSharedPtr<TTransaction>& t);
void RollbackTransaction(TSimpleSharedPtr<TTransaction>& t);


void BeginChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& config,
const TActorContext& ctx);
void BeginChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& config);
void ExecChangePartitionConfig();

void OnProcessTxsAndUserActsWriteComplete(const TActorContext& ctx);
Expand Down Expand Up @@ -717,10 +739,10 @@ class TPartition : public TActorBootstrapped<TPartition> {
TMaybe<i16> KafkaProducerEpoch = 0;
};

THashSet<TString> TxAffectedSourcesIds;
THashSet<TString> WriteAffectedSourcesIds;
THashSet<TString> TxAffectedConsumers;
THashSet<TString> SetOffsetAffectedConsumers;
THashMap<TString, size_t> TxAffectedSourcesIds;
THashMap<TString, size_t> WriteAffectedSourcesIds;
THashMap<TString, size_t> TxAffectedConsumers;
THashMap<TString, size_t> SetOffsetAffectedConsumers;
THashMap<TString, TSourceIdPostPersistInfo> TxSourceIdForPostPersist;
THashMap<TString, TSeqNoProducerEpoch> TxInflightMaxSeqNoPerSourceId;

Expand Down Expand Up @@ -775,44 +797,61 @@ class TPartition : public TActorBootstrapped<TPartition> {

TMaybe<TUsersInfoStorage> UsersInfoStorage;

// template <class T> T& GetUserActionAndTransactionEventsFront();
// template <class T> T& GetCurrentEvent();
//TSimpleSharedPtr<TTransaction>& GetCurrentTransaction();
struct TAffectedSourceIdsAndConsumers {
TVector<TString> TxWriteSourcesIds;
TVector<TString> WriteSourcesIds;
TVector<TString> TxReadConsumers;
TVector<TString> ReadConsumers;
ui32 WriteKeysSize = 0;
ui32 WriteCycleSize = 0;
};

void AppendAffectedSourceIdsAndConsumers(const TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);

EProcessResult PreProcessUserActionOrTransaction(TSimpleSharedPtr<TEvPQ::TEvSetClientInfo>& event);
EProcessResult PreProcessUserActionOrTransaction(TSimpleSharedPtr<TEvPersQueue::TEvProposeTransaction>& event);
EProcessResult PreProcessUserActionOrTransaction(TSimpleSharedPtr<TTransaction>& tx);
EProcessResult PreProcessUserActionOrTransaction(TMessage& msg);
void DeleteAffectedSourceIdsAndConsumers();
void DeleteAffectedSourceIdsAndConsumers(const TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);
void DeleteFromSet(const TVector<TString>& p, THashMap<TString, size_t>& q) const;

bool ExecUserActionOrTransaction(TSimpleSharedPtr<TEvPQ::TEvSetClientInfo>& event, TEvKeyValue::TEvRequest* request);
EProcessResult PreProcessUserActionOrTransaction(TSimpleSharedPtr<TEvPQ::TEvSetClientInfo>& event,
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);
EProcessResult PreProcessUserActionOrTransaction(TSimpleSharedPtr<TTransaction>& tx,
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);
EProcessResult PreProcessUserActionOrTransaction(TMessage& msg,
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);

bool ExecUserActionOrTransaction(TSimpleSharedPtr<TEvPersQueue::TEvProposeTransaction>& event,
TEvKeyValue::TEvRequest* request);
bool ExecUserActionOrTransaction(TSimpleSharedPtr<TEvPQ::TEvSetClientInfo>& events, TEvKeyValue::TEvRequest* request);
bool ExecUserActionOrTransaction(TSimpleSharedPtr<TTransaction>& tx, TEvKeyValue::TEvRequest* request);
bool ExecUserActionOrTransaction(TMessage& msg, TEvKeyValue::TEvRequest* request);

[[nodiscard]] EProcessResult PreProcessUserAct(TEvPQ::TEvSetClientInfo& act, const TActorContext& ctx);
[[nodiscard]] EProcessResult PreProcessUserAct(TEvPQ::TEvSetClientInfo& act,
TAffectedSourceIdsAndConsumers* affectedSourceIdsAndConsumers);
void CommitUserAct(TEvPQ::TEvSetClientInfo& act);


[[nodiscard]] EProcessResult PreProcessImmediateTx(const NKikimrPQ::TEvProposeTransaction& tx);
[[nodiscard]] EProcessResult PreProcessImmediateTx(TTransaction& t,
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);
void ExecImmediateTx(TTransaction& tx);

EProcessResult PreProcessRequest(TRegisterMessageGroupMsg& msg);
EProcessResult PreProcessRequest(TDeregisterMessageGroupMsg& msg);
EProcessResult PreProcessRequest(TSplitMessageGroupMsg& msg);
EProcessResult PreProcessRequest(TWriteMsg& msg);
EProcessResult PreProcessRequest(TRegisterMessageGroupMsg& msg,
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);
EProcessResult PreProcessRequest(TDeregisterMessageGroupMsg& msg,
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);
EProcessResult PreProcessRequest(TSplitMessageGroupMsg& msg,
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);
EProcessResult PreProcessRequest(TWriteMsg& msg,
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);

void ExecRequest(TRegisterMessageGroupMsg& msg, ProcessParameters& parameters);
void ExecRequest(TDeregisterMessageGroupMsg& msg, ProcessParameters& parameters);
void ExecRequest(TSplitMessageGroupMsg& msg, ProcessParameters& parameters);
bool ExecRequest(TWriteMsg& msg, ProcessParameters& parameters, TEvKeyValue::TEvRequest* request);

[[nodiscard]] EProcessResult BeginTransaction(const TEvPQ::TEvTxCalcPredicate& event, TMaybe<bool>& predicate);
[[nodiscard]] EProcessResult BeginTransactionData(TTransaction& t,
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);

EProcessResult ApplyWriteInfoResponse(TTransaction& tx);
EProcessResult ApplyWriteInfoResponse(TTransaction& tx,
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);

bool FirstEvent = true;
bool HaveWriteMsg = false;
bool HaveData = false;
bool HaveCheckDisk = false;
Expand All @@ -833,13 +872,12 @@ class TPartition : public TActorBootstrapped<TPartition> {
void BeginAppendHeadWithNewWrites(const TActorContext& ctx);
void EndAppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const TActorContext& ctx);

bool HasPendingCommitsOrPendingWrites() const;

//
// user actions and transactions
//
struct TUserActionAndTransactionEvent {
std::variant<TSimpleSharedPtr<TEvPQ::TEvSetClientInfo>, // user actions
TSimpleSharedPtr<TTransaction>, // distributed transaction or update config
TMessage> Event;
TUserActionAndTransactionEvent(TSimpleSharedPtr<TTransaction>&& transaction)
: Event(std::move(transaction))
{}
Expand All @@ -849,10 +887,16 @@ class TPartition : public TActorBootstrapped<TPartition> {
TUserActionAndTransactionEvent(TMessage&& message)
: Event(std::move(message))
{}

std::variant<TSimpleSharedPtr<TEvPQ::TEvSetClientInfo>, // user actions
TSimpleSharedPtr<TTransaction>, // distributed transaction or update config
TMessage> Event;
TAffectedSourceIdsAndConsumers AffectedSourceIdsAndConsumers;
};

std::deque<TUserActionAndTransactionEvent> UserActionAndTransactionEvents;
std::deque<TUserActionAndTransactionEvent> UserActionAndTxPendingCommit;
std::deque<TUserActionAndTransactionEvent> UserActionAndTxPendingWrite;
TVector<THolder<TEvPQ::TEvGetWriteInfoResponse>> WriteInfosApplied;

THashMap<ui64, TSimpleSharedPtr<TTransaction>> TransactionsInflight;
Expand All @@ -875,15 +919,6 @@ class TPartition : public TActorBootstrapped<TPartition> {
TMessageQueue Responses;
ui64 CurrentBatchSize = 0;

enum class ETxBatchingState{
PreProcessing,
Executing,
Finishing
};
ETxBatchingState BatchingState = ETxBatchingState::PreProcessing;
//
//
//
std::deque<std::pair<TString, ui64>> UpdateUserInfoTimestamp;
bool ReadingTimestamp;
TString ReadingForUser;
Expand Down Expand Up @@ -1067,15 +1102,15 @@ class TPartition : public TActorBootstrapped<TPartition> {
size_t WriteNewSizeFromSupportivePartitions = 0;

bool TryAddDeleteHeadKeysToPersistRequest();
void DumpKeyValueRequest(const NKikimrClient::TKeyValueRequest& request);
void DumpKeyValueRequest(const NKikimrClient::TKeyValueRequest& request) const;

TBlobKeyTokenPtr MakeBlobKeyToken(const TString& key);

void DumpTheSizeOfInternalQueues() const;

TIntrusivePtr<NJaegerTracing::TSamplingThrottlingControl> SamplingControl;
TDeque<NWilson::TTraceId> TxForPersistTraceIds;
TDeque<NWilson::TSpan> TxForPersistSpans;
bool CanProcessUserActionAndTransactionEvents() const;
};

} // namespace NKikimr::NPQ
33 changes: 19 additions & 14 deletions ydb/core/persqueue/partition_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -568,10 +568,6 @@ void TPartition::HandleWriteResponse(const TActorContext& ctx) {
TxSourceIdForPostPersist.clear();
TxInflightMaxSeqNoPerSourceId.clear();

TxAffectedSourcesIds.clear();
WriteAffectedSourcesIds.clear();
TxAffectedConsumers.clear();
SetOffsetAffectedConsumers.clear();
if (UserActionAndTransactionEvents.empty()) {
WriteInfosToTx.clear();
}
Expand Down Expand Up @@ -981,7 +977,9 @@ void TPartition::CancelOneWriteOnWrite(const TActorContext& ctx,
StartProcessChangeOwnerRequests(ctx);
}

TPartition::EProcessResult TPartition::PreProcessRequest(TRegisterMessageGroupMsg& msg) {
TPartition::EProcessResult TPartition::PreProcessRequest(TRegisterMessageGroupMsg& msg,
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers)
{
if (!CanWrite()) {
ScheduleReplyError(msg.Cookie, false, InactivePartitionErrorCode,
TStringBuilder() << "Write to inactive partition " << Partition.OriginalPartitionId);
Expand All @@ -996,7 +994,7 @@ TPartition::EProcessResult TPartition::PreProcessRequest(TRegisterMessageGroupMs
if (TxAffectedSourcesIds.contains(msg.Body.SourceId)) {
return EProcessResult::Blocked;
}
WriteAffectedSourcesIds.insert(msg.Body.SourceId);
affectedSourceIdsAndConsumers.WriteSourcesIds.push_back(msg.Body.SourceId);
return EProcessResult::Continue;
}

Expand All @@ -1013,7 +1011,9 @@ void TPartition::ExecRequest(TRegisterMessageGroupMsg& msg, ProcessParameters& p
parameters.SourceIdBatch.RegisterSourceId(body.SourceId, body.SeqNo, parameters.CurOffset, CurrentTimestamp, std::move(keyRange));
}

TPartition::EProcessResult TPartition::PreProcessRequest(TDeregisterMessageGroupMsg& msg) {
TPartition::EProcessResult TPartition::PreProcessRequest(TDeregisterMessageGroupMsg& msg,
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers)
{
if (!CanWrite()) {
ScheduleReplyError(msg.Cookie, false, InactivePartitionErrorCode,
TStringBuilder() << "Write to inactive partition " << Partition.OriginalPartitionId);
Expand All @@ -1027,7 +1027,7 @@ TPartition::EProcessResult TPartition::PreProcessRequest(TDeregisterMessageGroup
if (TxAffectedSourcesIds.contains(msg.Body.SourceId)) {
return EProcessResult::Blocked;
}
WriteAffectedSourcesIds.insert(msg.Body.SourceId);
affectedSourceIdsAndConsumers.WriteSourcesIds.push_back(msg.Body.SourceId);
return EProcessResult::Continue;
}

Expand All @@ -1036,7 +1036,9 @@ void TPartition::ExecRequest(TDeregisterMessageGroupMsg& msg, ProcessParameters&
}


TPartition::EProcessResult TPartition::PreProcessRequest(TSplitMessageGroupMsg& msg) {
TPartition::EProcessResult TPartition::PreProcessRequest(TSplitMessageGroupMsg& msg,
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers)
{
if (!CanWrite()) {
ScheduleReplyError(msg.Cookie, false, InactivePartitionErrorCode,
TStringBuilder() << "Write to inactive partition " << Partition.OriginalPartitionId);
Expand All @@ -1052,16 +1054,16 @@ TPartition::EProcessResult TPartition::PreProcessRequest(TSplitMessageGroupMsg&
if (TxAffectedSourcesIds.contains(body.SourceId)) {
return EProcessResult::Blocked;
}
WriteAffectedSourcesIds.insert(body.SourceId);
affectedSourceIdsAndConsumers.WriteSourcesIds.push_back(body.SourceId);
}
for (auto& body : msg.Deregistrations) {
if (TxAffectedSourcesIds.contains(body.SourceId)) {
return EProcessResult::Blocked;
}
WriteAffectedSourcesIds.insert(body.SourceId);
affectedSourceIdsAndConsumers.WriteSourcesIds.push_back(body.SourceId);
}
return EProcessResult::Continue;

return EProcessResult::Continue;
}


Expand All @@ -1081,7 +1083,9 @@ void TPartition::ExecRequest(TSplitMessageGroupMsg& msg, ProcessParameters& para
}
}

TPartition::EProcessResult TPartition::PreProcessRequest(TWriteMsg& p) {
TPartition::EProcessResult TPartition::PreProcessRequest(TWriteMsg& p,
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers)
{
if (!CanWrite()) {
WriteInflightSize -= p.Msg.Data.size();
ScheduleReplyError(p.Cookie, false, InactivePartitionErrorCode,
Expand All @@ -1106,7 +1110,8 @@ TPartition::EProcessResult TPartition::PreProcessRequest(TWriteMsg& p) {
return EProcessResult::Blocked;
}
}
WriteAffectedSourcesIds.insert(p.Msg.SourceId);
affectedSourceIdsAndConsumers.WriteSourcesIds.push_back(p.Msg.SourceId);

return EProcessResult::Continue;
}

Expand Down
Loading
Loading