diff --git a/SCR/valetudo_map_parser/__init__.py b/SCR/valetudo_map_parser/__init__.py index 3b672a2..b3b0bfd 100644 --- a/SCR/valetudo_map_parser/__init__.py +++ b/SCR/valetudo_map_parser/__init__.py @@ -54,14 +54,14 @@ CONF_OFFSET_TOP, CONF_SNAPSHOTS_ENABLE, CONF_TRIMS_SAVE, - CONF_VACUUM_CONFIG_ENTRY_ID, - CONF_VACUUM_CONNECTION_STRING, - CONF_VACUUM_ENTITY_ID, - CONF_VACUUM_IDENTIFIERS, CONF_VAC_STAT, CONF_VAC_STAT_FONT, CONF_VAC_STAT_POS, CONF_VAC_STAT_SIZE, + CONF_VACUUM_CONFIG_ENTRY_ID, + CONF_VACUUM_CONNECTION_STRING, + CONF_VACUUM_ENTITY_ID, + CONF_VACUUM_IDENTIFIERS, CONF_ZOOM_LOCK_RATIO, DECODED_TOPICS, DEFAULT_IMAGE_SIZE, diff --git a/SCR/valetudo_map_parser/config/async_utils.py b/SCR/valetudo_map_parser/config/async_utils.py index b8ef7b6..7be4d29 100644 --- a/SCR/valetudo_map_parser/config/async_utils.py +++ b/SCR/valetudo_map_parser/config/async_utils.py @@ -49,7 +49,7 @@ async def async_resize( ) -> Image.Image: """Async image resizing.""" if resample is None: - resample = Image.LANCZOS + resample = Image.Resampling.LANCZOS return await make_async(image.resize, size, resample) @staticmethod diff --git a/SCR/valetudo_map_parser/config/colors.py b/SCR/valetudo_map_parser/config/colors.py index 3f9b0d3..6640336 100644 --- a/SCR/valetudo_map_parser/config/colors.py +++ b/SCR/valetudo_map_parser/config/colors.py @@ -61,6 +61,7 @@ ) from .types import LOGGER, Color + color_transparent = (0, 0, 0, 0) color_charger = (0, 128, 0, 255) color_move = (238, 247, 255, 255) diff --git a/SCR/valetudo_map_parser/config/drawable.py b/SCR/valetudo_map_parser/config/drawable.py index 8eb0884..401b6f0 100644 --- a/SCR/valetudo_map_parser/config/drawable.py +++ b/SCR/valetudo_map_parser/config/drawable.py @@ -12,7 +12,7 @@ import logging from pathlib import Path -from typing import Tuple, Union +from typing import Union import numpy as np from mvcrender.blend import get_blended_color, sample_and_blend_color @@ -205,24 +205,6 @@ async def go_to_flag( layer = Drawable._line(layer, xp1, yp1, xp2, yp2, pole_color, pole_width) return layer - @staticmethod - def point_inside(x: int, y: int, points: list[Tuple[int, int]]) -> bool: - """Check if a point (x, y) is inside a polygon defined by a list of points.""" - n = len(points) - inside = False - inters_x = 0.0 - p1x, p1y = points[0] - for i in range(1, n + 1): - p2x, p2y = points[i % n] - if y > min(p1y, p2y): - if y <= max(p1y, p2y) and x <= max(p1x, p2x): - if p1y != p2y: - inters_x = (y - p1y) * (p2x - p1x) / (p2y - p1y) + p1x - if p1x == p2x or x <= inters_x: - inside = not inside - p1x, p1y = p2x, p2y - return inside - @staticmethod def _line( layer: NumpyArray, @@ -329,56 +311,6 @@ def _ellipse( image[y1:y2, x1:x2] = color return image - @staticmethod - def _polygon_outline( - arr: NumpyArray, - points: list[Tuple[int, int]], - width: int, - outline_color: Color, - fill_color: Color = None, - ) -> NumpyArray: - """ - Draw the outline of a polygon on the array using _line, and optionally fill it. - Uses NumPy vectorized operations for improved performance. - """ - # Draw the outline - for i, _ in enumerate(points): - current_point = points[i] - next_point = points[(i + 1) % len(points)] - arr = Drawable._line( - arr, - current_point[0], - current_point[1], - next_point[0], - next_point[1], - outline_color, - width, - ) - - # Fill the polygon if a fill color is provided - if fill_color is not None: - # Get the bounding box of the polygon - min_x = max(0, min(p[0] for p in points)) - max_x = min(arr.shape[1] - 1, max(p[0] for p in points)) - min_y = max(0, min(p[1] for p in points)) - max_y = min(arr.shape[0] - 1, max(p[1] for p in points)) - - # Create a mask for the polygon region - mask = np.zeros((max_y - min_y + 1, max_x - min_x + 1), dtype=bool) - - # Adjust points to the mask's coordinate system - adjusted_points = [(p[0] - min_x, p[1] - min_y) for p in points] - - # Test each point in the grid - for i in range(mask.shape[0]): - for j in range(mask.shape[1]): - mask[i, j] = Drawable.point_inside(j, i, adjusted_points) - - # Apply the fill color to the masked region - arr[min_y : max_y + 1, min_x : max_x + 1][mask] = fill_color - - return arr - @staticmethod async def zones(layers: NumpyArray, coordinates, color: Color) -> NumpyArray: """ @@ -420,14 +352,18 @@ async def zones(layers: NumpyArray, coordinates, color: Color) -> NumpyArray: mask_rgba = np.zeros((box_h, box_w, 4), dtype=np.uint8) # Convert points to xs, ys arrays (adjusted to local bbox coordinates) - xs = np.array([int(pts[i] - min_x) for i in range(0, len(pts), 2)], dtype=np.int32) - ys = np.array([int(pts[i] - min_y) for i in range(1, len(pts), 2)], dtype=np.int32) + xs = np.array( + [int(pts[i] - min_x) for i in range(0, len(pts), 2)], dtype=np.int32 + ) + ys = np.array( + [int(pts[i] - min_y) for i in range(1, len(pts), 2)], dtype=np.int32 + ) # Draw filled polygon on mask polygon_u8(mask_rgba, xs, ys, (0, 0, 0, 0), 0, (255, 255, 255, 255)) # Extract boolean mask from first channel - zone_mask = (mask_rgba[:, :, 0] > 0) + zone_mask = mask_rgba[:, :, 0] > 0 del mask_rgba del xs del ys diff --git a/SCR/valetudo_map_parser/config/rand256_parser.py b/SCR/valetudo_map_parser/config/rand256_parser.py index 11088ca..fe4131a 100644 --- a/SCR/valetudo_map_parser/config/rand256_parser.py +++ b/SCR/valetudo_map_parser/config/rand256_parser.py @@ -281,6 +281,49 @@ def parse_blocks(self, raw: bytes, pixels: bool = True) -> Dict[int, Any]: break return blocks + def _process_image_pixels( + self, + buf: bytes, + offset: int, + g3offset: int, + length: int, + pixels: bool, + parameters: Dict[str, Any], + ) -> None: + """Process image pixels sequentially - segments are organized as blocks.""" + current_segments = {} + + for i in range(length): + pixel_byte = struct.unpack( + "> 3 + if segment_id == 0 and pixels: + # Floor pixel + parameters["pixels"]["floor"].append(i) + elif segment_id != 0: + # Room segment - segments are sequential blocks + if segment_id not in current_segments: + parameters["segments"]["id"].append(segment_id) + parameters["segments"]["pixels_seg_" + str(segment_id)] = [] + current_segments[segment_id] = True + + if pixels: + parameters["segments"]["pixels_seg_" + str(segment_id)].append( + i + ) + def _parse_image_block( self, buf: bytes, offset: int, length: int, hlength: int, pixels: bool = True ) -> Dict[str, Any]: @@ -330,41 +373,9 @@ def _parse_image_block( parameters["dimensions"]["height"] > 0 and parameters["dimensions"]["width"] > 0 ): - # Process data sequentially - segments are organized as blocks - current_segments = {} - - for i in range(length): - pixel_byte = struct.unpack( - "> 3 - if segment_id == 0 and pixels: - # Floor pixel - parameters["pixels"]["floor"].append(i) - elif segment_id != 0: - # Room segment - segments are sequential blocks - if segment_id not in current_segments: - parameters["segments"]["id"].append(segment_id) - parameters["segments"][ - "pixels_seg_" + str(segment_id) - ] = [] - current_segments[segment_id] = True - - if pixels: - parameters["segments"][ - "pixels_seg_" + str(segment_id) - ].append(i) + self._process_image_pixels( + buf, offset, g3offset, length, pixels, parameters + ) parameters["segments"]["count"] = len(parameters["segments"]["id"]) return parameters @@ -377,6 +388,79 @@ def _parse_image_block( "pixels": {"floor": [], "walls": [], "segments": {}}, } + def _calculate_angle_from_points(self, points: list) -> Optional[float]: + """Calculate angle from last two points in a path.""" + if len(points) >= 2: + last_point = points[-1] + second_last = points[-2] + dx = last_point[0] - second_last[0] + dy = last_point[1] - second_last[1] + if dx != 0 or dy != 0: + angle_rad = math.atan2(dy, dx) + return math.degrees(angle_rad) + return None + + def _transform_path_coordinates(self, points: list) -> list: + """Apply coordinate transformation to path points.""" + return [[point[0], self.Tools.DIMENSION_MM - point[1]] for point in points] + + def _parse_path_data(self, blocks: dict, parsed_map_data: dict) -> list: + """Parse path data with coordinate transformation.""" + transformed_path_points = [] + if self.Types.PATH.value in blocks: + path_data = blocks[self.Types.PATH.value].copy() + transformed_path_points = self._transform_path_coordinates( + path_data["points"] + ) + path_data["points"] = transformed_path_points + + angle = self._calculate_angle_from_points(transformed_path_points) + if angle is not None: + path_data["current_angle"] = angle + parsed_map_data["path"] = path_data + return transformed_path_points + + def _parse_goto_path_data(self, blocks: dict, parsed_map_data: dict) -> None: + """Parse goto predicted path with coordinate transformation.""" + if self.Types.GOTO_PREDICTED_PATH.value in blocks: + goto_path_data = blocks[self.Types.GOTO_PREDICTED_PATH.value].copy() + goto_path_data["points"] = self._transform_path_coordinates( + goto_path_data["points"] + ) + + angle = self._calculate_angle_from_points(goto_path_data["points"]) + if angle is not None: + goto_path_data["current_angle"] = angle + parsed_map_data["goto_predicted_path"] = goto_path_data + + def _add_zone_data(self, blocks: dict, parsed_map_data: dict) -> None: + """Add zone and area data to parsed map.""" + parsed_map_data["currently_cleaned_zones"] = ( + blocks[self.Types.CURRENTLY_CLEANED_ZONES.value]["zones"] + if self.Types.CURRENTLY_CLEANED_ZONES.value in blocks + else [] + ) + parsed_map_data["forbidden_zones"] = ( + blocks[self.Types.FORBIDDEN_ZONES.value]["forbidden_zones"] + if self.Types.FORBIDDEN_ZONES.value in blocks + else [] + ) + parsed_map_data["forbidden_mop_zones"] = ( + blocks[self.Types.FORBIDDEN_MOP_ZONES.value]["forbidden_mop_zones"] + if self.Types.FORBIDDEN_MOP_ZONES.value in blocks + else [] + ) + parsed_map_data["virtual_walls"] = ( + blocks[self.Types.VIRTUAL_WALLS.value]["virtual_walls"] + if self.Types.VIRTUAL_WALLS.value in blocks + else [] + ) + parsed_map_data["carpet_areas"] = ( + blocks[self.Types.CARPET_MAP.value]["carpet_map"] + if self.Types.CARPET_MAP.value in blocks + else [] + ) + def parse_rrm_data( self, map_buf: bytes, pixels: bool = False ) -> Optional[Dict[str, Any]]: @@ -393,39 +477,14 @@ def parse_rrm_data( robot_data = blocks[self.Types.ROBOT_POSITION.value] parsed_map_data["robot"] = robot_data["position"] - # Parse path data with coordinate transformation FIRST - transformed_path_points = [] - if self.Types.PATH.value in blocks: - path_data = blocks[self.Types.PATH.value].copy() - # Apply coordinate transformation like current parser - transformed_path_points = [ - [point[0], self.Tools.DIMENSION_MM - point[1]] - for point in path_data["points"] - ] - path_data["points"] = transformed_path_points - - # Calculate current angle from transformed points - if len(transformed_path_points) >= 2: - last_point = transformed_path_points[-1] - second_last = transformed_path_points[-2] - dx = last_point[0] - second_last[0] - dy = last_point[1] - second_last[1] - if dx != 0 or dy != 0: - angle_rad = math.atan2(dy, dx) - path_data["current_angle"] = math.degrees(angle_rad) - parsed_map_data["path"] = path_data - - # Get robot angle from TRANSFORMED path data (like current implementation) - robot_angle = 0 - if len(transformed_path_points) >= 2: - last_point = transformed_path_points[-1] - second_last = transformed_path_points[-2] - dx = last_point[0] - second_last[0] - dy = last_point[1] - second_last[1] - if dx != 0 or dy != 0: - angle_rad = math.atan2(dy, dx) - robot_angle = int(math.degrees(angle_rad)) + # Parse path data with coordinate transformation + transformed_path_points = self._parse_path_data(blocks, parsed_map_data) + # Get robot angle from transformed path data + robot_angle = 0 + angle = self._calculate_angle_from_points(transformed_path_points) + if angle is not None: + robot_angle = int(angle) parsed_map_data["robot_angle"] = robot_angle # Parse charger position @@ -438,24 +497,7 @@ def parse_rrm_data( parsed_map_data["image"] = blocks[self.Types.IMAGE.value] # Parse goto predicted path - if self.Types.GOTO_PREDICTED_PATH.value in blocks: - goto_path_data = blocks[self.Types.GOTO_PREDICTED_PATH.value].copy() - # Apply coordinate transformation - goto_path_data["points"] = [ - [point[0], self.Tools.DIMENSION_MM - point[1]] - for point in goto_path_data["points"] - ] - # Calculate current angle from transformed points (like working parser) - if len(goto_path_data["points"]) >= 2: - points = goto_path_data["points"] - last_point = points[-1] - second_last = points[-2] - dx = last_point[0] - second_last[0] - dy = last_point[1] - second_last[1] - if dx != 0 or dy != 0: - angle_rad = math.atan2(dy, dx) - goto_path_data["current_angle"] = math.degrees(angle_rad) - parsed_map_data["goto_predicted_path"] = goto_path_data + self._parse_goto_path_data(blocks, parsed_map_data) # Parse goto target if self.Types.GOTO_TARGET.value in blocks: @@ -463,32 +505,8 @@ def parse_rrm_data( "position" ] - # Add missing fields to match expected JSON format - parsed_map_data["currently_cleaned_zones"] = ( - blocks[self.Types.CURRENTLY_CLEANED_ZONES.value]["zones"] - if self.Types.CURRENTLY_CLEANED_ZONES.value in blocks - else [] - ) - parsed_map_data["forbidden_zones"] = ( - blocks[self.Types.FORBIDDEN_ZONES.value]["forbidden_zones"] - if self.Types.FORBIDDEN_ZONES.value in blocks - else [] - ) - parsed_map_data["forbidden_mop_zones"] = ( - blocks[self.Types.FORBIDDEN_MOP_ZONES.value]["forbidden_mop_zones"] - if self.Types.FORBIDDEN_MOP_ZONES.value in blocks - else [] - ) - parsed_map_data["virtual_walls"] = ( - blocks[self.Types.VIRTUAL_WALLS.value]["virtual_walls"] - if self.Types.VIRTUAL_WALLS.value in blocks - else [] - ) - parsed_map_data["carpet_areas"] = ( - blocks[self.Types.CARPET_MAP.value]["carpet_map"] - if self.Types.CARPET_MAP.value in blocks - else [] - ) + # Add zone and area data + self._add_zone_data(blocks, parsed_map_data) parsed_map_data["is_valid"] = self.is_valid return parsed_map_data diff --git a/SCR/valetudo_map_parser/config/shared.py b/SCR/valetudo_map_parser/config/shared.py index 6bf7455..b811c64 100755 --- a/SCR/valetudo_map_parser/config/shared.py +++ b/SCR/valetudo_map_parser/config/shared.py @@ -10,7 +10,6 @@ from PIL import Image -from .utils import pil_size_rotation from ..const import ( ATTR_CALIBRATION_POINTS, ATTR_CAMERA_MODE, @@ -39,15 +38,17 @@ CONF_VAC_STAT_POS, CONF_VAC_STAT_SIZE, CONF_ZOOM_LOCK_RATIO, - NOT_STREAMING_STATES, DEFAULT_VALUES, + NOT_STREAMING_STATES, ) from .types import ( CameraModes, Colors, + FloorData, PilPNG, TrimsData, ) +from .utils import pil_size_rotation _LOGGER = logging.getLogger(__name__) @@ -120,6 +121,8 @@ def __init__(self, file_name): self.user_language = None self.trim_crop_data = None self.trims = TrimsData.from_dict(DEFAULT_VALUES["trims_data"]) + self.floors_trims: FloorData = {} + self.current_floor: str = "floor_0" self.skip_room_ids: List[str] = [] self.device_info = None self._battery_state = None @@ -127,11 +130,15 @@ def __init__(self, file_name): def vacuum_bat_charged(self) -> bool: """Check if the vacuum is charging.""" if self.vacuum_state != "docked": - self._battery_state = "not_charging" - elif (self._battery_state == "charging_done") and (int(self.vacuum_battery) == 100): + self._battery_state = "not_charging" + elif (self._battery_state == "charging_done") and ( + int(self.vacuum_battery) == 100 + ): self._battery_state = "charged" else: - self._battery_state = "charging" if int(self.vacuum_battery) < 100 else "charging_done" + self._battery_state = ( + "charging" if int(self.vacuum_battery) < 100 else "charging_done" + ) return (self.vacuum_state == "docked") and (self._battery_state == "charging") @staticmethod @@ -222,9 +229,9 @@ def generate_attributes(self) -> dict: def is_streaming(self) -> bool: """Return true if the device is streaming.""" updated_status = self.vacuum_state - attr_is_streaming = ((updated_status not in NOT_STREAMING_STATES - or self.vacuum_bat_charged()) - or not self.binary_image) + attr_is_streaming = ( + updated_status not in NOT_STREAMING_STATES or self.vacuum_bat_charged() + ) or not self.binary_image return attr_is_streaming def to_dict(self) -> dict: @@ -233,7 +240,7 @@ def to_dict(self) -> dict: "image": { "binary": self.binary_image, "size": pil_size_rotation(self.image_rotate, self.new_image), - "streaming": self.is_streaming() + "streaming": self.is_streaming(), }, "attributes": self.generate_attributes(), } @@ -251,9 +258,6 @@ def __init__(self, file_name: str, device_info: dict = None): self.device_info = device_info self.update_shared_data(device_info) - # Automatically initialize shared data for the instance - # self._init_shared_data(device_info) - def update_shared_data(self, device_info): """Initialize the shared data with device_info.""" instance = self.get_instance() # Retrieve the correct instance diff --git a/SCR/valetudo_map_parser/config/status_text/__init__.py b/SCR/valetudo_map_parser/config/status_text/__init__.py new file mode 100644 index 0000000..f6b85ea --- /dev/null +++ b/SCR/valetudo_map_parser/config/status_text/__init__.py @@ -0,0 +1,7 @@ +"""Status text module for vacuum cleaners.""" + +from .status_text import StatusText +from .translations import translations + + +__all__ = ["StatusText", "translations"] diff --git a/SCR/valetudo_map_parser/config/status_text/status_text.py b/SCR/valetudo_map_parser/config/status_text/status_text.py index 8bbf461..8a48c4e 100644 --- a/SCR/valetudo_map_parser/config/status_text/status_text.py +++ b/SCR/valetudo_map_parser/config/status_text/status_text.py @@ -5,12 +5,14 @@ """ from __future__ import annotations + from typing import Callable -from ...const import text_size_coverage, charge_level, charging, dot +from ...const import charge_level, charging, dot, text_size_coverage from ..types import LOGGER, PilPNG from .translations import translations + LOGGER.propagate = True @@ -30,7 +32,7 @@ def __init__(self, camera_shared): self._docked_ready, self._active, self._mqtt_disconnected, - ] # static ordered sequence of compose functions + ] # static ordered sequence of compose functions @staticmethod async def _get_vacuum_status_translation( @@ -57,7 +59,9 @@ def _mqtt_disconnected(self, current_state: list[str]) -> list[str]: if not self._shared.vacuum_connection: mqtt_disc = (self._lang_map or {}).get( "mqtt_disconnected", - translations.get("en", {}).get("mqtt_disconnected", "Disconnected from MQTT?"), + translations.get("en", {}).get( + "mqtt_disconnected", "Disconnected from MQTT?" + ), ) return [f"{self.file_name}: {mqtt_disc}"] return current_state @@ -72,7 +76,10 @@ def _docked_charged(self, current_state: list[str]) -> list[str]: def _docked_ready(self, current_state: list[str]) -> list[str]: """Return the translated docked and ready status.""" - if self._shared.vacuum_state == "docked" and not self._shared.vacuum_bat_charged(): + if ( + self._shared.vacuum_state == "docked" + and not self._shared.vacuum_bat_charged() + ): current_state.append(dot) current_state.append(f"{charge_level} ") ready_txt = (self._lang_map or {}).get( @@ -112,7 +119,5 @@ async def get_status_text(self, text_img: PilPNG) -> tuple[list[str], int]: status_text = func(status_text) if text_size >= 50 and getattr(text_img, "width", None): text_pixels = max(1, sum(len(text) for text in status_text)) - text_size = int( - (text_size_coverage * text_img.width) // text_pixels - ) + text_size = int((text_size_coverage * text_img.width) // text_pixels) return status_text, text_size diff --git a/SCR/valetudo_map_parser/config/types.py b/SCR/valetudo_map_parser/config/types.py index 1681406..8e703ff 100644 --- a/SCR/valetudo_map_parser/config/types.py +++ b/SCR/valetudo_map_parser/config/types.py @@ -20,21 +20,29 @@ class Spot(TypedDict): + """Type definition for a spot location.""" + name: str coordinates: List[int] # [x, y] class Zone(TypedDict): + """Type definition for a zone area.""" + name: str coordinates: List[List[int]] # [[x1, y1, x2, y2, repeats], ...] class Room(TypedDict): + """Type definition for a room.""" + name: str id: int class Destinations(TypedDict, total=False): + """Type definition for destinations including spots, zones, and rooms.""" + spots: NotRequired[Optional[List[Spot]]] zones: NotRequired[Optional[List[Zone]]] rooms: NotRequired[Optional[List[Room]]] @@ -42,6 +50,8 @@ class Destinations(TypedDict, total=False): class RoomProperty(TypedDict): + """Type definition for room properties including outline.""" + number: int outline: list[tuple[int, int]] name: str @@ -94,30 +104,49 @@ def from_list(data: list): class RoomStore: + """Singleton storage for room data per vacuum ID. + + Stores room properties in format: {segment_id: RoomProperty} + Example: {"16": {"number": 16, "outline": [...], "name": "Living Room", "x": 100, "y": 200}} + """ + _instances: Dict[str, "RoomStore"] = {} _lock = threading.Lock() - def __new__(cls, vacuum_id: str, rooms_data: Optional[dict] = None) -> "RoomStore": + def __new__( + cls, vacuum_id: str, rooms_data: Optional[Dict[str, RoomProperty]] = None + ) -> "RoomStore": with cls._lock: if vacuum_id not in cls._instances: instance = super(RoomStore, cls).__new__(cls) - instance.vacuum_id = vacuum_id - instance.vacuums_data = rooms_data or {} - instance.rooms_count = instance.get_rooms_count() - instance.floor = None cls._instances[vacuum_id] = instance - else: - if rooms_data is not None: - cls._instances[vacuum_id].vacuums_data = rooms_data - return cls._instances[vacuum_id] + return cls._instances[vacuum_id] - def get_rooms(self) -> dict: + def __init__( + self, vacuum_id: str, rooms_data: Optional[Dict[str, RoomProperty]] = None + ) -> None: + # Only initialize if this is a new instance (not yet initialized) + if not hasattr(self, "vacuum_id"): + self.vacuum_id: str = vacuum_id + self.vacuums_data: Dict[str, RoomProperty] = rooms_data or {} + self.rooms_count: int = self.get_rooms_count() + self.floor: Optional[str] = None + elif rooms_data is not None: + # Update only if new data is provided + self.vacuums_data = rooms_data + self.rooms_count = self.get_rooms_count() + + def get_rooms(self) -> Dict[str, RoomProperty]: + """Get all rooms data.""" return self.vacuums_data - def set_rooms(self, rooms_data: dict) -> None: + def set_rooms(self, rooms_data: Dict[str, RoomProperty]) -> None: + """Set rooms data and update room count.""" self.vacuums_data = rooms_data + self.rooms_count = self.get_rooms_count() def get_rooms_count(self) -> int: + """Get the number of rooms, defaulting to 1 if no rooms are present.""" if isinstance(self.vacuums_data, dict): count = len(self.vacuums_data) return count if count > 0 else DEFAULT_ROOMS @@ -125,11 +154,14 @@ def get_rooms_count(self) -> int: @property def room_names(self) -> dict: - """Return room names in format {'room_0_name': 'SegmentID: RoomName', ...}.""" + """Return room names in format {'room_0_name': 'SegmentID: RoomName', ...}. + + Maximum of 16 rooms supported. + """ result = {} if isinstance(self.vacuums_data, dict): for idx, (segment_id, room_data) in enumerate(self.vacuums_data.items()): - if idx >= 16: # Max 16 rooms + if idx >= 16: # Max 16 rooms supported break room_name = room_data.get("name", f"Room {segment_id}") result[f"room_{idx}_name"] = f"{segment_id}: {room_name}" @@ -137,6 +169,7 @@ def room_names(self) -> dict: @classmethod def get_all_instances(cls) -> Dict[str, "RoomStore"]: + """Get all RoomStore instances for all vacuum IDs.""" return cls._instances @@ -229,8 +262,9 @@ async def async_set_vacuum_json(self, vacuum_id: str, json_data: Any) -> None: async with self._lock: self.vacuum_json_data[vacuum_id] = json_data + class CameraModes: - """Constants for the camera modes""" + """Constants for the camera modes.""" MAP_VIEW = "map_view" OBSTACLE_VIEW = "obstacle_view" @@ -277,6 +311,19 @@ def to_dict(self) -> dict: """Convert TrimData to a dictionary.""" return asdict(self) + @classmethod + def from_list(cls, crop_area: List[int], floor: Optional[str] = None): + """ + Initialize TrimsData from a list [trim_up, trim_left, trim_down, trim_right] + """ + return cls( + trim_up=crop_area[0], + trim_left=crop_area[1], + trim_down=crop_area[2], + trim_right=crop_area[3], + floor=floor, + ) + def clear(self) -> dict: """Clear all the trims.""" self.floor = "" @@ -286,6 +333,27 @@ def clear(self) -> dict: self.trim_right = 0 return asdict(self) + +@dataclass +class FloorData: + """Dataclass to store floor configuration.""" + + trims: TrimsData + map_name: str = "" + + @classmethod + def from_dict(cls, data: dict): + """Initialize FloorData from a dictionary.""" + return cls( + trims=TrimsData.from_dict(data.get("trims", {})), + map_name=data.get("map_name", ""), + ) + + def to_dict(self) -> dict: + """Convert FloorData to a dictionary.""" + return {"trims": self.trims.to_dict(), "map_name": self.map_name} + + Color = Union[Tuple[int, int, int], Tuple[int, int, int, int]] Colors = Dict[str, Color] CalibrationPoints = list[dict[str, Any]] @@ -299,4 +367,4 @@ def clear(self) -> dict: JsonType = Any # json.loads() return type is Any PilPNG = Image.Image # Keep for backward compatibility NumpyArray = np.ndarray -Point = Tuple[int, int] \ No newline at end of file +Point = Tuple[int, int] diff --git a/SCR/valetudo_map_parser/config/utils.py b/SCR/valetudo_map_parser/config/utils.py index fb0019d..c03facd 100644 --- a/SCR/valetudo_map_parser/config/utils.py +++ b/SCR/valetudo_map_parser/config/utils.py @@ -24,6 +24,7 @@ PilPNG, RobotPosition, Size, + TrimsData, ) @@ -110,80 +111,17 @@ async def async_get_image( """ try: # Backup current image to last_image before processing new one - if hasattr(self.shared, "new_image") and self.shared.new_image is not None: - # Close old last_image to free memory before replacing it - if hasattr(self.shared, "last_image") and self.shared.last_image is not None: - try: - self.shared.last_image.close() - except Exception: - pass # Ignore errors if image is already closed - self.shared.last_image = self.shared.new_image + self._backup_last_image() # Call the appropriate handler method based on handler type - if hasattr(self, "get_image_from_rrm"): - # This is a Rand256 handler - new_image = await self.get_image_from_rrm( - m_json=m_json, - destinations=destinations, - ) - - elif hasattr(self, "async_get_image_from_json"): - # This is a Hypfer handler - self.json_data = await HyperMapData.async_from_valetudo_json(m_json) - new_image = await self.async_get_image_from_json( - m_json=m_json, - ) - else: - LOGGER.warning( - "%s: Handler type not recognized for async_get_image", - self.file_name, - ) - return ( - self.shared.last_image - if hasattr(self.shared, "last_image") - else None - ) + new_image = await self._generate_new_image(m_json, destinations) + if new_image is None: + return self._handle_failed_image_generation() - # Store the new image in shared data - if new_image is not None: - # Update shared data - await self._async_update_shared_data(destinations) - self.shared.new_image = new_image - # Add text to the image - if self.shared.show_vacuum_state: - text_editor = StatusText(self.shared) - img_text = await text_editor.get_status_text(new_image) - Drawable.status_text( - new_image, - img_text[1], - self.shared.user_colors[8], - img_text[0], - self.shared.vacuum_status_font, - self.shared.vacuum_status_position, - ) - # Convert to binary (PNG bytes) if requested - if bytes_format: - self.shared.binary_image = pil_to_png_bytes(new_image) - else: - self.shared.binary_image = pil_to_png_bytes(self.shared.last_image) - # Update the timestamp with current datetime - self.shared.image_last_updated = datetime.datetime.fromtimestamp(time()) - LOGGER.debug("%s: Frame Completed.", self.file_name) - data = {} - if bytes_format: - data = self.shared.to_dict() - return new_image, data - else: - LOGGER.warning( - "%s: Failed to generate image from JSON data", self.file_name - ) - return ( - self.shared.last_image - if hasattr(self.shared, "last_image") - else None - ), self.shared.to_dict() + # Process and store the new image + return await self._process_new_image(new_image, destinations, bytes_format) - except Exception as e: + except (ValueError, TypeError, AttributeError, KeyError, RuntimeError) as e: LOGGER.warning( "%s: Error in async_get_image: %s", self.file_name, @@ -191,24 +129,111 @@ async def async_get_image( exc_info=True, ) return ( - self.shared.last_image if hasattr(self.shared, "last_image") else None + self.shared.last_image if hasattr(self.shared, "last_image") else None, + {}, ) + def _backup_last_image(self): + """Backup current image to last_image before processing new one.""" + if hasattr(self.shared, "new_image") and self.shared.new_image is not None: + # Close old last_image to free memory before replacing it + if ( + hasattr(self.shared, "last_image") + and self.shared.last_image is not None + ): + try: + self.shared.last_image.close() + except (OSError, AttributeError, RuntimeError): + pass # Ignore errors if image is already closed + self.shared.last_image = self.shared.new_image + + async def _generate_new_image( + self, m_json: dict | None, destinations: Destinations | None + ) -> PilPNG | None: + """Generate new image based on handler type.""" + if hasattr(self, "get_image_from_rrm"): + # This is a Rand256 handler + return await self.get_image_from_rrm( + m_json=m_json, + destinations=destinations, + ) + + if hasattr(self, "async_get_image_from_json"): + # This is a Hypfer handler + self.json_data = await HyperMapData.async_from_valetudo_json(m_json) + return await self.async_get_image_from_json(m_json=m_json) + + LOGGER.warning( + "%s: Handler type not recognized for async_get_image", + self.file_name, + ) + return None + + def _handle_failed_image_generation(self) -> Tuple[PilPNG | None, dict]: + """Handle case when image generation fails.""" + LOGGER.warning("%s: Failed to generate image from JSON data", self.file_name) + return ( + self.shared.last_image if hasattr(self.shared, "last_image") else None + ), self.shared.to_dict() + + async def _process_new_image( + self, new_image: PilPNG, destinations: Destinations | None, bytes_format: bool + ) -> Tuple[PilPNG, dict]: + """Process and store the new image with text and binary conversion.""" + # Update shared data + await self._async_update_shared_data(destinations) + self.shared.new_image = new_image + + # Add text to the image + if self.shared.show_vacuum_state: + await self._add_status_text(new_image) + + # Convert to binary (PNG bytes) if requested + self._convert_to_binary(new_image, bytes_format) + + # Update the timestamp with current datetime + self.shared.image_last_updated = datetime.datetime.fromtimestamp(time()) + LOGGER.debug("%s: Frame Completed.", self.file_name) + + data = self.shared.to_dict() if bytes_format else {} + return new_image, data + + async def _add_status_text(self, new_image: PilPNG): + """Add status text to the image.""" + text_editor = StatusText(self.shared) + img_text = await text_editor.get_status_text(new_image) + Drawable.status_text( + new_image, + img_text[1], + self.shared.user_colors[8], + img_text[0], + self.shared.vacuum_status_font, + self.shared.vacuum_status_position, + ) + + def _convert_to_binary(self, new_image: PilPNG, bytes_format: bool): + """Convert image to binary PNG bytes.""" + if bytes_format: + self.shared.binary_image = pil_to_png_bytes(new_image) + else: + self.shared.binary_image = pil_to_png_bytes(self.shared.last_image) + async def _async_update_shared_data(self, destinations: Destinations | None = None): """Update the shared data with the latest information.""" if hasattr(self, "get_rooms_attributes") and ( self.shared.map_rooms is None and destinations is not None ): + # pylint: disable=no-member self.shared.map_rooms = await self.get_rooms_attributes(destinations) if self.shared.map_rooms: LOGGER.debug("%s: Rand256 attributes rooms updated", self.file_name) - if hasattr(self, "async_get_rooms_attributes") and ( self.shared.map_rooms is None ): if self.shared.map_rooms is None: + # pylint: disable=no-member self.shared.map_rooms = await self.async_get_rooms_attributes() if self.shared.map_rooms: LOGGER.debug("%s: Hyper attributes rooms updated", self.file_name) @@ -217,6 +242,7 @@ async def _async_update_shared_data(self, destinations: Destinations | None = No hasattr(self, "get_calibration_data") and self.shared.attr_calibration_points is None ): + # pylint: disable=no-member self.shared.attr_calibration_points = self.get_calibration_data( self.shared.image_rotate ) @@ -247,6 +273,10 @@ def prepare_resize_params( is_rand=rand, ) + def update_trims(self) -> None: + """Update the trims.""" + self.shared.trims = TrimsData.from_list(self.crop_area) + def get_charger_position(self) -> ChargerPosition | None: """Return the charger position.""" return self.charger_pos @@ -472,7 +502,8 @@ async def calculate_array_hash( return hashlib.sha256(data_json.encode()).hexdigest() return None - async def async_copy_array(self, original_array: NumpyArray) -> NumpyArray: + @staticmethod + async def async_copy_array(original_array: NumpyArray) -> NumpyArray: """Copy the array using AsyncNumPy to yield control to the event loop.""" return await AsyncNumPy.async_copy(original_array) diff --git a/SCR/valetudo_map_parser/const.py b/SCR/valetudo_map_parser/const.py index 10d3173..e86c317 100644 --- a/SCR/valetudo_map_parser/const.py +++ b/SCR/valetudo_map_parser/const.py @@ -1,3 +1,4 @@ +"""Constants for the Valetudo Map Parser library.""" CAMERA_STORAGE = "valetudo_camera" ATTR_IMAGE_LAST_UPDATED = "image_last_updated" @@ -280,7 +281,7 @@ ATTR_OBSTACLES = "obstacles" ATTR_CAMERA_MODE = "camera_mode" -# Status text cost +# Status text constants charge_level = "\u03de" # unicode Koppa symbol charging = "\u2211" # unicode Charging symbol dot = " \u00b7 " # unicode middle dot diff --git a/SCR/valetudo_map_parser/hypfer_draw.py b/SCR/valetudo_map_parser/hypfer_draw.py index fb74262..183b60a 100755 --- a/SCR/valetudo_map_parser/hypfer_draw.py +++ b/SCR/valetudo_map_parser/hypfer_draw.py @@ -376,170 +376,141 @@ def _check_active_zone_and_set_zooming(self) -> None: else: self.img_h.zooming = False + def _create_robot_position_dict( + self, robot_x: int, robot_y: int, angle: float, room_name: str | None + ) -> RobotPosition: + """Create a robot position dictionary.""" + return { + "x": robot_x, + "y": robot_y, + "angle": angle, + "in_room": room_name, + } + + def _check_cached_room_outline( + self, robot_x: int, robot_y: int, angle: float + ) -> RobotPosition | None: + """Check if robot is still in cached room using outline.""" + if "outline" in self.img_h.robot_in_room: + outline = self.img_h.robot_in_room["outline"] + if point_in_polygon(int(robot_x), int(robot_y), outline): + self._check_active_zone_and_set_zooming() + return self._create_robot_position_dict( + robot_x, robot_y, angle, self.img_h.robot_in_room["room"] + ) + return None + + def _check_cached_room_bbox( + self, robot_x: int, robot_y: int, angle: float + ) -> RobotPosition | None: + """Check if robot is still in cached room using bounding box.""" + if all(k in self.img_h.robot_in_room for k in ["left", "right", "up", "down"]): + if ( + (self.img_h.robot_in_room["right"] >= int(robot_x)) + and (self.img_h.robot_in_room["left"] <= int(robot_x)) + ) and ( + (self.img_h.robot_in_room["down"] >= int(robot_y)) + and (self.img_h.robot_in_room["up"] <= int(robot_y)) + ): + self._check_active_zone_and_set_zooming() + return self._create_robot_position_dict( + robot_x, robot_y, angle, self.img_h.robot_in_room["room"] + ) + return None + + def _check_room_with_outline( + self, room: dict, room_count: int, robot_x: int, robot_y: int, angle: float + ) -> RobotPosition | None: + """Check if robot is in room using outline polygon.""" + outline = room["outline"] + if point_in_polygon(int(robot_x), int(robot_y), outline): + self.img_h.robot_in_room = { + "id": room.get("id", room_count), + "room": str(room["name"]), + "outline": outline, + } + self._check_active_zone_and_set_zooming() + return self._create_robot_position_dict( + robot_x, robot_y, angle, self.img_h.robot_in_room["room"] + ) + return None + + def _check_room_with_corners( + self, room: dict, room_count: int, robot_x: int, robot_y: int, angle: float + ) -> RobotPosition | None: + """Check if robot is in room using corner bounding box.""" + corners = room["corners"] + self.img_h.robot_in_room = { + "id": room.get("id", room_count), + "left": int(corners[0][0]), + "right": int(corners[2][0]), + "up": int(corners[0][1]), + "down": int(corners[2][1]), + "room": str(room["name"]), + } + if ( + (self.img_h.robot_in_room["right"] >= int(robot_x)) + and (self.img_h.robot_in_room["left"] <= int(robot_x)) + ) and ( + (self.img_h.robot_in_room["down"] >= int(robot_y)) + and (self.img_h.robot_in_room["up"] <= int(robot_y)) + ): + self._check_active_zone_and_set_zooming() + return self._create_robot_position_dict( + robot_x, robot_y, angle, self.img_h.robot_in_room["room"] + ) + return None + async def async_get_robot_in_room( self, robot_y: int = 0, robot_x: int = 0, angle: float = 0.0 ) -> RobotPosition: """Get the robot position and return in what room is.""" - # First check if we already have a cached room and if the robot is still in it - if self.img_h.robot_in_room: - # If we have outline data, use point_in_polygon for accurate detection - if "outline" in self.img_h.robot_in_room: - outline = self.img_h.robot_in_room["outline"] - if point_in_polygon(int(robot_x), int(robot_y), outline): - temp = { - "x": robot_x, - "y": robot_y, - "angle": angle, - "in_room": self.img_h.robot_in_room["room"], - } - # Handle active zones - self._check_active_zone_and_set_zooming() - return temp - # Fallback to bounding box check if no outline data - elif all( - k in self.img_h.robot_in_room for k in ["left", "right", "up", "down"] - ): - if ( - (self.img_h.robot_in_room["right"] >= int(robot_x)) - and (self.img_h.robot_in_room["left"] <= int(robot_x)) - ) and ( - (self.img_h.robot_in_room["down"] >= int(robot_y)) - and (self.img_h.robot_in_room["up"] <= int(robot_y)) - ): - temp = { - "x": robot_x, - "y": robot_y, - "angle": angle, - "in_room": self.img_h.robot_in_room["room"], - } - # Handle active zones - self._check_active_zone_and_set_zooming() - return temp - - # If we don't have a cached room or the robot is not in it, search all rooms - last_room = None - room_count = 0 + # Check cached room first if self.img_h.robot_in_room: - last_room = self.img_h.robot_in_room - - # Check if the robot is far outside the normal map boundaries - # This helps prevent false positives for points very far from any room - map_boundary = 20000 # Typical map size is around 5000-10000 units - if abs(robot_x) > map_boundary or abs(robot_y) > map_boundary: - self.img_h.robot_in_room = last_room - self.img_h.zooming = False - temp = { - "x": robot_x, - "y": robot_y, - "angle": angle, - "in_room": last_room["room"] if last_room else None, - } - return temp - - # Search through all rooms to find which one contains the robot - if self.img_h.rooms_pos is None: + result = self._check_cached_room_outline(robot_x, robot_y, angle) + if result: + return result + result = self._check_cached_room_bbox(robot_x, robot_y, angle) + if result: + return result + + # Prepare for room search + last_room = self.img_h.robot_in_room + map_boundary = 20000 + + # Check boundary conditions or missing room data + if ( + abs(robot_x) > map_boundary + or abs(robot_y) > map_boundary + or self.img_h.rooms_pos is None + ): self.img_h.robot_in_room = last_room self.img_h.zooming = False - temp = { - "x": robot_x, - "y": robot_y, - "angle": angle, - "in_room": last_room["room"] if last_room else None, - } - return temp + return self._create_robot_position_dict( + robot_x, robot_y, angle, last_room["room"] if last_room else None + ) - for room in self.img_h.rooms_pos: - # Check if the room has an outline (polygon points) + # Search through all rooms + for room_count, room in enumerate(self.img_h.rooms_pos): if "outline" in room: - outline = room["outline"] - # Use point_in_polygon for accurate detection with complex shapes - if point_in_polygon(int(robot_x), int(robot_y), outline): - # Robot is in this room - self.img_h.robot_in_room = { - "id": room.get( - "id", room_count - ), # Use actual segment ID if available - "room": str(room["name"]), - "outline": outline, - } - temp = { - "x": robot_x, - "y": robot_y, - "angle": angle, - "in_room": self.img_h.robot_in_room["room"], - } - - # Handle active zones - Map segment ID to active_zones position - if self.img_h.active_zones: - segment_id = str(self.img_h.robot_in_room["id"]) - room_store = RoomStore(self.file_name) - room_keys = list(room_store.get_rooms().keys()) - - if segment_id in room_keys: - position = room_keys.index(segment_id) - if position < len(self.img_h.active_zones): - self.img_h.zooming = bool( - self.img_h.active_zones[position] - ) - else: - self.img_h.zooming = False - else: - _LOGGER.warning( - "%s: Segment ID %s not found in room_keys %s", - self.file_name, - segment_id, - room_keys, - ) - self.img_h.zooming = False - else: - self.img_h.zooming = False - - return temp - # Fallback to bounding box if no outline is available + result = self._check_room_with_outline( + room, room_count, robot_x, robot_y, angle + ) + if result: + return result elif "corners" in room: - corners = room["corners"] - # Create a bounding box from the corners - self.img_h.robot_in_room = { - "id": room.get( - "id", room_count - ), # Use actual segment ID if available - "left": int(corners[0][0]), - "right": int(corners[2][0]), - "up": int(corners[0][1]), - "down": int(corners[2][1]), - "room": str(room["name"]), - } - # Check if the robot is inside the bounding box - if ( - (self.img_h.robot_in_room["right"] >= int(robot_x)) - and (self.img_h.robot_in_room["left"] <= int(robot_x)) - ) and ( - (self.img_h.robot_in_room["down"] >= int(robot_y)) - and (self.img_h.robot_in_room["up"] <= int(robot_y)) - ): - temp = { - "x": robot_x, - "y": robot_y, - "angle": angle, - "in_room": self.img_h.robot_in_room["room"], - } - - # Handle active zones - self._check_active_zone_and_set_zooming() - - return temp - room_count += 1 + result = self._check_room_with_corners( + room, room_count, robot_x, robot_y, angle + ) + if result: + return result # Robot not found in any room self.img_h.robot_in_room = last_room self.img_h.zooming = False - temp = { - "x": robot_x, - "y": robot_y, - "angle": angle, - "in_room": last_room["room"] if last_room else None, - } - return temp + return self._create_robot_position_dict( + robot_x, robot_y, angle, last_room["room"] if last_room else None + ) async def async_get_robot_position(self, entity_dict: dict) -> tuple | None: """Get the robot position from the entity data.""" diff --git a/SCR/valetudo_map_parser/hypfer_handler.py b/SCR/valetudo_map_parser/hypfer_handler.py index 9fd5167..df310a7 100644 --- a/SCR/valetudo_map_parser/hypfer_handler.py +++ b/SCR/valetudo_map_parser/hypfer_handler.py @@ -29,6 +29,7 @@ BaseHandler, initialize_drawing_config, ) +from .const import COLORS from .hypfer_draw import ImageDraw as ImDraw from .map_data import ImageData from .rooms_handler import RoomsHandler @@ -88,6 +89,189 @@ async def async_extract_room_properties(self, json_data) -> RoomsProperties: self.rooms_pos = None return room_properties + def _identify_disabled_rooms(self) -> set: + """Identify which rooms are disabled in the drawing configuration.""" + disabled_rooms = set() + room_id = 0 + for layer_type, _ in self.json_data.layers.items(): + if layer_type == "segment": + current_room_id = room_id + 1 + if 1 <= current_room_id <= 15: + room_element = getattr( + DrawableElement, f"ROOM_{current_room_id}", None + ) + if room_element and not self.drawing_config.is_enabled( + room_element + ): + disabled_rooms.add(room_id) + room_id = (room_id + 1) % 16 + return disabled_rooms + + async def _draw_layer_if_enabled( + self, + img_np_array, + layer_type, + compressed_pixels_list, + colors, + pixel_size, + disabled_rooms, + room_id, + ): + """Draw a layer if it's enabled in the drawing configuration.""" + is_room_layer = layer_type == "segment" + + if is_room_layer: + current_room_id = room_id + 1 + if 1 <= current_room_id <= 15: + room_element = getattr(DrawableElement, f"ROOM_{current_room_id}", None) + if not self.drawing_config.is_enabled(room_element): + return room_id + 1, img_np_array # Skip this room + + is_wall_layer = layer_type == "wall" + if is_wall_layer and not self.drawing_config.is_enabled(DrawableElement.WALL): + return room_id, img_np_array # Skip walls + + # Draw the layer + room_id, img_np_array = await self.imd.async_draw_base_layer( + img_np_array, + compressed_pixels_list, + layer_type, + colors["wall"], + colors["zone_clean"], + pixel_size, + disabled_rooms if layer_type == "wall" else None, + ) + return room_id, img_np_array + + async def _draw_base_layers(self, img_np_array, colors, pixel_size): + """Draw all base layers (rooms, walls, floors).""" + disabled_rooms = self._identify_disabled_rooms() + room_id = 0 + + for layer_type, compressed_pixels_list in self.json_data.layers.items(): + room_id, img_np_array = await self._draw_layer_if_enabled( + img_np_array, + layer_type, + compressed_pixels_list, + colors, + pixel_size, + disabled_rooms, + room_id, + ) + + return img_np_array, room_id + + async def _draw_additional_elements( + self, img_np_array, m_json, entity_dict, colors + ): + """Draw additional elements like walls, charger, and obstacles.""" + if self.drawing_config.is_enabled(DrawableElement.VIRTUAL_WALL): + img_np_array = await self.imd.async_draw_virtual_walls( + m_json, img_np_array, colors["no_go"] + ) + + if self.drawing_config.is_enabled(DrawableElement.CHARGER): + img_np_array = await self.imd.async_draw_charger( + img_np_array, entity_dict, colors["charger"] + ) + + if self.drawing_config.is_enabled(DrawableElement.OBSTACLE): + self.shared.obstacles_pos = self.data.get_obstacles(entity_dict) + if self.shared.obstacles_pos: + img_np_array = await self.imd.async_draw_obstacle( + img_np_array, self.shared.obstacles_pos, colors["no_go"] + ) + + return img_np_array + + async def _setup_room_and_robot_data( + self, room_id, robot_position, robot_position_angle + ): + """Setup room properties and robot position data.""" + if (room_id > 0) and not self.room_propriety: + self.room_propriety = await self.async_extract_room_properties( + self.json_data.json_data + ) + + if not self.rooms_pos and not self.room_propriety: + self.room_propriety = await self.async_extract_room_properties( + self.json_data.json_data + ) + + if self.rooms_pos and robot_position and robot_position_angle: + self.robot_pos = await self.imd.async_get_robot_in_room( + robot_x=(robot_position[0]), + robot_y=(robot_position[1]), + angle=robot_position_angle, + ) + + async def _prepare_data_tasks(self, m_json, entity_dict): + """Prepare and execute data extraction tasks in parallel.""" + data_tasks = [] + + if self.drawing_config.is_enabled(DrawableElement.RESTRICTED_AREA): + data_tasks.append(self._prepare_zone_data(m_json)) + + if self.drawing_config.is_enabled(DrawableElement.GO_TO_TARGET): + data_tasks.append(self._prepare_goto_data(entity_dict)) + + path_enabled = self.drawing_config.is_enabled(DrawableElement.PATH) + LOGGER.info("%s: PATH element enabled: %s", self.file_name, path_enabled) + if path_enabled: + LOGGER.info("%s: Drawing path", self.file_name) + data_tasks.append(self._prepare_path_data(m_json)) + + if data_tasks: + await asyncio.gather(*data_tasks) + + return path_enabled + + async def _draw_dynamic_elements( + self, img_np_array, m_json, entity_dict, colors, path_enabled + ): + """Draw dynamic elements like zones, paths, and go-to targets.""" + if self.drawing_config.is_enabled(DrawableElement.RESTRICTED_AREA): + img_np_array = await self.imd.async_draw_zones( + m_json, img_np_array, colors["zone_clean"], colors["no_go"] + ) + + if self.drawing_config.is_enabled(DrawableElement.GO_TO_TARGET): + img_np_array = await self.imd.draw_go_to_flag( + img_np_array, entity_dict, colors["go_to"] + ) + + if path_enabled: + img_np_array = await self.imd.async_draw_paths( + img_np_array, m_json, colors["move"], self.color_grey + ) + else: + LOGGER.info("%s: Skipping path drawing", self.file_name) + + return img_np_array + + async def _draw_robot_if_enabled( + self, img_np_array, robot_pos, robot_position, robot_position_angle, colors + ): + """Draw the robot on the map if enabled.""" + if self.shared.vacuum_state == "docked": + robot_position_angle -= 180 + + if robot_pos and self.drawing_config.is_enabled(DrawableElement.ROBOT): + robot_color = self.drawing_config.get_property( + DrawableElement.ROBOT, "color", colors["robot"] + ) + img_np_array = await self.draw.robot( + layers=img_np_array, + x=robot_position[0], + y=robot_position[1], + angle=robot_position_angle, + fill=robot_color, + radius=self.shared.robot_size, + robot_state=self.shared.vacuum_state, + ) + + return img_np_array + # noinspection PyUnresolvedReferences,PyUnboundLocalVariable async def async_get_image_from_json( self, @@ -132,126 +316,21 @@ async def async_get_image_from_json( img_np_array = await self.draw.create_empty_image( self.img_size["x"], self.img_size["y"], colors["background"] ) - # Draw layers and segments if enabled room_id = 0 - # Keep track of disabled rooms to skip their walls later - disabled_rooms = set() if self.drawing_config.is_enabled(DrawableElement.FLOOR): - # First pass: identify disabled rooms - for ( - layer_type, - compressed_pixels_list, - ) in self.json_data.layers.items(): - # Check if this is a room layer - if layer_type == "segment": - # The room_id is the current room being processed (0-based index) - # We need to check if ROOM_{room_id+1} is enabled (1-based in DrawableElement) - current_room_id = room_id + 1 - if 1 <= current_room_id <= 15: - room_element = getattr( - DrawableElement, f"ROOM_{current_room_id}", None - ) - if ( - room_element - and not self.drawing_config.is_enabled( - room_element - ) - ): - # Add this room to the disabled rooms set - disabled_rooms.add(room_id) - room_id = ( - room_id + 1 - ) % 16 # Cycle room_id back to 0 after 15 - - # Reset room_id for the actual drawing pass - room_id = 0 - - # Second pass: draw enabled rooms and walls - for ( - layer_type, - compressed_pixels_list, - ) in self.json_data.layers.items(): - # Check if this is a room layer - is_room_layer = layer_type == "segment" - - # If it's a room layer, check if the specific room is enabled - if is_room_layer: - # The room_id is the current room being processed (0-based index) - # We need to check if ROOM_{room_id+1} is enabled (1-based in DrawableElement) - current_room_id = room_id + 1 - if 1 <= current_room_id <= 15: - room_element = getattr( - DrawableElement, f"ROOM_{current_room_id}", None - ) - - # Skip this room if it's disabled - if not self.drawing_config.is_enabled(room_element): - room_id = ( - room_id + 1 - ) % 16 # Increment room_id even if we skip - continue - - # Draw the layer ONLY if enabled - is_wall_layer = layer_type == "wall" - if is_wall_layer: - # Skip walls entirely if disabled - if not self.drawing_config.is_enabled( - DrawableElement.WALL - ): - continue - # Draw the layer - ( - room_id, - img_np_array, - ) = await self.imd.async_draw_base_layer( - img_np_array, - compressed_pixels_list, - layer_type, - colors["wall"], - colors["zone_clean"], - pixel_size, - disabled_rooms if layer_type == "wall" else None, - ) - - # Draw the virtual walls if enabled - if self.drawing_config.is_enabled(DrawableElement.VIRTUAL_WALL): - img_np_array = await self.imd.async_draw_virtual_walls( - m_json, img_np_array, colors["no_go"] + img_np_array, room_id = await self._draw_base_layers( + img_np_array, colors, pixel_size ) - # Draw charger if enabled - if self.drawing_config.is_enabled(DrawableElement.CHARGER): - img_np_array = await self.imd.async_draw_charger( - img_np_array, entity_dict, colors["charger"] - ) - - # Draw obstacles if enabled - if self.drawing_config.is_enabled(DrawableElement.OBSTACLE): - self.shared.obstacles_pos = self.data.get_obstacles(entity_dict) - if self.shared.obstacles_pos: - img_np_array = await self.imd.async_draw_obstacle( - img_np_array, self.shared.obstacles_pos, colors["no_go"] - ) - # Robot and rooms position - if (room_id > 0) and not self.room_propriety: - self.room_propriety = await self.async_extract_room_properties( - self.json_data.json_data - ) + img_np_array = await self._draw_additional_elements( + img_np_array, m_json, entity_dict, colors + ) - # Ensure room data is available for robot room detection (even if not extracted above) - if not self.rooms_pos and not self.room_propriety: - self.room_propriety = await self.async_extract_room_properties( - self.json_data.json_data - ) + await self._setup_room_and_robot_data( + room_id, robot_position, robot_position_angle + ) - # Always check robot position for zooming (moved outside the condition) - if self.rooms_pos and robot_position and robot_position_angle: - self.robot_pos = await self.imd.async_get_robot_in_room( - robot_x=(robot_position[0]), - robot_y=(robot_position[1]), - angle=robot_position_angle, - ) LOGGER.info("%s: Completed base Layers", self.file_name) # Copy the new array in base layer. # Delete old base layer before creating new one to free memory @@ -282,73 +361,22 @@ async def async_get_image_from_json( np.copyto(self.img_work_layer, self.img_base_layer) img_np_array = self.img_work_layer - # Prepare parallel data extraction tasks - data_tasks = [] - - # Prepare zone data extraction - if self.drawing_config.is_enabled(DrawableElement.RESTRICTED_AREA): - data_tasks.append(self._prepare_zone_data(m_json)) + # Prepare and execute data extraction tasks + path_enabled = await self._prepare_data_tasks(m_json, entity_dict) - # Prepare go_to flag data extraction - if self.drawing_config.is_enabled(DrawableElement.GO_TO_TARGET): - data_tasks.append(self._prepare_goto_data(entity_dict)) - - # Prepare path data extraction - path_enabled = self.drawing_config.is_enabled(DrawableElement.PATH) - LOGGER.info( - "%s: PATH element enabled: %s", self.file_name, path_enabled + # Draw dynamic elements + img_np_array = await self._draw_dynamic_elements( + img_np_array, m_json, entity_dict, colors, path_enabled ) - if path_enabled: - LOGGER.info("%s: Drawing path", self.file_name) - data_tasks.append(self._prepare_path_data(m_json)) - - # Await all data preparation tasks if any were created - if data_tasks: - await asyncio.gather(*data_tasks) - - # Process drawing operations sequentially (since they modify the same array) - # Draw zones if enabled - if self.drawing_config.is_enabled(DrawableElement.RESTRICTED_AREA): - img_np_array = await self.imd.async_draw_zones( - m_json, img_np_array, colors["zone_clean"], colors["no_go"] - ) - - # Draw the go_to target flag if enabled - if self.drawing_config.is_enabled(DrawableElement.GO_TO_TARGET): - img_np_array = await self.imd.draw_go_to_flag( - img_np_array, entity_dict, colors["go_to"] - ) - # Draw paths if enabled - if path_enabled: - img_np_array = await self.imd.async_draw_paths( - img_np_array, m_json, colors["move"], self.color_grey - ) - else: - LOGGER.info("%s: Skipping path drawing", self.file_name) - - # Check if the robot is docked. - if self.shared.vacuum_state == "docked": - # Adjust the robot angle. - robot_position_angle -= 180 - - # Draw the robot if enabled - if robot_pos and self.drawing_config.is_enabled(DrawableElement.ROBOT): - # Get robot color (allows for customization) - robot_color = self.drawing_config.get_property( - DrawableElement.ROBOT, "color", colors["robot"] - ) - - # Draw the robot - img_np_array = await self.draw.robot( - layers=img_np_array, - x=robot_position[0], - y=robot_position[1], - angle=robot_position_angle, - fill=robot_color, - radius=self.shared.robot_size, - robot_state=self.shared.vacuum_state, - ) + # Draw robot + img_np_array = await self._draw_robot_if_enabled( + img_np_array, + robot_pos, + robot_position, + robot_position_angle, + colors, + ) # Synchronize zooming state from ImageDraw to handler before auto-crop self.zooming = self.imd.img_h.zooming @@ -376,11 +404,11 @@ async def async_get_image_from_json( # Return PIL Image return resized_image - else: - # Return PIL Image (convert from NumPy) - pil_img = await AsyncPIL.async_fromarray(img_np_array, mode="RGBA") - del img_np_array - return pil_img + + # Return PIL Image (convert from NumPy) + pil_img = await AsyncPIL.async_fromarray(img_np_array, mode="RGBA") + del img_np_array + return pil_img except (RuntimeError, RuntimeWarning) as e: LOGGER.warning( "%s: Error %s during image creation.", diff --git a/SCR/valetudo_map_parser/map_data.py b/SCR/valetudo_map_parser/map_data.py index 2be9a9e..3c74d58 100755 --- a/SCR/valetudo_map_parser/map_data.py +++ b/SCR/valetudo_map_parser/map_data.py @@ -30,6 +30,8 @@ class RangeStats(TypedDict): + """Statistics for a range of values (min, max, mid, avg).""" + min: int max: int mid: int @@ -37,6 +39,8 @@ class RangeStats(TypedDict): class Dimensions(TypedDict): + """Dimensions with x/y range statistics and pixel count.""" + x: RangeStats y: RangeStats pixelCount: int @@ -46,10 +50,14 @@ class Dimensions(TypedDict): class FloorWallMeta(TypedDict, total=False): + """Metadata for floor and wall layers.""" + area: int class SegmentMeta(TypedDict, total=False): + """Metadata for segment layers including segment ID and active state.""" + segmentId: str active: bool source: str @@ -57,6 +65,8 @@ class SegmentMeta(TypedDict, total=False): class MapLayerBase(TypedDict): + """Base structure for map layers with pixels and dimensions.""" + __class__: Literal["MapLayer"] type: str pixels: list[int] @@ -65,11 +75,15 @@ class MapLayerBase(TypedDict): class FloorWallLayer(MapLayerBase): + """Map layer representing floor or wall areas.""" + metaData: FloorWallMeta type: Literal["floor", "wall"] class SegmentLayer(MapLayerBase): + """Map layer representing a room segment.""" + metaData: SegmentMeta type: Literal["segment"] @@ -78,12 +92,16 @@ class SegmentLayer(MapLayerBase): class PointMeta(TypedDict, total=False): + """Metadata for point entities including angle, label, and ID.""" + angle: float label: str id: str class PointMapEntity(TypedDict): + """Point-based map entity (robot, charger, obstacle, etc.).""" + __class__: Literal["PointMapEntity"] type: str points: list[int] @@ -91,6 +109,8 @@ class PointMapEntity(TypedDict): class PathMapEntity(TypedDict): + """Path-based map entity representing robot movement paths.""" + __class__: Literal["PathMapEntity"] type: str points: list[int] @@ -103,16 +123,22 @@ class PathMapEntity(TypedDict): class MapMeta(TypedDict, total=False): + """Metadata for the Valetudo map including version and total area.""" + version: int totalLayerArea: int class Size(TypedDict): + """Map size with x and y dimensions.""" + x: int y: int class ValetudoMap(TypedDict): + """Complete Valetudo map structure with layers and entities.""" + __class__: Literal["ValetudoMap"] metaData: MapMeta size: Size diff --git a/SCR/valetudo_map_parser/rand256_handler.py b/SCR/valetudo_map_parser/rand256_handler.py index 75a83d9..67c0585 100644 --- a/SCR/valetudo_map_parser/rand256_handler.py +++ b/SCR/valetudo_map_parser/rand256_handler.py @@ -31,6 +31,7 @@ initialize_drawing_config, point_in_polygon, ) +from .const import COLORS, DEFAULT_IMAGE_SIZE, DEFAULT_PIXEL_SIZE from .map_data import RandImageData from .reimg_draw import ImageDraw from .rooms_handler import RandRoomsHandler @@ -48,6 +49,8 @@ def __init__(self, shared_data): AutoCrop.__init__(self, self) self.auto_crop = None # Auto crop flag self.segment_data = None # Segment data + self.element_map = None # Element map for tracking drawable elements + self.robot_position = None # Robot position for zoom functionality self.outlines = None # Outlines data self.calibration_data = None # Calibration data self.data = RandImageData # Image Data @@ -84,7 +87,6 @@ async def extract_room_properties( json_data, size_x, size_y, top, left, True ) - dest_json = destinations if destinations else {} zones_data = dest_json.get("zones", []) points_data = dest_json.get("spots", []) @@ -96,7 +98,7 @@ async def extract_room_properties( # Update self.rooms_pos from room_properties for compatibility with other methods self.rooms_pos = [] - for room_id, room_data in room_properties.items(): + for _, room_data in room_properties.items(): self.rooms_pos.append( {"name": room_data["name"], "outline": room_data["outline"]} ) @@ -195,6 +197,94 @@ async def get_image_from_rrm( # If we reach here without returning, return None return None + async def _initialize_base_layer( + self, + m_json, + size_x, + size_y, + colors, + destinations, + robot_position, + robot_position_angle, + ): + """Initialize the base layer on first frame.""" + self.element_map = np.zeros((size_y, size_x), dtype=np.int32) + self.element_map[:] = DrawableElement.FLOOR + + if self.drawing_config.is_enabled(DrawableElement.FLOOR): + room_id, img_np_array = await self.imd.async_draw_base_layer( + m_json, + size_x, + size_y, + colors["wall"], + colors["zone_clean"], + colors["background"], + DEFAULT_PIXEL_SIZE, + ) + LOGGER.info("%s: Completed base Layers", self.file_name) + + if room_id > 0 and not self.room_propriety: + self.room_propriety = await self.get_rooms_attributes(destinations) + + if not self.rooms_pos and not self.room_propriety: + self.room_propriety = await self.get_rooms_attributes(destinations) + + if ( + self.rooms_pos + and robot_position + and (self.robot_pos is None or "in_room" not in self.robot_pos) + ): + self.robot_pos = await self.async_get_robot_in_room( + (robot_position[0] * 10), + (robot_position[1] * 10), + robot_position_angle, + ) + else: + background_color = self.drawing_config.get_property( + DrawableElement.FLOOR, "color", colors["background"] + ) + img_np_array = await self.draw.create_empty_image( + size_x, size_y, background_color + ) + + if self.img_base_layer is not None: + del self.img_base_layer + self.img_base_layer = await self.async_copy_array(img_np_array) + del img_np_array + + async def _check_zoom_conditions(self, m_json, robot_position, destinations): + """Check and set zoom conditions based on active zones.""" + if not ( + self.shared.image_auto_zoom + and self.shared.vacuum_state == "cleaning" + and robot_position + and destinations + ): + return + + try: + temp_room_properties = ( + await self.rooms_handler.async_extract_room_properties( + m_json, destinations + ) + ) + if temp_room_properties: + temp_rooms_pos = [] + for _, room_data in temp_room_properties.items(): + temp_rooms_pos.append( + {"name": room_data["name"], "outline": room_data["outline"]} + ) + original_rooms_pos = self.rooms_pos + self.rooms_pos = temp_rooms_pos + self.rooms_pos = original_rooms_pos + except (ValueError, KeyError, TypeError): + if ( + self.shared.image_auto_zoom + and self.shared.vacuum_state == "cleaning" + and robot_position + ): + self.zooming = True + async def _setup_robot_and_image( self, m_json, size_x, size_y, colors, destinations ): @@ -206,100 +296,17 @@ async def _setup_robot_and_image( ) = await self.imd.async_get_robot_position(m_json) if self.frame_number == 0: - # Create element map for tracking what's drawn where - self.element_map = np.zeros((size_y, size_x), dtype=np.int32) - self.element_map[:] = DrawableElement.FLOOR - - # Draw base layer if floor is enabled - if self.drawing_config.is_enabled(DrawableElement.FLOOR): - room_id, img_np_array = await self.imd.async_draw_base_layer( - m_json, - size_x, - size_y, - colors["wall"], - colors["zone_clean"], - colors["background"], - DEFAULT_PIXEL_SIZE, - ) - LOGGER.info("%s: Completed base Layers", self.file_name) - - if room_id > 0 and not self.room_propriety: - self.room_propriety = await self.get_rooms_attributes(destinations) - - # Ensure room data is available for robot room detection (even if not extracted above) - if not self.rooms_pos and not self.room_propriety: - self.room_propriety = await self.get_rooms_attributes(destinations) + await self._initialize_base_layer( + m_json, + size_x, + size_y, + colors, + destinations, + robot_position, + robot_position_angle, + ) - # Always check robot position for zooming (update if room info is missing) - if ( - self.rooms_pos - and robot_position - and (self.robot_pos is None or "in_room" not in self.robot_pos) - ): - self.robot_pos = await self.async_get_robot_in_room( - (robot_position[0] * 10), - (robot_position[1] * 10), - robot_position_angle, - ) - # Delete old base layer before creating new one to free memory - if self.img_base_layer is not None: - del self.img_base_layer - self.img_base_layer = await self.async_copy_array(img_np_array) - # Delete source array after copying to free memory - del img_np_array - else: - # If floor is disabled, create an empty image - background_color = self.drawing_config.get_property( - DrawableElement.FLOOR, "color", colors["background"] - ) - img_np_array = await self.draw.create_empty_image( - size_x, size_y, background_color - ) - # Delete old base layer before creating new one to free memory - if self.img_base_layer is not None: - del self.img_base_layer - self.img_base_layer = await self.async_copy_array(img_np_array) - # Delete source array after copying to free memory - del img_np_array - - # Check active zones BEFORE auto-crop to enable proper zoom functionality - # This needs to run on every frame, not just frame 0 - if ( - self.shared.image_auto_zoom - and self.shared.vacuum_state == "cleaning" - and robot_position - and destinations # Check if we have destinations data for room extraction - ): - # Extract room data early if we have destinations - try: - temp_room_properties = ( - await self.rooms_handler.async_extract_room_properties( - m_json, destinations - ) - ) - if temp_room_properties: - # Create temporary rooms_pos for robot room detection - temp_rooms_pos = [] - for room_id, room_data in temp_room_properties.items(): - temp_rooms_pos.append( - {"name": room_data["name"], "outline": room_data["outline"]} - ) - - # Store original rooms_pos and temporarily use the new one - original_rooms_pos = self.rooms_pos - self.rooms_pos = temp_rooms_pos - - # Restore original rooms_pos - self.rooms_pos = original_rooms_pos - - except (ValueError, KeyError, TypeError): - # Fallback to robot-position-based zoom if room extraction fails - if ( - self.shared.image_auto_zoom - and self.shared.vacuum_state == "cleaning" - and robot_position - ): - self.zooming = True + await self._check_zoom_conditions(m_json, robot_position, destinations) return self.img_base_layer, robot_position, robot_position_angle @@ -384,11 +391,6 @@ async def _finalize_image(self, pil_img): if self.check_zoom_and_aspect_ratio(): resize_params = self.prepare_resize_params(pil_img, True) pil_img = await self.async_resize_images(resize_params) - else: - LOGGER.warning( - "%s: Invalid image dimensions. Returning original image.", - self.file_name, - ) return pil_img async def get_rooms_attributes( @@ -401,139 +403,124 @@ async def get_rooms_attributes( ) return self.room_propriety + def _create_robot_position_dict( + self, robot_x: int, robot_y: int, angle: float, room_name: str + ) -> RobotPosition: + """Create a robot position dictionary.""" + return { + "x": robot_x, + "y": robot_y, + "angle": angle, + "in_room": room_name, + } + + def _set_zooming_from_active_zones(self) -> None: + """Set zooming based on active zones.""" + self.active_zones = self.shared.rand256_active_zone + self.zooming = False + if self.active_zones and ( + self.robot_in_room["id"] in range(len(self.active_zones)) + ): + self.zooming = bool(self.active_zones[self.robot_in_room["id"]]) + + def _check_cached_room_outline_rand( + self, robot_x: int, robot_y: int, angle: float + ) -> RobotPosition | None: + """Check if robot is still in cached room using outline.""" + if "outline" in self.robot_in_room: + outline = self.robot_in_room["outline"] + if point_in_polygon(int(robot_x), int(robot_y), outline): + self._set_zooming_from_active_zones() + LOGGER.debug( + "%s: Robot is in %s room (polygon detection). %s", + self.file_name, + self.robot_in_room["room"], + self.active_zones, + ) + return self._create_robot_position_dict( + robot_x, robot_y, angle, self.robot_in_room["room"] + ) + return None + + def _check_cached_room_bbox_rand( + self, robot_x: int, robot_y: int, angle: float + ) -> RobotPosition | None: + """Check if robot is still in cached room using bounding box.""" + if all(k in self.robot_in_room for k in ["left", "right", "up", "down"]): + if ( + self.robot_in_room["right"] + <= int(robot_x) + <= self.robot_in_room["left"] + ) and ( + self.robot_in_room["up"] <= int(robot_y) <= self.robot_in_room["down"] + ): + self._set_zooming_from_active_zones() + return self._create_robot_position_dict( + robot_x, robot_y, angle, self.robot_in_room["room"] + ) + return None + + def _check_room_with_outline_rand( + self, room: dict, room_count: int, robot_x: int, robot_y: int, angle: float + ) -> RobotPosition | None: + """Check if robot is in room using outline polygon.""" + outline = room["outline"] + if point_in_polygon(int(robot_x), int(robot_y), outline): + self.robot_in_room = { + "id": room_count, + "room": str(room["name"]), + "outline": outline, + } + self._set_zooming_from_active_zones() + return self._create_robot_position_dict( + robot_x, robot_y, angle, self.robot_in_room["room"] + ) + return None + async def async_get_robot_in_room( self, robot_x: int, robot_y: int, angle: float ) -> RobotPosition: """Get the robot position and return in what room is.""" - # First check if we already have a cached room and if the robot is still in it + # Check cached room first if self.robot_in_room: - # If we have outline data, use point_in_polygon for accurate detection - if "outline" in self.robot_in_room: - outline = self.robot_in_room["outline"] - if point_in_polygon(int(robot_x), int(robot_y), outline): - temp = { - "x": robot_x, - "y": robot_y, - "angle": angle, - "in_room": self.robot_in_room["room"], - } - # Handle active zones - self.active_zones = self.shared.rand256_active_zone - LOGGER.debug( - "%s: Robot is in %s room (polygon detection). %s", - self.file_name, - self.robot_in_room["room"], - self.active_zones, - ) - self.zooming = False - if self.active_zones and ( - self.robot_in_room["id"] in range(len(self.active_zones)) - ): - self.zooming = bool(self.active_zones[self.robot_in_room["id"]]) - else: - self.zooming = False - return temp - # Fallback to bounding box check if no outline data - elif all(k in self.robot_in_room for k in ["left", "right", "up", "down"]): - if ( - self.robot_in_room["right"] - <= int(robot_x) - <= self.robot_in_room["left"] - ) and ( - self.robot_in_room["up"] - <= int(robot_y) - <= self.robot_in_room["down"] - ): - temp = { - "x": robot_x, - "y": robot_y, - "angle": angle, - "in_room": self.robot_in_room["room"], - } - # Handle active zones - self.active_zones = self.shared.rand256_active_zone - self.zooming = False - if self.active_zones and ( - self.robot_in_room["id"] in range(len(self.active_zones)) - ): - self.zooming = bool(self.active_zones[self.robot_in_room["id"]]) - else: - self.zooming = False - return temp - - # If we don't have a cached room or the robot is not in it, search all rooms - last_room = None - room_count = 0 - if self.robot_in_room: - last_room = self.robot_in_room - - # Check if the robot is far outside the normal map boundaries - # This helps prevent false positives for points very far from any room - map_boundary = 50000 # Typical map size is around 25000-30000 units for Rand25 - if abs(robot_x) > map_boundary or abs(robot_y) > map_boundary: - self.robot_in_room = last_room - self.zooming = False - temp = { - "x": robot_x, - "y": robot_y, - "angle": angle, - "in_room": last_room["room"] if last_room else "unknown", - } - return temp - - # Search through all rooms to find which one contains the robot - if not self.rooms_pos: + result = self._check_cached_room_outline_rand(robot_x, robot_y, angle) + if result: + return result + result = self._check_cached_room_bbox_rand(robot_x, robot_y, angle) + if result: + return result + + # Prepare for room search + last_room = self.robot_in_room + map_boundary = 50000 + + # Check boundary conditions or missing room data + if ( + abs(robot_x) > map_boundary + or abs(robot_y) > map_boundary + or not self.rooms_pos + ): self.robot_in_room = last_room self.zooming = False - temp = { - "x": robot_x, - "y": robot_y, - "angle": angle, - "in_room": last_room["room"] if last_room else "unknown", - } - return temp + return self._create_robot_position_dict( + robot_x, robot_y, angle, last_room["room"] if last_room else "unknown" + ) - for room in self.rooms_pos: - # Check if the room has an outline (polygon points) + # Search through all rooms + for room_count, room in enumerate(self.rooms_pos): if "outline" in room: - outline = room["outline"] - # Use point_in_polygon for accurate detection with complex shapes - if point_in_polygon(int(robot_x), int(robot_y), outline): - # Robot is in this room - self.robot_in_room = { - "id": room_count, - "room": str(room["name"]), - "outline": outline, - } - temp = { - "x": robot_x, - "y": robot_y, - "angle": angle, - "in_room": self.robot_in_room["room"], - } - - # Handle active zones - Set zooming based on active zones - self.active_zones = self.shared.rand256_active_zone - if self.active_zones and ( - self.robot_in_room["id"] in range(len(self.active_zones)) - ): - self.zooming = bool(self.active_zones[self.robot_in_room["id"]]) - else: - self.zooming = False - - return temp - room_count += 1 + result = self._check_room_with_outline_rand( + room, room_count, robot_x, robot_y, angle + ) + if result: + return result # Robot not found in any room self.robot_in_room = last_room self.zooming = False - temp = { - "x": robot_x, - "y": robot_y, - "angle": angle, - "in_room": last_room["room"] if last_room else "unknown", - } - return temp + return self._create_robot_position_dict( + robot_x, robot_y, angle, last_room["room"] if last_room else "unknown" + ) def get_calibration_data(self, rotation_angle: int = 0) -> Any: """Return the map calibration data.""" diff --git a/SCR/valetudo_map_parser/rooms_handler.py b/SCR/valetudo_map_parser/rooms_handler.py index 8affc6b..56a55ae 100644 --- a/SCR/valetudo_map_parser/rooms_handler.py +++ b/SCR/valetudo_map_parser/rooms_handler.py @@ -11,7 +11,7 @@ import numpy as np from scipy.ndimage import binary_dilation, binary_erosion -from scipy.spatial import ConvexHull +from scipy.spatial import ConvexHull # pylint: disable=no-name-in-module from .config.drawable_elements import DrawableElement, DrawingConfig from .config.types import LOGGER, RoomsProperties @@ -83,7 +83,7 @@ async def _process_room_layer( """ meta_data = layer.get("metaData", {}) segment_id = meta_data.get("segmentId") - name = meta_data.get("name", "Room {}".format(segment_id)) + name = meta_data.get("name", f"Room {segment_id}") compressed_pixels = layer.get("compressedPixels", []) pixels = self.sublist(compressed_pixels, 3) @@ -296,8 +296,8 @@ def convex_hull_outline(points: List[Tuple[int, int]]) -> List[Tuple[int, int]]: return hull_points - except Exception as e: - LOGGER.warning(f"Error calculating convex hull: {e}") + except (ValueError, RuntimeError) as e: + LOGGER.warning("Error calculating convex hull: %s", e) # Fallback to bounding box if convex hull fails x_min, y_min = np.min(points_array, axis=0) @@ -342,7 +342,6 @@ async def _process_segment_data( except (ValueError, TypeError): # If segment_id is not a valid integer, we can't map it to a room element # In this case, we'll include the room (fail open) - pass LOGGER.debug( "Could not convert segment_id %s to room element", segment_id ) diff --git a/pyproject.toml b/pyproject.toml index 5487ce7..5c93bb1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "valetudo-map-parser" -version = "0.1.11" +version = "0.1.12" description = "A Python library to parse Valetudo map data returning a PIL Image object." authors = ["Sandro Cantarella "] license = "Apache-2.0"