Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions ydb/core/blobstorage/nodewarden/blobstorage_node_warden_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@
#include <ydb/core/blobstorage/base/blobstorage_events.h>
#include <ydb/core/blobstorage/pdisk/blobstorage_pdisk_tools.h>
#include <ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_http_request.h>
#include <ydb/core/blobstorage/vdisk/localrecovery/localrecovery_public.h>
#include <ydb/core/blobstorage/vdisk/common/vdisk_events.h>
#include <ydb/core/mind/bscontroller/bsc.h>
#include <ydb/core/util/actorsys_test/testactorsys.h>

#include <ydb/library/pdisk_io/sector_map.h>
#include <ydb/core/testlib/actors/block_events.h>
#include <ydb/core/util/random.h>

#include <google/protobuf/text_format.h>
Expand Down Expand Up @@ -1130,6 +1133,79 @@ Y_UNIT_TEST_SUITE(TBlobStorageWardenTest) {
CheckInferredPDiskSettings(runtime, fakeWhiteboard, fakeNodeWarden,
pdiskId, 12, 2u);
}

void ChangeGroupSizeInUnits(TTestBasicRuntime& runtime, TString poolName, ui32 groupId, ui32 groupSizeInUnits) {
TActorId edge = runtime.AllocateEdgeActor();

auto storagePool = DescribeStoragePool(runtime, poolName);
auto request = std::make_unique<TEvBlobStorage::TEvControllerConfigRequest>();
auto& cmd = *request->Record.MutableRequest()->AddCommand()->MutableChangeGroupSizeInUnits();
cmd.SetBoxId(storagePool.GetBoxId());
cmd.SetItemConfigGeneration(storagePool.GetItemConfigGeneration());
cmd.SetStoragePoolId(storagePool.GetStoragePoolId());
cmd.AddGroupId(groupId);
cmd.SetSizeInUnits(groupSizeInUnits);

NTabletPipe::TClientConfig pipeConfig;
pipeConfig.RetryPolicy = NTabletPipe::TClientRetryPolicy::WithRetries();
runtime.SendToPipe(MakeBSControllerID(), edge, request.release(), 0, pipeConfig);

auto reply = runtime.GrabEdgeEventRethrow<TEvBlobStorage::TEvControllerConfigResponse>(edge);
VERBOSE_COUT("TEvControllerConfigResponse# " << reply->ToString());
UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetResponse().GetSuccess(), true);
}

void CheckVDiskStateUpdate(TTestBasicRuntime& runtime, TActorId fakeWhiteboard, ui32 groupId,
ui32 expectedGroupGeneration, ui32 expectedGroupSizeInUnits,
TDuration simTimeout = TDuration::Seconds(10)) {
TInstant deadline = runtime.GetCurrentTime() + simTimeout;
while (true) {
UNIT_ASSERT_LT(runtime.GetCurrentTime(), deadline);

const auto ev = runtime.GrabEdgeEventRethrow<NNodeWhiteboard::TEvWhiteboard::TEvVDiskStateUpdate>(fakeWhiteboard, deadline - runtime.GetCurrentTime());
VERBOSE_COUT(" Got TEvVDiskStateUpdate# " << ev->ToString());

NKikimrWhiteboard::TVDiskStateInfo vdiskInfo = ev->Get()->Record;
if (vdiskInfo.GetVDiskId().GetGroupID() != groupId || !vdiskInfo.HasGroupSizeInUnits()) {
continue;
}

UNIT_ASSERT_VALUES_EQUAL(vdiskInfo.GetVDiskId().GetGroupGeneration(), expectedGroupGeneration);
UNIT_ASSERT_VALUES_EQUAL(vdiskInfo.GetGroupSizeInUnits(), expectedGroupSizeInUnits);
break;
}
}

