From f007abdfd56d1d4d73ccbc2172b13733a712debd Mon Sep 17 00:00:00 2001 From: Yaroslav Dynnikov Date: Thu, 23 Oct 2025 17:40:28 +0300 Subject: [PATCH 1/3] Move test helpers to kikimr client --- .../test_infer_pdisk_expected_slot_count.py | 95 ++----------------- ydb/tests/library/clients/kikimr_client.py | 69 ++++++++++++++ 2 files changed, 77 insertions(+), 87 deletions(-) diff --git a/ydb/tests/compatibility/test_infer_pdisk_expected_slot_count.py b/ydb/tests/compatibility/test_infer_pdisk_expected_slot_count.py index 15cd594f92e3..3d3ab4cb4e28 100644 --- a/ydb/tests/compatibility/test_infer_pdisk_expected_slot_count.py +++ b/ydb/tests/compatibility/test_infer_pdisk_expected_slot_count.py @@ -8,7 +8,6 @@ from ydb.tests.library.compatibility.fixtures import inter_stable_binary_path, inter_stable_name from ydb.tests.library.compatibility.fixtures import current_binary_path, current_name from ydb.tests.library.common.types import Erasure -import ydb.core.protos.msgbus_pb2 as msgbus import ydb.core.protos.blobstorage_config_pb2 as blobstorage_config_pb2 logger = logging.getLogger(__name__) @@ -43,96 +42,18 @@ def setup(self): use_in_memory_pdisks=False) next(cluster_generator) - host_configs = self.read_host_configs() + host_configs = self.cluster.client.read_host_configs() for host_config in host_configs: drive = host_config.Drive.add() drive.Path = CONST_PDISK_PATH drive.PDiskConfig.ExpectedSlotCount = CONST_EXPECTED_SLOT_COUNT - self.define_host_configs(host_configs) + self.cluster.client.define_host_configs(host_configs) yield - def read_host_configs(self): - request = msgbus.TBlobStorageConfigRequest() - request.Domain = 1 - request.Request.Command.add().ReadHostConfig.SetInParent() - - response = self.cluster.client.send(request, 'BlobStorageConfig').BlobStorageConfigResponse - logger.info(f"read_host_config response: {response}") - if not response.Success: - raise RuntimeError('read_host_config request failed: %s' % response.ErrorDescription) - status = response.Status[0] - if not status.Success: - raise RuntimeError('read_host_config has failed status: %s' % status.ErrorDescription) - - return status.HostConfig - - def define_host_configs(self, host_configs): - """Define host configuration with specified host config""" - request = msgbus.TBlobStorageConfigRequest() - request.Domain = 1 - for host_config in host_configs: - request.Request.Command.add().DefineHostConfig.MergeFrom(host_config) - - logger.info(f"define_host_config request: {request}") - response = self.cluster.client.send(request, 'BlobStorageConfig').BlobStorageConfigResponse - logger.info(f"define_host_config responce: {response}") - if not response.Success: - raise RuntimeError('define_host_config request failed: %s' % response.ErrorDescription) - for i, status in enumerate(response.Status): - if not status.Success: - raise RuntimeError('define_host_config has failed status[%d]: %s' % (i, status)) - - def pdisk_set_all_active(self): - """Update all drive statuses to ACTIVE. Equivalent to - `dstool pdisk set --status=ACTIVE --pdisk-ids ` - """ - base_config = self.query_base_config() - - request = msgbus.TBlobStorageConfigRequest() - request.Domain = 1 - - for pdisk in base_config.BaseConfig.PDisk: - if pdisk.Path != CONST_PDISK_PATH: - continue - cmd = request.Request.Command.add().UpdateDriveStatus - cmd.HostKey.NodeId = pdisk.NodeId - cmd.PDiskId = pdisk.PDiskId - cmd.Status = blobstorage_config_pb2.EDriveStatus.ACTIVE - - logger.info(f"update_all_drive_status_active request: {request}") - response = self.cluster.client.send(request, 'BlobStorageConfig').BlobStorageConfigResponse - logger.info(f"update_all_drive_status_active response: {response}") - - if not response.Success: - raise RuntimeError('update_all_drive_status_active request failed: %s' % response.ErrorDescription) - for i, status in enumerate(response.Status): - if not status.Success: - raise RuntimeError('update_all_drive_status_active has failed status[%d]: %s' % (i, status)) - - def query_base_config(self): - request = msgbus.TBlobStorageConfigRequest() - request.Domain = 1 - - # Add QueryBaseConfig command - command = request.Request.Command.add() - command.QueryBaseConfig.RetrieveDevices = True - command.QueryBaseConfig.VirtualGroupsOnly = False - - # Send the request - response = self.cluster.client.send(request, 'BlobStorageConfig').BlobStorageConfigResponse - if not response.Success: - raise RuntimeError('query_base_config failed: %s' % response.ErrorDescription) - - status = response.Status[0] - if not status.Success: - raise RuntimeError('query_base_config failed: %s' % status.ErrorDescription) - - return status - def pdisk_list(self): """Equivalent to `dstool pdisk list`""" - base_config = self.query_base_config() + base_config = self.cluster.client.query_base_config() # Collect PDisk information pdisks_info = [] @@ -156,7 +77,7 @@ def wait_and_check_pdisk_list(self, check_pdisks_fn, deadline, delay=1): else: time.sleep(delay) - def test_infer_pdisk_expected_slot_count(self): + def test(self): assert self.current_binary_paths_index == 0 logger.info(f"Test started on {self.versions[0]} {time.time()=}") ################################################################# @@ -187,7 +108,7 @@ def check_pdisks(pdisks): ###################################################################### t2 = time.time() - host_configs = self.read_host_configs() + host_configs = self.cluster.client.read_host_configs() for host_config in host_configs: drive = host_config.Drive[1] assert drive.Path == CONST_PDISK_PATH @@ -195,10 +116,10 @@ def check_pdisks(pdisks): drive.PDiskConfig.SetInParent() drive.InferPDiskSlotCountFromUnitSize = CONST_10_GB drive.InferPDiskSlotCountMax = 32 - self.define_host_configs(host_configs) + self.cluster.client.define_host_configs(host_configs) logger.info(f"Inferred PDisk setting applied {time.time()=}") - self.pdisk_set_all_active() + self.cluster.client.pdisk_set_all_active(pdisk_path=CONST_PDISK_PATH) logger.info(f"Drives activated {time.time()=}") deadline = time.time() + timeout @@ -224,7 +145,7 @@ def check_pdisks(pdisks): logger.info(f"Restarted back on version {self.versions[0]} {time.time()=}") ########################################################################### - self.pdisk_set_all_active() + self.cluster.client.pdisk_set_all_active(pdisk_path=CONST_PDISK_PATH) logger.info(f"Drives activated {time.time()=}") deadline = time.time() + timeout diff --git a/ydb/tests/library/clients/kikimr_client.py b/ydb/tests/library/clients/kikimr_client.py index 190f186699ee..9cdc7922067d 100644 --- a/ydb/tests/library/clients/kikimr_client.py +++ b/ydb/tests/library/clients/kikimr_client.py @@ -299,5 +299,74 @@ def tablet_state(self, tablet_type=None, tablet_ids=()): request.Alive = True return self.invoke(request, 'TabletStateRequest') + def read_host_configs(self, domain=1): + request = msgbus.TBlobStorageConfigRequest() + request.Domain = domain + request.Request.Command.add().ReadHostConfig.SetInParent() + + response = self.send(request, 'BlobStorageConfig').BlobStorageConfigResponse + if not response.Success: + raise RuntimeError(f'read_host_config request failed: {response.ErrorDescription}') + status = response.Status[0] + if not status.Success: + raise RuntimeError(f'read_host_config has failed status: {status.ErrorDescription}') + + return status.HostConfig + + def define_host_configs(self, host_configs, domain=1): + request = msgbus.TBlobStorageConfigRequest() + request.Domain = domain + for host_config in host_configs: + request.Request.Command.add().DefineHostConfig.MergeFrom(host_config) + + response = self.send(request, 'BlobStorageConfig').BlobStorageConfigResponse + if not response.Success: + raise RuntimeError(f'define_host_config request failed: {response.ErrorDescription}') + for i, status in enumerate(response.Status): + if not status.Success: + raise RuntimeError(f'define_host_config has failed status[{i}]: {status}') + + def pdisk_set_all_active(self, pdisk_path=None, domain=1): + """Equivalent to `dstool pdisk set --status=ACTIVE --pdisk-ids `""" + base_config = self.query_base_config(domain=domain) + + request = msgbus.TBlobStorageConfigRequest() + request.Domain = domain + + for pdisk in base_config.BaseConfig.PDisk: + if pdisk_path is not None and pdisk.Path != pdisk_path: + continue + cmd = request.Request.Command.add().UpdateDriveStatus + cmd.HostKey.NodeId = pdisk.NodeId + cmd.PDiskId = pdisk.PDiskId + cmd.Status = blobstorage_config_pb2.EDriveStatus.ACTIVE + + response = self.send(request, 'BlobStorageConfig').BlobStorageConfigResponse + + if not response.Success: + raise RuntimeError(f'update_all_drive_status_active request failed: {response.ErrorDescription}') + for i, status in enumerate(response.Status): + if not status.Success: + raise RuntimeError(f'update_all_drive_status_active has failed status[{i}]: {status}') + + def query_base_config(self, domain=1): + request = msgbus.TBlobStorageConfigRequest() + request.Domain = domain + + command = request.Request.Command.add() + command.QueryBaseConfig.RetrieveDevices = True + command.QueryBaseConfig.VirtualGroupsOnly = False + + response = self.send(request, 'BlobStorageConfig').BlobStorageConfigResponse + if not response.Success: + raise RuntimeError(f'query_base_config failed: {response.ErrorDescription}') + + status = response.Status[0] + if not status.Success: + raise RuntimeError(f'query_base_config failed: {status.ErrorDescription}') + + return status + + def __del__(self): self.close() From 62281b272120fd643f4d758b376a0eba083b0410 Mon Sep 17 00:00:00 2001 From: Yaroslav Dynnikov Date: Thu, 23 Oct 2025 18:12:33 +0300 Subject: [PATCH 2/3] Add functional test for PDisk SlotSizeInUnits feature --- .../test_infer_pdisk_expected_slot_count.py | 4 +- .../test_pdisk_slot_size_in_units.py | 171 ++++++++++++++++++ ydb/tests/functional/blobstorage/ya.make | 1 + ydb/tests/library/clients/kikimr_client.py | 17 +- .../library/clients/kikimr_http_client.py | 10 + ydb/tests/library/harness/kikimr_config.py | 36 +++- ydb/tests/library/harness/kikimr_runner.py | 6 + 7 files changed, 236 insertions(+), 9 deletions(-) create mode 100644 ydb/tests/functional/blobstorage/test_pdisk_slot_size_in_units.py diff --git a/ydb/tests/compatibility/test_infer_pdisk_expected_slot_count.py b/ydb/tests/compatibility/test_infer_pdisk_expected_slot_count.py index 3d3ab4cb4e28..20c7f4b65fac 100644 --- a/ydb/tests/compatibility/test_infer_pdisk_expected_slot_count.py +++ b/ydb/tests/compatibility/test_infer_pdisk_expected_slot_count.py @@ -8,7 +8,7 @@ from ydb.tests.library.compatibility.fixtures import inter_stable_binary_path, inter_stable_name from ydb.tests.library.compatibility.fixtures import current_binary_path, current_name from ydb.tests.library.common.types import Erasure -import ydb.core.protos.blobstorage_config_pb2 as blobstorage_config_pb2 +from ydb.core.protos import blobstorage_config_pb2 logger = logging.getLogger(__name__) @@ -73,7 +73,7 @@ def wait_and_check_pdisk_list(self, check_pdisks_fn, deadline, delay=1): except AssertionError as e: if time.time() > deadline: logger.warning(f"pdisk_list incorrect: {pdisks}") - raise e from e + raise e else: time.sleep(delay) diff --git a/ydb/tests/functional/blobstorage/test_pdisk_slot_size_in_units.py b/ydb/tests/functional/blobstorage/test_pdisk_slot_size_in_units.py new file mode 100644 index 000000000000..968c35a3796b --- /dev/null +++ b/ydb/tests/functional/blobstorage/test_pdisk_slot_size_in_units.py @@ -0,0 +1,171 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +import time +import logging +import pytest +import requests +import json + +from ydb.tests.library.harness.kikimr_runner import KiKiMR +from ydb.tests.library.harness.kikimr_config import KikimrConfigGenerator +from ydb.tests.library.common.types import Erasure +from ydb.tests.library.harness.util import LogLevels +from ydb.core.protos import msgbus_pb2 + +logger = logging.getLogger(__name__) + +CONST_64_GB = 64 * 1024**3 + + +class TestPDiskSlotSizeInUnits(object): + erasure = Erasure.NONE + pool_name = 'test:1' + nodes_count = 1 + + @pytest.fixture(autouse=True) + def setup(self): + log_configs = { + 'BS_NODE': LogLevels.DEBUG, + 'BS_CONTROLLER': LogLevels.DEBUG, + 'BS_SKELETON': LogLevels.DEBUG, + 'BS_PDISK': LogLevels.DEBUG, + } + + configurator = KikimrConfigGenerator( + erasure=self.erasure, + nodes=self.nodes_count, + use_in_memory_pdisks=False, + static_pdisk_size=CONST_64_GB, + dynamic_pdisks=[{'disk_size': CONST_64_GB, 'user_kind': 1}], + dynamic_pdisks_config=dict(expected_slot_count=4), + dynamic_storage_pools=[dict(name=self.pool_name, kind="hdd", pdisk_user_kind=1)], + additional_log_configs=log_configs + ) + + assert len(configurator.pdisks_info) == 2 + self.cluster = KiKiMR(configurator) + self.cluster.start() + + host_config = self.cluster.client.read_host_configs()[0] + assert len(host_config.Drive) == 2 + assert host_config.Drive[1].PDiskConfig.ExpectedSlotCount == 4 + + base_config = self.cluster.client.query_base_config().BaseConfig + logger.info(f"{base_config=}") + + self.pdisk_id = base_config.PDisk[1].PDiskId + self.groups = [group for group in base_config.Group if group.StoragePoolId == 1] + assert len(self.groups) == 2 + for g in self.groups: + assert len(g.VSlotId) == 1 + assert g.VSlotId[0].PDiskId == self.pdisk_id + + yield + self.cluster.stop() + + def change_group_size_in_units(self, new_size, group_id=None): + storage_pool = self.cluster.client.read_storage_pools()[0] + + request = msgbus_pb2.TBlobStorageConfigRequest() + request.Domain = 1 + + cmd = request.Request.Command.add().ChangeGroupSizeInUnits + cmd.BoxId = storage_pool.BoxId + cmd.StoragePoolId = storage_pool.StoragePoolId + cmd.SizeInUnits = new_size + if group_id: + cmd.GroupId.append(group_id) + cmd.ItemConfigGeneration = storage_pool.ItemConfigGeneration + + logger.info(f"change_group_size_in_units request: {request}") + response = self.cluster.client.send(request, 'BlobStorageConfig').BlobStorageConfigResponse + logger.info(f"change_group_size_in_units response: {response}") + + if not response.Success: + raise RuntimeError(f'change_group_size_in_units request failed: {response.ErrorDescription}') + + for i, status in enumerate(response.Status): + if not status.Success: + raise RuntimeError(f'change_group_size_in_units has failed status[{i}]: {status.ErrorDescription}') + + def change_pdisk_slot_size_in_units(self, slot_size_in_units): + host_config = self.cluster.client.read_host_configs()[0] + host_config.Drive[1].PDiskConfig.SlotSizeInUnits = slot_size_in_units + self.cluster.client.define_host_configs([host_config]) + + def retriable(self, check_fn, timeout=30, delay=1): + deadline = time.time() + timeout + + while True: + try: + return check_fn() + except AssertionError as e: + logger.info(str(e)) + if time.time() > deadline: + raise e + else: + time.sleep(delay) + + def http_get(self, url): + host = self.cluster.nodes[1].host + port = self.cluster.nodes[1].mon_port + return requests.get("http://%s:%s%s" % (host, port, url)) + + def get_storage_groups(self): + response = self.http_get('/storage/groups?fields_required=all&with=all').json() + groups = [group for group in response['StorageGroups'] if group['PoolName'] == self.pool_name] + assert len(groups) == 2 + for group in groups: + vdisk = group['VDisks'][0] + assert vdisk['Whiteboard'].get('VDiskState') == 'OK' + assert vdisk['PDisk']['Whiteboard'].get('State') == 'Normal' + return groups + + def get_pdisk_info(self): + response = self.http_get('/pdisk/info?node_id=1&pdisk_id=%s' % self.pdisk_id).json() + return response + + def check_group(self, group, expected_vdisk_weight, expected_num_active_slots): + vdisk = group['VDisks'][0] + assert vdisk['PDisk']['Whiteboard']['NumActiveSlots'] == expected_num_active_slots + assert int(vdisk['Whiteboard']['AllocatedSize']) + int(vdisk['Whiteboard']['AvailableSize']) == \ + expected_vdisk_weight * int(vdisk['PDisk']['Whiteboard']['EnforcedDynamicSlotSize']) + + def check_pdisk(self, pdisk, expected_num_active_slots=None, expected_slot_size_in_units=None): + if expected_num_active_slots is not None: + assert pdisk['NumActiveSlots'] == expected_num_active_slots + if expected_slot_size_in_units is not None: + assert pdisk['SlotSizeInUnits'] == expected_slot_size_in_units + + def test_change_group_size_in_units(self): + self.change_group_size_in_units(new_size=2, group_id=self.groups[0].GroupId) + + def wait_whiteboard_updated(): + groups = self.get_storage_groups() + logger.info(json.dumps(groups, indent=2)) + self.check_group(groups[0], expected_vdisk_weight=2, expected_num_active_slots=3) + self.check_group(groups[1], expected_vdisk_weight=1, expected_num_active_slots=3) + self.retriable(wait_whiteboard_updated) + + pdisk_info = self.get_pdisk_info() + logger.info(json.dumps(pdisk_info, indent=2)) + + self.check_pdisk(pdisk_info['Whiteboard']['PDisk'], expected_num_active_slots=3) + self.check_pdisk(pdisk_info['BSC']['PDisk'], expected_num_active_slots=3, expected_slot_size_in_units=0) + + def test_change_pdisk_slot_size_in_units(self): + self.change_pdisk_slot_size_in_units(slot_size_in_units=2) + self.change_group_size_in_units(new_size=4, group_id=self.groups[1].GroupId) + + def wait_whiteboard_updated(): + groups = self.get_storage_groups() + logger.info(json.dumps(groups, indent=2)) + self.check_group(groups[0], expected_vdisk_weight=1, expected_num_active_slots=3) + self.check_group(groups[1], expected_vdisk_weight=2, expected_num_active_slots=3) + self.retriable(wait_whiteboard_updated) + + pdisk_info = self.get_pdisk_info() + logger.info(json.dumps(pdisk_info, indent=2)) + + self.check_pdisk(pdisk_info['Whiteboard']['PDisk'], expected_num_active_slots=3) + self.check_pdisk(pdisk_info['BSC']['PDisk'], expected_num_active_slots=3, expected_slot_size_in_units=2) diff --git a/ydb/tests/functional/blobstorage/ya.make b/ydb/tests/functional/blobstorage/ya.make index 00218fbe13e9..0195f1fe4bcf 100644 --- a/ydb/tests/functional/blobstorage/ya.make +++ b/ydb/tests/functional/blobstorage/ya.make @@ -3,6 +3,7 @@ PY3TEST() INCLUDE(${ARCADIA_ROOT}/ydb/tests/ydbd_dep.inc) TEST_SRCS( test_pdisk_format_info.py + test_pdisk_slot_size_in_units.py test_replication.py test_self_heal.py test_tablet_channel_migration.py diff --git a/ydb/tests/library/clients/kikimr_client.py b/ydb/tests/library/clients/kikimr_client.py index 9cdc7922067d..52be138fdb6d 100644 --- a/ydb/tests/library/clients/kikimr_client.py +++ b/ydb/tests/library/clients/kikimr_client.py @@ -326,6 +326,22 @@ def define_host_configs(self, host_configs, domain=1): if not status.Success: raise RuntimeError(f'define_host_config has failed status[{i}]: {status}') + def read_storage_pools(self, domain=1): + request = msgbus.TBlobStorageConfigRequest() + request.Domain = domain + cmd = request.Request.Command.add().ReadStoragePool + cmd.BoxId = 0xFFFFFFFFFFFFFFFF + + response = self.send(request, 'BlobStorageConfig').BlobStorageConfigResponse + if not response.Success: + raise RuntimeError(f'read_storage_pools request failed: {response.ErrorDescription}') + + status = response.Status[0] + if not status.Success: + raise RuntimeError(f'read_storage_pools has failed status: {status.ErrorDescription}') + + return status.StoragePool + def pdisk_set_all_active(self, pdisk_path=None, domain=1): """Equivalent to `dstool pdisk set --status=ACTIVE --pdisk-ids `""" base_config = self.query_base_config(domain=domain) @@ -367,6 +383,5 @@ def query_base_config(self, domain=1): return status - def __del__(self): self.close() diff --git a/ydb/tests/library/clients/kikimr_http_client.py b/ydb/tests/library/clients/kikimr_http_client.py index 1fed437016f4..2112c7789453 100644 --- a/ydb/tests/library/clients/kikimr_http_client.py +++ b/ydb/tests/library/clients/kikimr_http_client.py @@ -177,3 +177,13 @@ def nodes_info(self): return self.__http_get_and_parse_json( "/json/nodes", timeout=self.__timeout ) + + def storage_groups(self, **kwargs): + return self.__http_get_and_parse_json( + "/storage/groups", timeout=self.__timeout, **kwargs + ) + + def pdisk_info_detailed(self, node_id, pdisk_id): + return self.__http_get_and_parse_json( + "/pdisk/info", node_id=str(node_id), pdisk_id=str(pdisk_id), timeout=self.__timeout + ) diff --git a/ydb/tests/library/harness/kikimr_config.py b/ydb/tests/library/harness/kikimr_config.py index 2a80f4339cbb..f139e663020d 100644 --- a/ydb/tests/library/harness/kikimr_config.py +++ b/ydb/tests/library/harness/kikimr_config.py @@ -127,8 +127,10 @@ def __init__( domain_name='Root', suppress_version_check=True, static_pdisk_size=PDISK_SIZE, + static_pdisk_config=None, dynamic_pdisk_size=PDISK_SIZE, dynamic_pdisks=[], + dynamic_pdisks_config=None, dynamic_storage_pools=[dict(name="dynamic_storage_pool:1", kind="hdd", pdisk_user_kind=0)], state_storage_rings=None, n_to_select=None, @@ -212,6 +214,7 @@ def __init__( self.explicit_hosts_and_host_configs = True self._pdisk_store_path = pdisk_store_path self.static_pdisk_size = static_pdisk_size + self.static_pdisk_config = static_pdisk_config self.app_config = config_pb2.TAppConfig() self.port_allocator = KikimrPortManagerPortAllocator() if port_allocator is None else port_allocator erasure = Erasure.NONE if erasure is None else erasure @@ -257,6 +260,7 @@ def __init__( self.dynamic_pdisk_size = dynamic_pdisk_size self.dynamic_storage_pools = dynamic_storage_pools + self.dynamic_pdisks_config = dynamic_pdisks_config self.__dynamic_pdisks = dynamic_pdisks @@ -810,12 +814,19 @@ def _add_state_storage_config(self): for ring in self.state_storage_rings: self.yaml_config["domains_config"]["state_storage"][0]["ring"]["ring"].append({"node" : ring if isinstance(ring, list) else [ring], "use_ring_specific_node_selection" : True}) - def _add_pdisk_to_static_group(self, pdisk_id, path, node_id, pdisk_category, ring): + def _add_pdisk_to_static_group(self, pdisk_id, path, node_id, pdisk_category, ring, pdisk_config=None): domain_id = len( self.yaml_config['blob_storage_config']["service_set"]["groups"][0]["rings"][ring]["fail_domains"]) - self.yaml_config['blob_storage_config']["service_set"]["pdisks"].append( - {"node_id": node_id, "pdisk_id": pdisk_id, "path": path, "pdisk_guid": pdisk_id, - "pdisk_category": pdisk_category}) + pdisk_entry = { + "node_id": node_id, + "pdisk_id": pdisk_id, + "path": path, + "pdisk_guid": pdisk_id, + "pdisk_category": pdisk_category + } + if pdisk_config: + pdisk_entry["pdisk_config"] = pdisk_config + self.yaml_config['blob_storage_config']["service_set"]["pdisks"].append(pdisk_entry) self.yaml_config['blob_storage_config']["service_set"]["vdisks"].append( { "vdisk_id": {"group_id": 0, "group_generation": 1, "ring": ring, "domain": domain_id, "vdisk": 0}, @@ -838,6 +849,9 @@ def _initialize_pdisks_info(self): 'disk_size', self.dynamic_pdisk_size) pdisk_user_kind = 0 if pdisk_id <= 1 else self.__dynamic_pdisks[pdisk_id - 2].get('user_kind', 0) + # Get pdisk config based on whether it's static or dynamic + pdisk_config = self.static_pdisk_config if pdisk_id <= 1 else self.dynamic_pdisks_config + if self.__use_in_memory_pdisks: pdisk_size_gb = disk_size / (1024 * 1024 * 1024) pdisk_path = "SectorMap:%d:%d" % (pdisk_id, pdisk_size_gb) @@ -848,8 +862,17 @@ def _initialize_pdisks_info(self): dir=self._pdisk_store_path) pdisk_path = tmp_file.name - self._pdisks_info.append({'pdisk_path': pdisk_path, 'node_id': node_id, 'disk_size': disk_size, - 'pdisk_user_kind': pdisk_user_kind}) + pdisk_info = { + 'pdisk_path': pdisk_path, + 'node_id': node_id, + 'disk_size': disk_size, + 'pdisk_user_kind': pdisk_user_kind, + 'pdisk_id': pdisk_id + } + if pdisk_config: + pdisk_info['pdisk_config'] = pdisk_config + + self._pdisks_info.append(pdisk_info) if not self.use_self_management and pdisk_id == 1 and node_id <= self.static_erasure.min_fail_domains * self._rings_count: self._add_pdisk_to_static_group( pdisk_id, @@ -857,6 +880,7 @@ def _initialize_pdisks_info(self): node_id, pdisk_user_kind, datacenter_id - 1, + pdisk_config=pdisk_config, ) def _add_host_config_and_hosts(self): diff --git a/ydb/tests/library/harness/kikimr_runner.py b/ydb/tests/library/harness/kikimr_runner.py index 4ff16d7b95e7..d42118005fad 100644 --- a/ydb/tests/library/harness/kikimr_runner.py +++ b/ydb/tests/library/harness/kikimr_runner.py @@ -808,6 +808,12 @@ def __add_bs_box(self): drive_proto.Kind = drive['pdisk_user_kind'] drive_proto.Type = drive.get('pdisk_type', 0) + for key, value in drive.get('pdisk_config', {}).items(): + if key == 'expected_slot_count': + drive_proto.PDiskConfig.ExpectedSlotCount = value + else: + raise KeyError(f"unknown pdisk_config option {key}") + cmd = request.Command.add() cmd.DefineBox.BoxId = 1 for node_id, node in self.nodes.items(): From cf325a227c7b1ece512eb082a9d35f725fe31e0e Mon Sep 17 00:00:00 2001 From: Yaroslav Dynnikov Date: Sat, 15 Nov 2025 00:11:23 +0300 Subject: [PATCH 3/3] Fix segfault in ChangeGroupSizeInUnits during VDisk LocalRecovery --- .../nodewarden/blobstorage_node_warden_ut.cpp | 76 +++++++++++++++++++ .../vdisk/skeleton/blobstorage_skeleton.cpp | 14 +++- 2 files changed, 89 insertions(+), 1 deletion(-) diff --git a/ydb/core/blobstorage/nodewarden/blobstorage_node_warden_ut.cpp b/ydb/core/blobstorage/nodewarden/blobstorage_node_warden_ut.cpp index eae2b000e4db..e2e2497fa5a9 100644 --- a/ydb/core/blobstorage/nodewarden/blobstorage_node_warden_ut.cpp +++ b/ydb/core/blobstorage/nodewarden/blobstorage_node_warden_ut.cpp @@ -7,10 +7,13 @@ #include #include #include +#include +#include #include #include #include +#include #include #include @@ -1130,6 +1133,79 @@ Y_UNIT_TEST_SUITE(TBlobStorageWardenTest) { CheckInferredPDiskSettings(runtime, fakeWhiteboard, fakeNodeWarden, pdiskId, 12, 2u); } + + void ChangeGroupSizeInUnits(TTestBasicRuntime& runtime, TString poolName, ui32 groupId, ui32 groupSizeInUnits) { + TActorId edge = runtime.AllocateEdgeActor(); + + auto storagePool = DescribeStoragePool(runtime, poolName); + auto request = std::make_unique(); + auto& cmd = *request->Record.MutableRequest()->AddCommand()->MutableChangeGroupSizeInUnits(); + cmd.SetBoxId(storagePool.GetBoxId()); + cmd.SetItemConfigGeneration(storagePool.GetItemConfigGeneration()); + cmd.SetStoragePoolId(storagePool.GetStoragePoolId()); + cmd.AddGroupId(groupId); + cmd.SetSizeInUnits(groupSizeInUnits); + + NTabletPipe::TClientConfig pipeConfig; + pipeConfig.RetryPolicy = NTabletPipe::TClientRetryPolicy::WithRetries(); + runtime.SendToPipe(MakeBSControllerID(), edge, request.release(), 0, pipeConfig); + + auto reply = runtime.GrabEdgeEventRethrow(edge); + VERBOSE_COUT("TEvControllerConfigResponse# " << reply->ToString()); + UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetResponse().GetSuccess(), true); + } + + void CheckVDiskStateUpdate(TTestBasicRuntime& runtime, TActorId fakeWhiteboard, ui32 groupId, + ui32 expectedGroupGeneration, ui32 expectedGroupSizeInUnits, + TDuration simTimeout = TDuration::Seconds(10)) { + TInstant deadline = runtime.GetCurrentTime() + simTimeout; + while (true) { + UNIT_ASSERT_LT(runtime.GetCurrentTime(), deadline); + + const auto ev = runtime.GrabEdgeEventRethrow(fakeWhiteboard, deadline - runtime.GetCurrentTime()); + VERBOSE_COUT(" Got TEvVDiskStateUpdate# " << ev->ToString()); + + NKikimrWhiteboard::TVDiskStateInfo vdiskInfo = ev->Get()->Record; + if (vdiskInfo.GetVDiskId().GetGroupID() != groupId || !vdiskInfo.HasGroupSizeInUnits()) { + continue; + } + + UNIT_ASSERT_VALUES_EQUAL(vdiskInfo.GetVDiskId().GetGroupGeneration(), expectedGroupGeneration); + UNIT_ASSERT_VALUES_EQUAL(vdiskInfo.GetGroupSizeInUnits(), expectedGroupSizeInUnits); + break; + } + } + + CUSTOM_UNIT_TEST(TestEvVGenerationChangeRace) { + TTestBasicRuntime runtime(1, false); + Setup(runtime, "", nullptr); + runtime.SetLogPriority(NKikimrServices::BS_PROXY, NLog::PRI_ERROR); + runtime.SetLogPriority(NKikimrServices::BS_PROXY_PUT, NLog::PRI_ERROR); + runtime.SetLogPriority(NKikimrServices::BS_PROXY_BLOCK, NLog::PRI_ERROR); + runtime.SetLogPriority(NKikimrServices::BS_SKELETON, NLog::PRI_INFO); + runtime.SetLogPriority(NKikimrServices::BS_LOCALRECOVERY, NLog::PRI_INFO); + runtime.SetLogPriority(NKikimrServices::BS_NODE, NLog::PRI_INFO); + runtime.SetLogPriority(NKikimrServices::BS_CONTROLLER, NLog::PRI_INFO); + + const ui32 nodeId = runtime.GetNodeId(0); + TActorId fakeWhiteboard = runtime.AllocateEdgeActor(); + runtime.RegisterService(NNodeWhiteboard::MakeNodeWhiteboardServiceId(nodeId), fakeWhiteboard); + + VERBOSE_COUT(" Starting test"); + + TBlockEvents block(runtime); + + const TString poolName = "testEvVGenerationChangeRace"; + CreateStoragePool(runtime, poolName, "pool-kind-1"); + ui32 groupId = GetGroupFromPool(runtime, poolName); + + CheckVDiskStateUpdate(runtime, fakeWhiteboard, groupId, 1, 0u); + ChangeGroupSizeInUnits(runtime, poolName, groupId, 2u); + CheckVDiskStateUpdate(runtime, fakeWhiteboard, groupId, 1, 0u); + block.Stop().Unblock(); + CheckVDiskStateUpdate(runtime, fakeWhiteboard, groupId, 2, 2u); + } + } } // namespace NBlobStorageNodeWardenTest diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp index 117b8cf90eef..5ad9288c634e 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp +++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp @@ -1931,6 +1931,17 @@ namespace NKikimr { ApplyHugeBlobSize(Config->MinHugeBlobInBytes); Y_VERIFY_S(MinHugeBlobInBytes, VCtx->VDiskLogPrefix); + if (Config->GroupSizeInUnits != GInfo->GroupSizeInUnits) { + Config->GroupSizeInUnits = GInfo->GroupSizeInUnits; + Y_VERIFY(PDiskCtx); + Y_VERIFY(PDiskCtx->Dsk); + ctx.Send(PDiskCtx->PDiskId, + new NPDisk::TEvYardResize( + PDiskCtx->Dsk->Owner, + PDiskCtx->Dsk->OwnerRound, + Config->GroupSizeInUnits)); + } + // handle special case when donor disk starts and finds out that it has been wiped out if (ev->Get()->LsnMngr->GetOriginallyRecoveredLsn() == 0 && Config->BaseInfo.DonorMode) { // send drop donor cmd to NodeWarden @@ -2448,10 +2459,11 @@ namespace NKikimr { GInfo = msg->NewInfo; SelfVDiskId = msg->NewVDiskId; - if (Config->GroupSizeInUnits != GInfo->GroupSizeInUnits) { + if (PDiskCtx && Config->GroupSizeInUnits != GInfo->GroupSizeInUnits) { Config->GroupSizeInUnits = GInfo->GroupSizeInUnits; UpdateWhiteboard(ctx); + Y_VERIFY(PDiskCtx->Dsk); ctx.Send(PDiskCtx->PDiskId, new NPDisk::TEvYardResize( PDiskCtx->Dsk->Owner,