Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 35 additions & 36 deletions ydb/apps/dstool/lib/dstool_cmd_cluster_workload_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,8 @@ def add_options(p):
p.add_argument('--enable-hard-switch-piles', action='store_true', help='Enable hard switch pile with setting PRIMARY')
p.add_argument('--enable-disconnect-piles', action='store_true', help='Enable disconnect pile')
p.add_argument('--fixed-pile-for-disconnect', type=int, help='Pile to disconnect')


def fetch_start_time_map(base_config):
start_time_map = {}
for node_id in {pdisk.NodeId for pdisk in base_config.PDisk}:
r = common.fetch_json_info('sysinfo', [node_id])
if len(r) != 1:
return None
k, v = r.popitem()
assert k == node_id
if 'StartTime' not in v:
return None
start_time_map[node_id] = int(v['StartTime'])
return start_time_map
p.add_argument('--weight-restarts', type=float, default=1.0, help='weight for restart action')
p.add_argument('--weight-kill-tablets', type=float, default=1.0, help='weight for kill tablets action')


def make_pdisk_key_config(pdisk_keys, node_id):
Expand Down Expand Up @@ -97,8 +85,6 @@ def do(args):
base_config = common.fetch_base_config()
vslot_map = common.build_vslot_map(base_config)
node_fqdn_map = common.build_node_fqdn_map(base_config)
if args.enable_pdisk_encryption_keys_changes or not args.disable_restarts:
start_time_map = fetch_start_time_map(base_config)
except Exception:
if config_retries is None:
config_retries = 3
Expand All @@ -108,7 +94,21 @@ def do(args):
config_retries -= 1
continue

tablets = common.fetch_json_info('tabletinfo') if args.enable_kill_tablets or args.enable_kill_blob_depot else {}
if args.enable_kill_tablets or args.enable_kill_blob_depot:
tablets = {
int(tablet['TabletId']) : tablet
for tablet in common.fetch('viewer/json/tabletinfo', dict(enums=1)).get('TabletStateInfo', [])
}
else:
tablets = {}
sysinfo = {
int(node['NodeId']): node
for node in common.fetch('viewer/json/sysinfo', dict(fields_required=-1, enums=1), cache=False).get('SystemStateInfo', [])
}
start_time_map = {
int(node['NodeId']): int(node['StartTime'])
for node in sysinfo.values()
}

config_retries = None

Expand Down Expand Up @@ -187,11 +187,10 @@ def match(x):
return True

def do_restart(node_id):
host = node_fqdn_map[node_id]
node = sysinfo[node_id]
if args.enable_pdisk_encryption_keys_changes:
update_pdisk_key_config(node_fqdn_map, pdisk_keys, node_id)
subprocess.call(['ssh', host, 'sudo', 'killall', '-%s' % args.kill_signal, 'kikimr'])
subprocess.call(['ssh', host, 'sudo', 'killall', '-%s' % args.kill_signal, 'ydbd'])
subprocess.call(['ssh', node['Host'], 'sudo', 'kill', '-%s' % args.kill_signal, node['PID']])
if args.enable_pdisk_encryption_keys_changes:
remove_old_pdisk_keys(pdisk_keys, pdisk_key_versions, node_id)

Expand Down Expand Up @@ -316,9 +315,9 @@ def do_connect_pile(pile_id, pile_id_to_hosts):
possible_actions = []

if args.enable_kill_tablets:
possible_actions.append(('kill tablet', (do_kill_tablet,)))
possible_actions.append((args.weight_kill_tablets, 'kill tablet', (do_kill_tablet,)))
if args.enable_kill_blob_depot:
possible_actions.append(('kill blob depot', (do_kill_blob_depot,)))
possible_actions.append((1.0, 'kill blob depot', (do_kill_blob_depot,)))

evicts = []
wipes = []
Expand Down Expand Up @@ -361,19 +360,19 @@ def pick(v):
action[0](*action[1:])

