Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions ydb/core/persqueue/events/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -861,18 +861,21 @@ struct TEvPQ {
};

struct TEvTxCalcPredicateResult : public TEventLocal<TEvTxCalcPredicateResult, EvTxCalcPredicateResult> {
TEvTxCalcPredicateResult(ui64 step, ui64 txId, const NPQ::TPartitionId& partition, TMaybe<bool> predicate) :
TEvTxCalcPredicateResult(ui64 step, ui64 txId, const NPQ::TPartitionId& partition, TMaybe<bool> predicate,
const TString& issueMsg) :
Step(step),
TxId(txId),
Partition(partition),
Predicate(predicate)
Predicate(predicate),
IssueMsg(issueMsg)
{
}

ui64 Step;
ui64 TxId;
NPQ::TPartitionId Partition;
TMaybe<bool> Predicate;
TString IssueMsg;
};

struct TEvProposePartitionConfig : public TEventLocal<TEvProposePartitionConfig, EvProposePartitionConfig> {
Expand Down
512 changes: 336 additions & 176 deletions ydb/core/persqueue/partition.cpp

Large diffs are not rendered by default.

125 changes: 80 additions & 45 deletions ydb/core/persqueue/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,14 @@ class TPartitionCompaction;

struct TTransaction {

explicit TTransaction(TSimpleSharedPtr<TEvPQ::TEvTxCalcPredicate> tx,
TMaybe<bool> predicate = Nothing())
TTransaction(TSimpleSharedPtr<TEvPQ::TEvTxCalcPredicate> tx,
TInstant calcPredicateTimestamp,
TMaybe<bool> predicate = Nothing())
: Tx(tx)
, Predicate(predicate)
, SupportivePartitionActor(tx->SupportivePartitionActor)
, CalcPredicateSpan(std::move(tx->Span))
, CalcPredicateTimestamp(calcPredicateTimestamp)
{
Y_ABORT_UNLESS(Tx);
}
Expand Down Expand Up @@ -128,6 +130,7 @@ struct TTransaction {
NWilson::TSpan CommitSpan;

TInstant WriteInfoResponseTimestamp;
TInstant CalcPredicateTimestamp;
};
class TPartitionCompaction;

Expand Down Expand Up @@ -333,11 +336,31 @@ class TPartition : public TActorBootstrapped<TPartition> {
TAutoPtr<TEvPersQueue::TEvHasDataInfoResponse> MakeHasDataInfoResponse(ui64 lagSize, const TMaybe<ui64>& cookie, bool readingFinished = false);

void ProcessTxsAndUserActs(const TActorContext& ctx);
void ContinueProcessTxsAndUserActs(const TActorContext& ctx);
void ProcessCommitQueue();
void RunPersist();

void MoveUserActOrTxToCommitState();
enum class EProcessResult;
struct TAffectedSourceIdsAndConsumers;

void ProcessUserActionAndTxEvents();
EProcessResult ProcessUserActionAndTxEvent(TSimpleSharedPtr<TEvPQ::TEvSetClientInfo>& event,
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);
EProcessResult ProcessUserActionAndTxEvent(TSimpleSharedPtr<TTransaction>& tx,
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);
EProcessResult ProcessUserActionAndTxEvent(TMessage& msg,
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);

void MoveUserActionAndTxToPendingCommitQueue();

void ProcessUserActionAndTxPendingCommits();
void ProcessUserActionAndTxPendingCommit(TSimpleSharedPtr<TEvPQ::TEvSetClientInfo>& event,
TEvKeyValue::TEvRequest* request);
void ProcessUserActionAndTxPendingCommit(TSimpleSharedPtr<TTransaction>& tx,
TEvKeyValue::TEvRequest* request);
void ProcessUserActionAndTxPendingCommit(TMessage& msg,
TEvKeyValue::TEvRequest* request);

bool WritingCycleDoesNotExceedTheLimits() const;

void PushBackDistrTx(TSimpleSharedPtr<TEvPQ::TEvTxCalcPredicate> event);
void PushBackDistrTx(TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> event);
void PushFrontDistrTx(TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> event);
Expand Down Expand Up @@ -419,14 +442,13 @@ class TPartition : public TActorBootstrapped<TPartition> {
const TString& reason);
THolder<TEvPQ::TEvTxCommitDone> MakeCommitDone(ui64 step, ui64 txId);

bool BeginTransaction(const TEvPQ::TEvProposePartitionConfig& event);
bool BeginTransactionConfig();

void CommitTransaction(TSimpleSharedPtr<TTransaction>& t);
void RollbackTransaction(TSimpleSharedPtr<TTransaction>& t);


void BeginChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& config,
const TActorContext& ctx);
void BeginChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& config);
void ExecChangePartitionConfig();

void OnProcessTxsAndUserActsWriteComplete(const TActorContext& ctx);
Expand Down Expand Up @@ -717,10 +739,10 @@ class TPartition : public TActorBootstrapped<TPartition> {
TMaybe<i16> KafkaProducerEpoch = 0;
};

THashSet<TString> TxAffectedSourcesIds;
THashSet<TString> WriteAffectedSourcesIds;
THashSet<TString> TxAffectedConsumers;
THashSet<TString> SetOffsetAffectedConsumers;
THashMap<TString, size_t> TxAffectedSourcesIds;
THashMap<TString, size_t> WriteAffectedSourcesIds;
THashMap<TString, size_t> TxAffectedConsumers;
THashMap<TString, size_t> SetOffsetAffectedConsumers;
THashMap<TString, TSourceIdPostPersistInfo> TxSourceIdForPostPersist;
THashMap<TString, TSeqNoProducerEpoch> TxInflightMaxSeqNoPerSourceId;

Expand Down Expand Up @@ -775,44 +797,61 @@ class TPartition : public TActorBootstrapped<TPartition> {

TMaybe<TUsersInfoStorage> UsersInfoStorage;

// template <class T> T& GetUserActionAndTransactionEventsFront();
// template <class T> T& GetCurrentEvent();
//TSimpleSharedPtr<TTransaction>& GetCurrentTransaction();
struct TAffectedSourceIdsAndConsumers {
TVector<TString> TxWriteSourcesIds;
TVector<TString> WriteSourcesIds;
TVector<TString> TxReadConsumers;
TVector<TString> ReadConsumers;
ui32 WriteKeysSize = 0;
ui32 WriteCycleSize = 0;
};

void AppendAffectedSourceIdsAndConsumers(const TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);

EProcessResult PreProcessUserActionOrTransaction(TSimpleSharedPtr<TEvPQ::TEvSetClientInfo>& event);
EProcessResult PreProcessUserActionOrTransaction(TSimpleSharedPtr<TEvPersQueue::TEvProposeTransaction>& event);
EProcessResult PreProcessUserActionOrTransaction(TSimpleSharedPtr<TTransaction>& tx);
EProcessResult PreProcessUserActionOrTransaction(TMessage& msg);
void DeleteAffectedSourceIdsAndConsumers();
void DeleteAffectedSourceIdsAndConsumers(const TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);
void DeleteFromSet(const TVector<TString>& p, THashMap<TString, size_t>& q) const;

bool ExecUserActionOrTransaction(TSimpleSharedPtr<TEvPQ::TEvSetClientInfo>& event, TEvKeyValue::TEvRequest* request);
EProcessResult PreProcessUserActionOrTransaction(TSimpleSharedPtr<TEvPQ::TEvSetClientInfo>& event,
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);
EProcessResult PreProcessUserActionOrTransaction(TSimpleSharedPtr<TTransaction>& tx,
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);
EProcessResult PreProcessUserActionOrTransaction(TMessage& msg,
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);

bool ExecUserActionOrTransaction(TSimpleSharedPtr<TEvPersQueue::TEvProposeTransaction>& event,
TEvKeyValue::TEvRequest* request);
bool ExecUserActionOrTransaction(TSimpleSharedPtr<TEvPQ::TEvSetClientInfo>& events, TEvKeyValue::TEvRequest* request);
bool ExecUserActionOrTransaction(TSimpleSharedPtr<TTransaction>& tx, TEvKeyValue::TEvRequest* request);
bool ExecUserActionOrTransaction(TMessage& msg, TEvKeyValue::TEvRequest* request);

[[nodiscard]] EProcessResult PreProcessUserAct(TEvPQ::TEvSetClientInfo& act, const TActorContext& ctx);
[[nodiscard]] EProcessResult PreProcessUserAct(TEvPQ::TEvSetClientInfo& act,
TAffectedSourceIdsAndConsumers* affectedSourceIdsAndConsumers);
void CommitUserAct(TEvPQ::TEvSetClientInfo& act);


[[nodiscard]] EProcessResult PreProcessImmediateTx(const NKikimrPQ::TEvProposeTransaction& tx);
[[nodiscard]] EProcessResult PreProcessImmediateTx(TTransaction& t,
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);
void ExecImmediateTx(TTransaction& tx);

EProcessResult PreProcessRequest(TRegisterMessageGroupMsg& msg);
EProcessResult PreProcessRequest(TDeregisterMessageGroupMsg& msg);
EProcessResult PreProcessRequest(TSplitMessageGroupMsg& msg);
EProcessResult PreProcessRequest(TWriteMsg& msg);
EProcessResult PreProcessRequest(TRegisterMessageGroupMsg& msg,
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);
EProcessResult PreProcessRequest(TDeregisterMessageGroupMsg& msg,
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);
EProcessResult PreProcessRequest(TSplitMessageGroupMsg& msg,
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);
EProcessResult PreProcessRequest(TWriteMsg& msg,
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);

void ExecRequest(TRegisterMessageGroupMsg& msg, ProcessParameters& parameters);
void ExecRequest(TDeregisterMessageGroupMsg& msg, ProcessParameters& parameters);
void ExecRequest(TSplitMessageGroupMsg& msg, ProcessParameters& parameters);
bool ExecRequest(TWriteMsg& msg, ProcessParameters& parameters, TEvKeyValue::TEvRequest* request);

[[nodiscard]] EProcessResult BeginTransaction(const TEvPQ::TEvTxCalcPredicate& event, TMaybe<bool>& predicate);
[[nodiscard]] EProcessResult BeginTransactionData(TTransaction& t,
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);

EProcessResult ApplyWriteInfoResponse(TTransaction& tx);
EProcessResult ApplyWriteInfoResponse(TTransaction& tx,
TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers);

bool FirstEvent = true;
bool HaveWriteMsg = false;
bool HaveData = false;
bool HaveCheckDisk = false;
Expand All @@ -833,13 +872,12 @@ class TPartition : public TActorBootstrapped<TPartition> {
void BeginAppendHeadWithNewWrites(const TActorContext& ctx);
void EndAppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const TActorContext& ctx);

bool HasPendingCommitsOrPendingWrites() const;

//
// user actions and transactions
//
struct TUserActionAndTransactionEvent {
std::variant<TSimpleSharedPtr<TEvPQ::TEvSetClientInfo>, // user actions
TSimpleSharedPtr<TTransaction>, // distributed transaction or update config
TMessage> Event;
TUserActionAndTransactionEvent(TSimpleSharedPtr<TTransaction>&& transaction)
: Event(std::move(transaction))
{}
Expand All @@ -849,10 +887,16 @@ class TPartition : public TActorBootstrapped<TPartition> {
TUserActionAndTransactionEvent(TMessage&& message)
: Event(std::move(message))
{}

std::variant<TSimpleSharedPtr<TEvPQ::TEvSetClientInfo>, // user actions
TSimpleSharedPtr<TTransaction>, // distributed transaction or update config
TMessage> Event;
TAffectedSourceIdsAndConsumers AffectedSourceIdsAndConsumers;
};

std::deque<TUserActionAndTransactionEvent> UserActionAndTransactionEvents;
std::deque<TUserActionAndTransactionEvent> UserActionAndTxPendingCommit;
std::deque<TUserActionAndTransactionEvent> UserActionAndTxPendingWrite;
TVector<THolder<TEvPQ::TEvGetWriteInfoResponse>> WriteInfosApplied;

THashMap<ui64, TSimpleSharedPtr<TTransaction>> TransactionsInflight;
Expand All @@ -875,15 +919,6 @@ class TPartition : public TActorBootstrapped<TPartition> {
TMessageQueue Responses;
ui64 CurrentBatchSize = 0;

enum class ETxBatchingState{
PreProcessing,
Executing,
Finishing
};
ETxBatchingState BatchingState = ETxBatchingState::PreProcessing;
//
//
//
std::deque<std::pair<TString, ui64>> UpdateUserInfoTimestamp;
bool ReadingTimestamp;
TString ReadingForUser;
Expand Down Expand Up @@ -1067,15 +1102,15 @@ class TPartition : public TActorBootstrapped<TPartition> {
size_t WriteNewSizeFromSupportivePartitions = 0;

bool TryAddDeleteHeadKeysToPersistRequest();
void DumpKeyValueRequest(const NKikimrClient::TKeyValueRequest& request);
void DumpKeyValueRequest(const NKikimrClient::TKeyValueRequest& request) const;

TBlobKeyTokenPtr MakeBlobKeyToken(const TString& key);

void DumpTheSizeOfInternalQueues() const;

TIntrusivePtr<NJaegerTracing::TSamplingThrottlingControl> SamplingControl;
TDeque<NWilson::TTraceId> TxForPersistTraceIds;
TDeque<NWilson::TSpan> TxForPersistSpans;
bool CanProcessUserActionAndTransactionEvents() const;
};

} // namespace NKikimr::NPQ
33 changes: 15 additions & 18 deletions ydb/core/persqueue/partition_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -532,29 +532,26 @@ enum EKeyPosition {
// Calculates the location of keys relative to each other
static EKeyPosition KeyPosition(const TKey& lhs, const TKey& rhs)
{
// Called from FilterBlobsMetaData. The keys are pre-sorted
Y_ABORT_UNLESS(lhs.GetOffset() <= rhs.GetOffset(),
"lhs: %s, rhs: %s",
lhs.ToString().data(), rhs.ToString().data());

if (lhs.GetOffset() == rhs.GetOffset()) {
if (lhs.GetPartNo() == rhs.GetPartNo()) {
Y_ABORT_UNLESS(lhs.GetCount() <= rhs.GetCount(),
"lhs: %s, rhs: %s",
lhs.ToString().data(), rhs.ToString().data());
return RhsContainsLhs;
if (lhs.GetCount() < rhs.GetCount()) {
return RhsContainsLhs;
} else if (lhs.GetCount() == rhs.GetCount()) {
if (lhs.GetInternalPartsCount() < rhs.GetInternalPartsCount()) {
return RhsContainsLhs;
} else {
return LhsContainsRhs;
}
} else {
return LhsContainsRhs;
}
} else if (lhs.GetPartNo() > rhs.GetPartNo()) {
return LhsContainsRhs;
} else {
return RhsAfterLhs;
}

// case lhs.GetOffset() == rhs.GetOffset() && lhs.GetPartNo() < rhs.GetPartNo()
Y_ABORT_UNLESS(lhs.GetPartNo() + lhs.GetInternalPartsCount() <= rhs.GetPartNo(),
"lhs: %s, rhs: %s",
lhs.ToString().data(), rhs.ToString().data());

return RhsAfterLhs;
}

// case lhs.GetOffset() < rhs.GetOffset()

if (ui64 nextOffset = lhs.GetOffset() + lhs.GetCount(); nextOffset > rhs.GetOffset()) {
return LhsContainsRhs;
} else if (nextOffset == rhs.GetOffset()) {
Expand Down
Loading
Loading