diff --git a/ydb/core/blobstorage/nodewarden/blobstorage_node_warden_ut.cpp b/ydb/core/blobstorage/nodewarden/blobstorage_node_warden_ut.cpp index 86abd2d1a292..7cde508507a2 100644 --- a/ydb/core/blobstorage/nodewarden/blobstorage_node_warden_ut.cpp +++ b/ydb/core/blobstorage/nodewarden/blobstorage_node_warden_ut.cpp @@ -7,10 +7,13 @@ #include #include #include +#include +#include #include #include #include +#include #include #include @@ -1119,6 +1122,79 @@ Y_UNIT_TEST_SUITE(TBlobStorageWardenTest) { CheckInferredPDiskSettings(runtime, fakeWhiteboard, fakeNodeWarden, pdiskId, 12, 2u); } + + void ChangeGroupSizeInUnits(TTestBasicRuntime& runtime, TString poolName, ui32 groupId, ui32 groupSizeInUnits) { + TActorId edge = runtime.AllocateEdgeActor(); + + auto storagePool = DescribeStoragePool(runtime, poolName); + auto request = std::make_unique(); + auto& cmd = *request->Record.MutableRequest()->AddCommand()->MutableChangeGroupSizeInUnits(); + cmd.SetBoxId(storagePool.GetBoxId()); + cmd.SetItemConfigGeneration(storagePool.GetItemConfigGeneration()); + cmd.SetStoragePoolId(storagePool.GetStoragePoolId()); + cmd.AddGroupId(groupId); + cmd.SetSizeInUnits(groupSizeInUnits); + + NTabletPipe::TClientConfig pipeConfig; + pipeConfig.RetryPolicy = NTabletPipe::TClientRetryPolicy::WithRetries(); + runtime.SendToPipe(MakeBSControllerID(), edge, request.release(), 0, pipeConfig); + + auto reply = runtime.GrabEdgeEventRethrow(edge); + VERBOSE_COUT("TEvControllerConfigResponse# " << reply->ToString()); + UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetResponse().GetSuccess(), true); + } + + void CheckVDiskStateUpdate(TTestBasicRuntime& runtime, TActorId fakeWhiteboard, ui32 groupId, + ui32 expectedGroupGeneration, ui32 expectedGroupSizeInUnits, + TDuration simTimeout = TDuration::Seconds(10)) { + TInstant deadline = runtime.GetCurrentTime() + simTimeout; + while (true) { + UNIT_ASSERT_LT(runtime.GetCurrentTime(), deadline); + + const auto ev = runtime.GrabEdgeEventRethrow(fakeWhiteboard, deadline - runtime.GetCurrentTime()); + VERBOSE_COUT(" Got TEvVDiskStateUpdate# " << ev->ToString()); + + NKikimrWhiteboard::TVDiskStateInfo vdiskInfo = ev->Get()->Record; + if (vdiskInfo.GetVDiskId().GetGroupID() != groupId || !vdiskInfo.HasGroupSizeInUnits()) { + continue; + } + + UNIT_ASSERT_VALUES_EQUAL(vdiskInfo.GetVDiskId().GetGroupGeneration(), expectedGroupGeneration); + UNIT_ASSERT_VALUES_EQUAL(vdiskInfo.GetGroupSizeInUnits(), expectedGroupSizeInUnits); + break; + } + } + + CUSTOM_UNIT_TEST(TestEvVGenerationChangeRace) { + TTestBasicRuntime runtime(1, false); + Setup(runtime, "", nullptr); + runtime.SetLogPriority(NKikimrServices::BS_PROXY, NLog::PRI_ERROR); + runtime.SetLogPriority(NKikimrServices::BS_PROXY_PUT, NLog::PRI_ERROR); + runtime.SetLogPriority(NKikimrServices::BS_PROXY_BLOCK, NLog::PRI_ERROR); + runtime.SetLogPriority(NKikimrServices::BS_SKELETON, NLog::PRI_INFO); + runtime.SetLogPriority(NKikimrServices::BS_LOCALRECOVERY, NLog::PRI_INFO); + runtime.SetLogPriority(NKikimrServices::BS_NODE, NLog::PRI_INFO); + runtime.SetLogPriority(NKikimrServices::BS_CONTROLLER, NLog::PRI_INFO); + + const ui32 nodeId = runtime.GetNodeId(0); + TActorId fakeWhiteboard = runtime.AllocateEdgeActor(); + runtime.RegisterService(NNodeWhiteboard::MakeNodeWhiteboardServiceId(nodeId), fakeWhiteboard); + + VERBOSE_COUT(" Starting test"); + + TBlockEvents block(runtime); + + const TString poolName = "testEvVGenerationChangeRace"; + CreateStoragePool(runtime, poolName, "pool-kind-1"); + ui32 groupId = GetGroupFromPool(runtime, poolName); + + CheckVDiskStateUpdate(runtime, fakeWhiteboard, groupId, 1, 0u); + ChangeGroupSizeInUnits(runtime, poolName, groupId, 2u); + CheckVDiskStateUpdate(runtime, fakeWhiteboard, groupId, 1, 0u); + block.Stop().Unblock(); + CheckVDiskStateUpdate(runtime, fakeWhiteboard, groupId, 2, 2u); + } + } } // namespace NBlobStorageNodeWardenTest diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp index 23a46a0e4251..8a29bc440d25 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp +++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp @@ -1927,6 +1927,17 @@ namespace NKikimr { ApplyHugeBlobSize(Config->MinHugeBlobInBytes); Y_VERIFY_S(MinHugeBlobInBytes, VCtx->VDiskLogPrefix); + if (Config->GroupSizeInUnits != GInfo->GroupSizeInUnits) { + Config->GroupSizeInUnits = GInfo->GroupSizeInUnits; + Y_VERIFY(PDiskCtx); + Y_VERIFY(PDiskCtx->Dsk); + ctx.Send(PDiskCtx->PDiskId, + new NPDisk::TEvYardResize( + PDiskCtx->Dsk->Owner, + PDiskCtx->Dsk->OwnerRound, + Config->GroupSizeInUnits)); + } + // handle special case when donor disk starts and finds out that it has been wiped out if (ev->Get()->LsnMngr->GetOriginallyRecoveredLsn() == 0 && Config->BaseInfo.DonorMode) { // send drop donor cmd to NodeWarden @@ -2444,10 +2455,11 @@ namespace NKikimr { GInfo = msg->NewInfo; SelfVDiskId = msg->NewVDiskId; - if (Config->GroupSizeInUnits != GInfo->GroupSizeInUnits) { + if (PDiskCtx && Config->GroupSizeInUnits != GInfo->GroupSizeInUnits) { Config->GroupSizeInUnits = GInfo->GroupSizeInUnits; UpdateWhiteboard(ctx); + Y_VERIFY(PDiskCtx->Dsk); ctx.Send(PDiskCtx->PDiskId, new NPDisk::TEvYardResize( PDiskCtx->Dsk->Owner, diff --git a/ydb/tests/compatibility/test_infer_pdisk_expected_slot_count.py b/ydb/tests/compatibility/test_infer_pdisk_expected_slot_count.py new file mode 100644 index 000000000000..20c7f4b65fac --- /dev/null +++ b/ydb/tests/compatibility/test_infer_pdisk_expected_slot_count.py @@ -0,0 +1,172 @@ +# -*- coding: utf-8 -*- +import pytest +import time +import logging + +from ydb.tests.library.compatibility.fixtures import RestartToAnotherVersionFixture +from ydb.tests.library.compatibility.fixtures import init_stable_binary_path, init_stable_name +from ydb.tests.library.compatibility.fixtures import inter_stable_binary_path, inter_stable_name +from ydb.tests.library.compatibility.fixtures import current_binary_path, current_name +from ydb.tests.library.common.types import Erasure +from ydb.core.protos import blobstorage_config_pb2 + +logger = logging.getLogger(__name__) + +CONST_PDISK_PATH = "SectorMap:TestInferPDiskSettings:480" +CONST_EXPECTED_SLOT_COUNT = 14 +CONST_480_GB = 480 * 1024**3 +CONST_10_GB = 10 * 1024**3 + +all_binary_combinations_restart = [ + [init_stable_binary_path, inter_stable_binary_path], + [inter_stable_binary_path, current_binary_path], + [init_stable_binary_path, current_binary_path], +] +all_binary_combinations_ids_restart = [ + "restart_{}_to_{}".format(init_stable_name, inter_stable_name), + "restart_{}_to_{}".format(inter_stable_name, current_name), + "restart_{}_to_{}".format(init_stable_name, current_name), +] + + +@pytest.mark.parametrize("base_setup", + argvalues=all_binary_combinations_restart, + ids=all_binary_combinations_ids_restart, + indirect=True) +class TestUpgradeThenRollback(RestartToAnotherVersionFixture): + @pytest.fixture(autouse=True, scope="function") + def setup(self): + cluster_generator = self.setup_cluster( + erasure=Erasure.NONE, + nodes=2, + use_in_memory_pdisks=False) + next(cluster_generator) + + host_configs = self.cluster.client.read_host_configs() + for host_config in host_configs: + drive = host_config.Drive.add() + drive.Path = CONST_PDISK_PATH + drive.PDiskConfig.ExpectedSlotCount = CONST_EXPECTED_SLOT_COUNT + self.cluster.client.define_host_configs(host_configs) + + yield + + def pdisk_list(self): + """Equivalent to `dstool pdisk list`""" + base_config = self.cluster.client.query_base_config() + + # Collect PDisk information + pdisks_info = [] + for pdisk in base_config.BaseConfig.PDisk: + if pdisk.Path != CONST_PDISK_PATH: + continue + pdisks_info.append(pdisk) + return pdisks_info + + def wait_and_check_pdisk_list(self, check_pdisks_fn, deadline, delay=1): + while True: + pdisks = self.pdisk_list() + try: + check_pdisks_fn(pdisks) + logger.info(f"pdisk_list good: {pdisks}") + return + except AssertionError as e: + if time.time() > deadline: + logger.warning(f"pdisk_list incorrect: {pdisks}") + raise e + else: + time.sleep(delay) + + def test(self): + assert self.current_binary_paths_index == 0 + logger.info(f"Test started on {self.versions[0]} {time.time()=}") + ################################################################# + + t1 = time.time() + timeout = 20 + + def check_pdisks(pdisks): + for pdisk in pdisks: + assert pdisk.Path == CONST_PDISK_PATH + assert pdisk.PDiskConfig.ExpectedSlotCount == CONST_EXPECTED_SLOT_COUNT + assert pdisk.DriveStatus == blobstorage_config_pb2.EDriveStatus.ACTIVE + assert pdisk.PDiskMetrics.TotalSize == CONST_480_GB + if self.versions[0] < (25, 3): + assert not pdisk.PDiskMetrics.HasField('SlotCount') + assert not pdisk.PDiskMetrics.HasField('SlotSizeInUnits') + else: + assert pdisk.PDiskMetrics.SlotCount == CONST_EXPECTED_SLOT_COUNT + assert pdisk.PDiskMetrics.HasField('SlotSizeInUnits') and \ + pdisk.PDiskMetrics.SlotSizeInUnits == 0 + assert pdisk.PDiskMetrics.UpdateTimestamp * 1e-6 > t1 + assert pdisk.PDiskMetrics.UpdateTimestamp * 1e-6 < t1 + timeout + self.wait_and_check_pdisk_list(check_pdisks, deadline=t1+timeout) + + self.change_cluster_version() + assert self.current_binary_paths_index == 1 + logger.info(f"Restarted on version {self.versions[1]} {time.time()=}") + ###################################################################### + + t2 = time.time() + host_configs = self.cluster.client.read_host_configs() + for host_config in host_configs: + drive = host_config.Drive[1] + assert drive.Path == CONST_PDISK_PATH + drive.ClearField('PDiskConfig') + drive.PDiskConfig.SetInParent() + drive.InferPDiskSlotCountFromUnitSize = CONST_10_GB + drive.InferPDiskSlotCountMax = 32 + self.cluster.client.define_host_configs(host_configs) + logger.info(f"Inferred PDisk setting applied {time.time()=}") + + self.cluster.client.pdisk_set_all_active(pdisk_path=CONST_PDISK_PATH) + logger.info(f"Drives activated {time.time()=}") + + deadline = time.time() + timeout + + def check_pdisks(pdisks): + for pdisk in pdisks: + assert pdisk.Path == CONST_PDISK_PATH + assert pdisk.DriveStatus == blobstorage_config_pb2.EDriveStatus.ACTIVE + assert not pdisk.HasField('PDiskConfig') + assert pdisk.ExpectedSlotCount == 16 # hardcoded default + assert pdisk.PDiskMetrics.TotalSize == CONST_480_GB + assert pdisk.PDiskMetrics.SlotCount == 24 + assert pdisk.PDiskMetrics.SlotSizeInUnits == 2 + assert pdisk.InferPDiskSlotCountFromUnitSize == CONST_10_GB + assert pdisk.InferPDiskSlotCountMax == 32 + assert pdisk.PDiskMetrics.UpdateTimestamp * 1e-6 > t2 + assert pdisk.PDiskMetrics.UpdateTimestamp * 1e-6 < deadline + self.wait_and_check_pdisk_list(check_pdisks, deadline) + + t3 = time.time() + self.change_cluster_version() + assert self.current_binary_paths_index == 0 + logger.info(f"Restarted back on version {self.versions[0]} {time.time()=}") + ########################################################################### + + self.cluster.client.pdisk_set_all_active(pdisk_path=CONST_PDISK_PATH) + logger.info(f"Drives activated {time.time()=}") + + deadline = time.time() + timeout + + def check_pdisks(pdisks): + for pdisk in pdisks: + assert pdisk.Path == CONST_PDISK_PATH + assert pdisk.DriveStatus == blobstorage_config_pb2.EDriveStatus.ACTIVE + assert not pdisk.HasField('PDiskConfig') + assert pdisk.ExpectedSlotCount == 16 # hardcoded default + assert pdisk.PDiskMetrics.TotalSize == CONST_480_GB + if self.versions[0] < (25, 3): + assert not pdisk.PDiskMetrics.HasField('SlotCount') + assert not pdisk.PDiskMetrics.HasField('SlotSizeInUnits') + assert pdisk.InferPDiskSlotCountFromUnitSize == 0 + assert pdisk.InferPDiskSlotCountMax == 0 + else: + assert pdisk.PDiskMetrics.HasField('SlotCount') and pdisk.PDiskMetrics.SlotCount == 24 + assert pdisk.PDiskMetrics.HasField('SlotSizeInUnits') and pdisk.PDiskMetrics.SlotSizeInUnits == 2 + assert pdisk.InferPDiskSlotCountFromUnitSize == CONST_10_GB + assert pdisk.InferPDiskSlotCountMax == 32 + assert pdisk.PDiskMetrics.UpdateTimestamp * 1e-6 > t3 + assert pdisk.PDiskMetrics.UpdateTimestamp * 1e-6 < deadline + self.wait_and_check_pdisk_list(check_pdisks, deadline) diff --git a/ydb/tests/compatibility/ya.make b/ydb/tests/compatibility/ya.make index 0b838af44a8b..fcdc2aa2d74d 100644 --- a/ydb/tests/compatibility/ya.make +++ b/ydb/tests/compatibility/ya.make @@ -26,6 +26,7 @@ TEST_SRCS( test_node_broker_delta_protocol.py test_table_schema_compatibility.py test_workload_manager.py + test_infer_pdisk_expected_slot_count.py udf/test_datetime2.py udf/test_digest.py udf/test_digest_regression.py diff --git a/ydb/tests/functional/blobstorage/test_pdisk_slot_size_in_units.py b/ydb/tests/functional/blobstorage/test_pdisk_slot_size_in_units.py new file mode 100644 index 000000000000..11052ba52883 --- /dev/null +++ b/ydb/tests/functional/blobstorage/test_pdisk_slot_size_in_units.py @@ -0,0 +1,181 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +import time +import logging +import pytest +import requests +import json + +from ydb.tests.library.harness.kikimr_runner import KiKiMR +from ydb.tests.library.harness.kikimr_config import KikimrConfigGenerator +from ydb.tests.library.common.types import Erasure +from ydb.tests.library.harness.util import LogLevels +from ydb.core.protos import msgbus_pb2 + +logger = logging.getLogger(__name__) + +CONST_64_GB = 64 * 1024**3 + + +class TestPDiskSlotSizeInUnits(object): + erasure = Erasure.NONE + pool_name = 'test:1' + nodes_count = 1 + + @pytest.fixture(autouse=True) + def setup(self): + log_configs = { + 'BS_NODE': LogLevels.DEBUG, + 'BS_CONTROLLER': LogLevels.DEBUG, + 'BS_SKELETON': LogLevels.DEBUG, + 'BS_PDISK': LogLevels.DEBUG, + } + + configurator = KikimrConfigGenerator( + erasure=self.erasure, + nodes=self.nodes_count, + use_in_memory_pdisks=False, + static_pdisk_size=CONST_64_GB, + dynamic_pdisks=[{'disk_size': CONST_64_GB, 'user_kind': 1}], + dynamic_pdisks_config=dict(expected_slot_count=4), + dynamic_storage_pools=[dict(name=self.pool_name, kind="hdd", pdisk_user_kind=1)], + additional_log_configs=log_configs + ) + + assert len(configurator.pdisks_info) == 2 + self.cluster = KiKiMR(configurator) + self.cluster.start() + + host_config = self.cluster.client.read_host_configs()[0] + assert len(host_config.Drive) == 2 + assert host_config.Drive[1].PDiskConfig.ExpectedSlotCount == 4 + + base_config = self.cluster.client.query_base_config().BaseConfig + logger.info(f"{base_config=}") + + self.pdisk_id = base_config.PDisk[1].PDiskId + self.groups = [group for group in base_config.Group if group.StoragePoolId == 1] + self.groups.sort(key=lambda g: g.GroupId) + assert len(self.groups) == 2 + for g in self.groups: + assert len(g.VSlotId) == 1 + assert g.VSlotId[0].PDiskId == self.pdisk_id + + yield + self.cluster.stop() + + def change_group_size_in_units(self, new_size, group_id=None): + storage_pool = self.cluster.client.read_storage_pools()[0] + + request = msgbus_pb2.TBlobStorageConfigRequest() + request.Domain = 1 + + cmd = request.Request.Command.add().ChangeGroupSizeInUnits + cmd.BoxId = storage_pool.BoxId + cmd.StoragePoolId = storage_pool.StoragePoolId + cmd.SizeInUnits = new_size + if group_id: + cmd.GroupId.append(group_id) + cmd.ItemConfigGeneration = storage_pool.ItemConfigGeneration + + logger.info(f"change_group_size_in_units request: {request}") + response = self.cluster.client.send(request, 'BlobStorageConfig').BlobStorageConfigResponse + logger.info(f"change_group_size_in_units response: {response}") + + if not response.Success: + raise RuntimeError(f'change_group_size_in_units request failed: {response.ErrorDescription}') + + for i, status in enumerate(response.Status): + if not status.Success: + raise RuntimeError(f'change_group_size_in_units has failed status[{i}]: {status.ErrorDescription}') + + def change_pdisk_slot_size_in_units(self, slot_size_in_units): + host_config = self.cluster.client.read_host_configs()[0] + host_config.Drive[1].PDiskConfig.SlotSizeInUnits = slot_size_in_units + self.cluster.client.define_host_configs([host_config]) + + def retriable(self, check_fn, timeout=30, delay=1): + deadline = time.time() + timeout + + while True: + try: + return check_fn() + except AssertionError as e: + logger.info(str(e)) + if time.time() > deadline: + raise e + else: + time.sleep(delay) + + def http_get(self, url): + host = self.cluster.nodes[1].host + port = self.cluster.nodes[1].mon_port + return requests.get("http://%s:%s%s" % (host, port, url)) + + def get_storage_groups(self): + response = self.http_get('/storage/groups?fields_required=all&with=all').json() + groups = [group for group in response['StorageGroups'] if group['PoolName'] == self.pool_name] + groups.sort(key=lambda g: g['GroupId']) + assert len(groups) == 2 + for group in groups: + vdisk = group['VDisks'][0] + assert vdisk['Whiteboard'].get('VDiskState') == 'OK' + assert vdisk['PDisk']['Whiteboard'].get('State') == 'Normal' + return groups + + def get_pdisk_info(self): + response = self.http_get('/pdisk/info?node_id=1&pdisk_id=%s' % self.pdisk_id).json() + return response + + def check_group(self, group, expected_vdisk_weight, expected_num_active_slots): + vdisk = group['VDisks'][0] + assert vdisk['PDisk']['Whiteboard']['NumActiveSlots'] == expected_num_active_slots + assert int(vdisk['Whiteboard']['AllocatedSize']) + int(vdisk['Whiteboard']['AvailableSize']) == \ + expected_vdisk_weight * int(vdisk['PDisk']['Whiteboard']['EnforcedDynamicSlotSize']) + + def check_pdisk(self, pdisk, expected_num_active_slots=None, expected_slot_size_in_units=None): + if expected_num_active_slots is not None: + assert pdisk['NumActiveSlots'] == expected_num_active_slots + if expected_slot_size_in_units is not None: + assert pdisk['SlotSizeInUnits'] == expected_slot_size_in_units + + def test_change_group_size_in_units(self): + self.change_group_size_in_units(new_size=2, group_id=self.groups[0].GroupId) + + def wait_whiteboard_updated(): + groups = self.get_storage_groups() + logger.info(json.dumps(groups, indent=2)) + self.check_group(groups[0], expected_vdisk_weight=2, expected_num_active_slots=3) + self.check_group(groups[1], expected_vdisk_weight=1, expected_num_active_slots=3) + self.retriable(wait_whiteboard_updated) + + def wait_pdisk_info_updated(): + pdisk_info = self.get_pdisk_info() + logger.info(json.dumps(pdisk_info, indent=2)) + self.check_pdisk(pdisk_info['Whiteboard']['PDisk'], expected_num_active_slots=3) + self.check_pdisk(pdisk_info['BSC']['PDisk'], expected_num_active_slots=3, expected_slot_size_in_units=0) + self.retriable(wait_pdisk_info_updated) + + def test_change_pdisk_slot_size_in_units(self): + self.change_pdisk_slot_size_in_units(slot_size_in_units=2) + self.change_group_size_in_units(new_size=4, group_id=self.groups[1].GroupId) + + def wait_whiteboard_updated(): + groups = self.get_storage_groups() + logger.info(json.dumps(groups, indent=2)) + self.check_group(groups[0], expected_vdisk_weight=1, expected_num_active_slots=3) + self.check_group(groups[1], expected_vdisk_weight=2, expected_num_active_slots=3) + self.retriable(wait_whiteboard_updated) + + def wait_bsc_updated(): + base_config = self.cluster.client.query_base_config().BaseConfig + logger.info(base_config.PDisk[1]) + assert base_config.PDisk[1].PDiskMetrics.SlotSizeInUnits == 2 + self.retriable(wait_bsc_updated) + + def wait_pdisk_info_updated(): + pdisk_info = self.get_pdisk_info() + logger.info(json.dumps(pdisk_info, indent=2)) + self.check_pdisk(pdisk_info['Whiteboard']['PDisk'], expected_num_active_slots=3) + self.check_pdisk(pdisk_info['BSC']['PDisk'], expected_num_active_slots=3, expected_slot_size_in_units=2) + self.retriable(wait_pdisk_info_updated) diff --git a/ydb/tests/functional/blobstorage/ya.make b/ydb/tests/functional/blobstorage/ya.make index 00218fbe13e9..0195f1fe4bcf 100644 --- a/ydb/tests/functional/blobstorage/ya.make +++ b/ydb/tests/functional/blobstorage/ya.make @@ -3,6 +3,7 @@ PY3TEST() INCLUDE(${ARCADIA_ROOT}/ydb/tests/ydbd_dep.inc) TEST_SRCS( test_pdisk_format_info.py + test_pdisk_slot_size_in_units.py test_replication.py test_self_heal.py test_tablet_channel_migration.py diff --git a/ydb/tests/library/clients/kikimr_client.py b/ydb/tests/library/clients/kikimr_client.py index 190f186699ee..2bb3772bbe01 100644 --- a/ydb/tests/library/clients/kikimr_client.py +++ b/ydb/tests/library/clients/kikimr_client.py @@ -299,5 +299,89 @@ def tablet_state(self, tablet_type=None, tablet_ids=()): request.Alive = True return self.invoke(request, 'TabletStateRequest') + def read_host_configs(self, domain=1): + request = msgbus.TBlobStorageConfigRequest() + request.Domain = domain + request.Request.Command.add().ReadHostConfig.SetInParent() + + response = self.send(request, 'BlobStorageConfig').BlobStorageConfigResponse + if not response.Success: + raise RuntimeError('read_host_config request failed: %s' % response.ErrorDescription) + status = response.Status[0] + if not status.Success: + raise RuntimeError('read_host_config has failed status: %s' % status.ErrorDescription) + + return status.HostConfig + + def define_host_configs(self, host_configs, domain=1): + request = msgbus.TBlobStorageConfigRequest() + request.Domain = domain + for host_config in host_configs: + request.Request.Command.add().DefineHostConfig.MergeFrom(host_config) + + response = self.send(request, 'BlobStorageConfig').BlobStorageConfigResponse + if not response.Success: + raise RuntimeError('define_host_config request failed: %s' % response.ErrorDescription) + for i, status in enumerate(response.Status): + if not status.Success: + raise RuntimeError('define_host_config has failed status[%d]: %s' % (i, status)) + + def read_storage_pools(self, domain=1): + request = msgbus.TBlobStorageConfigRequest() + request.Domain = domain + cmd = request.Request.Command.add().ReadStoragePool + cmd.BoxId = 0xFFFFFFFFFFFFFFFF + + response = self.send(request, 'BlobStorageConfig').BlobStorageConfigResponse + if not response.Success: + raise RuntimeError('read_storage_pools request failed: %s' % response.ErrorDescription) + + status = response.Status[0] + if not status.Success: + raise RuntimeError('read_storage_pools has failed status: %s' % status.ErrorDescription) + + return status.StoragePool + + def pdisk_set_all_active(self, pdisk_path=None, domain=1): + """Equivalent to `dstool pdisk set --status=ACTIVE --pdisk-ids `""" + base_config = self.query_base_config(domain=domain) + + request = msgbus.TBlobStorageConfigRequest() + request.Domain = domain + + for pdisk in base_config.BaseConfig.PDisk: + if pdisk_path is not None and pdisk.Path != pdisk_path: + continue + cmd = request.Request.Command.add().UpdateDriveStatus + cmd.HostKey.NodeId = pdisk.NodeId + cmd.PDiskId = pdisk.PDiskId + cmd.Status = blobstorage_config_pb2.EDriveStatus.ACTIVE + + response = self.send(request, 'BlobStorageConfig').BlobStorageConfigResponse + + if not response.Success: + raise RuntimeError('update_all_drive_status_active request failed: %s' % response.ErrorDescription) + for i, status in enumerate(response.Status): + if not status.Success: + raise RuntimeError('update_all_drive_status_active has failed status[%d]: %s' % (i, status)) + + def query_base_config(self, domain=1): + request = msgbus.TBlobStorageConfigRequest() + request.Domain = domain + + command = request.Request.Command.add() + command.QueryBaseConfig.RetrieveDevices = True + command.QueryBaseConfig.VirtualGroupsOnly = False + + response = self.send(request, 'BlobStorageConfig').BlobStorageConfigResponse + if not response.Success: + raise RuntimeError('query_base_config failed: %s' % response.ErrorDescription) + + status = response.Status[0] + if not status.Success: + raise RuntimeError('query_base_config failed: %s' % status.ErrorDescription) + + return status + def __del__(self): self.close() diff --git a/ydb/tests/library/clients/kikimr_http_client.py b/ydb/tests/library/clients/kikimr_http_client.py index 1fed437016f4..2112c7789453 100644 --- a/ydb/tests/library/clients/kikimr_http_client.py +++ b/ydb/tests/library/clients/kikimr_http_client.py @@ -177,3 +177,13 @@ def nodes_info(self): return self.__http_get_and_parse_json( "/json/nodes", timeout=self.__timeout ) + + def storage_groups(self, **kwargs): + return self.__http_get_and_parse_json( + "/storage/groups", timeout=self.__timeout, **kwargs + ) + + def pdisk_info_detailed(self, node_id, pdisk_id): + return self.__http_get_and_parse_json( + "/pdisk/info", node_id=str(node_id), pdisk_id=str(pdisk_id), timeout=self.__timeout + ) diff --git a/ydb/tests/library/compatibility/fixtures.py b/ydb/tests/library/compatibility/fixtures.py index 8a012efe277a..399f61561abd 100644 --- a/ydb/tests/library/compatibility/fixtures.py +++ b/ydb/tests/library/compatibility/fixtures.py @@ -109,9 +109,9 @@ def setup_cluster(self, tenant_db=None, **kwargs): extra_feature_flags = copy.copy(extra_feature_flags) extra_feature_flags["suppress_compatibility_check"] = True self.config = KikimrConfigGenerator( - erasure=Erasure.MIRROR_3_DC, + erasure=kwargs.pop("erasure", Erasure.MIRROR_3_DC), binary_paths=[self.all_binary_paths[self.current_binary_paths_index]], - use_in_memory_pdisks=False, + use_in_memory_pdisks=kwargs.pop("use_in_memory_pdisks", False), extra_feature_flags=extra_feature_flags, **kwargs, ) @@ -262,9 +262,9 @@ def setup_cluster(self, tenant_db=None, **kwargs): # By default draining is not enabled to faster tests extra_feature_flags["enable_drain_on_shutdown"] = True self.config = KikimrConfigGenerator( - erasure=Erasure.MIRROR_3_DC, + erasure=kwargs.pop("erasure", Erasure.MIRROR_3_DC), binary_paths=[self.all_binary_paths[0]], - use_in_memory_pdisks=False, + use_in_memory_pdisks=kwargs.pop("use_in_memory_pdisks", False), extra_feature_flags=extra_feature_flags, **kwargs, ) diff --git a/ydb/tests/library/harness/kikimr_config.py b/ydb/tests/library/harness/kikimr_config.py index ba7b66f1a564..9a20999efa91 100644 --- a/ydb/tests/library/harness/kikimr_config.py +++ b/ydb/tests/library/harness/kikimr_config.py @@ -127,8 +127,10 @@ def __init__( domain_name='Root', suppress_version_check=True, static_pdisk_size=PDISK_SIZE, + static_pdisk_config=None, dynamic_pdisk_size=PDISK_SIZE, dynamic_pdisks=[], + dynamic_pdisks_config=None, dynamic_storage_pools=[dict(name="dynamic_storage_pool:1", kind="hdd", pdisk_user_kind=0)], state_storage_rings=None, n_to_select=None, @@ -212,6 +214,7 @@ def __init__( self.explicit_hosts_and_host_configs = True self._pdisk_store_path = pdisk_store_path self.static_pdisk_size = static_pdisk_size + self.static_pdisk_config = static_pdisk_config self.app_config = config_pb2.TAppConfig() self.port_allocator = KikimrPortManagerPortAllocator() if port_allocator is None else port_allocator erasure = Erasure.NONE if erasure is None else erasure @@ -257,6 +260,7 @@ def __init__( self.dynamic_pdisk_size = dynamic_pdisk_size self.dynamic_storage_pools = dynamic_storage_pools + self.dynamic_pdisks_config = dynamic_pdisks_config self.__dynamic_pdisks = dynamic_pdisks @@ -806,12 +810,19 @@ def _add_state_storage_config(self): for ring in self.state_storage_rings: self.yaml_config["domains_config"]["state_storage"][0]["ring"]["ring"].append({"node" : ring if isinstance(ring, list) else [ring], "use_ring_specific_node_selection" : True}) - def _add_pdisk_to_static_group(self, pdisk_id, path, node_id, pdisk_category, ring): + def _add_pdisk_to_static_group(self, pdisk_id, path, node_id, pdisk_category, ring, pdisk_config=None): domain_id = len( self.yaml_config['blob_storage_config']["service_set"]["groups"][0]["rings"][ring]["fail_domains"]) - self.yaml_config['blob_storage_config']["service_set"]["pdisks"].append( - {"node_id": node_id, "pdisk_id": pdisk_id, "path": path, "pdisk_guid": pdisk_id, - "pdisk_category": pdisk_category}) + pdisk_entry = { + "node_id": node_id, + "pdisk_id": pdisk_id, + "path": path, + "pdisk_guid": pdisk_id, + "pdisk_category": pdisk_category + } + if pdisk_config: + pdisk_entry["pdisk_config"] = pdisk_config + self.yaml_config['blob_storage_config']["service_set"]["pdisks"].append(pdisk_entry) self.yaml_config['blob_storage_config']["service_set"]["vdisks"].append( { "vdisk_id": {"group_id": 0, "group_generation": 1, "ring": ring, "domain": domain_id, "vdisk": 0}, @@ -834,6 +845,9 @@ def _initialize_pdisks_info(self): 'disk_size', self.dynamic_pdisk_size) pdisk_user_kind = 0 if pdisk_id <= 1 else self.__dynamic_pdisks[pdisk_id - 2].get('user_kind', 0) + # Get pdisk config based on whether it's static or dynamic + pdisk_config = self.static_pdisk_config if pdisk_id <= 1 else self.dynamic_pdisks_config + if self.__use_in_memory_pdisks: pdisk_size_gb = disk_size / (1024 * 1024 * 1024) pdisk_path = "SectorMap:%d:%d" % (pdisk_id, pdisk_size_gb) @@ -844,8 +858,17 @@ def _initialize_pdisks_info(self): dir=self._pdisk_store_path) pdisk_path = tmp_file.name - self._pdisks_info.append({'pdisk_path': pdisk_path, 'node_id': node_id, 'disk_size': disk_size, - 'pdisk_user_kind': pdisk_user_kind}) + pdisk_info = { + 'pdisk_path': pdisk_path, + 'node_id': node_id, + 'disk_size': disk_size, + 'pdisk_user_kind': pdisk_user_kind, + 'pdisk_id': pdisk_id + } + if pdisk_config: + pdisk_info['pdisk_config'] = pdisk_config + + self._pdisks_info.append(pdisk_info) if not self.use_self_management and pdisk_id == 1 and node_id <= self.static_erasure.min_fail_domains * self._rings_count: self._add_pdisk_to_static_group( pdisk_id, @@ -853,6 +876,7 @@ def _initialize_pdisks_info(self): node_id, pdisk_user_kind, datacenter_id - 1, + pdisk_config=pdisk_config, ) def _add_host_config_and_hosts(self): diff --git a/ydb/tests/library/harness/kikimr_runner.py b/ydb/tests/library/harness/kikimr_runner.py index f94c15d9a002..c7abd1a276d1 100644 --- a/ydb/tests/library/harness/kikimr_runner.py +++ b/ydb/tests/library/harness/kikimr_runner.py @@ -790,6 +790,12 @@ def __add_bs_box(self): drive_proto.Kind = drive['pdisk_user_kind'] drive_proto.Type = drive.get('pdisk_type', 0) + for key, value in drive.get('pdisk_config', {}).items(): + if key == 'expected_slot_count': + drive_proto.PDiskConfig.ExpectedSlotCount = value + else: + raise KeyError("unknown pdisk_config option %s" % key) + cmd = request.Command.add() cmd.DefineBox.BoxId = 1 for node_id, node in self.nodes.items():