diff --git a/SCR/valetudo_map_parser/__init__.py b/SCR/valetudo_map_parser/__init__.py index c5d0efa..c304492 100644 --- a/SCR/valetudo_map_parser/__init__.py +++ b/SCR/valetudo_map_parser/__init__.py @@ -6,27 +6,26 @@ from .config.colors import ColorsManagement from .config.drawable import Drawable from .config.drawable_elements import DrawableElement, DrawingConfig -from .config.enhanced_drawable import EnhancedDrawable from .config.rand256_parser import RRMapParser from .config.shared import CameraShared, CameraSharedManager +from .config.status_text.status_text import StatusText +from .config.status_text.translations import translations as STATUS_TEXT_TRANSLATIONS from .config.types import ( CameraModes, + ImageSize, + JsonType, + NumpyArray, + PilPNG, RoomsProperties, RoomStore, SnapshotStore, TrimCropData, UserLanguageStore, - JsonType, - PilPNG, - NumpyArray, - ImageSize, ) -from .config.status_text.status_text import StatusText -from .config.status_text.translations import translations as STATUS_TEXT_TRANSLATIONS from .hypfer_handler import HypferMapImageHandler -from .rand256_handler import ReImageHandler -from .rooms_handler import RoomsHandler, RandRoomsHandler from .map_data import HyperMapData +from .rand256_handler import ReImageHandler +from .rooms_handler import RandRoomsHandler, RoomsHandler def get_default_font_path() -> str: @@ -51,7 +50,6 @@ def get_default_font_path() -> str: "Drawable", "DrawableElement", "DrawingConfig", - "EnhancedDrawable", "SnapshotStore", "UserLanguageStore", "RoomStore", diff --git a/SCR/valetudo_map_parser/config/color_utils.py b/SCR/valetudo_map_parser/config/color_utils.py index 94e22e8..80d1297 100644 --- a/SCR/valetudo_map_parser/config/color_utils.py +++ b/SCR/valetudo_map_parser/config/color_utils.py @@ -1,8 +1,7 @@ """Utility functions for color operations in the map parser.""" -from typing import Optional, Tuple +from typing import Optional -from .colors import ColorsManagement from .types import Color, NumpyArray @@ -36,8 +35,8 @@ def get_blended_color( # Sample background at midpoint mid_x, mid_y = (x0 + x1) // 2, (y0 + y1) // 2 if 0 <= mid_y < arr.shape[0] and 0 <= mid_x < arr.shape[1]: - return tuple(arr[mid_y, mid_x]) - return (0, 0, 0, 0) # Default if out of bounds + return Color(arr[mid_y, mid_x]) + return Color(0, 0, 0, 0) # Default if out of bounds # Calculate direction vector for offset sampling dx = x1 - x0 diff --git a/SCR/valetudo_map_parser/config/drawable.py b/SCR/valetudo_map_parser/config/drawable.py index 919c785..d963c7a 100644 --- a/SCR/valetudo_map_parser/config/drawable.py +++ b/SCR/valetudo_map_parser/config/drawable.py @@ -14,10 +14,10 @@ from pathlib import Path import numpy as np +from mvcrender.blend import get_blended_color, sample_and_blend_color +from mvcrender.draw import circle_u8, line_u8 from PIL import Image, ImageDraw, ImageFont -from .color_utils import get_blended_color -from .colors import ColorsManagement from .types import Color, NumpyArray, PilPNG, Point, Tuple, Union @@ -85,7 +85,7 @@ async def from_json_to_image( and 0 <= center_x < image_array.shape[1] ): # Get blended color - blended_color = ColorsManagement.sample_and_blend_color( + blended_color = sample_and_blend_color( image_array, center_x, center_y, full_color ) # Apply blended color to the region @@ -131,9 +131,7 @@ async def battery_charger( center_x = (start_col + end_col) // 2 # Get blended color - blended_color = ColorsManagement.sample_and_blend_color( - layers, center_x, center_y, color - ) + blended_color = sample_and_blend_color(layers, center_x, center_y, color) # Apply blended color layers[start_row:end_row, start_col:end_col] = blended_color @@ -165,9 +163,7 @@ async def go_to_flag( # Blend flag color if needed if flag_alpha < 255: - flag_color = ColorsManagement.sample_and_blend_color( - layer, x, y, flag_color - ) + flag_color = sample_and_blend_color(layer, x, y, flag_color) # Create pole color with alpha pole_color: Color = ( @@ -179,9 +175,7 @@ async def go_to_flag( # Blend pole color if needed if pole_alpha < 255: - pole_color = ColorsManagement.sample_and_blend_color( - layer, x, y, pole_color - ) + pole_color = sample_and_blend_color(layer, x, y, pole_color) flag_size = 50 pole_width = 6 @@ -246,62 +240,19 @@ def point_inside(x: int, y: int, points: list[Tuple[int, int]]) -> bool: @staticmethod def _line( - layer: np.ndarray, + layer: NumpyArray, x1: int, y1: int, x2: int, y2: int, color: Color, width: int = 3, - ) -> np.ndarray: - """Draw a line on a NumPy array (layer) from point A to B using Bresenham's algorithm. - - Args: - layer: The numpy array to draw on (H, W, C) - x1, y1: Start point coordinates - x2, y2: End point coordinates - color: Color to draw with (tuple or array) - width: Width of the line in pixels - """ - x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2) - - blended_color = get_blended_color(x1, y1, x2, y2, layer, color) - - dx = abs(x2 - x1) - dy = abs(y2 - y1) - sx = 1 if x1 < x2 else -1 - sy = 1 if y1 < y2 else -1 - err = dx - dy - - half_w = width // 2 - h, w = layer.shape[:2] - - while True: - # Draw a filled circle for thickness - yy, xx = np.ogrid[-half_w : half_w + 1, -half_w : half_w + 1] - mask = xx**2 + yy**2 <= half_w**2 - y_min = max(0, y1 - half_w) - y_max = min(h, y1 + half_w + 1) - x_min = max(0, x1 - half_w) - x_max = min(w, x1 + half_w + 1) - - sub_mask = mask[ - (y_min - (y1 - half_w)) : (y_max - (y1 - half_w)), - (x_min - (x1 - half_w)) : (x_max - (x1 - half_w)), - ] - layer[y_min:y_max, x_min:x_max][sub_mask] = blended_color - - if x1 == x2 and y1 == y2: - break - - e2 = 2 * err - if e2 > -dy: - err -= dy - x1 += sx - if e2 < dx: - err += dx - y1 += sy - + ) -> NumpyArray: + """Segment-aware preblend, then stamp a solid line.""" + width = int(max(1, width)) + # Preblend once for this segment + seg = get_blended_color(int(x1), int(y1), int(x2), int(y2), layer, color) + line_u8(layer, int(x1), int(y1), int(x2), int(y2), seg, width) return layer @staticmethod @@ -337,11 +288,8 @@ async def lines( if x0 == x1 and y0 == y1: continue - # Get blended color for this line segment - blended_color = get_blended_color(x0, y0, x1, y1, arr, color) - # Use the optimized line drawing method - arr = Drawable._line(arr, x0, y0, x1, y1, blended_color, width) + arr = Drawable._line(arr, x0, y0, x1, y1, color, width) return arr @@ -355,35 +303,31 @@ def _filled_circle( outline_width: int = 0, ) -> NumpyArray: """ - Draw a filled circle on the image using NumPy. - Optimized to only process the bounding box of the circle. + Draw a filled circle and optional outline using mvcrender.draw.circle_u8. + If alpha<255, preblend once at the center and stamp solid. """ - y, x = center - height, width = image.shape[:2] - - # Calculate the bounding box of the circle - min_y = max(0, y - radius - outline_width) - max_y = min(height, y + radius + outline_width + 1) - min_x = max(0, x - radius - outline_width) - max_x = min(width, x + radius + outline_width + 1) - - # Create coordinate arrays for the bounding box - y_indices, x_indices = np.ogrid[min_y:max_y, min_x:max_x] - - # Calculate distances from center - dist_sq = (y_indices - y) ** 2 + (x_indices - x) ** 2 + cy, cx = ( + int(center[0]), + int(center[1]), + ) # incoming Point is (y,x) in your codebase + h, w = image.shape[:2] + if not (0 <= cx < w and 0 <= cy < h): + return image - # Create masks for the circle and outline - circle_mask = dist_sq <= radius**2 + fill_rgba = color + if fill_rgba[3] < 255: + fill_rgba = sample_and_blend_color(image, cx, cy, fill_rgba) - # Apply the fill color - image[min_y:max_y, min_x:max_x][circle_mask] = color + circle_u8(image, int(cx), int(cy), int(radius), fill_rgba, -1) - # Draw the outline if needed - if outline_width > 0 and outline_color is not None: - outer_mask = dist_sq <= (radius + outline_width) ** 2 - outline_mask = outer_mask & ~circle_mask - image[min_y:max_y, min_x:max_x][outline_mask] = outline_color + if outline_color is not None and outline_width > 0: + out_rgba = outline_color + if out_rgba[3] < 255: + out_rgba = sample_and_blend_color(image, cx, cy, out_rgba) + # outlined stroke thickness = outline_width + circle_u8( + image, int(cx), int(cy), int(radius), out_rgba, int(outline_width) + ) return image @@ -835,9 +779,7 @@ async def async_draw_obstacles( continue if need_blending: - obs_color = ColorsManagement.sample_and_blend_color( - image, x, y, color - ) + obs_color = sample_and_blend_color(image, x, y, color) else: obs_color = color diff --git a/SCR/valetudo_map_parser/config/drawable_elements.py b/SCR/valetudo_map_parser/config/drawable_elements.py index ed7be98..f15dbc2 100644 --- a/SCR/valetudo_map_parser/config/drawable_elements.py +++ b/SCR/valetudo_map_parser/config/drawable_elements.py @@ -9,8 +9,6 @@ from enum import IntEnum from typing import Dict, List, Tuple, Union -import numpy as np - from .colors import DefaultColors, SupportedColor from .types import LOGGER diff --git a/SCR/valetudo_map_parser/config/enhanced_drawable.py b/SCR/valetudo_map_parser/config/enhanced_drawable.py deleted file mode 100644 index 549d39e..0000000 --- a/SCR/valetudo_map_parser/config/enhanced_drawable.py +++ /dev/null @@ -1,324 +0,0 @@ -""" -Enhanced Drawable Class. -Provides drawing utilities with element selection support. -Version: 0.1.9 -""" - -from __future__ import annotations - -import logging - -# math is not used in this file -from typing import Optional, Tuple - -import numpy as np - -from .colors import ColorsManagement -from .drawable import Drawable -from .drawable_elements import ( - DrawableElement, - DrawingConfig, -) - - -# Type aliases -NumpyArray = np.ndarray -Color = Tuple[int, int, int, int] - -_LOGGER = logging.getLogger(__name__) - - -class EnhancedDrawable(Drawable): - """Enhanced drawing utilities with element selection support.""" - - def __init__(self, drawing_config: Optional[DrawingConfig] = None): - """Initialize with optional drawing configuration.""" - super().__init__() - self.drawing_config = drawing_config or DrawingConfig() - - # Color blending methods have been moved to ColorsManagement class in colors.py - - # Pixel blending methods have been moved to ColorsManagement class in colors.py - - async def draw_map( - self, map_data: dict, base_array: Optional[NumpyArray] = None - ) -> NumpyArray: - """ - Draw the map with selected elements. - - Args: - map_data: The map data dictionary - base_array: Optional base array to draw on - - Returns: - The image array with all elements drawn - """ - # Get map dimensions - size_x = map_data.get("size", {}).get("x", 1024) - size_y = map_data.get("size", {}).get("y", 1024) - - # Create empty image if none provided - if base_array is None: - background_color = self.drawing_config.get_property( - DrawableElement.FLOOR, "color", (200, 200, 200, 255) - ) - base_array = await self.create_empty_image(size_x, size_y, background_color) - - # Draw elements in order of z-index - for element in self.drawing_config.get_drawing_order(): - if element == DrawableElement.FLOOR: - base_array = await self._draw_floor(map_data, base_array) - elif element == DrawableElement.WALL: - base_array = await self._draw_walls(map_data, base_array) - elif element == DrawableElement.ROBOT: - base_array = await self._draw_robot(map_data, base_array) - elif element == DrawableElement.CHARGER: - base_array = await self._draw_charger(map_data, base_array) - elif element == DrawableElement.VIRTUAL_WALL: - base_array = await self._draw_virtual_walls(map_data, base_array) - elif element == DrawableElement.RESTRICTED_AREA: - base_array = await self._draw_restricted_areas(map_data, base_array) - elif element == DrawableElement.NO_MOP_AREA: - base_array = await self._draw_no_mop_areas(map_data, base_array) - elif element == DrawableElement.PATH: - base_array = await self._draw_path(map_data, base_array) - elif element == DrawableElement.PREDICTED_PATH: - base_array = await self._draw_predicted_path(map_data, base_array) - elif element == DrawableElement.GO_TO_TARGET: - base_array = await self._draw_go_to_target(map_data, base_array) - elif DrawableElement.ROOM_1 <= element <= DrawableElement.ROOM_15: - room_id = element - DrawableElement.ROOM_1 + 1 - base_array = await self._draw_room(map_data, room_id, base_array) - - return base_array - - async def _draw_floor(self, map_data: dict, array: NumpyArray) -> NumpyArray: - """Draw the floor layer.""" - if not self.drawing_config.is_enabled(DrawableElement.FLOOR): - return array - - # Implementation depends on the map data format - # This is a placeholder - actual implementation would use map_data to draw floor - - return array - - async def _draw_walls(self, map_data: dict, array: NumpyArray) -> NumpyArray: - """Draw the walls.""" - if not self.drawing_config.is_enabled(DrawableElement.WALL): - return array - - # Get wall color from drawing config - wall_color = self.drawing_config.get_property( - DrawableElement.WALL, "color", (255, 255, 0, 255) - ) - - # Implementation depends on the map data format - # For Valetudo maps, we would look at the layers with type "wall" - # This is a simplified example - in a real implementation, we would extract the actual wall pixels - - # Find wall data in map_data - wall_pixels = [] - for layer in map_data.get("layers", []): - if layer.get("type") == "wall": - # Extract wall pixels from the layer - # This is a placeholder - actual implementation would depend on the map data format - wall_pixels = layer.get("pixels", []) - break - - # Draw wall pixels with color blending - for x, y in wall_pixels: - # Use sample_and_blend_color from ColorsManagement - blended_color = ColorsManagement.sample_and_blend_color( - array, x, y, wall_color - ) - if 0 <= y < array.shape[0] and 0 <= x < array.shape[1]: - array[y, x] = blended_color - - return array - - async def _draw_robot(self, map_data: dict, array: NumpyArray) -> NumpyArray: - """Draw the robot.""" - if not self.drawing_config.is_enabled(DrawableElement.ROBOT): - return array - - # Get robot color from drawing config - robot_color = self.drawing_config.get_property( - DrawableElement.ROBOT, "color", (255, 255, 204, 255) - ) - - # Extract robot position and angle from map_data - robot_position = map_data.get("robot", {}).get("position", None) - robot_angle = map_data.get("robot", {}).get("angle", 0) - - if robot_position: - x, y = robot_position.get("x", 0), robot_position.get("y", 0) - - # Draw robot with color blending - # Create a circle around the robot position - radius = 25 # Same as in the robot drawing method - for dy in range(-radius, radius + 1): - for dx in range(-radius, radius + 1): - if dx * dx + dy * dy <= radius * radius: - map_x, map_y = int(x + dx), int(y + dy) - # Use sample_and_blend_color from ColorsManagement - blended_color = ColorsManagement.sample_and_blend_color( - array, map_x, map_y, robot_color - ) - if 0 <= map_y < array.shape[0] and 0 <= map_x < array.shape[1]: - array[map_y, map_x] = blended_color - return array - - async def _draw_charger(self, map_data: dict, array: NumpyArray) -> NumpyArray: - """Draw the charger.""" - if not self.drawing_config.is_enabled(DrawableElement.CHARGER): - return array - - # Get charger color from drawing config - charger_color = self.drawing_config.get_property( - DrawableElement.CHARGER, "color", (255, 128, 0, 255) - ) - - # Implementation depends on the map data format - # This would extract charger data from map_data and draw it - - return array - - async def _draw_virtual_walls( - self, map_data: dict, array: NumpyArray - ) -> NumpyArray: - """Draw virtual walls.""" - if not self.drawing_config.is_enabled(DrawableElement.VIRTUAL_WALL): - return array - - # Get virtual wall color from drawing config - wall_color = self.drawing_config.get_property( - DrawableElement.VIRTUAL_WALL, "color", (255, 0, 0, 255) - ) - - # Implementation depends on the map data format - # This would extract virtual wall data from map_data and draw it - - return array - - async def _draw_restricted_areas( - self, map_data: dict, array: NumpyArray - ) -> NumpyArray: - """Draw restricted areas.""" - if not self.drawing_config.is_enabled(DrawableElement.RESTRICTED_AREA): - return array - - # Get restricted area color from drawing config - area_color = self.drawing_config.get_property( - DrawableElement.RESTRICTED_AREA, "color", (255, 0, 0, 125) - ) - - # Implementation depends on the map data format - # This would extract restricted area data from map_data and draw it - - return array - - async def _draw_no_mop_areas(self, map_data: dict, array: NumpyArray) -> NumpyArray: - """Draw no-mop areas.""" - if not self.drawing_config.is_enabled(DrawableElement.NO_MOP_AREA): - return array - - # Get no-mop area color from drawing config - area_color = self.drawing_config.get_property( - DrawableElement.NO_MOP_AREA, "color", (0, 0, 255, 125) - ) - - # Implementation depends on the map data format - # This would extract no-mop area data from map_data and draw it - - return array - - async def _draw_path(self, map_data: dict, array: NumpyArray) -> NumpyArray: - """Draw the robot's path.""" - if not self.drawing_config.is_enabled(DrawableElement.PATH): - return array - - # Get path color from drawing config - path_color = self.drawing_config.get_property( - DrawableElement.PATH, "color", (238, 247, 255, 255) - ) - - # Implementation depends on the map data format - # This would extract path data from map_data and draw it - - return array - - async def _draw_predicted_path( - self, map_data: dict, array: NumpyArray - ) -> NumpyArray: - """Draw the predicted path.""" - if not self.drawing_config.is_enabled(DrawableElement.PREDICTED_PATH): - return array - - # Get predicted path color from drawing config - path_color = self.drawing_config.get_property( - DrawableElement.PREDICTED_PATH, "color", (238, 247, 255, 125) - ) - - # Implementation depends on the map data format - # This would extract predicted path data from map_data and draw it - - return array - - async def _draw_go_to_target(self, map_data: dict, array: NumpyArray) -> NumpyArray: - """Draw the go-to target.""" - if not self.drawing_config.is_enabled(DrawableElement.GO_TO_TARGET): - return array - - # Get go-to target color from drawing config - target_color = self.drawing_config.get_property( - DrawableElement.GO_TO_TARGET, "color", (0, 255, 0, 255) - ) - - # Implementation depends on the map data format - # This would extract go-to target data from map_data and draw it - - return array - - async def _draw_room( - self, map_data: dict, room_id: int, array: NumpyArray - ) -> NumpyArray: - """Draw a specific room.""" - element = getattr(DrawableElement, f"ROOM_{room_id}") - if not self.drawing_config.is_enabled(element): - return array - - # Get room color from drawing config - room_color = self.drawing_config.get_property( - element, - "color", - (135, 206, 250, 255), # Default light blue - ) - - # Implementation depends on the map data format - # For Valetudo maps, we would look at the layers with type "segment" - # This is a simplified example - in a real implementation, we would extract the actual room pixels - - # Find room data in map_data - room_pixels = [] - for layer in map_data.get("layers", []): - if layer.get("type") == "segment" and str( - layer.get("metaData", {}).get("segmentId") - ) == str(room_id): - # Extract room pixels from the layer - # This is a placeholder - actual implementation would depend on the map data format - # For example, it might use compressed pixels or other data structures - - # For demonstration, let's assume we have a list of (x, y) coordinates - room_pixels = layer.get("pixels", []) - break - - # Draw room pixels with color blending - for x, y in room_pixels: - # Use sample_and_blend_color from ColorsManagement - blended_color = ColorsManagement.sample_and_blend_color( - array, x, y, room_color - ) - if 0 <= y < array.shape[0] and 0 <= x < array.shape[1]: - array[y, x] = blended_color - - return array diff --git a/SCR/valetudo_map_parser/config/rand256_parser.py b/SCR/valetudo_map_parser/config/rand256_parser.py index b6b618d..c1bb0f3 100644 --- a/SCR/valetudo_map_parser/config/rand256_parser.py +++ b/SCR/valetudo_map_parser/config/rand256_parser.py @@ -1,7 +1,7 @@ """New Rand256 Map Parser - Based on Xiaomi/Roborock implementation with precise binary parsing.""" -import struct import math +import struct from enum import Enum from typing import Any, Dict, List, Optional @@ -159,6 +159,25 @@ def _parse_walls(data: bytes, header: bytes) -> list: walls.append([x0, RRMapParser.Tools.DIMENSION_MM - y0, x1, RRMapParser.Tools.DIMENSION_MM - y1]) return walls + @staticmethod + def _parse_walls(data: bytes, header: bytes) -> list: + wall_pairs = RRMapParser._get_int16(header, 0x08) + walls = [] + for wall_start in range(0, wall_pairs * 8, 8): + x0 = RRMapParser._get_int16(data, wall_start + 0) + y0 = RRMapParser._get_int16(data, wall_start + 2) + x1 = RRMapParser._get_int16(data, wall_start + 4) + y1 = RRMapParser._get_int16(data, wall_start + 6) + walls.append( + [ + x0, + RRMapParser.Tools.DIMENSION_MM - y0, + x1, + RRMapParser.Tools.DIMENSION_MM - y1, + ] + ) + return walls + @staticmethod def _parse_path_block(buf: bytes, offset: int, length: int) -> Dict[str, Any]: """Parse path block using EXACT same method as working parser.""" @@ -218,29 +237,53 @@ def parse_blocks(self, raw: bytes, pixels: bool = True) -> Dict[int, Any]: match block_type: case self.Types.DIGEST.value: self.is_valid = True - case self.Types.ROBOT_POSITION.value | self.Types.CHARGER_LOCATION.value: - blocks[block_type] = self._parse_object_position(block_data_length, data) + case ( + self.Types.ROBOT_POSITION.value + | self.Types.CHARGER_LOCATION.value + ): + blocks[block_type] = self._parse_object_position( + block_data_length, data + ) case self.Types.PATH.value | self.Types.GOTO_PREDICTED_PATH.value: - blocks[block_type] = self._parse_path_block(raw, block_start_position, block_data_length) + blocks[block_type] = self._parse_path_block( + raw, block_start_position, block_data_length + ) case self.Types.CURRENTLY_CLEANED_ZONES.value: blocks[block_type] = {"zones": self._parse_zones(data, header)} case self.Types.FORBIDDEN_ZONES.value: - blocks[block_type] = {"forbidden_zones": self._parse_area(header, data)} + blocks[block_type] = { + "forbidden_zones": self._parse_area(header, data) + } case self.Types.FORBIDDEN_MOP_ZONES.value: - blocks[block_type] = {"forbidden_mop_zones": self._parse_area(header, data)} + blocks[block_type] = { + "forbidden_mop_zones": self._parse_area(header, data) + } case self.Types.GOTO_TARGET.value: blocks[block_type] = {"position": self._parse_goto_target(data)} case self.Types.VIRTUAL_WALLS.value: - blocks[block_type] = {"virtual_walls": self._parse_walls(data, header)} + blocks[block_type] = { + "virtual_walls": self._parse_walls(data, header) + } case self.Types.CARPET_MAP.value: - data = RRMapParser._get_bytes(raw, block_data_start, block_data_length) - blocks[block_type] = {"carpet_map": self._parse_carpet_map(data)} + data = RRMapParser._get_bytes( + raw, block_data_start, block_data_length + ) + blocks[block_type] = { + "carpet_map": self._parse_carpet_map(data) + } case self.Types.IMAGE.value: header_length = self._get_int8(header, 2) blocks[block_type] = self._parse_image_block( - raw, block_start_position, block_data_length, header_length, pixels) - - block_start_position = block_start_position + block_data_length + self._get_int8(header, 2) + raw, + block_start_position, + block_data_length, + header_length, + pixels, + ) + + block_start_position = ( + block_start_position + block_data_length + self._get_int8(header, 2) + ) except (struct.error, IndexError): break return blocks diff --git a/SCR/valetudo_map_parser/config/shared.py b/SCR/valetudo_map_parser/config/shared.py index 8ecd4ae..bffdec4 100755 --- a/SCR/valetudo_map_parser/config/shared.py +++ b/SCR/valetudo_map_parser/config/shared.py @@ -7,18 +7,19 @@ import asyncio import logging from typing import List + from PIL import Image from .types import ( ATTR_CALIBRATION_POINTS, ATTR_CAMERA_MODE, ATTR_CONTENT_TYPE, + ATTR_IMAGE_LAST_UPDATED, ATTR_MARGINS, ATTR_OBSTACLES, ATTR_POINTS, ATTR_ROOMS, ATTR_ROTATE, - ATTR_IMAGE_LAST_UPDATED, ATTR_VACUUM_BATTERY, ATTR_VACUUM_CHARGING, ATTR_VACUUM_JSON_ID, @@ -40,8 +41,8 @@ DEFAULT_VALUES, CameraModes, Colors, - TrimsData, PilPNG, + TrimsData, ) diff --git a/SCR/valetudo_map_parser/config/status_text/status_text.py b/SCR/valetudo_map_parser/config/status_text/status_text.py index 720ec2f..7e7942d 100644 --- a/SCR/valetudo_map_parser/config/status_text/status_text.py +++ b/SCR/valetudo_map_parser/config/status_text/status_text.py @@ -9,6 +9,7 @@ from ..types import LOGGER, PilPNG from .translations import translations + LOGGER.propagate = True diff --git a/SCR/valetudo_map_parser/config/types.py b/SCR/valetudo_map_parser/config/types.py index 6be8f0c..c6740bd 100644 --- a/SCR/valetudo_map_parser/config/types.py +++ b/SCR/valetudo_map_parser/config/types.py @@ -8,7 +8,7 @@ import logging import threading from dataclasses import asdict, dataclass -from typing import Any, Dict, Optional, Tuple, TypedDict, Union, List, NotRequired +from typing import Any, Dict, List, NotRequired, Optional, Tuple, TypedDict, Union import numpy as np from PIL import Image @@ -222,7 +222,9 @@ async def async_set_vacuum_json(self, vacuum_id: str, json_data: Any) -> None: Color = Union[Tuple[int, int, int], Tuple[int, int, int, int]] Colors = Dict[str, Color] CalibrationPoints = list[dict[str, Any]] -RobotPosition = Optional[dict[str, Union[int | float]]] +RobotPosition: type[tuple[Any, Any, dict[str, int | float] | None]] = tuple[ + Any, Any, dict[str, int | float] | None +] ChargerPosition = dict[str, Any] RoomsProperties = dict[str, RoomProperty] ImageSize = dict[str, int | list[int]] diff --git a/SCR/valetudo_map_parser/config/utils.py b/SCR/valetudo_map_parser/config/utils.py index 21a2473..56bf974 100644 --- a/SCR/valetudo_map_parser/config/utils.py +++ b/SCR/valetudo_map_parser/config/utils.py @@ -1,32 +1,30 @@ """Utility code for the valetudo map parser.""" import datetime -from time import time import hashlib +import io import json from dataclasses import dataclass +from time import time from typing import Callable, List, Optional, Tuple -import io import numpy as np from PIL import Image, ImageOps +from ..map_data import HyperMapData +from .async_utils import AsyncNumPy from .drawable import Drawable from .drawable_elements import DrawingConfig -from .enhanced_drawable import EnhancedDrawable from .status_text.status_text import StatusText - from .types import ( LOGGER, ChargerPosition, - Size, + Destinations, NumpyArray, PilPNG, RobotPosition, - Destinations, + Size, ) -from ..map_data import HyperMapData -from .async_utils import AsyncNumPy @dataclass @@ -79,7 +77,6 @@ def __init__(self): # Drawing components are initialized by initialize_drawing_config in handlers self.drawing_config: Optional[DrawingConfig] = None self.draw: Optional[Drawable] = None - self.enhanced_draw: Optional[EnhancedDrawable] = None def get_frame_number(self) -> int: """Return the frame number of the image.""" @@ -199,10 +196,11 @@ async def _async_update_shared_data(self, destinations: Destinations | None = No if hasattr(self, "get_rooms_attributes") and ( self.shared.map_rooms is None and destinations is not None ): - (self.shared.map_rooms,) = await self.get_rooms_attributes(destinations) + self.shared.map_rooms = await self.get_rooms_attributes(destinations) if self.shared.map_rooms: LOGGER.debug("%s: Rand256 attributes rooms updated", self.file_name) + if hasattr(self, "async_get_rooms_attributes") and ( self.shared.map_rooms is None ): @@ -709,7 +707,7 @@ def initialize_drawing_config(handler): handler: The handler instance with shared data and file_name attributes Returns: - Tuple of (DrawingConfig, Drawable, EnhancedDrawable) + Tuple of (DrawingConfig, Drawable) """ # Initialize drawing configuration @@ -721,11 +719,10 @@ def initialize_drawing_config(handler): ): drawing_config.update_from_device_info(handler.shared.device_info) - # Initialize both drawable systems for backward compatibility - draw = Drawable() # Legacy drawing utilities - enhanced_draw = EnhancedDrawable(drawing_config) # New enhanced drawing system + # Initialize drawing utilities + draw = Drawable() - return drawing_config, draw, enhanced_draw + return drawing_config, draw def blend_colors(base_color, overlay_color): diff --git a/SCR/valetudo_map_parser/hypfer_handler.py b/SCR/valetudo_map_parser/hypfer_handler.py index 4b62699..05a00de 100644 --- a/SCR/valetudo_map_parser/hypfer_handler.py +++ b/SCR/valetudo_map_parser/hypfer_handler.py @@ -8,24 +8,22 @@ from __future__ import annotations import asyncio -import numpy as np +import numpy as np +from mvcrender.autocrop import AutoCrop from PIL import Image from .config.async_utils import AsyncPIL - -from mvcrender.autocrop import AutoCrop from .config.drawable_elements import DrawableElement from .config.shared import CameraShared - from .config.types import ( COLORS, LOGGER, CalibrationPoints, Colors, + JsonType, RoomsProperties, RoomStore, - JsonType, ) from .config.utils import ( BaseHandler, @@ -48,9 +46,7 @@ def __init__(self, shared_data: CameraShared): self.calibration_data = None # camera shared data. self.data = ImageData # imported Image Data Module. # Initialize drawing configuration using the shared utility function - self.drawing_config, self.draw, self.enhanced_draw = initialize_drawing_config( - self - ) + self.drawing_config, self.draw = initialize_drawing_config(self) self.go_to = None # vacuum go to data self.img_hash = None # hash of the image calculated to check differences. @@ -77,7 +73,7 @@ async def async_extract_room_properties(self, json_data) -> RoomsProperties: json_data ) if room_properties: - rooms = RoomStore(self.file_name, room_properties) + _ = RoomStore(self.file_name, room_properties) # Convert room_properties to the format expected by async_get_robot_in_room self.rooms_pos = [] for room_id, room_data in room_properties.items(): @@ -346,16 +342,6 @@ async def async_get_image_from_json( robot_state=self.shared.vacuum_state, ) - # Update element map for robot position - if ( - hasattr(self.shared, "element_map") - and self.shared.element_map is not None - ): - update_element_map_with_robot( - self.shared.element_map, - robot_position, - DrawableElement.ROBOT, - ) # Synchronize zooming state from ImageDraw to handler before auto-crop self.zooming = self.imd.img_h.zooming diff --git a/SCR/valetudo_map_parser/map_data.py b/SCR/valetudo_map_parser/map_data.py index 07bd753..c7119ae 100755 --- a/SCR/valetudo_map_parser/map_data.py +++ b/SCR/valetudo_map_parser/map_data.py @@ -8,22 +8,22 @@ from __future__ import annotations -import numpy as np +from dataclasses import asdict, dataclass, field from typing import ( - List, - Sequence, - TypeVar, Any, - TypedDict, - NotRequired, Literal, + NotRequired, Optional, + Sequence, + TypedDict, + TypeVar, ) -from dataclasses import dataclass, field, asdict +import numpy as np from .config.types import ImageSize, JsonType + T = TypeVar("T") # --- Common Nested Structures --- @@ -373,6 +373,11 @@ async def async_get_rooms_coordinates( Else: (min_x_mm, min_y_mm, max_x_mm, max_y_mm) """ + + def to_mm(coord): + """Convert pixel coordinates to millimeters.""" + return round(coord * pixel_size * 10) + if not pixels: raise ValueError("Pixels list cannot be empty.") @@ -393,7 +398,6 @@ async def async_get_rooms_coordinates( min_y = min(min_y, y) if rand: - to_mm = lambda v: v * pixel_size * 10 return (to_mm(max_x), to_mm(max_y)), (to_mm(min_x), to_mm(min_y)) return ( @@ -548,8 +552,9 @@ def get_rrm_currently_cleaned_zones(json_data: JsonType) -> list[dict[str, Any]] @staticmethod def get_rrm_forbidden_zones(json_data: JsonType) -> list[dict[str, Any]]: """Get the forbidden zones from the json.""" - re_zones = json_data.get("forbidden_zones", []) - re_zones.extend(json_data.get("forbidden_mop_zones", [])) + re_zones = json_data.get("forbidden_zones", []) + json_data.get( + "forbidden_mop_zones", [] + ) formatted_zones = RandImageData._rrm_valetudo_format_zone(re_zones) return formatted_zones diff --git a/SCR/valetudo_map_parser/rand256_handler.py b/SCR/valetudo_map_parser/rand256_handler.py index f5e6f65..56f5fc9 100644 --- a/SCR/valetudo_map_parser/rand256_handler.py +++ b/SCR/valetudo_map_parser/rand256_handler.py @@ -2,7 +2,7 @@ Image Handler Module for Valetudo Re Vacuums. It returns the PIL PNG image frame relative to the Map Data extrapolated from the vacuum json. It also returns calibration, rooms data to the card and other images information to the camera. -Version: 0.1.9.a6 +Version: 0.1.10 """ from __future__ import annotations @@ -11,10 +11,9 @@ from typing import Any import numpy as np +from mvcrender.autocrop import AutoCrop from .config.async_utils import AsyncPIL - -from mvcrender.autocrop import AutoCrop from .config.drawable_elements import DrawableElement from .config.types import ( COLORS, @@ -55,9 +54,7 @@ def __init__(self, shared_data): self.data = RandImageData # Image Data # Initialize drawing configuration using the shared utility function - self.drawing_config, self.draw, self.enhanced_draw = initialize_drawing_config( - self - ) + self.drawing_config, self.draw = initialize_drawing_config(self) self.go_to = None # Go to position data self.img_base_layer = None # Base image layer self.img_rotate = shared_data.image_rotate # Image rotation @@ -96,17 +93,13 @@ async def extract_room_properties( # Update self.rooms_pos from room_properties for compatibility with other methods self.rooms_pos = [] - room_ids = [] # Collect room IDs for shared.map_rooms for room_id, room_data in room_properties.items(): self.rooms_pos.append( {"name": room_data["name"], "outline": room_data["outline"]} ) - # Store the room number (segment ID) for MQTT active zone mapping - room_ids.append(room_data["number"]) - - # Update shared.map_rooms with the room IDs for MQTT active zone mapping - self.shared.map_rooms = room_ids + # Update shared.map_rooms with the full room properties (consistent with Hypfer) + self.shared.map_rooms = room_properties # get the zones and points data self.shared.map_pred_zones = await self.async_zone_propriety(zones_data) # get the points data @@ -115,7 +108,7 @@ async def extract_room_properties( if not (room_properties or self.shared.map_pred_zones): self.rooms_pos = None - rooms = RoomStore(self.file_name, room_properties) + _ = RoomStore(self.file_name, room_properties) return room_properties except (RuntimeError, ValueError) as e: LOGGER.warning( @@ -123,7 +116,7 @@ async def extract_room_properties( e, exc_info=True, ) - return None, None, None + return None async def get_image_from_rrm( self, @@ -188,6 +181,7 @@ async def get_image_from_rrm( async def _setup_robot_and_image( self, m_json, size_x, size_y, colors, destinations ): + """Set up the elements of the map and the image.""" ( _, robot_position, @@ -212,12 +206,6 @@ async def _setup_robot_and_image( ) LOGGER.info("%s: Completed base Layers", self.file_name) - # Update element map for rooms - if 0 < room_id <= 15: - # This is a simplification - in a real implementation we would - # need to identify the exact pixels that belong to each room - pass - if room_id > 0 and not self.room_propriety: self.room_propriety = await self.get_rooms_attributes(destinations) @@ -225,8 +213,10 @@ async def _setup_robot_and_image( if not self.rooms_pos and not self.room_propriety: self.room_propriety = await self.get_rooms_attributes(destinations) - # Always check robot position for zooming (fallback) - if self.rooms_pos and robot_position and not hasattr(self, "robot_pos"): + # Always check robot position for zooming (update if room info is missing) + if self.rooms_pos and robot_position and ( + self.robot_pos is None or "in_room" not in self.robot_pos + ): self.robot_pos = await self.async_get_robot_in_room( (robot_position[0] * 10), (robot_position[1] * 10), @@ -273,7 +263,7 @@ async def _setup_robot_and_image( # Restore original rooms_pos self.rooms_pos = original_rooms_pos - except Exception as e: + except (ValueError, KeyError, TypeError): # Fallback to robot-position-based zoom if room extraction fails if ( self.shared.image_auto_zoom @@ -287,6 +277,7 @@ async def _setup_robot_and_image( async def _draw_map_elements( self, img_np_array, m_json, colors, robot_position, robot_position_angle ): + """Draw map elements on the image.""" # Draw charger if enabled if self.drawing_config.is_enabled(DrawableElement.CHARGER): img_np_array, self.charger_pos = await self.imd.async_draw_charger( @@ -357,22 +348,24 @@ async def _draw_map_elements( return img_np_array async def _finalize_image(self, pil_img): - if not self.shared.image_ref_width or not self.shared.image_ref_height: - LOGGER.warning( - "Image finalization failed: Invalid image dimensions. Returning original image." - ) - return pil_img + """Finalize the image by resizing if needed.""" + if pil_img is None: + LOGGER.warning("%s: Image is None. Returning None.", self.file_name) + return None if self.check_zoom_and_aspect_ratio(): resize_params = self.prepare_resize_params(pil_img, True) pil_img = await self.async_resize_images(resize_params) + else: + LOGGER.warning( + "%s: Invalid image dimensions. Returning original image.", + self.file_name, + ) return pil_img async def get_rooms_attributes( self, destinations: JsonType = None ) -> tuple[RoomsProperties, Any, Any]: """Return the rooms attributes.""" - if self.room_propriety: - return self.room_propriety if self.json_data and destinations: self.room_propriety = await self.extract_room_properties( self.json_data, destinations @@ -397,6 +390,12 @@ async def async_get_robot_in_room( } # Handle active zones self.active_zones = self.shared.rand256_active_zone + LOGGER.debug( + "%s: Robot is in %s room (polygon detection). %s", + self.file_name, + self.robot_in_room["room"], + self.active_zones, + ) self.zooming = False if self.active_zones and ( self.robot_in_room["id"] in range(len(self.active_zones)) diff --git a/SCR/valetudo_map_parser/reimg_draw.py b/SCR/valetudo_map_parser/reimg_draw.py index 7ec6649..63c1604 100644 --- a/SCR/valetudo_map_parser/reimg_draw.py +++ b/SCR/valetudo_map_parser/reimg_draw.py @@ -8,7 +8,7 @@ from .config.drawable import Drawable from .config.drawable_elements import DrawableElement -from .config.types import Color, JsonType, NumpyArray, LOGGER +from .config.types import LOGGER, Color, JsonType, NumpyArray from .map_data import ImageData, RandImageData diff --git a/SCR/valetudo_map_parser/rooms_handler.py b/SCR/valetudo_map_parser/rooms_handler.py index 08ad391..a1f5e48 100644 --- a/SCR/valetudo_map_parser/rooms_handler.py +++ b/SCR/valetudo_map_parser/rooms_handler.py @@ -7,7 +7,6 @@ from __future__ import annotations -import time from typing import Any, Dict, List, Optional, Tuple import numpy as np @@ -16,8 +15,7 @@ from .config.drawable_elements import DrawableElement, DrawingConfig from .config.types import LOGGER, RoomsProperties - -from .map_data import RandImageData, ImageData +from .map_data import RandImageData class RoomsHandler: @@ -204,7 +202,6 @@ async def async_extract_room_properties(self, json_data) -> RoomsProperties: Returns: Dictionary of room properties """ - start_total = time.time() room_properties = {} pixel_size = json_data.get("pixelSize", 5) height = json_data["size"]["y"] @@ -217,9 +214,6 @@ async def async_extract_room_properties(self, json_data) -> RoomsProperties: ) if room_id is not None and room_data is not None: room_properties[room_id] = room_data - - # Log timing information (kept internal, no debug output) - total_time = time.time() - start_total return room_properties @@ -395,7 +389,6 @@ async def async_extract_room_properties( Returns: Dictionary of room properties """ - start_total = time.time() room_properties = {} # Get basic map information @@ -463,6 +456,4 @@ async def async_extract_room_properties( room_properties[room_id] = room_data - # Log timing information (kept internal, no debug output) - total_time = time.time() - start_total return room_properties diff --git a/pyproject.toml b/pyproject.toml index 6b506c0..04d6ac5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "valetudo-map-parser" -version = "0.1.10rc7" +version = "0.1.10" description = "A Python library to parse Valetudo map data returning a PIL Image object." authors = ["Sandro Cantarella "] license = "Apache-2.0" @@ -18,7 +18,7 @@ python = ">=3.13" numpy = ">=1.26.4" Pillow = ">=10.3.0" scipy = ">=1.12.0" -mvcrender = ">=0.0.4" +mvcrender = ">=0.0.5" [tool.poetry.group.dev.dependencies] ruff = "*"