if evicts:
possible_actions.append(('evict', (pick, evicts)))
possible_actions.append((1.0, 'evict', (pick, evicts)))
if wipes:
possible_actions.append(('wipe', (pick, wipes)))
possible_actions.append((1.0, 'wipe', (pick, wipes)))
if readonlies:
possible_actions.append(('readonly', (pick, readonlies)))
possible_actions.append((1.0, 'readonly', (pick, readonlies)))
if unreadonlies:
possible_actions.append(('un-readonly', (pick, unreadonlies)))
possible_actions.append((1.0, 'un-readonly', (pick, unreadonlies)))
if pdisk_restarts:
possible_actions.append(('restart-pdisk', (pick, pdisk_restarts)))
possible_actions.append((1.0, 'restart-pdisk', (pick, pdisk_restarts)))
if make_pdisks_readonly:
possible_actions.append(('make-pdisks-readonly', (pick, make_pdisks_readonly)))
possible_actions.append((1.0, 'make-pdisks-readonly', (pick, make_pdisks_readonly)))
if make_pdisks_not_readonly:
possible_actions.append(('make-pdisks-not-readonly', (pick, make_pdisks_not_readonly)))
possible_actions.append((1.0, 'make-pdisks-not-readonly', (pick, make_pdisks_not_readonly)))

restarts = []