CUSTOM_UNIT_TEST(TestEvVGenerationChangeRace) {
TTestBasicRuntime runtime(1, false);
Setup(runtime, "", nullptr);
runtime.SetLogPriority(NKikimrServices::BS_PROXY, NLog::PRI_ERROR);
runtime.SetLogPriority(NKikimrServices::BS_PROXY_PUT, NLog::PRI_ERROR);
runtime.SetLogPriority(NKikimrServices::BS_PROXY_BLOCK, NLog::PRI_ERROR);
runtime.SetLogPriority(NKikimrServices::BS_SKELETON, NLog::PRI_INFO);
runtime.SetLogPriority(NKikimrServices::BS_LOCALRECOVERY, NLog::PRI_INFO);
runtime.SetLogPriority(NKikimrServices::BS_NODE, NLog::PRI_INFO);
runtime.SetLogPriority(NKikimrServices::BS_CONTROLLER, NLog::PRI_INFO);

const ui32 nodeId = runtime.GetNodeId(0);
TActorId fakeWhiteboard = runtime.AllocateEdgeActor();
runtime.RegisterService(NNodeWhiteboard::MakeNodeWhiteboardServiceId(nodeId), fakeWhiteboard);

VERBOSE_COUT(" Starting test");

TBlockEvents<TEvBlobStorage::TEvLocalRecoveryDone> block(runtime);

const TString poolName = "testEvVGenerationChangeRace";
CreateStoragePool(runtime, poolName, "pool-kind-1");
ui32 groupId = GetGroupFromPool(runtime, poolName);

CheckVDiskStateUpdate(runtime, fakeWhiteboard, groupId, 1, 0u);
ChangeGroupSizeInUnits(runtime, poolName, groupId, 2u);
CheckVDiskStateUpdate(runtime, fakeWhiteboard, groupId, 1, 0u);
block.Stop().Unblock();
CheckVDiskStateUpdate(runtime, fakeWhiteboard, groupId, 2, 2u);
}

}

} // namespace NBlobStorageNodeWardenTest
Expand Down
14 changes: 13 additions & 1 deletion ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1931,6 +1931,17 @@ namespace NKikimr {
ApplyHugeBlobSize(Config->MinHugeBlobInBytes);
Y_VERIFY_S(MinHugeBlobInBytes, VCtx->VDiskLogPrefix);

if (Config->GroupSizeInUnits != GInfo->GroupSizeInUnits) {
Config->GroupSizeInUnits = GInfo->GroupSizeInUnits;
Y_VERIFY(PDiskCtx);
Y_VERIFY(PDiskCtx->Dsk);
ctx.Send(PDiskCtx->PDiskId,
new NPDisk::TEvYardResize(
PDiskCtx->Dsk->Owner,
PDiskCtx->Dsk->OwnerRound,
Config->GroupSizeInUnits));
}

// handle special case when donor disk starts and finds out that it has been wiped out
if (ev->Get()->LsnMngr->GetOriginallyRecoveredLsn() == 0 && Config->BaseInfo.DonorMode) {
// send drop donor cmd to NodeWarden
Expand Down Expand Up @@ -2448,10 +2459,11 @@ namespace NKikimr {
GInfo = msg->NewInfo;
SelfVDiskId = msg->NewVDiskId;

if (Config->GroupSizeInUnits != GInfo->GroupSizeInUnits) {
if (PDiskCtx && Config->GroupSizeInUnits != GInfo->GroupSizeInUnits) {
Config->GroupSizeInUnits = GInfo->GroupSizeInUnits;
UpdateWhiteboard(ctx);

Y_VERIFY(PDiskCtx->Dsk);
ctx.Send(PDiskCtx->PDiskId,
new NPDisk::TEvYardResize(
PDiskCtx->Dsk->Owner,
Expand Down
99 changes: 10 additions & 89 deletions ydb/tests/compatibility/test_infer_pdisk_expected_slot_count.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@
from ydb.tests.library.compatibility.fixtures import inter_stable_binary_path, inter_stable_name
from ydb.tests.library.compatibility.fixtures import current_binary_path, current_name
from ydb.tests.library.common.types import Erasure
import ydb.core.protos.msgbus_pb2 as msgbus
import ydb.core.protos.blobstorage_config_pb2 as blobstorage_config_pb2
from ydb.core.protos import blobstorage_config_pb2

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -43,96 +42,18 @@ def setup(self):
use_in_memory_pdisks=False)
next(cluster_generator)

host_configs = self.read_host_configs()
host_configs = self.cluster.client.read_host_configs()
for host_config in host_configs:
drive = host_config.Drive.add()
drive.Path = CONST_PDISK_PATH
drive.PDiskConfig.ExpectedSlotCount = CONST_EXPECTED_SLOT_COUNT
self.define_host_configs(host_configs)
self.cluster.client.define_host_configs(host_configs)

yield

def read_host_configs(self):
request = msgbus.TBlobStorageConfigRequest()
request.Domain = 1
request.Request.Command.add().ReadHostConfig.SetInParent()

response = self.cluster.client.send(request, 'BlobStorageConfig').BlobStorageConfigResponse
logger.info(f"read_host_config response: {response}")
if not response.Success:
raise RuntimeError('read_host_config request failed: %s' % response.ErrorDescription)
status = response.Status[0]
if not status.Success:
raise RuntimeError('read_host_config has failed status: %s' % status.ErrorDescription)

return status.HostConfig

def define_host_configs(self, host_configs):
"""Define host configuration with specified host config"""
request = msgbus.TBlobStorageConfigRequest()
request.Domain = 1
for host_config in host_configs:
request.Request.Command.add().DefineHostConfig.MergeFrom(host_config)

logger.info(f"define_host_config request: {request}")
response = self.cluster.client.send(request, 'BlobStorageConfig').BlobStorageConfigResponse
logger.info(f"define_host_config responce: {response}")
if not response.Success:
raise RuntimeError('define_host_config request failed: %s' % response.ErrorDescription)
for i, status in enumerate(response.Status):
if not status.Success:
raise RuntimeError('define_host_config has failed status[%d]: %s' % (i, status))

def pdisk_set_all_active(self):
"""Update all drive statuses to ACTIVE. Equivalent to
`dstool pdisk set --status=ACTIVE --pdisk-ids <pdisks>`
"""
base_config = self.query_base_config()

request = msgbus.TBlobStorageConfigRequest()
request.Domain = 1

for pdisk in base_config.BaseConfig.PDisk:
if pdisk.Path != CONST_PDISK_PATH:
continue
cmd = request.Request.Command.add().UpdateDriveStatus
cmd.HostKey.NodeId = pdisk.NodeId
cmd.PDiskId = pdisk.PDiskId
cmd.Status = blobstorage_config_pb2.EDriveStatus.ACTIVE

logger.info(f"update_all_drive_status_active request: {request}")
response = self.cluster.client.send(request, 'BlobStorageConfig').BlobStorageConfigResponse
logger.info(f"update_all_drive_status_active response: {response}")

if not response.Success:
raise RuntimeError('update_all_drive_status_active request failed: %s' % response.ErrorDescription)
for i, status in enumerate(response.Status):
if not status.Success:
raise RuntimeError('update_all_drive_status_active has failed status[%d]: %s' % (i, status))

def query_base_config(self):
request = msgbus.TBlobStorageConfigRequest()
request.Domain = 1

# Add QueryBaseConfig command
command = request.Request.Command.add()
command.QueryBaseConfig.RetrieveDevices = True
command.QueryBaseConfig.VirtualGroupsOnly = False

# Send the request
response = self.cluster.client.send(request, 'BlobStorageConfig').BlobStorageConfigResponse
if not response.Success:
raise RuntimeError('query_base_config failed: %s' % response.ErrorDescription)

status = response.Status[0]
if not status.Success:
raise RuntimeError('query_base_config failed: %s' % status.ErrorDescription)

return status

def pdisk_list(self):
"""Equivalent to `dstool pdisk list`"""
base_config = self.query_base_config()
base_config = self.cluster.client.query_base_config()

# Collect PDisk information
pdisks_info = []
Expand All @@ -152,11 +73,11 @@ def wait_and_check_pdisk_list(self, check_pdisks_fn, deadline, delay=1):
except AssertionError as e:
if time.time() > deadline:
logger.warning(f"pdisk_list incorrect: {pdisks}")
raise e from e
raise e
else:
time.sleep(delay)

def test_infer_pdisk_expected_slot_count(self):
def test(self):
assert self.current_binary_paths_index == 0
logger.info(f"Test started on {self.versions[0]} {time.time()=}")
#################################################################
Expand Down Expand Up @@ -187,18 +108,18 @@ def check_pdisks(pdisks):
######################################################################

t2 = time.time()
host_configs = self.read_host_configs()
host_configs = self.cluster.client.read_host_configs()
for host_config in host_configs:
drive = host_config.Drive[1]
assert drive.Path == CONST_PDISK_PATH
drive.ClearField('PDiskConfig')
drive.PDiskConfig.SetInParent()
drive.InferPDiskSlotCountFromUnitSize = CONST_10_GB
drive.InferPDiskSlotCountMax = 32
self.define_host_configs(host_configs)
self.cluster.client.define_host_configs(host_configs)
logger.info(f"Inferred PDisk setting applied {time.time()=}")

self.pdisk_set_all_active()
self.cluster.client.pdisk_set_all_active(pdisk_path=CONST_PDISK_PATH)
logger.info(f"Drives activated {time.time()=}")

deadline = time.time() + timeout
Expand All @@ -224,7 +145,7 @@ def check_pdisks(pdisks):
logger.info(f"Restarted back on version {self.versions[0]} {time.time()=}")
###########################################################################

self.pdisk_set_all_active()
self.cluster.client.pdisk_set_all_active(pdisk_path=CONST_PDISK_PATH)
logger.info(f"Drives activated {time.time()=}")

deadline = time.time() + timeout
Expand Down
Loading
Loading