Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 14 additions & 10 deletions ydb/core/blob_depot/assimilator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -297,9 +297,10 @@ namespace NKikimr::NBlobDepot {
return;
}

THPTimer timer;
const ui64 endTime = GetCycleCountFast() + DurationToCycles(TDuration::MilliSeconds(10));
ui32 numItems = 0;
bool timeout = false;
bool invalidate = false;

if (!LastPlanScannedKey) {
++Self->AsStats.CopyIteration;
Expand All @@ -311,22 +312,25 @@ namespace NKikimr::NBlobDepot {
LastPlanScannedKey ? TData::TKey(*LastPlanScannedKey) : TData::TKey::Min(),
TData::TKey::Max(),
};

Self->Data->ScanRange(range, nullptr, nullptr, [&](const TData::TKey& key, const TData::TValue& value) {
if (++numItems == 1000) {
numItems = 0;
if (TDuration::Seconds(timer.Passed()) >= TDuration::MilliSeconds(1)) {
timeout = true;
return false;
}
}
if (value.GoingToAssimilate) {
Self->AsStats.BytesToCopy += key.GetBlobId().BlobSize();
Self->JsonHandler.Invalidate();
invalidate = true;
}
LastPlanScannedKey.emplace(key.GetBlobId());
return true;
if (++numItems % 1024 == 0 && endTime <= GetCycleCountFast()) {
timeout = true;
return false;
} else {
return true;
}
});

if (invalidate) {
Self->JsonHandler.Invalidate();
}

if (timeout) {
ResumeScanDataForPlanningInFlight = true;
TActivationContext::Send(new IEventHandle(TEvPrivate::EvResumeScanDataForPlanning, 0, SelfId(), {}, nullptr, 0));
Expand Down
73 changes: 27 additions & 46 deletions ydb/core/blob_depot/coro_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@

namespace NKikimr::NBlobDepot {

thread_local TCoroTx *TCoroTx::Current = nullptr;

enum class EOutcome {
UNSET,
FINISH_TX,
Expand All @@ -27,21 +25,24 @@ namespace NKikimr::NBlobDepot {
return size;
}

class TCoroTx::TContext : public ITrampoLine {
class TCoroTx::TContext
: public ITrampoLine
, public TContextBase
{
TMappedAllocation Stack;
TExceptionSafeContext Context;
TExceptionSafeContext *BackContext = nullptr;

EOutcome Outcome = EOutcome::UNSET;

TTokens Tokens;
std::function<void()> Body;
std::function<void(TContextBase&)> Body;

bool Finished = false;
bool Aborted = false;

public:
TContext(TTokens&& tokens, std::function<void()>&& body)
TContext(TTokens&& tokens, std::function<void(TContextBase&)>&& body)
: Stack(AlignStackSize(65536))
, Context({this, TArrayRef(Stack.Begin(), Stack.End())})
, Tokens(std::move(tokens))
Expand All @@ -55,21 +56,26 @@ namespace NKikimr::NBlobDepot {
~TContext() {
if (!Finished) {
Aborted = true;
Resume();
Resume(nullptr);
}
}

EOutcome Resume() {
EOutcome Resume(NTabletFlatExecutor::TTransactionContext *txc) {
Outcome = EOutcome::UNSET;

NTabletFlatExecutor::TTransactionContext *prevTxC = std::exchange(TxContext, txc);
Y_ABORT_UNLESS(!prevTxC);

TExceptionSafeContext returnContext;
Y_ABORT_UNLESS(!BackContext);
BackContext = &returnContext;
Y_DEBUG_ABORT_UNLESS(CurrentTx() || Aborted);
returnContext.SwitchTo(&Context);
Y_ABORT_UNLESS(BackContext == &returnContext);
BackContext = nullptr;

prevTxC = std::exchange(TxContext, nullptr);
Y_ABORT_UNLESS(prevTxC == txc);

Y_ABORT_UNLESS(Outcome != EOutcome::UNSET);
return Outcome;
}
Expand Down Expand Up @@ -100,7 +106,7 @@ namespace NKikimr::NBlobDepot {
void DoRun() override {
if (!IsExpired()) {
try {
Body();
Body(*this);
} catch (const TExDead&) {
// just do nothing
}
Expand All @@ -110,7 +116,7 @@ namespace NKikimr::NBlobDepot {
}
};

TCoroTx::TCoroTx(TBlobDepot *self, TTokens&& tokens, std::function<void()> body)
TCoroTx::TCoroTx(TBlobDepot *self, TTokens&& tokens, std::function<void(TCoroTx::TContextBase&)> body)
: TTransactionBase(self)
, Context(std::make_unique<TContext>(std::move(tokens), std::move(body)))
{}
Expand All @@ -124,18 +130,8 @@ namespace NKikimr::NBlobDepot {
{}

bool TCoroTx::Execute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext&) {
// prepare environment
Y_ABORT_UNLESS(TxContext == nullptr && Current == nullptr);
TxContext = &txc;
Current = this;

Y_ABORT_UNLESS(Context);
const EOutcome outcome = Context->Resume();

// clear environment back
Y_ABORT_UNLESS(TxContext == &txc && Current == this);
TxContext = nullptr;
Current = nullptr;
const EOutcome outcome = Context->Resume(&txc);

switch (outcome) {
case EOutcome::FINISH_TX:
Expand All @@ -150,16 +146,8 @@ namespace NKikimr::NBlobDepot {
}

void TCoroTx::Complete(const TActorContext&) {
// prepare environment
Y_ABORT_UNLESS(TxContext == nullptr && Current == nullptr);
Current = this;

Y_ABORT_UNLESS(Context);
const EOutcome outcome = Context->Resume();

// clear environment back
Y_ABORT_UNLESS(TxContext == nullptr && Current == this);
Current = nullptr;
const EOutcome outcome = Context->Resume(nullptr);

switch (outcome) {
case EOutcome::RUN_SUCCESSOR_TX:
Expand All @@ -174,28 +162,21 @@ namespace NKikimr::NBlobDepot {
}
}

TCoroTx *TCoroTx::CurrentTx() {
return Current;
}

NTabletFlatExecutor::TTransactionContext *TCoroTx::GetTxc() {
Y_ABORT_UNLESS(Current->TxContext);
return Current->TxContext;
NTabletFlatExecutor::TTransactionContext *TCoroTx::TContextBase::GetTxc() {
Y_ABORT_UNLESS(TxContext);
return TxContext;
}

void TCoroTx::FinishTx() {
Y_ABORT_UNLESS(Current);
Current->Context->Return(EOutcome::FINISH_TX);
void TCoroTx::TContextBase::FinishTx() {
static_cast<TContext*>(this)->Return(EOutcome::FINISH_TX);
}

void TCoroTx::RestartTx() {
Y_ABORT_UNLESS(Current);
Current->Context->Return(EOutcome::RESTART_TX);
void TCoroTx::TContextBase::RestartTx() {
static_cast<TContext*>(this)->Return(EOutcome::RESTART_TX);
}

void TCoroTx::RunSuccessorTx() {
Y_ABORT_UNLESS(Current);
Current->Context->Return(EOutcome::RUN_SUCCESSOR_TX);
void TCoroTx::TContextBase::RunSuccessorTx() {
static_cast<TContext*>(this)->Return(EOutcome::RUN_SUCCESSOR_TX);
}

} // NKikimr::NBlobDepot
25 changes: 15 additions & 10 deletions ydb/core/blob_depot/coro_tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,31 @@ namespace NKikimr::NBlobDepot {
using TTokens = std::vector<std::weak_ptr<TToken>>;

class TCoroTx : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
public:
class TContextBase {
protected:
TTransactionContext *TxContext = nullptr;

public:
NTabletFlatExecutor::TTransactionContext *GetTxc();
void FinishTx(); // finish this transaction; function returns on Complete() entry
void RestartTx(); // restart transaction; function returns on next Execute() entry
void RunSuccessorTx(); // restart in new transaction -- called after FinishTx()
NTabletFlatExecutor::TTransactionContext& operator *() { return *GetTxc(); }
};

private:
class TContext;
std::unique_ptr<TContext> Context;
TTransactionContext *TxContext = nullptr;
static thread_local TCoroTx *Current;

public:
TCoroTx(TBlobDepot *self, TTokens&& tokens, std::function<void()> body);
TCoroTx(TBlobDepot *self, TTokens&& tokens, std::function<void(TContextBase&)> body);
TCoroTx(TCoroTx& predecessor);
~TCoroTx();

private:
bool Execute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext&) override;
void Complete(const TActorContext&) override;

public:
static NTabletFlatExecutor::TTransactionContext *GetTxc();
static TCoroTx *CurrentTx(); // obtain pointer to current tx
static void FinishTx(); // finish this transaction; function returns on Complete() entry
static void RestartTx(); // restart transaction; function returns on next Execute() entry
static void RunSuccessorTx(); // restart in new transaction -- called after FinishTx()
};

} // NKikimr::NBlobDepot
14 changes: 11 additions & 3 deletions ydb/core/blob_depot/data.h
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,14 @@ namespace NKikimr::NBlobDepot {
ui64 TotalStoredTrashSize = 0;
ui64 InFlightTrashSize = 0;

ui64 LoadRestartTx = 0;
ui64 LoadRunSuccessorTx = 0;
ui64 LoadProcessingCycles = 0;
ui64 LoadFinishTxCycles = 0;
ui64 LoadRestartTxCycles = 0;
ui64 LoadRunSuccessorTxCycles = 0;
ui64 LoadTotalCycles = 0;

friend class TGroupAssimilator;

THashMultiMap<void*, TLogoBlobID> InFlightTrash; // being committed, but not yet confirmed
Expand Down Expand Up @@ -709,9 +717,9 @@ namespace NKikimr::NBlobDepot {

////////////////////////////////////////////////////////////////////////////////////////////////////////////////

ui64 GetTotalStoredDataSize() const {
return TotalStoredDataSize;
}
ui64 GetTotalStoredDataSize() const { return TotalStoredDataSize; }
ui64 GetTotalStoredTrashSize() const { return TotalStoredTrashSize; }
ui64 GetInFlightTrashSize() const { return InFlightTrashSize; }

void RenderMainPage(IOutputStream& s);

Expand Down
Loading
Loading