Expand All @@ -385,12 +384,12 @@ def pick(v):
nodes_to_restart = nodes_to_restart[:node_count//2]
for node_id in nodes_to_restart:
if args.enable_pdisk_encryption_keys_changes:
possible_actions.append(('add new pdisk key to node with id: %d' % node_id, (do_add_pdisk_key, node_id)))
possible_actions.append((1.0, 'add new pdisk key to node with id: %d' % node_id, (do_add_pdisk_key, node_id)))
if not args.disable_restarts:
restarts.append(('restart node with id: %d' % node_id, (do_restart, node_id)))

if restarts:
possible_actions.append(('restart', (pick, restarts)))
possible_actions.append((args.weight_restarts, 'restart', (pick, restarts)))

has_pile_operations = args.enable_soft_switch_piles or args.enable_hard_switch_piles or args.enable_disconnect_piles
if has_pile_operations:
Expand Down Expand Up @@ -418,14 +417,14 @@ def pick(v):
can_hard_switch = (len(synchronized_piles) + len(promoted_piles) > 0)

if args.enable_soft_switch_piles and can_soft_switch:
possible_actions.append(('soft-switch-pile', (do_soft_switch_pile, random.choice(synchronized_piles))))
possible_actions.append((1.0, 'soft-switch-pile', (do_soft_switch_pile, random.choice(synchronized_piles))))
if args.enable_hard_switch_piles and can_hard_switch:
possible_actions.append(('hard-switch-pile', (do_hard_switch_pile, random.choice(promoted_piles + synchronized_piles), [primary_pile] + promoted_piles + synchronized_piles)))
possible_actions.append((1.0, 'hard-switch-pile', (do_hard_switch_pile, random.choice(promoted_piles + synchronized_piles), [primary_pile] + promoted_piles + synchronized_piles)))
if len(disconnected_piles) > 0:
possible_actions.append(('connect-pile', (do_connect_pile, random.choice(disconnected_piles), pile_id_to_endpoints)))
possible_actions.append((1.0, 'connect-pile', (do_connect_pile, random.choice(disconnected_piles), pile_id_to_endpoints)))
if args.enable_disconnect_piles and len(synchronized_piles) > 0:
pile_to_disconnect = args.fixed_pile_for_disconnect if args.fixed_pile_for_disconnect is not None else random.choice([primary_pile] + synchronized_piles)
possible_actions.append(('disconnect-pile', (do_disconnect_pile, pile_to_disconnect, pile_id_to_endpoints)))
possible_actions.append((1.0, 'disconnect-pile', (do_disconnect_pile, pile_to_disconnect, pile_id_to_endpoints)))

if not possible_actions:
common.print_if_not_quiet(args, 'Waiting for the next round...', file=sys.stdout)
Expand All @@ -434,7 +433,7 @@ def pick(v):

################################################################################################################

action_name, action = random.choice(possible_actions)
(_, action_name, action), = random.choices(possible_actions, weights=[w for w, _, _ in possible_actions])
print('%s %s' % (action_name, datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%S')))

try:
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/base/blobstorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -1241,6 +1241,7 @@ struct TEvBlobStorage {
bool ReportDetailedPartMap = false;
bool PhantomCheck = false;
bool Decommission = false; // is it generated by decommission actor and should be handled by the underlying proxy?
bool DoNotReportIndexRestoreGetMissingBlobs = false;

struct TTabletData {
TTabletData() = default;
Expand Down Expand Up @@ -2604,6 +2605,7 @@ struct TEvBlobStorage {
: SkipBlocksUpTo(origin.SkipBlocksUpTo)
, SkipBarriersUpTo(origin.SkipBarriersUpTo)
, SkipBlobsUpTo(origin.SkipBlobsUpTo)
, IgnoreDecommitState(origin.IgnoreDecommitState)
, Reverse(origin.Reverse)
{}

Expand Down
42 changes: 15 additions & 27 deletions ydb/core/blobstorage/bridge/proxy/bridge_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,40 +199,27 @@ namespace NKikimr {
return CreateWithErrorReason(ev, ev->Status, ev->Id, StatusFlags, self.GroupId, ApproximateFreeSpaceShare);
}

std::unique_ptr<IEventBase> Combine(TThis& /*self*/, TEvBlobStorage::TEvGetBlockResult *ev, auto *current) {
Y_ABORT_UNLESS(!current || current->TabletId == ev->TabletId);
return CreateWithErrorReason(ev, ev->Status, ev->TabletId,
Max(current ? current->BlockedGeneration : 0, ev->BlockedGeneration));
}

template<typename TEvent>
std::unique_ptr<IEventBase> ProcessFullQuorumResponse(TThis& self, std::unique_ptr<TEvent> ev) {
// combine responses
if (CombinedResponse) {
Y_DEBUG_ABORT_UNLESS(dynamic_cast<TEvent*>(CombinedResponse.get()));
}
Y_DEBUG_ABORT_UNLESS(!CombinedResponse || dynamic_cast<TEvent*>(CombinedResponse.get()));
Y_DEBUG_ABORT_UNLESS(ev->Status != NKikimrProto::NODATA);
const bool readyToReply = !ResponsesPending || ev->Status != NKikimrProto::OK;
CombinedResponse = Combine(self, ev.get(), static_cast<TEvent*>(CombinedResponse.get()));
const bool readyToReply = !ResponsesPending ||
(ev->Status != NKikimrProto::OK && ev->Status != NKikimrProto::NODATA);
return readyToReply
? std::exchange(CombinedResponse, nullptr)
: nullptr;
}

template<typename TEvent>
std::unique_ptr<IEventBase> ProcessPrimaryPileResponse(TThis& self, std::unique_ptr<TEvent> ev,
const TBridgeInfo::TPile& pile) {
if (CombinedResponse) {
Y_DEBUG_ABORT_UNLESS(dynamic_cast<TEvent*>(CombinedResponse.get()));
}
if (ev->Status != NKikimrProto::OK && ev->Status != NKikimrProto::NODATA) {
// if any pile reports error, we finish with error
return MakeErrorFrom(self, ev.get());
}
if (pile.IsPrimary) {
return ev;
}
Y_ABORT_UNLESS(ResponsesPending);
return nullptr;
return readyToReply ? std::move(CombinedResponse) : nullptr;
}

std::unique_ptr<IEventBase> ProcessResponse(TThis& self, std::unique_ptr<TEvBlobStorage::TEvPutResult> ev,
const TBridgeInfo::TPile& /*pile*/, TRequestPayload& payload) {
if (IsRestoring) {
Y_ABORT_UNLESS(!std::holds_alternative<TPutState>(State));

if (ev->Status != NKikimrProto::OK) { // can't restore this blob
return std::visit(TOverloaded{
[&](TGetState& state) -> std::unique_ptr<IEventBase> {
Expand Down Expand Up @@ -463,8 +450,8 @@ namespace NKikimr {
}

std::unique_ptr<IEventBase> ProcessResponse(TThis& self, std::unique_ptr<TEvBlobStorage::TEvGetBlockResult> ev,
const TBridgeInfo::TPile& pile, TRequestPayload& /*payload*/) {
return ProcessPrimaryPileResponse(self, std::move(ev), pile);
const TBridgeInfo::TPile& /*pile*/, TRequestPayload& /*payload*/) {
return ProcessFullQuorumResponse(self, std::move(ev));
}

std::unique_ptr<IEventBase> ProcessResponse(TThis& self, std::unique_ptr<TEvBlobStorage::TEvDiscoverResult> ev,
Expand Down Expand Up @@ -876,6 +863,7 @@ namespace NKikimr {

STLOG(success ? PRI_INFO : PRI_NOTICE, BS_PROXY_BRIDGE, BPB01, "request finished",
(RequestId, request->RequestId),
(Status, common->Status),
(Response, response->ToString()),
(Passed, TDuration::Seconds(request->Timer.Passed())),
(SubrequestTimings, makeSubrequestTimings()));
Expand Down
6 changes: 4 additions & 2 deletions ydb/core/blobstorage/bridge/syncer/syncer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,10 @@ namespace NKikimr::NBridge {
for (size_t i = 0; i < numBlobs; ++i) {
q[i].Set(*jt++);
}
IssueQuery(true, std::make_unique<TEvBlobStorage::TEvGet>(q, numBlobs, TInstant::Max(),
NKikimrBlobStorage::FastRead, true, true));
auto ev = std::make_unique<TEvBlobStorage::TEvGet>(q, numBlobs, TInstant::Max(),
NKikimrBlobStorage::FastRead, true, true);
ev->DoNotReportIndexRestoreGetMissingBlobs = true; // they may be missing, do not report errors in this case
IssueQuery(true, std::move(ev));

RestoreQueue.erase(RestoreQueue.begin(), it);
}
Expand Down
13 changes: 9 additions & 4 deletions ydb/core/blobstorage/dsproxy/dsproxy_indexrestoreget.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class TBlobStorageGroupIndexRestoreGetRequest : public TBlobStorageGroupRequestA
const TInstant Deadline;
const bool IsInternal;
const bool Decommission;
const bool DoNotReportIndexRestoreGetMissingBlobs;
const std::optional<TEvBlobStorage::TEvGet::TForceBlockTabletData> ForceBlockTabletData;

THashMap<ui64, TGroupQuorumTracker> QuorumTracker;
Expand Down Expand Up @@ -210,10 +211,12 @@ class TBlobStorageGroupIndexRestoreGetRequest : public TBlobStorageGroupRequestA
for (ui32 i = 0; i < getResult.ResponseSz; ++i) {
TEvBlobStorage::TEvGetResult::TResponse &response = getResult.Responses[i];
if (response.Status != NKikimrProto::OK) {
DSP_LOG_ERROR_S("DSPI07", "Handle TEvGetResult status# " << NKikimrProto::EReplyStatus_Name(status)
<< " Response[" << i << "]# " << NKikimrProto::EReplyStatus_Name(response.Status)
<< " for tablet# " << TabletId
<< " BlobStatus# " << DumpBlobStatus());
if (!DoNotReportIndexRestoreGetMissingBlobs) {
DSP_LOG_ERROR_S("DSPI07", "Handle TEvGetResult status# " << NKikimrProto::EReplyStatus_Name(status)
<< " Response[" << i << "]# " << NKikimrProto::EReplyStatus_Name(response.Status)
<< " for tablet# " << TabletId
<< " BlobStatus# " << DumpBlobStatus());
}
SetPendingResultResponseStatus(response.Id, response.Status);
}
}
Expand Down Expand Up @@ -245,6 +248,7 @@ class TBlobStorageGroupIndexRestoreGetRequest : public TBlobStorageGroupRequestA
true /*mustRestoreFirst*/, true /*isIndexOnly*/, std::nullopt /*forceBlockTabletData*/, IsInternal);
ev->RestartCounter = counter;
ev->Decommission = Decommission;
ev->DoNotReportIndexRestoreGetMissingBlobs = DoNotReportIndexRestoreGetMissingBlobs;
return ev;
}

Expand All @@ -264,6 +268,7 @@ class TBlobStorageGroupIndexRestoreGetRequest : public TBlobStorageGroupRequestA
, Deadline(params.Common.Event->Deadline)
, IsInternal(params.Common.Event->IsInternal)
, Decommission(params.Common.Event->Decommission)
, DoNotReportIndexRestoreGetMissingBlobs(params.Common.Event->DoNotReportIndexRestoreGetMissingBlobs)
, ForceBlockTabletData(params.Common.Event->ForceBlockTabletData)
, VGetsInFlight(0)
, GetHandleClass(params.Common.Event->GetHandleClass)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ namespace NKikimr {

// only issue command if there is a progress in FreeUpToLsn queue
bool progress = false;
for (; FreeUpToLsn && FreeUpToLsn.front() < curLsn; FreeUpToLsn.pop_front()) {
for (; FreeUpToLsn && FreeUpToLsn.front() <= curLsn; FreeUpToLsn.pop_front()) {
progress = true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ namespace NKikimr {

bool THullHugeKeeperPersState::WouldNewEntryPointAdvanceLog(ui64 freeUpToLsn, ui64 minInFlightLsn,
ui32 itemsAfterCommit) const {
return freeUpToLsn < minInFlightLsn && (PersistentLsn <= freeUpToLsn || itemsAfterCommit > 10000);
return freeUpToLsn <= minInFlightLsn && (!PersistentLsn || PersistentLsn < freeUpToLsn || itemsAfterCommit > 10000);
}

// initiate commit
Expand Down
14 changes: 12 additions & 2 deletions ydb/core/blobstorage/vdisk/query/assimilation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ namespace NKikimr {
size_t RecordSize;
size_t RecordItems = 0;

TIntrusivePtr<TBarriersSnapshot::TBarriersEssence> BarriersEssence;

static constexpr TDuration MaxQuantumTime = TDuration::MilliSeconds(10);
static constexpr size_t MaxResultSize = 5'000'000; // may overshoot a little
static constexpr size_t MaxResultItems = 10'000;
Expand Down Expand Up @@ -139,7 +141,7 @@ namespace NKikimr {
RecordSize += len;
++RecordItems;

return iter.Next();
iter.Next();
}

template<bool IsForward>
Expand Down Expand Up @@ -192,6 +194,14 @@ namespace NKikimr {
void Process(TBlobIter<IsForward>& iter) {
const TKeyLogoBlob& key = iter;
const TMemRecLogoBlob& memRec = iter;

if (!BarriersEssence) {
BarriersEssence = Snap.BarriersSnap.CreateEssence(Snap.HullCtx);
}
if (auto keep = BarriersEssence->Keep(key, memRec, {}, Snap.HullCtx->AllowKeepFlags, true); !keep.KeepData) {
return iter.Next(); // we are not interested in this blob -- it is going to be collected
}

auto *pb = Result->Record.AddBlobs();

const TLogoBlobID id(key.LogoBlobID());
Expand Down Expand Up @@ -227,7 +237,7 @@ namespace NKikimr {
RecordSize += len;
++RecordItems;

return iter.Next();
iter.Next();
}

template<typename TKey, typename TMemRec, bool IsForward>
Expand Down
Loading
Loading