From b62ca859c8806109cb1e6874588c393cab3f658f Mon Sep 17 00:00:00 2001 From: SCA075 <82227818+sca075@users.noreply.github.com> Date: Thu, 2 Oct 2025 18:18:54 +0200 Subject: [PATCH 1/6] change local _LOGGER to LOGGER and shared.py optimized parsed zone clean tested for rand. Signed-off-by: Sandro Cantarella --- .../config/rand256_parser.py | 69 ++++++++++++++++++- SCR/valetudo_map_parser/config/shared.py | 17 +++-- SCR/valetudo_map_parser/config/types.py | 10 ++- SCR/valetudo_map_parser/config/utils.py | 32 +++++---- SCR/valetudo_map_parser/rand256_handler.py | 28 ++++---- SCR/valetudo_map_parser/reimg_draw.py | 31 ++++----- pyproject.toml | 2 +- 7 files changed, 129 insertions(+), 60 deletions(-) diff --git a/SCR/valetudo_map_parser/config/rand256_parser.py b/SCR/valetudo_map_parser/config/rand256_parser.py index b328e29..65b6a98 100644 --- a/SCR/valetudo_map_parser/config/rand256_parser.py +++ b/SCR/valetudo_map_parser/config/rand256_parser.py @@ -67,6 +67,52 @@ def _get_int32_signed(data: bytes, address: int) -> int: value = RRMapParser._get_int32(data, address) return value if value < 0x80000000 else value - 0x100000000 + @staticmethod + def _parse_area(header: bytes, data: bytes) -> list: + area_pairs = RRMapParser._get_int16(header, 0x08) + areas = [] + for area_start in range(0, area_pairs * 16, 16): + x0 = RRMapParser._get_int16(data, area_start + 0) + y0 = RRMapParser._get_int16(data, area_start + 2) + x1 = RRMapParser._get_int16(data, area_start + 4) + y1 = RRMapParser._get_int16(data, area_start + 6) + x2 = RRMapParser._get_int16(data, area_start + 8) + y2 = RRMapParser._get_int16(data, area_start + 10) + x3 = RRMapParser._get_int16(data, area_start + 12) + y3 = RRMapParser._get_int16(data, area_start + 14) + areas.append( + [ + x0, + RRMapParser.Tools.DIMENSION_MM - y0, + x1, + RRMapParser.Tools.DIMENSION_MM - y1, + x2, + RRMapParser.Tools.DIMENSION_MM - y2, + x3, + RRMapParser.Tools.DIMENSION_MM - y3, + ] + ) + return areas + + @staticmethod + def _parse_zones(data: bytes, header: bytes) -> list: + zone_pairs = RRMapParser._get_int16(header, 0x08) + zones = [] + for zone_start in range(0, zone_pairs * 8, 8): + x0 = RRMapParser._get_int16(data, zone_start + 0) + y0 = RRMapParser._get_int16(data, zone_start + 2) + x1 = RRMapParser._get_int16(data, zone_start + 4) + y1 = RRMapParser._get_int16(data, zone_start + 6) + zones.append( + [ + x0, + RRMapParser.Tools.DIMENSION_MM - y0, + x1, + RRMapParser.Tools.DIMENSION_MM - y1, + ] + ) + return zones + @staticmethod def _parse_object_position(block_data_length: int, data: bytes) -> Dict[str, Any]: """Parse object position using Xiaomi method.""" @@ -159,6 +205,12 @@ def parse_blocks(self, raw: bytes, pixels: bool = True) -> Dict[int, Any]: blocks[block_type] = self._parse_path_block( raw, block_start_position, block_data_length ) + elif block_type == self.Types.CURRENTLY_CLEANED_ZONES.value: + blocks[block_type] = {"zones": self._parse_zones(data, header)} + elif block_type == self.Types.FORBIDDEN_ZONES.value: + blocks[block_type] = { + "forbidden_zones": self._parse_area(header, data) + } elif block_type == self.Types.GOTO_TARGET.value: blocks[block_type] = {"position": self._parse_goto_target(data)} elif block_type == self.Types.IMAGE.value: @@ -365,8 +417,21 @@ def parse_rrm_data( ] # Add missing fields to match expected JSON format - parsed_map_data["forbidden_zones"] = [] - parsed_map_data["virtual_walls"] = [] + parsed_map_data["currently_cleaned_zones"] = ( + blocks[self.Types.CURRENTLY_CLEANED_ZONES.value]["zones"] + if self.Types.CURRENTLY_CLEANED_ZONES.value in blocks + else [] + ) + parsed_map_data["forbidden_zones"] = ( + blocks[self.Types.FORBIDDEN_ZONES.value]["forbidden_zones"] + if self.Types.FORBIDDEN_ZONES.value in blocks + else [] + ) + parsed_map_data["virtual_walls"] = ( + blocks[self.Types.VIRTUAL_WALLS.value]["virtual_walls"] + if self.Types.VIRTUAL_WALLS.value in blocks + else [] + ) return parsed_map_data diff --git a/SCR/valetudo_map_parser/config/shared.py b/SCR/valetudo_map_parser/config/shared.py index 3a29d77..8ecd4ae 100755 --- a/SCR/valetudo_map_parser/config/shared.py +++ b/SCR/valetudo_map_parser/config/shared.py @@ -12,12 +12,13 @@ from .types import ( ATTR_CALIBRATION_POINTS, ATTR_CAMERA_MODE, + ATTR_CONTENT_TYPE, ATTR_MARGINS, ATTR_OBSTACLES, ATTR_POINTS, ATTR_ROOMS, ATTR_ROTATE, - ATTR_SNAPSHOT, + ATTR_IMAGE_LAST_UPDATED, ATTR_VACUUM_BATTERY, ATTR_VACUUM_CHARGING, ATTR_VACUUM_JSON_ID, @@ -179,12 +180,14 @@ async def batch_get(self, *args): def generate_attributes(self) -> dict: """Generate and return the shared attribute's dictionary.""" attrs = { + ATTR_IMAGE_LAST_UPDATED: self.image_last_updated, + ATTR_CONTENT_TYPE: self.image_format, + ATTR_VACUUM_JSON_ID: self.vac_json_id, ATTR_CAMERA_MODE: self.camera_mode, + ATTR_VACUUM_STATUS: self.vacuum_state, ATTR_VACUUM_BATTERY: f"{self.vacuum_battery}%", ATTR_VACUUM_CHARGING: self.vacuum_bat_charged(), ATTR_VACUUM_POSITION: self.current_room, - ATTR_VACUUM_STATUS: self.vacuum_state, - ATTR_VACUUM_JSON_ID: self.vac_json_id, ATTR_CALIBRATION_POINTS: self.attr_calibration_points, } if self.obstacles_pos and self.vacuum_ips: @@ -193,8 +196,6 @@ def generate_attributes(self) -> dict: ) attrs[ATTR_OBSTACLES] = self.obstacles_data - attrs[ATTR_SNAPSHOT] = self.snapshot_take if self.enable_snapshots else False - shared_attrs = { ATTR_ROOMS: self.map_rooms, ATTR_ZONES: self.map_pred_zones, @@ -211,10 +212,8 @@ def to_dict(self) -> dict: return { "image": { "binary": self.binary_image, - "pil_image_size": self.new_image.size, - "size": self.new_image.size if self.new_image else None, - "format": self.image_format, - "updated": self.image_last_updated, + "pil_image": self.new_image, + "size": self.new_image.size if self.new_image else (10, 10), }, "attributes": self.generate_attributes(), } diff --git a/SCR/valetudo_map_parser/config/types.py b/SCR/valetudo_map_parser/config/types.py index 8624a56..6be8f0c 100644 --- a/SCR/valetudo_map_parser/config/types.py +++ b/SCR/valetudo_map_parser/config/types.py @@ -18,23 +18,29 @@ LOGGER = logging.getLogger(__package__) + class Spot(TypedDict): name: str coordinates: List[int] # [x, y] + class Zone(TypedDict): name: str coordinates: List[List[int]] # [[x1, y1, x2, y2, repeats], ...] + class Room(TypedDict): name: str id: int + +# list[dict[str, str | list[int]]] | list[dict[str, str | list[list[int]]]] | list[dict[str, str | int]] | int]' class Destinations(TypedDict, total=False): spots: NotRequired[Optional[List[Spot]]] zones: NotRequired[Optional[List[Zone]]] rooms: NotRequired[Optional[List[Room]]] - updated: NotRequired[Optional[int]] + updated: NotRequired[Optional[float]] + class RoomProperty(TypedDict): number: int @@ -227,9 +233,11 @@ async def async_set_vacuum_json(self, vacuum_id: str, json_data: Any) -> None: Point = Tuple[int, int] CAMERA_STORAGE = "valetudo_camera" +ATTR_IMAGE_LAST_UPDATED = "image_last_updated" ATTR_ROTATE = "rotate_image" ATTR_CROP = "crop_image" ATTR_MARGINS = "margins" +ATTR_CONTENT_TYPE = "content_type" CONF_OFFSET_TOP = "offset_top" CONF_OFFSET_BOTTOM = "offset_bottom" CONF_OFFSET_LEFT = "offset_left" diff --git a/SCR/valetudo_map_parser/config/utils.py b/SCR/valetudo_map_parser/config/utils.py index 456b59e..21a2473 100644 --- a/SCR/valetudo_map_parser/config/utils.py +++ b/SCR/valetudo_map_parser/config/utils.py @@ -23,7 +23,7 @@ NumpyArray, PilPNG, RobotPosition, - Destinations + Destinations, ) from ..map_data import HyperMapData from .async_utils import AsyncNumPy @@ -197,26 +197,27 @@ async def _async_update_shared_data(self, destinations: Destinations | None = No """Update the shared data with the latest information.""" if hasattr(self, "get_rooms_attributes") and ( - self.shared.map_rooms is None and destinations is not None + self.shared.map_rooms is None and destinations is not None ): - ( - self.shared.map_rooms, - self.shared.map_pred_zones, - self.shared.map_pred_points, - ) = await self.get_rooms_attributes(destinations) + (self.shared.map_rooms,) = await self.get_rooms_attributes(destinations) if self.shared.map_rooms: LOGGER.debug("%s: Rand256 attributes rooms updated", self.file_name) if hasattr(self, "async_get_rooms_attributes") and ( - self.shared.map_rooms is None + self.shared.map_rooms is None ): if self.shared.map_rooms is None: self.shared.map_rooms = await self.async_get_rooms_attributes() if self.shared.map_rooms: LOGGER.debug("%s: Hyper attributes rooms updated", self.file_name) - if hasattr(self, "get_calibration_data") and self.shared.attr_calibration_points is None: - self.shared.attr_calibration_points = self.get_calibration_data(self.shared.image_rotate) + if ( + hasattr(self, "get_calibration_data") + and self.shared.attr_calibration_points is None + ): + self.shared.attr_calibration_points = self.get_calibration_data( + self.shared.image_rotate + ) if not self.shared.image_size: self.shared.image_size = self.get_img_size() @@ -228,14 +229,19 @@ async def _async_update_shared_data(self, destinations: Destinations | None = No self.shared.current_room = self.get_robot_position() - def prepare_resize_params(self, pil_img: PilPNG, rand: bool=False) -> ResizeParams: + def prepare_resize_params( + self, pil_img: PilPNG, rand: bool = False + ) -> ResizeParams: """Prepare resize parameters for image resizing.""" if self.shared.image_rotate in [0, 180]: width, height = pil_img.size else: height, width = pil_img.size - LOGGER.debug("Shared PIL image size: %s x %s", self.shared.image_ref_width, - self.shared.image_ref_height) + LOGGER.debug( + "Shared PIL image size: %s x %s", + self.shared.image_ref_width, + self.shared.image_ref_height, + ) return ResizeParams( pil_img=pil_img, width=width, diff --git a/SCR/valetudo_map_parser/rand256_handler.py b/SCR/valetudo_map_parser/rand256_handler.py index 7a342ca..0f9a157 100644 --- a/SCR/valetudo_map_parser/rand256_handler.py +++ b/SCR/valetudo_map_parser/rand256_handler.py @@ -7,7 +7,6 @@ from __future__ import annotations -import logging import uuid from typing import Any @@ -15,7 +14,6 @@ from .config.async_utils import AsyncPIL -# from .config.auto_crop import AutoCrop from mvcrender.autocrop import AutoCrop from .config.drawable_elements import DrawableElement from .config.types import ( @@ -28,6 +26,7 @@ RobotPosition, RoomsProperties, RoomStore, + LOGGER, ) from .config.utils import ( BaseHandler, @@ -39,9 +38,6 @@ from .rooms_handler import RandRoomsHandler -_LOGGER = logging.getLogger(__name__) - - # noinspection PyTypeChecker class ReImageHandler(BaseHandler, AutoCrop): """ @@ -112,17 +108,17 @@ async def extract_room_properties( self.shared.map_rooms = room_ids # get the zones and points data - zone_properties = await self.async_zone_propriety(zones_data) + self.shared.map_pred_zones = await self.async_zone_propriety(zones_data) # get the points data - point_properties = await self.async_points_propriety(points_data) + self.shared.map_pred_points = await self.async_points_propriety(points_data) - if not (room_properties or zone_properties): + if not (room_properties or self.shared.map_pred_zones): self.rooms_pos = None rooms = RoomStore(self.file_name, room_properties) - return room_properties, zone_properties, point_properties + return room_properties except (RuntimeError, ValueError) as e: - _LOGGER.warning( + LOGGER.warning( "No rooms Data or Error in extract_room_properties: %s", e, exc_info=True, @@ -146,12 +142,12 @@ async def get_image_from_rrm( try: if (m_json is not None) and (not isinstance(m_json, tuple)): - _LOGGER.info("%s: Composing the image for the camera.", self.file_name) + LOGGER.info("%s: Composing the image for the camera.", self.file_name) self.json_data = m_json size_x, size_y = self.data.get_rrm_image_size(m_json) self.img_size = DEFAULT_IMAGE_SIZE self.json_id = str(uuid.uuid4()) # image id - _LOGGER.info("Vacuum Data ID: %s", self.json_id) + LOGGER.info("Vacuum Data ID: %s", self.json_id) ( img_np_array, @@ -178,7 +174,7 @@ async def get_image_from_rrm( return await self._finalize_image(pil_img) except (RuntimeError, RuntimeWarning) as e: - _LOGGER.warning( + LOGGER.warning( "%s: Runtime Error %s during image creation.", self.file_name, str(e), @@ -214,7 +210,7 @@ async def _setup_robot_and_image( colors["background"], DEFAULT_PIXEL_SIZE, ) - _LOGGER.info("%s: Completed base Layers", self.file_name) + LOGGER.info("%s: Completed base Layers", self.file_name) # Update element map for rooms if 0 < room_id <= 15: @@ -362,7 +358,7 @@ async def _draw_map_elements( async def _finalize_image(self, pil_img): if not self.shared.image_ref_width or not self.shared.image_ref_height: - _LOGGER.warning( + LOGGER.warning( "Image finalization failed: Invalid image dimensions. Returning original image." ) return pil_img @@ -515,7 +511,7 @@ def get_calibration_data(self, rotation_angle: int = 0) -> Any: """Return the map calibration data.""" if not self.calibration_data and self.crop_img_size: self.calibration_data = [] - _LOGGER.info( + LOGGER.info( "%s: Getting Calibrations points %s", self.file_name, str(self.crop_area), diff --git a/SCR/valetudo_map_parser/reimg_draw.py b/SCR/valetudo_map_parser/reimg_draw.py index bc82dac..7ec6649 100644 --- a/SCR/valetudo_map_parser/reimg_draw.py +++ b/SCR/valetudo_map_parser/reimg_draw.py @@ -6,17 +6,12 @@ from __future__ import annotations -import logging - from .config.drawable import Drawable from .config.drawable_elements import DrawableElement -from .config.types import Color, JsonType, NumpyArray +from .config.types import Color, JsonType, NumpyArray, LOGGER from .map_data import ImageData, RandImageData -_LOGGER = logging.getLogger(__name__) - - class ImageDraw: """Class to handle the image creation.""" @@ -48,7 +43,7 @@ async def async_draw_go_to_flag( ) return np_array except KeyError as e: - _LOGGER.warning( + LOGGER.warning( "%s: Error in extraction of go-to target: %s", self.file_name, e, @@ -70,7 +65,7 @@ async def async_segment_data( ) except ValueError as e: self.img_h.segment_data = None - _LOGGER.info("%s: No segments data found: %s", self.file_name, e) + LOGGER.info("%s: No segments data found: %s", self.file_name, e) async def async_draw_base_layer( self, @@ -87,13 +82,13 @@ async def async_draw_base_layer( walls_data = self.data.get_rrm_walls(m_json) floor_data = self.data.get_rrm_floor(m_json) - _LOGGER.info("%s: Empty image with background color", self.file_name) + LOGGER.info("%s: Empty image with background color", self.file_name) img_np_array = await self.draw.create_empty_image( self.img_h.img_size["x"], self.img_h.img_size["y"], color_background ) room_id = 0 if self.img_h.frame_number == 0: - _LOGGER.info("%s: Overlapping Layers", self.file_name) + LOGGER.info("%s: Overlapping Layers", self.file_name) # checking if there are segments too (sorted pixels in the raw data). await self.async_segment_data(m_json, size_x, size_y, pos_top, pos_left) @@ -148,10 +143,10 @@ async def _draw_segments( room_id = 0 rooms_list = [color_wall] if not segment_data: - _LOGGER.info("%s: No segments data found.", self.file_name) + LOGGER.info("%s: No segments data found.", self.file_name) return room_id, img_np_array - _LOGGER.info("%s: Drawing segments.", self.file_name) + LOGGER.info("%s: Drawing segments.", self.file_name) for pixels in segment_data: room_color = self.img_h.shared.rooms_colors[room_id] rooms_list.append(room_color) @@ -211,7 +206,7 @@ async def async_draw_charger( self.data.get_rrm_charger_position(m_json) ) except KeyError as e: - _LOGGER.warning("%s: No charger position found: %s", self.file_name, e) + LOGGER.warning("%s: No charger position found: %s", self.file_name, e) else: if charger_pos: charger_pos_dictionary = { @@ -238,7 +233,7 @@ async def async_draw_zones( zone_clean = None if zone_clean: - _LOGGER.info("%s: Got zones.", self.file_name) + LOGGER.info("%s: Got zones.", self.file_name) return await self.draw.zones(np_array, zone_clean, color_zone_clean) return np_array @@ -252,7 +247,7 @@ async def async_draw_virtual_restrictions( virtual_walls = None if virtual_walls: - _LOGGER.info("%s: Got virtual walls.", self.file_name) + LOGGER.info("%s: Got virtual walls.", self.file_name) np_array = await self.draw.draw_virtual_walls( np_array, virtual_walls, color_no_go ) @@ -280,7 +275,7 @@ async def async_draw_path( self.data.rrm_valetudo_path_array(path_pixel["points"]), 2 ) except KeyError as e: - _LOGGER.warning( + LOGGER.warning( "%s: Error extracting paths data: %s", self.file_name, str(e) ) finally: @@ -297,7 +292,7 @@ async def async_get_entity_data(self, m_json: JsonType) -> dict or None: except (ValueError, KeyError): entity_dict = None else: - _LOGGER.info("%s: Got the points in the json.", self.file_name) + LOGGER.info("%s: Got the points in the json.", self.file_name) return entity_dict async def async_get_robot_position(self, m_json: JsonType) -> tuple | None: @@ -310,7 +305,7 @@ async def async_get_robot_position(self, m_json: JsonType) -> tuple | None: robot_pos = self.data.rrm_coordinates_to_valetudo(robot_pos_data) angle = self.data.get_rrm_robot_angle(m_json) except (ValueError, KeyError): - _LOGGER.warning("%s No robot position found.", self.file_name) + LOGGER.warning("%s No robot position found.", self.file_name) return None, None, None finally: robot_position_angle = round(angle[0], 0) diff --git a/pyproject.toml b/pyproject.toml index b9a2747..9d1359f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "valetudo-map-parser" -version = "0.1.10rc6" +version = "0.1.10rc7" description = "A Python library to parse Valetudo map data returning a PIL Image object." authors = ["Sandro Cantarella "] license = "Apache-2.0" From 537417cdf3075535ac70500065f6ee287eab0d95 Mon Sep 17 00:00:00 2001 From: SCA075 <82227818+sca075@users.noreply.github.com> Date: Fri, 3 Oct 2025 14:23:25 +0200 Subject: [PATCH 2/6] typing and other parser touches. Signed-off-by: Sandro Cantarella --- .../config/rand256_parser.py | 119 ++++++++++-------- SCR/valetudo_map_parser/hypfer_handler.py | 1 - SCR/valetudo_map_parser/map_data.py | 5 +- 3 files changed, 71 insertions(+), 54 deletions(-) diff --git a/SCR/valetudo_map_parser/config/rand256_parser.py b/SCR/valetudo_map_parser/config/rand256_parser.py index 65b6a98..b6b618d 100644 --- a/SCR/valetudo_map_parser/config/rand256_parser.py +++ b/SCR/valetudo_map_parser/config/rand256_parser.py @@ -24,6 +24,14 @@ class Types(Enum): VIRTUAL_WALLS = 10 CURRENTLY_CLEANED_BLOCKS = 11 FORBIDDEN_MOP_ZONES = 12 + OBSTACLES = 13 + IGNORED_OBSTACLES = 14 + OBSTACLES_WITH_PHOTO = 15 + IGNORED_OBSTACLES_WITH_PHOTO = 16 + CARPET_MAP = 17 + MOP_PATH = 18 + NO_CARPET_AREAS = 19 + DIGEST = 1024 class Tools: """Tools for coordinate transformations.""" @@ -33,6 +41,7 @@ class Tools: def __init__(self): """Initialize the parser.""" + self.is_valid = False self.map_data: Dict[str, Any] = {} # Xiaomi/Roborock style byte extraction methods @@ -67,6 +76,15 @@ def _get_int32_signed(data: bytes, address: int) -> int: value = RRMapParser._get_int32(data, address) return value if value < 0x80000000 else value - 0x100000000 + @staticmethod + def _parse_carpet_map(data: bytes) -> set[int]: + carpet_map = set() + + for i, v in enumerate(data): + if v: + carpet_map.add(i) + return carpet_map + @staticmethod def _parse_area(header: bytes, data: bytes) -> list: area_pairs = RRMapParser._get_int16(header, 0x08) @@ -128,6 +146,19 @@ def _parse_object_position(block_data_length: int, data: bytes) -> Dict[str, Any angle = raw_angle return {"position": [x, y], "angle": angle} + + @staticmethod + def _parse_walls(data: bytes, header: bytes) -> list: + wall_pairs = RRMapParser._get_int16(header, 0x08) + walls = [] + for wall_start in range(0, wall_pairs * 8, 8): + x0 = RRMapParser._get_int16(data, wall_start + 0) + y0 = RRMapParser._get_int16(data, wall_start + 2) + x1 = RRMapParser._get_int16(data, wall_start + 4) + y1 = RRMapParser._get_int16(data, wall_start + 6) + walls.append([x0, RRMapParser.Tools.DIMENSION_MM - y0, x1, RRMapParser.Tools.DIMENSION_MM - y1]) + return walls + @staticmethod def _parse_path_block(buf: bytes, offset: int, length: int) -> Dict[str, Any]: """Parse path block using EXACT same method as working parser.""" @@ -173,65 +204,45 @@ def parse(self, map_buf: bytes) -> Dict[str, Any]: return {} def parse_blocks(self, raw: bytes, pixels: bool = True) -> Dict[int, Any]: - """Parse all blocks using Xiaomi method.""" blocks = {} map_header_length = self._get_int16(raw, 0x02) block_start_position = map_header_length - while block_start_position < len(raw): try: - # Parse block header using Xiaomi method block_header_length = self._get_int16(raw, block_start_position + 0x02) header = self._get_bytes(raw, block_start_position, block_header_length) block_type = self._get_int16(header, 0x00) block_data_length = self._get_int32(header, 0x04) block_data_start = block_start_position + block_header_length data = self._get_bytes(raw, block_data_start, block_data_length) - - # Parse different block types - if block_type == self.Types.ROBOT_POSITION.value: - blocks[block_type] = self._parse_object_position( - block_data_length, data - ) - elif block_type == self.Types.CHARGER_LOCATION.value: - blocks[block_type] = self._parse_object_position( - block_data_length, data - ) - elif block_type == self.Types.PATH.value: - blocks[block_type] = self._parse_path_block( - raw, block_start_position, block_data_length - ) - elif block_type == self.Types.GOTO_PREDICTED_PATH.value: - blocks[block_type] = self._parse_path_block( - raw, block_start_position, block_data_length - ) - elif block_type == self.Types.CURRENTLY_CLEANED_ZONES.value: - blocks[block_type] = {"zones": self._parse_zones(data, header)} - elif block_type == self.Types.FORBIDDEN_ZONES.value: - blocks[block_type] = { - "forbidden_zones": self._parse_area(header, data) - } - elif block_type == self.Types.GOTO_TARGET.value: - blocks[block_type] = {"position": self._parse_goto_target(data)} - elif block_type == self.Types.IMAGE.value: - # Get header length for Gen1/Gen3 detection - header_length = self._get_int8(header, 2) - blocks[block_type] = self._parse_image_block( - raw, - block_start_position, - block_data_length, - header_length, - pixels, - ) - - # Move to next block using Xiaomi method - block_start_position = ( - block_start_position + block_data_length + self._get_int8(header, 2) - ) - + match block_type: + case self.Types.DIGEST.value: + self.is_valid = True + case self.Types.ROBOT_POSITION.value | self.Types.CHARGER_LOCATION.value: + blocks[block_type] = self._parse_object_position(block_data_length, data) + case self.Types.PATH.value | self.Types.GOTO_PREDICTED_PATH.value: + blocks[block_type] = self._parse_path_block(raw, block_start_position, block_data_length) + case self.Types.CURRENTLY_CLEANED_ZONES.value: + blocks[block_type] = {"zones": self._parse_zones(data, header)} + case self.Types.FORBIDDEN_ZONES.value: + blocks[block_type] = {"forbidden_zones": self._parse_area(header, data)} + case self.Types.FORBIDDEN_MOP_ZONES.value: + blocks[block_type] = {"forbidden_mop_zones": self._parse_area(header, data)} + case self.Types.GOTO_TARGET.value: + blocks[block_type] = {"position": self._parse_goto_target(data)} + case self.Types.VIRTUAL_WALLS.value: + blocks[block_type] = {"virtual_walls": self._parse_walls(data, header)} + case self.Types.CARPET_MAP.value: + data = RRMapParser._get_bytes(raw, block_data_start, block_data_length) + blocks[block_type] = {"carpet_map": self._parse_carpet_map(data)} + case self.Types.IMAGE.value: + header_length = self._get_int8(header, 2) + blocks[block_type] = self._parse_image_block( + raw, block_start_position, block_data_length, header_length, pixels) + + block_start_position = block_start_position + block_data_length + self._get_int8(header, 2) except (struct.error, IndexError): break - return blocks def _parse_image_block( @@ -427,11 +438,22 @@ def parse_rrm_data( if self.Types.FORBIDDEN_ZONES.value in blocks else [] ) + parsed_map_data["forbidden_mop_zones"] = ( + blocks[self.Types.FORBIDDEN_MOP_ZONES.value]["forbidden_mop_zones"] + if self.Types.FORBIDDEN_MOP_ZONES.value in blocks + else [] + ) parsed_map_data["virtual_walls"] = ( blocks[self.Types.VIRTUAL_WALLS.value]["virtual_walls"] if self.Types.VIRTUAL_WALLS.value in blocks else [] ) + parsed_map_data["carpet_areas"] = ( + blocks[self.Types.CARPET_MAP.value]["carpet_map"] + if self.Types.CARPET_MAP.value in blocks + else [] + ) + parsed_map_data["is_valid"] = self.is_valid return parsed_map_data @@ -453,8 +475,3 @@ def parse_data( except (struct.error, IndexError, ValueError): return None return self.map_data - - @staticmethod - def get_int32(data: bytes, address: int) -> int: - """Get a 32-bit integer from the data - kept for compatibility.""" - return struct.unpack_from(" Any: return None @staticmethod - def get_rrm_currently_cleaned_zones(json_data: JsonType) -> dict: + def get_rrm_currently_cleaned_zones(json_data: JsonType) -> list[dict[str, Any]]: """Get the currently cleaned zones from the json.""" re_zones = json_data.get("currently_cleaned_zones", []) formatted_zones = RandImageData._rrm_valetudo_format_zone(re_zones) return formatted_zones @staticmethod - def get_rrm_forbidden_zones(json_data: JsonType) -> dict: + def get_rrm_forbidden_zones(json_data: JsonType) -> list[dict[str, Any]]: """Get the forbidden zones from the json.""" re_zones = json_data.get("forbidden_zones", []) + re_zones.extend(json_data.get("forbidden_mop_zones", [])) formatted_zones = RandImageData._rrm_valetudo_format_zone(re_zones) return formatted_zones From 6dc6b5d472b81c9db242bd0ec38f8991d26a4173 Mon Sep 17 00:00:00 2001 From: SCA075 <82227818+sca075@users.noreply.github.com> Date: Fri, 3 Oct 2025 14:34:36 +0200 Subject: [PATCH 3/6] bump mvcrender and change the async to sync usage. Signed-off-by: Sandro Cantarella --- SCR/valetudo_map_parser/hypfer_handler.py | 3 +- .../hypfer_rooms_handler.py | 599 ------------------ SCR/valetudo_map_parser/rand256_handler.py | 2 +- pyproject.toml | 2 +- 4 files changed, 3 insertions(+), 603 deletions(-) delete mode 100644 SCR/valetudo_map_parser/hypfer_rooms_handler.py diff --git a/SCR/valetudo_map_parser/hypfer_handler.py b/SCR/valetudo_map_parser/hypfer_handler.py index e9aca09..4b62699 100644 --- a/SCR/valetudo_map_parser/hypfer_handler.py +++ b/SCR/valetudo_map_parser/hypfer_handler.py @@ -14,7 +14,6 @@ from .config.async_utils import AsyncPIL -# from .config.auto_crop import AutoCrop from mvcrender.autocrop import AutoCrop from .config.drawable_elements import DrawableElement from .config.shared import CameraShared @@ -361,7 +360,7 @@ async def async_get_image_from_json( self.zooming = self.imd.img_h.zooming # Resize the image - img_np_array = self.async_auto_trim_and_zoom_image( + img_np_array = self.auto_trim_and_zoom_image( img_np_array, colors["background"], int(self.shared.margins), diff --git a/SCR/valetudo_map_parser/hypfer_rooms_handler.py b/SCR/valetudo_map_parser/hypfer_rooms_handler.py deleted file mode 100644 index 91de957..0000000 --- a/SCR/valetudo_map_parser/hypfer_rooms_handler.py +++ /dev/null @@ -1,599 +0,0 @@ -""" -Hipfer Rooms Handler Module. -Handles room data extraction and processing for Valetudo Hipfer vacuum maps. -Provides async methods for room outline extraction and properties management. -Version: 0.1.9 -""" - -from __future__ import annotations - -from math import sqrt -from typing import Any, Dict, Optional, List, Tuple - -import numpy as np - -from .config.drawable_elements import DrawableElement, DrawingConfig -from .config.types import LOGGER, RoomsProperties, RoomStore - - -class HypferRoomsHandler: - """ - Handler for extracting and managing room data from Hipfer vacuum maps. - - This class provides methods to: - - Extract room outlines using the Ramer-Douglas-Peucker algorithm - - Process room properties from JSON data - - Generate room masks and extract contours - - All methods are async for better integration with the rest of the codebase. - """ - - def __init__(self, vacuum_id: str, drawing_config: Optional[DrawingConfig] = None): - """ - Initialize the HipferRoomsHandler. - - Args: - vacuum_id: Identifier for the vacuum - drawing_config: Configuration for which elements to draw (optional) - """ - self.vacuum_id = vacuum_id - self.drawing_config = drawing_config - self.current_json_data = None # Will store the current JSON data being processed - - @staticmethod - def sublist(data: list, chunk_size: int) -> list: - return [data[i : i + chunk_size] for i in range(0, len(data), chunk_size)] - - # Cache for RDP results - _rdp_cache = {} - - @staticmethod - def perpendicular_distance( - point: tuple[int, int], line_start: tuple[int, int], line_end: tuple[int, int] - ) -> float: - """Calculate the perpendicular distance from a point to a line. - Optimized for performance. - """ - # Fast path for point-to-point distance - if line_start == line_end: - dx = point[0] - line_start[0] - dy = point[1] - line_start[1] - return sqrt(dx*dx + dy*dy) - - x, y = point - x1, y1 = line_start - x2, y2 = line_end - - # Precompute differences for efficiency - dx = x2 - x1 - dy = y2 - y1 - - # Calculate the line length squared (avoid sqrt until needed) - line_length_sq = dx*dx + dy*dy - if line_length_sq == 0: - return 0 - - # Calculate the distance from the point to the line - # Using the formula: |cross_product| / |line_vector| - # This is more efficient than the original formula - cross_product = abs(dy * x - dx * y + x2 * y1 - y2 * x1) - return cross_product / sqrt(line_length_sq) - - async def rdp( - self, points: List[Tuple[int, int]], epsilon: float - ) -> List[Tuple[int, int]]: - """Ramer-Douglas-Peucker algorithm for simplifying a curve. - Optimized with caching and better performance. - """ - # Create a hashable key for caching - # Convert points to a tuple for hashing - points_tuple = tuple(points) - cache_key = (points_tuple, epsilon) - - # Check cache first - if cache_key in self._rdp_cache: - return self._rdp_cache[cache_key] - - # Base case - if len(points) <= 2: - return points - - # For very small point sets, process directly without recursion - if len(points) <= 5: - # Find the point with the maximum distance - dmax = 0 - index = 0 - for i in range(1, len(points) - 1): - d = self.perpendicular_distance(points[i], points[0], points[-1]) - if d > dmax: - index = i - dmax = d - - # If max distance is greater than epsilon, keep the point - if dmax > epsilon: - result = [points[0]] + [points[index]] + [points[-1]] - else: - result = [points[0], points[-1]] - - # Cache and return - self._rdp_cache[cache_key] = result - return result - - # For larger point sets, use numpy for faster distance calculation - if len(points) > 20: - # Convert to numpy arrays for vectorized operations - points_array = np.array(points) - start = points_array[0] - end = points_array[-1] - - # Calculate perpendicular distances in one vectorized operation - line_vector = end - start - line_length = np.linalg.norm(line_vector) - - if line_length == 0: - # If start and end are the same, use direct distance - distances = np.linalg.norm(points_array[1:-1] - start, axis=1) - else: - # Normalize line vector - line_vector = line_vector / line_length - # Calculate perpendicular distances using vector operations - vectors_to_points = points_array[1:-1] - start - # Project vectors onto line vector - projections = np.dot(vectors_to_points, line_vector) - # Calculate projected points on line - projected_points = start + np.outer(projections, line_vector) - # Calculate distances from points to their projections - distances = np.linalg.norm(points_array[1:-1] - projected_points, axis=1) - - # Find the point with maximum distance - if len(distances) > 0: - max_idx = np.argmax(distances) - dmax = distances[max_idx] - index = max_idx + 1 # +1 because we skipped the first point - else: - dmax = 0 - index = 0 - else: - # For medium-sized point sets, use the original algorithm - dmax = 0 - index = 0 - for i in range(1, len(points) - 1): - d = self.perpendicular_distance(points[i], points[0], points[-1]) - if d > dmax: - index = i - dmax = d - - # If max distance is greater than epsilon, recursively simplify - if dmax > epsilon: - # Recursive call - first_segment = await self.rdp(points[: index + 1], epsilon) - second_segment = await self.rdp(points[index:], epsilon) - - # Build the result list (avoiding duplicating the common point) - result = first_segment[:-1] + second_segment - else: - result = [points[0], points[-1]] - - # Limit cache size - if len(self._rdp_cache) > 100: # Keep only 100 most recent items - try: - self._rdp_cache.pop(next(iter(self._rdp_cache))) - except (StopIteration, KeyError): - pass - - # Cache the result - self._rdp_cache[cache_key] = result - return result - - # Cache for corner results - _corners_cache = {} - - async def async_get_corners( - self, mask: np.ndarray, epsilon_factor: float = 0.05 - ) -> List[Tuple[int, int]]: - """ - Get the corners of a room shape as a list of (x, y) tuples. - Uses contour detection and Douglas-Peucker algorithm to simplify the contour. - Optimized with caching and faster calculations. - - Args: - mask: Binary mask of the room (1 for room, 0 for background) - epsilon_factor: Controls the level of simplification (higher = fewer points) - - Returns: - List of (x, y) tuples representing the corners of the room - """ - # Create a hash of the mask and epsilon factor for caching - mask_hash = hash((mask.tobytes(), epsilon_factor)) - - # Check if we have a cached result - if mask_hash in self._corners_cache: - return self._corners_cache[mask_hash] - - # Fast path for empty masks - if not np.any(mask): - return [] - - # Find contours in the mask - this uses our optimized method with caching - contour = await self.async_moore_neighbor_trace(mask) - - if not contour: - # Fallback to bounding box if contour detection fails - y_indices, x_indices = np.where(mask > 0) - if len(x_indices) == 0 or len(y_indices) == 0: - return [] - - x_min, x_max = np.min(x_indices), np.max(x_indices) - y_min, y_max = np.min(y_indices), np.max(y_indices) - - result = [ - (x_min, y_min), # Top-left - (x_max, y_min), # Top-right - (x_max, y_max), # Bottom-right - (x_min, y_max), # Bottom-left - (x_min, y_min), # Back to top-left to close the polygon - ] - - # Cache the result - self._corners_cache[mask_hash] = result - return result - - # For small contours (less than 10 points), skip simplification - if len(contour) <= 10: - # Ensure the contour is closed - if contour[0] != contour[-1]: - contour.append(contour[0]) - - # Cache and return - self._corners_cache[mask_hash] = contour - return contour - - # For larger contours, calculate perimeter more efficiently using numpy - points = np.array(contour) - # Calculate differences between consecutive points - diffs = np.diff(points, axis=0) - # Calculate squared distances - squared_dists = np.sum(diffs**2, axis=1) - # Calculate perimeter as sum of distances - perimeter = np.sum(np.sqrt(squared_dists)) - - # Apply Douglas-Peucker algorithm to simplify the contour - epsilon = epsilon_factor * perimeter - simplified_contour = await self.rdp(contour, epsilon=epsilon) - - # Ensure the contour has at least 3 points to form a polygon - if len(simplified_contour) < 3: - # Fallback to bounding box - y_indices, x_indices = np.where(mask > 0) - x_min, x_max = int(np.min(x_indices)), int(np.max(x_indices)) - y_min, y_max = int(np.min(y_indices)), int(np.max(y_indices)) - - LOGGER.debug( - f"{self.vacuum_id}: Too few points in contour, using bounding box" - ) - result = [ - (x_min, y_min), # Top-left - (x_max, y_min), # Top-right - (x_max, y_max), # Bottom-right - (x_min, y_max), # Bottom-left - (x_min, y_min), # Back to top-left to close the polygon - ] - - # Cache the result - self._corners_cache[mask_hash] = result - return result - - # Ensure the contour is closed - if simplified_contour[0] != simplified_contour[-1]: - simplified_contour.append(simplified_contour[0]) - - # Limit cache size - if len(self._corners_cache) > 50: # Keep only 50 most recent items - try: - self._corners_cache.pop(next(iter(self._corners_cache))) - except (StopIteration, KeyError): - pass - - # Cache the result - self._corners_cache[mask_hash] = simplified_contour - return simplified_contour - - # Cache for labeled arrays to avoid redundant calculations - _label_cache = {} - _hull_cache = {} - - @staticmethod - async def async_moore_neighbor_trace(mask: np.ndarray) -> List[Tuple[int, int]]: - """ - Trace the contour of a binary mask using an optimized approach. - Uses caching and simplified algorithms for better performance. - - Args: - mask: Binary mask of the room (1 for room, 0 for background) - - Returns: - List of (x, y) tuples representing the contour - """ - # Create a hash of the mask for caching - mask_hash = hash(mask.tobytes()) - - # Check if we have a cached result - if mask_hash in HypferRoomsHandler._hull_cache: - return HypferRoomsHandler._hull_cache[mask_hash] - - # Fast path for empty masks - if not np.any(mask): - return [] - - # Find bounding box of non-zero elements (much faster than full labeling for simple cases) - y_indices, x_indices = np.where(mask > 0) - if len(x_indices) == 0 or len(y_indices) == 0: - return [] - - # For very small rooms (less than 100 pixels), just use bounding box - if len(x_indices) < 100: - x_min, x_max = np.min(x_indices), np.max(x_indices) - y_min, y_max = np.min(y_indices), np.max(y_indices) - - # Create a simple rectangle - hull_vertices = [ - (int(x_min), int(y_min)), # Top-left - (int(x_max), int(y_min)), # Top-right - (int(x_max), int(y_max)), # Bottom-right - (int(x_min), int(y_max)), # Bottom-left - (int(x_min), int(y_min)), # Back to top-left to close the polygon - ] - - # Cache and return the result - HypferRoomsHandler._hull_cache[mask_hash] = hull_vertices - return hull_vertices - - # For larger rooms, use convex hull but with optimizations - try: - # Import here to avoid overhead for small rooms - from scipy import ndimage - from scipy.spatial import ConvexHull - - # Use cached labeled array if available - if mask_hash in HypferRoomsHandler._label_cache: - labeled_array = HypferRoomsHandler._label_cache[mask_hash] - else: - # Find connected components - this is expensive - labeled_array, _ = ndimage.label(mask) - # Cache the result for future use - HypferRoomsHandler._label_cache[mask_hash] = labeled_array - - # Limit cache size to prevent memory issues - if len(HypferRoomsHandler._label_cache) > 50: # Keep only 50 most recent items - # Remove oldest item (first key) - try: - HypferRoomsHandler._label_cache.pop(next(iter(HypferRoomsHandler._label_cache))) - except (StopIteration, KeyError): - # Handle edge case of empty cache - pass - - # Create a mask with all components - all_components_mask = (labeled_array > 0) - - # Sample points instead of using all points for large masks - # This significantly reduces computation time for ConvexHull - if len(x_indices) > 1000: - # Sample every 10th point for very large rooms - step = 10 - elif len(x_indices) > 500: - # Sample every 5th point for medium-sized rooms - step = 5 - else: - # Use all points for smaller rooms - step = 1 - - # Sample points using the step size - sampled_y = y_indices[::step] - sampled_x = x_indices[::step] - - # Create a list of points - points = np.column_stack((sampled_x, sampled_y)) - - # Compute the convex hull - hull = ConvexHull(points) - - # Extract the vertices of the convex hull - hull_vertices = [(int(points[v, 0]), int(points[v, 1])) for v in hull.vertices] - - # Ensure the hull is closed - if hull_vertices[0] != hull_vertices[-1]: - hull_vertices.append(hull_vertices[0]) - - # Cache and return the result - HypferRoomsHandler._hull_cache[mask_hash] = hull_vertices - - # Limit hull cache size - if len(HypferRoomsHandler._hull_cache) > 50: - try: - HypferRoomsHandler._hull_cache.pop(next(iter(HypferRoomsHandler._hull_cache))) - except (StopIteration, KeyError): - pass - - return hull_vertices - - except Exception as e: - LOGGER.warning(f"Failed to compute convex hull: {e}. Falling back to bounding box.") - - # Fallback to bounding box if convex hull fails - x_min, x_max = np.min(x_indices), np.max(x_indices) - y_min, y_max = np.min(y_indices), np.max(y_indices) - - # Create a simple rectangle - hull_vertices = [ - (int(x_min), int(y_min)), # Top-left - (int(x_max), int(y_min)), # Top-right - (int(x_max), int(y_max)), # Bottom-right - (int(x_min), int(y_max)), # Bottom-left - (int(x_min), int(y_min)), # Back to top-left to close the polygon - ] - - # Cache and return the result - HypferRoomsHandler._hull_cache[mask_hash] = hull_vertices - return hull_vertices - - - - async def async_extract_room_properties( - self, json_data: Dict[str, Any] - ) -> RoomsProperties: - """ - Extract room properties from the JSON data. - - Args: - json_data: JSON data from the vacuum - - Returns: - Dictionary of room properties - """ - room_properties = {} - pixel_size = json_data.get("pixelSize", 5) - height = json_data["size"]["y"] - width = json_data["size"]["x"] - vacuum_id = self.vacuum_id - room_id_counter = 0 - - # Store the JSON data for reference in other methods - self.current_json_data = json_data - - for layer in json_data.get("layers", []): - if layer.get("__class") == "MapLayer" and layer.get("type") == "segment": - meta_data = layer.get("metaData", {}) - segment_id = meta_data.get("segmentId") - name = meta_data.get("name", f"Room {segment_id}") - - # Check if this room is disabled in the drawing configuration - # The room_id_counter is 0-based, but DrawableElement.ROOM_X is 1-based - current_room_id = room_id_counter + 1 - room_id_counter = ( - room_id_counter + 1 - ) % 16 # Cycle room_id back to 0 after 15 - - if 1 <= current_room_id <= 15 and self.drawing_config is not None: - room_element = getattr( - DrawableElement, f"ROOM_{current_room_id}", None - ) - if room_element and not self.drawing_config.is_enabled( - room_element - ): - LOGGER.debug( - "%s: Room %d is disabled and will be skipped", - self.vacuum_id, - current_room_id, - ) - continue - - compressed_pixels = layer.get("compressedPixels", []) - pixels = self.sublist(compressed_pixels, 3) - - # Create a binary mask for the room - if not pixels: - LOGGER.warning(f"Skipping segment {segment_id}: no pixels found") - continue - - mask = np.zeros((height, width), dtype=np.uint8) - for x, y, length in pixels: - if 0 <= y < height and 0 <= x < width and x + length <= width: - mask[y, x : x + length] = 1 - - # Find the room outline using the improved get_corners function - # Adjust epsilon_factor to control the level of simplification (higher = fewer points) - outline = await self.async_get_corners(mask, epsilon_factor=0.05) - - if not outline: - LOGGER.warning( - f"Skipping segment {segment_id}: failed to generate outline" - ) - continue - - # Calculate the center of the room - xs, ys = zip(*outline) - x_min, x_max = min(xs), max(xs) - y_min, y_max = min(ys), max(ys) - - # Scale coordinates by pixel_size - scaled_outline = [(x * pixel_size, y * pixel_size) for x, y in outline] - - room_id = str(segment_id) - room_properties[room_id] = { - "number": segment_id, - "outline": scaled_outline, # Already includes the closing point - "name": name, - "x": ((x_min + x_max) * pixel_size) // 2, - "y": ((y_min + y_max) * pixel_size) // 2, - } - - RoomStore(vacuum_id, room_properties) - return room_properties - - async def get_room_at_position( - self, x: int, y: int, room_properties: Optional[RoomsProperties] = None - ) -> Optional[Dict[str, Any]]: - """ - Get the room at a specific position. - - Args: - x: X coordinate - y: Y coordinate - room_properties: Room properties dictionary (optional) - - Returns: - Room data dictionary or None if no room at position - """ - if room_properties is None: - room_store = RoomStore(self.vacuum_id) - room_properties = room_store.get_rooms() - - if not room_properties: - return None - - for room_id, room_data in room_properties.items(): - outline = room_data.get("outline", []) - if not outline or len(outline) < 3: - continue - - # Check if point is inside the polygon - if self.point_in_polygon(x, y, outline): - return { - "id": room_id, - "name": room_data.get("name", f"Room {room_id}"), - "x": room_data.get("x", 0), - "y": room_data.get("y", 0), - } - - return None - - @staticmethod - def point_in_polygon(x: int, y: int, polygon: List[Tuple[int, int]]) -> bool: - """ - Check if a point is inside a polygon using ray casting algorithm. - - Args: - x: X coordinate of the point - y: Y coordinate of the point - polygon: List of (x, y) tuples forming the polygon - - Returns: - True if the point is inside the polygon, False otherwise - """ - n = len(polygon) - inside = False - - p1x, p1y = polygon[0] - xinters = None # Initialize with default value - for i in range(1, n + 1): - p2x, p2y = polygon[i % n] - if y > min(p1y, p2y): - if y <= max(p1y, p2y): - if x <= max(p1x, p2x): - if p1y != p2y: - xinters = (y - p1y) * (p2x - p1x) / (p2y - p1y) + p1x - if p1x == p2x or x <= xinters: - inside = not inside - p1x, p1y = p2x, p2y - - return inside diff --git a/SCR/valetudo_map_parser/rand256_handler.py b/SCR/valetudo_map_parser/rand256_handler.py index 0f9a157..f5e6f65 100644 --- a/SCR/valetudo_map_parser/rand256_handler.py +++ b/SCR/valetudo_map_parser/rand256_handler.py @@ -346,7 +346,7 @@ async def _draw_map_elements( else: self.zooming = False - img_np_array = self.async_auto_trim_and_zoom_image( + img_np_array = self.auto_trim_and_zoom_image( img_np_array, detect_colour=colors["background"], margin_size=int(self.shared.margins), diff --git a/pyproject.toml b/pyproject.toml index 9d1359f..6b506c0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,7 +18,7 @@ python = ">=3.13" numpy = ">=1.26.4" Pillow = ">=10.3.0" scipy = ">=1.12.0" -mvcrender = ">=0.0.2" +mvcrender = ">=0.0.4" [tool.poetry.group.dev.dependencies] ruff = "*" From d76d7c16effa82526df1073c7b5b846ec60f0562 Mon Sep 17 00:00:00 2001 From: SCA075 <82227818+sca075@users.noreply.github.com> Date: Sat, 4 Oct 2025 22:26:37 +0200 Subject: [PATCH 4/6] bump mvcrender isort and ruff and replace blending and some drawing function with C based from mcvrender. Signed-off-by: Sandro Cantarella --- SCR/valetudo_map_parser/__init__.py | 18 +- SCR/valetudo_map_parser/config/color_utils.py | 7 +- SCR/valetudo_map_parser/config/drawable.py | 125 ++----- .../config/drawable_elements.py | 2 - .../config/enhanced_drawable.py | 324 ------------------ .../config/rand256_parser.py | 58 +++- SCR/valetudo_map_parser/config/shared.py | 5 +- .../config/status_text/status_text.py | 1 + SCR/valetudo_map_parser/config/types.py | 6 +- SCR/valetudo_map_parser/config/utils.py | 24 +- SCR/valetudo_map_parser/hypfer_handler.py | 24 +- SCR/valetudo_map_parser/map_data.py | 20 +- SCR/valetudo_map_parser/rand256_handler.py | 23 +- SCR/valetudo_map_parser/reimg_draw.py | 2 +- SCR/valetudo_map_parser/rooms_handler.py | 11 +- pyproject.toml | 4 +- 16 files changed, 139 insertions(+), 515 deletions(-) delete mode 100644 SCR/valetudo_map_parser/config/enhanced_drawable.py diff --git a/SCR/valetudo_map_parser/__init__.py b/SCR/valetudo_map_parser/__init__.py index c5d0efa..c304492 100644 --- a/SCR/valetudo_map_parser/__init__.py +++ b/SCR/valetudo_map_parser/__init__.py @@ -6,27 +6,26 @@ from .config.colors import ColorsManagement from .config.drawable import Drawable from .config.drawable_elements import DrawableElement, DrawingConfig -from .config.enhanced_drawable import EnhancedDrawable from .config.rand256_parser import RRMapParser from .config.shared import CameraShared, CameraSharedManager +from .config.status_text.status_text import StatusText +from .config.status_text.translations import translations as STATUS_TEXT_TRANSLATIONS from .config.types import ( CameraModes, + ImageSize, + JsonType, + NumpyArray, + PilPNG, RoomsProperties, RoomStore, SnapshotStore, TrimCropData, UserLanguageStore, - JsonType, - PilPNG, - NumpyArray, - ImageSize, ) -from .config.status_text.status_text import StatusText -from .config.status_text.translations import translations as STATUS_TEXT_TRANSLATIONS from .hypfer_handler import HypferMapImageHandler -from .rand256_handler import ReImageHandler -from .rooms_handler import RoomsHandler, RandRoomsHandler from .map_data import HyperMapData +from .rand256_handler import ReImageHandler +from .rooms_handler import RandRoomsHandler, RoomsHandler def get_default_font_path() -> str: @@ -51,7 +50,6 @@ def get_default_font_path() -> str: "Drawable", "DrawableElement", "DrawingConfig", - "EnhancedDrawable", "SnapshotStore", "UserLanguageStore", "RoomStore", diff --git a/SCR/valetudo_map_parser/config/color_utils.py b/SCR/valetudo_map_parser/config/color_utils.py index 94e22e8..80d1297 100644 --- a/SCR/valetudo_map_parser/config/color_utils.py +++ b/SCR/valetudo_map_parser/config/color_utils.py @@ -1,8 +1,7 @@ """Utility functions for color operations in the map parser.""" -from typing import Optional, Tuple +from typing import Optional -from .colors import ColorsManagement from .types import Color, NumpyArray @@ -36,8 +35,8 @@ def get_blended_color( # Sample background at midpoint mid_x, mid_y = (x0 + x1) // 2, (y0 + y1) // 2 if 0 <= mid_y < arr.shape[0] and 0 <= mid_x < arr.shape[1]: - return tuple(arr[mid_y, mid_x]) - return (0, 0, 0, 0) # Default if out of bounds + return Color(arr[mid_y, mid_x]) + return Color(0, 0, 0, 0) # Default if out of bounds # Calculate direction vector for offset sampling dx = x1 - x0 diff --git a/SCR/valetudo_map_parser/config/drawable.py b/SCR/valetudo_map_parser/config/drawable.py index 919c785..33715be 100644 --- a/SCR/valetudo_map_parser/config/drawable.py +++ b/SCR/valetudo_map_parser/config/drawable.py @@ -14,10 +14,10 @@ from pathlib import Path import numpy as np +from mvcrender.blend import get_blended_color, sample_and_blend_color +from mvcrender.draw import circle_u8, line_u8 from PIL import Image, ImageDraw, ImageFont -from .color_utils import get_blended_color -from .colors import ColorsManagement from .types import Color, NumpyArray, PilPNG, Point, Tuple, Union @@ -85,7 +85,7 @@ async def from_json_to_image( and 0 <= center_x < image_array.shape[1] ): # Get blended color - blended_color = ColorsManagement.sample_and_blend_color( + blended_color = sample_and_blend_color( image_array, center_x, center_y, full_color ) # Apply blended color to the region @@ -131,9 +131,7 @@ async def battery_charger( center_x = (start_col + end_col) // 2 # Get blended color - blended_color = ColorsManagement.sample_and_blend_color( - layers, center_x, center_y, color - ) + blended_color = sample_and_blend_color(layers, center_x, center_y, color) # Apply blended color layers[start_row:end_row, start_col:end_col] = blended_color @@ -165,9 +163,7 @@ async def go_to_flag( # Blend flag color if needed if flag_alpha < 255: - flag_color = ColorsManagement.sample_and_blend_color( - layer, x, y, flag_color - ) + flag_color = sample_and_blend_color(layer, x, y, flag_color) # Create pole color with alpha pole_color: Color = ( @@ -179,9 +175,7 @@ async def go_to_flag( # Blend pole color if needed if pole_alpha < 255: - pole_color = ColorsManagement.sample_and_blend_color( - layer, x, y, pole_color - ) + pole_color = sample_and_blend_color(layer, x, y, pole_color) flag_size = 50 pole_width = 6 @@ -246,62 +240,19 @@ def point_inside(x: int, y: int, points: list[Tuple[int, int]]) -> bool: @staticmethod def _line( - layer: np.ndarray, + layer: NumpyArray, x1: int, y1: int, x2: int, y2: int, color: Color, width: int = 3, - ) -> np.ndarray: - """Draw a line on a NumPy array (layer) from point A to B using Bresenham's algorithm. - - Args: - layer: The numpy array to draw on (H, W, C) - x1, y1: Start point coordinates - x2, y2: End point coordinates - color: Color to draw with (tuple or array) - width: Width of the line in pixels - """ - x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2) - - blended_color = get_blended_color(x1, y1, x2, y2, layer, color) - - dx = abs(x2 - x1) - dy = abs(y2 - y1) - sx = 1 if x1 < x2 else -1 - sy = 1 if y1 < y2 else -1 - err = dx - dy - - half_w = width // 2 - h, w = layer.shape[:2] - - while True: - # Draw a filled circle for thickness - yy, xx = np.ogrid[-half_w : half_w + 1, -half_w : half_w + 1] - mask = xx**2 + yy**2 <= half_w**2 - y_min = max(0, y1 - half_w) - y_max = min(h, y1 + half_w + 1) - x_min = max(0, x1 - half_w) - x_max = min(w, x1 + half_w + 1) - - sub_mask = mask[ - (y_min - (y1 - half_w)) : (y_max - (y1 - half_w)), - (x_min - (x1 - half_w)) : (x_max - (x1 - half_w)), - ] - layer[y_min:y_max, x_min:x_max][sub_mask] = blended_color - - if x1 == x2 and y1 == y2: - break - - e2 = 2 * err - if e2 > -dy: - err -= dy - x1 += sx - if e2 < dx: - err += dx - y1 += sy - + ) -> NumpyArray: + """Segment-aware preblend, then stamp a solid line.""" + width = int(max(1, width)) + # Preblend once for this segment + seg = get_blended_color(int(x1), int(y1), int(x2), int(y2), layer, color) + line_u8(layer, int(x1), int(y1), int(x2), int(y2), seg, width) return layer @staticmethod @@ -355,35 +306,31 @@ def _filled_circle( outline_width: int = 0, ) -> NumpyArray: """ - Draw a filled circle on the image using NumPy. - Optimized to only process the bounding box of the circle. + Draw a filled circle and optional outline using mvcrender.draw.circle_u8. + If alpha<255, preblend once at the center and stamp solid. """ - y, x = center - height, width = image.shape[:2] - - # Calculate the bounding box of the circle - min_y = max(0, y - radius - outline_width) - max_y = min(height, y + radius + outline_width + 1) - min_x = max(0, x - radius - outline_width) - max_x = min(width, x + radius + outline_width + 1) - - # Create coordinate arrays for the bounding box - y_indices, x_indices = np.ogrid[min_y:max_y, min_x:max_x] - - # Calculate distances from center - dist_sq = (y_indices - y) ** 2 + (x_indices - x) ** 2 + cy, cx = ( + int(center[0]), + int(center[1]), + ) # incoming Point is (y,x) in your codebase + h, w = image.shape[:2] + if not (0 <= cx < w and 0 <= cy < h): + return image - # Create masks for the circle and outline - circle_mask = dist_sq <= radius**2 + fill_rgba = color + if fill_rgba[3] < 255: + fill_rgba = sample_and_blend_color(image, cx, cy, fill_rgba) - # Apply the fill color - image[min_y:max_y, min_x:max_x][circle_mask] = color + circle_u8(image, int(cx), int(cy), int(radius), fill_rgba, -1) - # Draw the outline if needed - if outline_width > 0 and outline_color is not None: - outer_mask = dist_sq <= (radius + outline_width) ** 2 - outline_mask = outer_mask & ~circle_mask - image[min_y:max_y, min_x:max_x][outline_mask] = outline_color + if outline_color is not None and outline_width > 0: + out_rgba = outline_color + if out_rgba[3] < 255: + out_rgba = sample_and_blend_color(image, cx, cy, out_rgba) + # outlined stroke thickness = outline_width + circle_u8( + image, int(cx), int(cy), int(radius), out_rgba, int(outline_width) + ) return image @@ -835,9 +782,7 @@ async def async_draw_obstacles( continue if need_blending: - obs_color = ColorsManagement.sample_and_blend_color( - image, x, y, color - ) + obs_color = sample_and_blend_color(image, x, y, color) else: obs_color = color diff --git a/SCR/valetudo_map_parser/config/drawable_elements.py b/SCR/valetudo_map_parser/config/drawable_elements.py index ed7be98..f15dbc2 100644 --- a/SCR/valetudo_map_parser/config/drawable_elements.py +++ b/SCR/valetudo_map_parser/config/drawable_elements.py @@ -9,8 +9,6 @@ from enum import IntEnum from typing import Dict, List, Tuple, Union -import numpy as np - from .colors import DefaultColors, SupportedColor from .types import LOGGER diff --git a/SCR/valetudo_map_parser/config/enhanced_drawable.py b/SCR/valetudo_map_parser/config/enhanced_drawable.py deleted file mode 100644 index 549d39e..0000000 --- a/SCR/valetudo_map_parser/config/enhanced_drawable.py +++ /dev/null @@ -1,324 +0,0 @@ -""" -Enhanced Drawable Class. -Provides drawing utilities with element selection support. -Version: 0.1.9 -""" - -from __future__ import annotations - -import logging - -# math is not used in this file -from typing import Optional, Tuple - -import numpy as np - -from .colors import ColorsManagement -from .drawable import Drawable -from .drawable_elements import ( - DrawableElement, - DrawingConfig, -) - - -# Type aliases -NumpyArray = np.ndarray -Color = Tuple[int, int, int, int] - -_LOGGER = logging.getLogger(__name__) - - -class EnhancedDrawable(Drawable): - """Enhanced drawing utilities with element selection support.""" - - def __init__(self, drawing_config: Optional[DrawingConfig] = None): - """Initialize with optional drawing configuration.""" - super().__init__() - self.drawing_config = drawing_config or DrawingConfig() - - # Color blending methods have been moved to ColorsManagement class in colors.py - - # Pixel blending methods have been moved to ColorsManagement class in colors.py - - async def draw_map( - self, map_data: dict, base_array: Optional[NumpyArray] = None - ) -> NumpyArray: - """ - Draw the map with selected elements. - - Args: - map_data: The map data dictionary - base_array: Optional base array to draw on - - Returns: - The image array with all elements drawn - """ - # Get map dimensions - size_x = map_data.get("size", {}).get("x", 1024) - size_y = map_data.get("size", {}).get("y", 1024) - - # Create empty image if none provided - if base_array is None: - background_color = self.drawing_config.get_property( - DrawableElement.FLOOR, "color", (200, 200, 200, 255) - ) - base_array = await self.create_empty_image(size_x, size_y, background_color) - - # Draw elements in order of z-index - for element in self.drawing_config.get_drawing_order(): - if element == DrawableElement.FLOOR: - base_array = await self._draw_floor(map_data, base_array) - elif element == DrawableElement.WALL: - base_array = await self._draw_walls(map_data, base_array) - elif element == DrawableElement.ROBOT: - base_array = await self._draw_robot(map_data, base_array) - elif element == DrawableElement.CHARGER: - base_array = await self._draw_charger(map_data, base_array) - elif element == DrawableElement.VIRTUAL_WALL: - base_array = await self._draw_virtual_walls(map_data, base_array) - elif element == DrawableElement.RESTRICTED_AREA: - base_array = await self._draw_restricted_areas(map_data, base_array) - elif element == DrawableElement.NO_MOP_AREA: - base_array = await self._draw_no_mop_areas(map_data, base_array) - elif element == DrawableElement.PATH: - base_array = await self._draw_path(map_data, base_array) - elif element == DrawableElement.PREDICTED_PATH: - base_array = await self._draw_predicted_path(map_data, base_array) - elif element == DrawableElement.GO_TO_TARGET: - base_array = await self._draw_go_to_target(map_data, base_array) - elif DrawableElement.ROOM_1 <= element <= DrawableElement.ROOM_15: - room_id = element - DrawableElement.ROOM_1 + 1 - base_array = await self._draw_room(map_data, room_id, base_array) - - return base_array - - async def _draw_floor(self, map_data: dict, array: NumpyArray) -> NumpyArray: - """Draw the floor layer.""" - if not self.drawing_config.is_enabled(DrawableElement.FLOOR): - return array - - # Implementation depends on the map data format - # This is a placeholder - actual implementation would use map_data to draw floor - - return array - - async def _draw_walls(self, map_data: dict, array: NumpyArray) -> NumpyArray: - """Draw the walls.""" - if not self.drawing_config.is_enabled(DrawableElement.WALL): - return array - - # Get wall color from drawing config - wall_color = self.drawing_config.get_property( - DrawableElement.WALL, "color", (255, 255, 0, 255) - ) - - # Implementation depends on the map data format - # For Valetudo maps, we would look at the layers with type "wall" - # This is a simplified example - in a real implementation, we would extract the actual wall pixels - - # Find wall data in map_data - wall_pixels = [] - for layer in map_data.get("layers", []): - if layer.get("type") == "wall": - # Extract wall pixels from the layer - # This is a placeholder - actual implementation would depend on the map data format - wall_pixels = layer.get("pixels", []) - break - - # Draw wall pixels with color blending - for x, y in wall_pixels: - # Use sample_and_blend_color from ColorsManagement - blended_color = ColorsManagement.sample_and_blend_color( - array, x, y, wall_color - ) - if 0 <= y < array.shape[0] and 0 <= x < array.shape[1]: - array[y, x] = blended_color - - return array - - async def _draw_robot(self, map_data: dict, array: NumpyArray) -> NumpyArray: - """Draw the robot.""" - if not self.drawing_config.is_enabled(DrawableElement.ROBOT): - return array - - # Get robot color from drawing config - robot_color = self.drawing_config.get_property( - DrawableElement.ROBOT, "color", (255, 255, 204, 255) - ) - - # Extract robot position and angle from map_data - robot_position = map_data.get("robot", {}).get("position", None) - robot_angle = map_data.get("robot", {}).get("angle", 0) - - if robot_position: - x, y = robot_position.get("x", 0), robot_position.get("y", 0) - - # Draw robot with color blending - # Create a circle around the robot position - radius = 25 # Same as in the robot drawing method - for dy in range(-radius, radius + 1): - for dx in range(-radius, radius + 1): - if dx * dx + dy * dy <= radius * radius: - map_x, map_y = int(x + dx), int(y + dy) - # Use sample_and_blend_color from ColorsManagement - blended_color = ColorsManagement.sample_and_blend_color( - array, map_x, map_y, robot_color - ) - if 0 <= map_y < array.shape[0] and 0 <= map_x < array.shape[1]: - array[map_y, map_x] = blended_color - return array - - async def _draw_charger(self, map_data: dict, array: NumpyArray) -> NumpyArray: - """Draw the charger.""" - if not self.drawing_config.is_enabled(DrawableElement.CHARGER): - return array - - # Get charger color from drawing config - charger_color = self.drawing_config.get_property( - DrawableElement.CHARGER, "color", (255, 128, 0, 255) - ) - - # Implementation depends on the map data format - # This would extract charger data from map_data and draw it - - return array - - async def _draw_virtual_walls( - self, map_data: dict, array: NumpyArray - ) -> NumpyArray: - """Draw virtual walls.""" - if not self.drawing_config.is_enabled(DrawableElement.VIRTUAL_WALL): - return array - - # Get virtual wall color from drawing config - wall_color = self.drawing_config.get_property( - DrawableElement.VIRTUAL_WALL, "color", (255, 0, 0, 255) - ) - - # Implementation depends on the map data format - # This would extract virtual wall data from map_data and draw it - - return array - - async def _draw_restricted_areas( - self, map_data: dict, array: NumpyArray - ) -> NumpyArray: - """Draw restricted areas.""" - if not self.drawing_config.is_enabled(DrawableElement.RESTRICTED_AREA): - return array - - # Get restricted area color from drawing config - area_color = self.drawing_config.get_property( - DrawableElement.RESTRICTED_AREA, "color", (255, 0, 0, 125) - ) - - # Implementation depends on the map data format - # This would extract restricted area data from map_data and draw it - - return array - - async def _draw_no_mop_areas(self, map_data: dict, array: NumpyArray) -> NumpyArray: - """Draw no-mop areas.""" - if not self.drawing_config.is_enabled(DrawableElement.NO_MOP_AREA): - return array - - # Get no-mop area color from drawing config - area_color = self.drawing_config.get_property( - DrawableElement.NO_MOP_AREA, "color", (0, 0, 255, 125) - ) - - # Implementation depends on the map data format - # This would extract no-mop area data from map_data and draw it - - return array - - async def _draw_path(self, map_data: dict, array: NumpyArray) -> NumpyArray: - """Draw the robot's path.""" - if not self.drawing_config.is_enabled(DrawableElement.PATH): - return array - - # Get path color from drawing config - path_color = self.drawing_config.get_property( - DrawableElement.PATH, "color", (238, 247, 255, 255) - ) - - # Implementation depends on the map data format - # This would extract path data from map_data and draw it - - return array - - async def _draw_predicted_path( - self, map_data: dict, array: NumpyArray - ) -> NumpyArray: - """Draw the predicted path.""" - if not self.drawing_config.is_enabled(DrawableElement.PREDICTED_PATH): - return array - - # Get predicted path color from drawing config - path_color = self.drawing_config.get_property( - DrawableElement.PREDICTED_PATH, "color", (238, 247, 255, 125) - ) - - # Implementation depends on the map data format - # This would extract predicted path data from map_data and draw it - - return array - - async def _draw_go_to_target(self, map_data: dict, array: NumpyArray) -> NumpyArray: - """Draw the go-to target.""" - if not self.drawing_config.is_enabled(DrawableElement.GO_TO_TARGET): - return array - - # Get go-to target color from drawing config - target_color = self.drawing_config.get_property( - DrawableElement.GO_TO_TARGET, "color", (0, 255, 0, 255) - ) - - # Implementation depends on the map data format - # This would extract go-to target data from map_data and draw it - - return array - - async def _draw_room( - self, map_data: dict, room_id: int, array: NumpyArray - ) -> NumpyArray: - """Draw a specific room.""" - element = getattr(DrawableElement, f"ROOM_{room_id}") - if not self.drawing_config.is_enabled(element): - return array - - # Get room color from drawing config - room_color = self.drawing_config.get_property( - element, - "color", - (135, 206, 250, 255), # Default light blue - ) - - # Implementation depends on the map data format - # For Valetudo maps, we would look at the layers with type "segment" - # This is a simplified example - in a real implementation, we would extract the actual room pixels - - # Find room data in map_data - room_pixels = [] - for layer in map_data.get("layers", []): - if layer.get("type") == "segment" and str( - layer.get("metaData", {}).get("segmentId") - ) == str(room_id): - # Extract room pixels from the layer - # This is a placeholder - actual implementation would depend on the map data format - # For example, it might use compressed pixels or other data structures - - # For demonstration, let's assume we have a list of (x, y) coordinates - room_pixels = layer.get("pixels", []) - break - - # Draw room pixels with color blending - for x, y in room_pixels: - # Use sample_and_blend_color from ColorsManagement - blended_color = ColorsManagement.sample_and_blend_color( - array, x, y, room_color - ) - if 0 <= y < array.shape[0] and 0 <= x < array.shape[1]: - array[y, x] = blended_color - - return array diff --git a/SCR/valetudo_map_parser/config/rand256_parser.py b/SCR/valetudo_map_parser/config/rand256_parser.py index b6b618d..a5e2f1b 100644 --- a/SCR/valetudo_map_parser/config/rand256_parser.py +++ b/SCR/valetudo_map_parser/config/rand256_parser.py @@ -1,7 +1,7 @@ """New Rand256 Map Parser - Based on Xiaomi/Roborock implementation with precise binary parsing.""" -import struct import math +import struct from enum import Enum from typing import Any, Dict, List, Optional @@ -146,7 +146,6 @@ def _parse_object_position(block_data_length: int, data: bytes) -> Dict[str, Any angle = raw_angle return {"position": [x, y], "angle": angle} - @staticmethod def _parse_walls(data: bytes, header: bytes) -> list: wall_pairs = RRMapParser._get_int16(header, 0x08) @@ -156,7 +155,14 @@ def _parse_walls(data: bytes, header: bytes) -> list: y0 = RRMapParser._get_int16(data, wall_start + 2) x1 = RRMapParser._get_int16(data, wall_start + 4) y1 = RRMapParser._get_int16(data, wall_start + 6) - walls.append([x0, RRMapParser.Tools.DIMENSION_MM - y0, x1, RRMapParser.Tools.DIMENSION_MM - y1]) + walls.append( + [ + x0, + RRMapParser.Tools.DIMENSION_MM - y0, + x1, + RRMapParser.Tools.DIMENSION_MM - y1, + ] + ) return walls @staticmethod @@ -218,29 +224,53 @@ def parse_blocks(self, raw: bytes, pixels: bool = True) -> Dict[int, Any]: match block_type: case self.Types.DIGEST.value: self.is_valid = True - case self.Types.ROBOT_POSITION.value | self.Types.CHARGER_LOCATION.value: - blocks[block_type] = self._parse_object_position(block_data_length, data) + case ( + self.Types.ROBOT_POSITION.value + | self.Types.CHARGER_LOCATION.value + ): + blocks[block_type] = self._parse_object_position( + block_data_length, data + ) case self.Types.PATH.value | self.Types.GOTO_PREDICTED_PATH.value: - blocks[block_type] = self._parse_path_block(raw, block_start_position, block_data_length) + blocks[block_type] = self._parse_path_block( + raw, block_start_position, block_data_length + ) case self.Types.CURRENTLY_CLEANED_ZONES.value: blocks[block_type] = {"zones": self._parse_zones(data, header)} case self.Types.FORBIDDEN_ZONES.value: - blocks[block_type] = {"forbidden_zones": self._parse_area(header, data)} + blocks[block_type] = { + "forbidden_zones": self._parse_area(header, data) + } case self.Types.FORBIDDEN_MOP_ZONES.value: - blocks[block_type] = {"forbidden_mop_zones": self._parse_area(header, data)} + blocks[block_type] = { + "forbidden_mop_zones": self._parse_area(header, data) + } case self.Types.GOTO_TARGET.value: blocks[block_type] = {"position": self._parse_goto_target(data)} case self.Types.VIRTUAL_WALLS.value: - blocks[block_type] = {"virtual_walls": self._parse_walls(data, header)} + blocks[block_type] = { + "virtual_walls": self._parse_walls(data, header) + } case self.Types.CARPET_MAP.value: - data = RRMapParser._get_bytes(raw, block_data_start, block_data_length) - blocks[block_type] = {"carpet_map": self._parse_carpet_map(data)} + data = RRMapParser._get_bytes( + raw, block_data_start, block_data_length + ) + blocks[block_type] = { + "carpet_map": self._parse_carpet_map(data) + } case self.Types.IMAGE.value: header_length = self._get_int8(header, 2) blocks[block_type] = self._parse_image_block( - raw, block_start_position, block_data_length, header_length, pixels) - - block_start_position = block_start_position + block_data_length + self._get_int8(header, 2) + raw, + block_start_position, + block_data_length, + header_length, + pixels, + ) + + block_start_position = ( + block_start_position + block_data_length + self._get_int8(header, 2) + ) except (struct.error, IndexError): break return blocks diff --git a/SCR/valetudo_map_parser/config/shared.py b/SCR/valetudo_map_parser/config/shared.py index 8ecd4ae..bffdec4 100755 --- a/SCR/valetudo_map_parser/config/shared.py +++ b/SCR/valetudo_map_parser/config/shared.py @@ -7,18 +7,19 @@ import asyncio import logging from typing import List + from PIL import Image from .types import ( ATTR_CALIBRATION_POINTS, ATTR_CAMERA_MODE, ATTR_CONTENT_TYPE, + ATTR_IMAGE_LAST_UPDATED, ATTR_MARGINS, ATTR_OBSTACLES, ATTR_POINTS, ATTR_ROOMS, ATTR_ROTATE, - ATTR_IMAGE_LAST_UPDATED, ATTR_VACUUM_BATTERY, ATTR_VACUUM_CHARGING, ATTR_VACUUM_JSON_ID, @@ -40,8 +41,8 @@ DEFAULT_VALUES, CameraModes, Colors, - TrimsData, PilPNG, + TrimsData, ) diff --git a/SCR/valetudo_map_parser/config/status_text/status_text.py b/SCR/valetudo_map_parser/config/status_text/status_text.py index 720ec2f..7e7942d 100644 --- a/SCR/valetudo_map_parser/config/status_text/status_text.py +++ b/SCR/valetudo_map_parser/config/status_text/status_text.py @@ -9,6 +9,7 @@ from ..types import LOGGER, PilPNG from .translations import translations + LOGGER.propagate = True diff --git a/SCR/valetudo_map_parser/config/types.py b/SCR/valetudo_map_parser/config/types.py index 6be8f0c..c6740bd 100644 --- a/SCR/valetudo_map_parser/config/types.py +++ b/SCR/valetudo_map_parser/config/types.py @@ -8,7 +8,7 @@ import logging import threading from dataclasses import asdict, dataclass -from typing import Any, Dict, Optional, Tuple, TypedDict, Union, List, NotRequired +from typing import Any, Dict, List, NotRequired, Optional, Tuple, TypedDict, Union import numpy as np from PIL import Image @@ -222,7 +222,9 @@ async def async_set_vacuum_json(self, vacuum_id: str, json_data: Any) -> None: Color = Union[Tuple[int, int, int], Tuple[int, int, int, int]] Colors = Dict[str, Color] CalibrationPoints = list[dict[str, Any]] -RobotPosition = Optional[dict[str, Union[int | float]]] +RobotPosition: type[tuple[Any, Any, dict[str, int | float] | None]] = tuple[ + Any, Any, dict[str, int | float] | None +] ChargerPosition = dict[str, Any] RoomsProperties = dict[str, RoomProperty] ImageSize = dict[str, int | list[int]] diff --git a/SCR/valetudo_map_parser/config/utils.py b/SCR/valetudo_map_parser/config/utils.py index 21a2473..baf42ec 100644 --- a/SCR/valetudo_map_parser/config/utils.py +++ b/SCR/valetudo_map_parser/config/utils.py @@ -1,32 +1,30 @@ """Utility code for the valetudo map parser.""" import datetime -from time import time import hashlib +import io import json from dataclasses import dataclass +from time import time from typing import Callable, List, Optional, Tuple -import io import numpy as np from PIL import Image, ImageOps +from ..map_data import HyperMapData +from .async_utils import AsyncNumPy from .drawable import Drawable from .drawable_elements import DrawingConfig -from .enhanced_drawable import EnhancedDrawable from .status_text.status_text import StatusText - from .types import ( LOGGER, ChargerPosition, - Size, + Destinations, NumpyArray, PilPNG, RobotPosition, - Destinations, + Size, ) -from ..map_data import HyperMapData -from .async_utils import AsyncNumPy @dataclass @@ -79,7 +77,6 @@ def __init__(self): # Drawing components are initialized by initialize_drawing_config in handlers self.drawing_config: Optional[DrawingConfig] = None self.draw: Optional[Drawable] = None - self.enhanced_draw: Optional[EnhancedDrawable] = None def get_frame_number(self) -> int: """Return the frame number of the image.""" @@ -709,7 +706,7 @@ def initialize_drawing_config(handler): handler: The handler instance with shared data and file_name attributes Returns: - Tuple of (DrawingConfig, Drawable, EnhancedDrawable) + Tuple of (DrawingConfig, Drawable) """ # Initialize drawing configuration @@ -721,11 +718,10 @@ def initialize_drawing_config(handler): ): drawing_config.update_from_device_info(handler.shared.device_info) - # Initialize both drawable systems for backward compatibility - draw = Drawable() # Legacy drawing utilities - enhanced_draw = EnhancedDrawable(drawing_config) # New enhanced drawing system + # Initialize drawing utilities + draw = Drawable() - return drawing_config, draw, enhanced_draw + return drawing_config, draw def blend_colors(base_color, overlay_color): diff --git a/SCR/valetudo_map_parser/hypfer_handler.py b/SCR/valetudo_map_parser/hypfer_handler.py index 4b62699..05a00de 100644 --- a/SCR/valetudo_map_parser/hypfer_handler.py +++ b/SCR/valetudo_map_parser/hypfer_handler.py @@ -8,24 +8,22 @@ from __future__ import annotations import asyncio -import numpy as np +import numpy as np +from mvcrender.autocrop import AutoCrop from PIL import Image from .config.async_utils import AsyncPIL - -from mvcrender.autocrop import AutoCrop from .config.drawable_elements import DrawableElement from .config.shared import CameraShared - from .config.types import ( COLORS, LOGGER, CalibrationPoints, Colors, + JsonType, RoomsProperties, RoomStore, - JsonType, ) from .config.utils import ( BaseHandler, @@ -48,9 +46,7 @@ def __init__(self, shared_data: CameraShared): self.calibration_data = None # camera shared data. self.data = ImageData # imported Image Data Module. # Initialize drawing configuration using the shared utility function - self.drawing_config, self.draw, self.enhanced_draw = initialize_drawing_config( - self - ) + self.drawing_config, self.draw = initialize_drawing_config(self) self.go_to = None # vacuum go to data self.img_hash = None # hash of the image calculated to check differences. @@ -77,7 +73,7 @@ async def async_extract_room_properties(self, json_data) -> RoomsProperties: json_data ) if room_properties: - rooms = RoomStore(self.file_name, room_properties) + _ = RoomStore(self.file_name, room_properties) # Convert room_properties to the format expected by async_get_robot_in_room self.rooms_pos = [] for room_id, room_data in room_properties.items(): @@ -346,16 +342,6 @@ async def async_get_image_from_json( robot_state=self.shared.vacuum_state, ) - # Update element map for robot position - if ( - hasattr(self.shared, "element_map") - and self.shared.element_map is not None - ): - update_element_map_with_robot( - self.shared.element_map, - robot_position, - DrawableElement.ROBOT, - ) # Synchronize zooming state from ImageDraw to handler before auto-crop self.zooming = self.imd.img_h.zooming diff --git a/SCR/valetudo_map_parser/map_data.py b/SCR/valetudo_map_parser/map_data.py index 07bd753..fee7d03 100755 --- a/SCR/valetudo_map_parser/map_data.py +++ b/SCR/valetudo_map_parser/map_data.py @@ -8,22 +8,22 @@ from __future__ import annotations -import numpy as np +from dataclasses import asdict, dataclass, field from typing import ( - List, - Sequence, - TypeVar, Any, - TypedDict, - NotRequired, Literal, + NotRequired, Optional, + Sequence, + TypedDict, + TypeVar, ) -from dataclasses import dataclass, field, asdict +import numpy as np from .config.types import ImageSize, JsonType + T = TypeVar("T") # --- Common Nested Structures --- @@ -373,6 +373,11 @@ async def async_get_rooms_coordinates( Else: (min_x_mm, min_y_mm, max_x_mm, max_y_mm) """ + + def to_mm(coord): + """Convert pixel coordinates to millimeters.""" + return round(coord * pixel_size * 10) + if not pixels: raise ValueError("Pixels list cannot be empty.") @@ -393,7 +398,6 @@ async def async_get_rooms_coordinates( min_y = min(min_y, y) if rand: - to_mm = lambda v: v * pixel_size * 10 return (to_mm(max_x), to_mm(max_y)), (to_mm(min_x), to_mm(min_y)) return ( diff --git a/SCR/valetudo_map_parser/rand256_handler.py b/SCR/valetudo_map_parser/rand256_handler.py index f5e6f65..71dc2f2 100644 --- a/SCR/valetudo_map_parser/rand256_handler.py +++ b/SCR/valetudo_map_parser/rand256_handler.py @@ -11,22 +11,21 @@ from typing import Any import numpy as np +from mvcrender.autocrop import AutoCrop from .config.async_utils import AsyncPIL - -from mvcrender.autocrop import AutoCrop from .config.drawable_elements import DrawableElement from .config.types import ( COLORS, DEFAULT_IMAGE_SIZE, DEFAULT_PIXEL_SIZE, + LOGGER, Colors, JsonType, PilPNG, RobotPosition, RoomsProperties, RoomStore, - LOGGER, ) from .config.utils import ( BaseHandler, @@ -55,9 +54,7 @@ def __init__(self, shared_data): self.data = RandImageData # Image Data # Initialize drawing configuration using the shared utility function - self.drawing_config, self.draw, self.enhanced_draw = initialize_drawing_config( - self - ) + self.drawing_config, self.draw = initialize_drawing_config(self) self.go_to = None # Go to position data self.img_base_layer = None # Base image layer self.img_rotate = shared_data.image_rotate # Image rotation @@ -115,7 +112,7 @@ async def extract_room_properties( if not (room_properties or self.shared.map_pred_zones): self.rooms_pos = None - rooms = RoomStore(self.file_name, room_properties) + _ = RoomStore(self.file_name, room_properties) return room_properties except (RuntimeError, ValueError) as e: LOGGER.warning( @@ -273,7 +270,7 @@ async def _setup_robot_and_image( # Restore original rooms_pos self.rooms_pos = original_rooms_pos - except Exception as e: + except (ValueError, KeyError, TypeError): # Fallback to robot-position-based zoom if room extraction fails if ( self.shared.image_auto_zoom @@ -357,14 +354,14 @@ async def _draw_map_elements( return img_np_array async def _finalize_image(self, pil_img): - if not self.shared.image_ref_width or not self.shared.image_ref_height: - LOGGER.warning( - "Image finalization failed: Invalid image dimensions. Returning original image." - ) - return pil_img if self.check_zoom_and_aspect_ratio(): resize_params = self.prepare_resize_params(pil_img, True) pil_img = await self.async_resize_images(resize_params) + else: + LOGGER.warning( + "%s: Invalid image dimensions. Returning original image.", + self.file_name, + ) return pil_img async def get_rooms_attributes( diff --git a/SCR/valetudo_map_parser/reimg_draw.py b/SCR/valetudo_map_parser/reimg_draw.py index 7ec6649..63c1604 100644 --- a/SCR/valetudo_map_parser/reimg_draw.py +++ b/SCR/valetudo_map_parser/reimg_draw.py @@ -8,7 +8,7 @@ from .config.drawable import Drawable from .config.drawable_elements import DrawableElement -from .config.types import Color, JsonType, NumpyArray, LOGGER +from .config.types import LOGGER, Color, JsonType, NumpyArray from .map_data import ImageData, RandImageData diff --git a/SCR/valetudo_map_parser/rooms_handler.py b/SCR/valetudo_map_parser/rooms_handler.py index 08ad391..a1f5e48 100644 --- a/SCR/valetudo_map_parser/rooms_handler.py +++ b/SCR/valetudo_map_parser/rooms_handler.py @@ -7,7 +7,6 @@ from __future__ import annotations -import time from typing import Any, Dict, List, Optional, Tuple import numpy as np @@ -16,8 +15,7 @@ from .config.drawable_elements import DrawableElement, DrawingConfig from .config.types import LOGGER, RoomsProperties - -from .map_data import RandImageData, ImageData +from .map_data import RandImageData class RoomsHandler: @@ -204,7 +202,6 @@ async def async_extract_room_properties(self, json_data) -> RoomsProperties: Returns: Dictionary of room properties """ - start_total = time.time() room_properties = {} pixel_size = json_data.get("pixelSize", 5) height = json_data["size"]["y"] @@ -217,9 +214,6 @@ async def async_extract_room_properties(self, json_data) -> RoomsProperties: ) if room_id is not None and room_data is not None: room_properties[room_id] = room_data - - # Log timing information (kept internal, no debug output) - total_time = time.time() - start_total return room_properties @@ -395,7 +389,6 @@ async def async_extract_room_properties( Returns: Dictionary of room properties """ - start_total = time.time() room_properties = {} # Get basic map information @@ -463,6 +456,4 @@ async def async_extract_room_properties( room_properties[room_id] = room_data - # Log timing information (kept internal, no debug output) - total_time = time.time() - start_total return room_properties diff --git a/pyproject.toml b/pyproject.toml index 6b506c0..04d6ac5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "valetudo-map-parser" -version = "0.1.10rc7" +version = "0.1.10" description = "A Python library to parse Valetudo map data returning a PIL Image object." authors = ["Sandro Cantarella "] license = "Apache-2.0" @@ -18,7 +18,7 @@ python = ">=3.13" numpy = ">=1.26.4" Pillow = ">=10.3.0" scipy = ">=1.12.0" -mvcrender = ">=0.0.4" +mvcrender = ">=0.0.5" [tool.poetry.group.dev.dependencies] ruff = "*" From 3eaa9b9f0fea9d56bbc02737a04ff48091d372e9 Mon Sep 17 00:00:00 2001 From: SCA075 <82227818+sca075@users.noreply.github.com> Date: Sun, 5 Oct 2025 13:26:30 +0200 Subject: [PATCH 5/6] corrections done Signed-off-by: Sandro Cantarella --- SCR/valetudo_map_parser/config/drawable.py | 5 +-- SCR/valetudo_map_parser/config/utils.py | 3 +- SCR/valetudo_map_parser/map_data.py | 5 +-- SCR/valetudo_map_parser/rand256_handler.py | 38 ++++++++++++---------- 4 files changed, 26 insertions(+), 25 deletions(-) diff --git a/SCR/valetudo_map_parser/config/drawable.py b/SCR/valetudo_map_parser/config/drawable.py index 33715be..d963c7a 100644 --- a/SCR/valetudo_map_parser/config/drawable.py +++ b/SCR/valetudo_map_parser/config/drawable.py @@ -288,11 +288,8 @@ async def lines( if x0 == x1 and y0 == y1: continue - # Get blended color for this line segment - blended_color = get_blended_color(x0, y0, x1, y1, arr, color) - # Use the optimized line drawing method - arr = Drawable._line(arr, x0, y0, x1, y1, blended_color, width) + arr = Drawable._line(arr, x0, y0, x1, y1, color, width) return arr diff --git a/SCR/valetudo_map_parser/config/utils.py b/SCR/valetudo_map_parser/config/utils.py index baf42ec..56bf974 100644 --- a/SCR/valetudo_map_parser/config/utils.py +++ b/SCR/valetudo_map_parser/config/utils.py @@ -196,10 +196,11 @@ async def _async_update_shared_data(self, destinations: Destinations | None = No if hasattr(self, "get_rooms_attributes") and ( self.shared.map_rooms is None and destinations is not None ): - (self.shared.map_rooms,) = await self.get_rooms_attributes(destinations) + self.shared.map_rooms = await self.get_rooms_attributes(destinations) if self.shared.map_rooms: LOGGER.debug("%s: Rand256 attributes rooms updated", self.file_name) + if hasattr(self, "async_get_rooms_attributes") and ( self.shared.map_rooms is None ): diff --git a/SCR/valetudo_map_parser/map_data.py b/SCR/valetudo_map_parser/map_data.py index fee7d03..c7119ae 100755 --- a/SCR/valetudo_map_parser/map_data.py +++ b/SCR/valetudo_map_parser/map_data.py @@ -552,8 +552,9 @@ def get_rrm_currently_cleaned_zones(json_data: JsonType) -> list[dict[str, Any]] @staticmethod def get_rrm_forbidden_zones(json_data: JsonType) -> list[dict[str, Any]]: """Get the forbidden zones from the json.""" - re_zones = json_data.get("forbidden_zones", []) - re_zones.extend(json_data.get("forbidden_mop_zones", [])) + re_zones = json_data.get("forbidden_zones", []) + json_data.get( + "forbidden_mop_zones", [] + ) formatted_zones = RandImageData._rrm_valetudo_format_zone(re_zones) return formatted_zones diff --git a/SCR/valetudo_map_parser/rand256_handler.py b/SCR/valetudo_map_parser/rand256_handler.py index 71dc2f2..6ae6d58 100644 --- a/SCR/valetudo_map_parser/rand256_handler.py +++ b/SCR/valetudo_map_parser/rand256_handler.py @@ -2,7 +2,7 @@ Image Handler Module for Valetudo Re Vacuums. It returns the PIL PNG image frame relative to the Map Data extrapolated from the vacuum json. It also returns calibration, rooms data to the card and other images information to the camera. -Version: 0.1.9.a6 +Version: 0.1.10 """ from __future__ import annotations @@ -93,17 +93,13 @@ async def extract_room_properties( # Update self.rooms_pos from room_properties for compatibility with other methods self.rooms_pos = [] - room_ids = [] # Collect room IDs for shared.map_rooms for room_id, room_data in room_properties.items(): self.rooms_pos.append( {"name": room_data["name"], "outline": room_data["outline"]} ) - # Store the room number (segment ID) for MQTT active zone mapping - room_ids.append(room_data["number"]) - - # Update shared.map_rooms with the room IDs for MQTT active zone mapping - self.shared.map_rooms = room_ids + # Update shared.map_rooms with the full room properties (consistent with Hypfer) + self.shared.map_rooms = room_properties # get the zones and points data self.shared.map_pred_zones = await self.async_zone_propriety(zones_data) # get the points data @@ -120,7 +116,7 @@ async def extract_room_properties( e, exc_info=True, ) - return None, None, None + return None async def get_image_from_rrm( self, @@ -185,6 +181,7 @@ async def get_image_from_rrm( async def _setup_robot_and_image( self, m_json, size_x, size_y, colors, destinations ): + """Set up the elements of the map and the image.""" ( _, robot_position, @@ -209,12 +206,6 @@ async def _setup_robot_and_image( ) LOGGER.info("%s: Completed base Layers", self.file_name) - # Update element map for rooms - if 0 < room_id <= 15: - # This is a simplification - in a real implementation we would - # need to identify the exact pixels that belong to each room - pass - if room_id > 0 and not self.room_propriety: self.room_propriety = await self.get_rooms_attributes(destinations) @@ -222,8 +213,10 @@ async def _setup_robot_and_image( if not self.rooms_pos and not self.room_propriety: self.room_propriety = await self.get_rooms_attributes(destinations) - # Always check robot position for zooming (fallback) - if self.rooms_pos and robot_position and not hasattr(self, "robot_pos"): + # Always check robot position for zooming (update if room info is missing) + if self.rooms_pos and robot_position and ( + self.robot_pos is None or "in_room" not in self.robot_pos + ): self.robot_pos = await self.async_get_robot_in_room( (robot_position[0] * 10), (robot_position[1] * 10), @@ -284,6 +277,7 @@ async def _setup_robot_and_image( async def _draw_map_elements( self, img_np_array, m_json, colors, robot_position, robot_position_angle ): + """Draw map elements on the image.""" # Draw charger if enabled if self.drawing_config.is_enabled(DrawableElement.CHARGER): img_np_array, self.charger_pos = await self.imd.async_draw_charger( @@ -354,6 +348,10 @@ async def _draw_map_elements( return img_np_array async def _finalize_image(self, pil_img): + """Finalize the image by resizing if needed.""" + if pil_img is None: + LOGGER.warning("%s: Image is None. Returning None.", self.file_name) + return None if self.check_zoom_and_aspect_ratio(): resize_params = self.prepare_resize_params(pil_img, True) pil_img = await self.async_resize_images(resize_params) @@ -368,8 +366,6 @@ async def get_rooms_attributes( self, destinations: JsonType = None ) -> tuple[RoomsProperties, Any, Any]: """Return the rooms attributes.""" - if self.room_propriety: - return self.room_propriety if self.json_data and destinations: self.room_propriety = await self.extract_room_properties( self.json_data, destinations @@ -394,6 +390,12 @@ async def async_get_robot_in_room( } # Handle active zones self.active_zones = self.shared.rand256_active_zone + LOGGER.debug( + "%s: Robot is in %s room (polygon detection). %s", + self.file_name, + self.robot_in_room["room"], + self.active_zones, + ) self.zooming = False if self.active_zones and ( self.robot_in_room["id"] in range(len(self.active_zones)) From 06508c19d4e5fa3b654c2754351afc7d0ea73e2b Mon Sep 17 00:00:00 2001 From: SCA075 <82227818+sca075@users.noreply.github.com> Date: Sun, 5 Oct 2025 13:33:19 +0200 Subject: [PATCH 6/6] corrections duplicate logger Signed-off-by: Sandro Cantarella --- SCR/valetudo_map_parser/rand256_handler.py | 1 - 1 file changed, 1 deletion(-) diff --git a/SCR/valetudo_map_parser/rand256_handler.py b/SCR/valetudo_map_parser/rand256_handler.py index 54a32da..56f5fc9 100644 --- a/SCR/valetudo_map_parser/rand256_handler.py +++ b/SCR/valetudo_map_parser/rand256_handler.py @@ -19,7 +19,6 @@ COLORS, DEFAULT_IMAGE_SIZE, DEFAULT_PIXEL_SIZE, - LOGGER, Colors, JsonType, PilPNG,