diff --git a/.github/workflows/code_quality.yaml b/.github/workflows/code_quality.yaml index 35cfd1f..3976398 100644 --- a/.github/workflows/code_quality.yaml +++ b/.github/workflows/code_quality.yaml @@ -20,7 +20,7 @@ jobs: # name: Check code with pylint steps: - name: Checkout the repository - uses: actions/checkout@v4 + uses: actions/checkout@v5 - name: Set up Python 3 uses: actions/setup-python@v5 diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index 328dd83..e32edf5 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -10,7 +10,7 @@ jobs: runs-on: ubuntu-latest steps: # Step 1: Checkout the repository - - uses: actions/checkout@v4 + - uses: actions/checkout@v5 # Step 2: Set up Python - name: Set up Python 3.12 diff --git a/SCR/valetudo_map_parser/__init__.py b/SCR/valetudo_map_parser/__init__.py index 48893d8..d9a8560 100644 --- a/SCR/valetudo_map_parser/__init__.py +++ b/SCR/valetudo_map_parser/__init__.py @@ -5,7 +5,6 @@ from .config.drawable import Drawable from .config.drawable_elements import DrawableElement, DrawingConfig from .config.enhanced_drawable import EnhancedDrawable -from .config.utils import webp_bytes_to_pil from .config.rand256_parser import RRMapParser from .config.shared import CameraShared, CameraSharedManager from .config.types import ( @@ -15,7 +14,10 @@ SnapshotStore, TrimCropData, UserLanguageStore, - WebPBytes, + JsonType, + PilPNG, + NumpyArray, + ImageSize, ) from .hypfer_handler import HypferMapImageHandler from .rand256_handler import ReImageHandler @@ -41,6 +43,8 @@ "RoomsProperties", "TrimCropData", "CameraModes", - "WebPBytes", - "webp_bytes_to_pil", + "JsonType", + "PilPNG", + "NumpyArray", + "ImageSize", ] diff --git a/SCR/valetudo_map_parser/config/async_utils.py b/SCR/valetudo_map_parser/config/async_utils.py new file mode 100644 index 0000000..b8ef7b6 --- /dev/null +++ b/SCR/valetudo_map_parser/config/async_utils.py @@ -0,0 +1,93 @@ +"""Async utility functions for making NumPy and PIL operations truly async.""" + +import asyncio +import io +from typing import Any, Callable + +import numpy as np +from numpy import rot90 +from PIL import Image + + +async def make_async(func: Callable, *args, **kwargs) -> Any: + """Convert a synchronous function to async by yielding control to the event loop.""" + return await asyncio.to_thread(func, *args, **kwargs) + + +class AsyncNumPy: + """Async wrappers for NumPy operations that yield control to the event loop.""" + + @staticmethod + async def async_copy(array: np.ndarray) -> np.ndarray: + """Async array copying.""" + return await make_async(np.copy, array) + + @staticmethod + async def async_full( + shape: tuple, fill_value: Any, dtype: np.dtype = None + ) -> np.ndarray: + """Async array creation with fill value.""" + return await make_async(np.full, shape, fill_value, dtype=dtype) + + @staticmethod + async def async_rot90(array: np.ndarray, k: int = 1) -> np.ndarray: + """Async array rotation.""" + return await make_async(rot90, array, k) + + +class AsyncPIL: + """Async wrappers for PIL operations that yield control to the event loop.""" + + @staticmethod + async def async_fromarray(array: np.ndarray, mode: str = "RGBA") -> Image.Image: + """Async PIL Image creation from NumPy array.""" + return await make_async(Image.fromarray, array, mode) + + @staticmethod + async def async_resize( + image: Image.Image, size: tuple, resample: int = None + ) -> Image.Image: + """Async image resizing.""" + if resample is None: + resample = Image.LANCZOS + return await make_async(image.resize, size, resample) + + @staticmethod + async def async_save_to_bytes( + image: Image.Image, format_type: str = "WEBP", **kwargs + ) -> bytes: + """Async image saving to bytes.""" + + def save_to_bytes(): + buffer = io.BytesIO() + image.save(buffer, format=format_type, **kwargs) + return buffer.getvalue() + + return await make_async(save_to_bytes) + + +class AsyncParallel: + """Helper functions for parallel processing with asyncio.gather().""" + + @staticmethod + async def parallel_data_preparation(*tasks): + """Execute multiple data preparation tasks in parallel.""" + return await asyncio.gather(*tasks, return_exceptions=True) + + @staticmethod + async def parallel_array_operations(base_array: np.ndarray, operations: list): + """Execute multiple array operations in parallel on copies of the base array.""" + + # Create tasks for parallel execution + tasks = [] + for operation_func, *args in operations: + # Each operation works on a copy of the base array + array_copy = await AsyncNumPy.async_copy(base_array) + tasks.append(operation_func(array_copy, *args)) + + # Execute all operations in parallel + results = await asyncio.gather(*tasks, return_exceptions=True) + + # Filter out exceptions and return successful results + successful_results = [r for r in results if not isinstance(r, Exception)] + return successful_results diff --git a/SCR/valetudo_map_parser/config/auto_crop.py b/SCR/valetudo_map_parser/config/auto_crop.py index 811f064..5fdb542 100644 --- a/SCR/valetudo_map_parser/config/auto_crop.py +++ b/SCR/valetudo_map_parser/config/auto_crop.py @@ -9,6 +9,7 @@ from numpy import rot90 from scipy import ndimage +from .async_utils import AsyncNumPy, make_async from .types import Color, NumpyArray, TrimCropData, TrimsData from .utils import BaseHandler @@ -364,7 +365,7 @@ async def async_rotate_the_image( ) -> NumpyArray: """Rotate the image and return the new array.""" if rotate == 90: - rotated = rot90(trimmed) + rotated = await AsyncNumPy.async_rot90(trimmed) self.crop_area = [ self.trim_left, self.trim_up, @@ -372,10 +373,10 @@ async def async_rotate_the_image( self.trim_down, ] elif rotate == 180: - rotated = rot90(trimmed, 2) + rotated = await AsyncNumPy.async_rot90(trimmed, 2) self.crop_area = self.auto_crop elif rotate == 270: - rotated = rot90(trimmed, 3) + rotated = await AsyncNumPy.async_rot90(trimmed, 3) self.crop_area = [ self.trim_left, self.trim_up, diff --git a/SCR/valetudo_map_parser/config/drawable.py b/SCR/valetudo_map_parser/config/drawable.py index 80c1037..d73c0d6 100644 --- a/SCR/valetudo_map_parser/config/drawable.py +++ b/SCR/valetudo_map_parser/config/drawable.py @@ -14,7 +14,7 @@ import math import numpy as np -from PIL import ImageDraw, ImageFont +from PIL import Image, ImageDraw, ImageFont from .color_utils import get_blended_color from .colors import ColorsManagement @@ -243,63 +243,62 @@ def point_inside(x: int, y: int, points: list[Tuple[int, int]]) -> bool: @staticmethod def _line( - layer: NumpyArray, + layer: np.ndarray, x1: int, y1: int, x2: int, y2: int, color: Color, width: int = 3, - ) -> NumpyArray: + ) -> np.ndarray: """ - Draw a line on a NumPy array (layer) from point A to B using vectorized operations. + Draw a line on a NumPy array (layer) from point A to B using Bresenham's algorithm. Args: - layer: The numpy array to draw on + layer: The numpy array to draw on (H, W, C) x1, y1: Start point coordinates x2, y2: End point coordinates - color: Color to draw with - width: Width of the line + color: Color to draw with (tuple or array) + width: Width of the line in pixels """ - # Ensure coordinates are integers x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2) - # Get blended color for the line blended_color = get_blended_color(x1, y1, x2, y2, layer, color) - # Calculate line length - length = max(abs(x2 - x1), abs(y2 - y1)) - if length == 0: # Handle case of a single point - # Draw a dot with the specified width - for i in range(-width // 2, (width + 1) // 2): - for j in range(-width // 2, (width + 1) // 2): - if 0 <= x1 + i < layer.shape[1] and 0 <= y1 + j < layer.shape[0]: - layer[y1 + j, x1 + i] = blended_color - return layer - - # Create parametric points along the line - t = np.linspace(0, 1, length * 2) # Double the points for smoother lines - x_coords = np.round(x1 * (1 - t) + x2 * t).astype(int) - y_coords = np.round(y1 * (1 - t) + y2 * t).astype(int) - - # Draw the line with the specified width - if width == 1: - # Fast path for width=1 - for x, y in zip(x_coords, y_coords): - if 0 <= x < layer.shape[1] and 0 <= y < layer.shape[0]: - layer[y, x] = blended_color - else: - # For thicker lines, draw a rectangle at each point - half_width = width // 2 - for x, y in zip(x_coords, y_coords): - for i in range(-half_width, half_width + 1): - for j in range(-half_width, half_width + 1): - if ( - i * i + j * j <= half_width * half_width # Make it round - and 0 <= x + i < layer.shape[1] - and 0 <= y + j < layer.shape[0] - ): - layer[y + j, x + i] = blended_color + dx = abs(x2 - x1) + dy = abs(y2 - y1) + sx = 1 if x1 < x2 else -1 + sy = 1 if y1 < y2 else -1 + err = dx - dy + + half_w = width // 2 + h, w = layer.shape[:2] + + while True: + # Draw a filled circle for thickness + yy, xx = np.ogrid[-half_w : half_w + 1, -half_w : half_w + 1] + mask = xx**2 + yy**2 <= half_w**2 + y_min = max(0, y1 - half_w) + y_max = min(h, y1 + half_w + 1) + x_min = max(0, x1 - half_w) + x_max = min(w, x1 + half_w + 1) + + submask = mask[ + (y_min - (y1 - half_w)) : (y_max - (y1 - half_w)), + (x_min - (x1 - half_w)) : (x_max - (x1 - half_w)), + ] + layer[y_min:y_max, x_min:x_max][submask] = blended_color + + if x1 == x2 and y1 == y2: + break + + e2 = 2 * err + if e2 > -dy: + err -= dy + x1 += sx + if e2 < dx: + err += dx + y1 += sy return layer @@ -483,57 +482,58 @@ def _polygon_outline( @staticmethod async def zones(layers: NumpyArray, coordinates, color: Color) -> NumpyArray: """ - Draw the zones on the input layer with color blending. - Optimized with NumPy vectorized operations for better performance. + Draw zones as solid filled polygons with alpha blending using a per-zone mask. + Keeps API the same; no dotted rendering. """ - dot_radius = 1 # Number of pixels for the dot - dot_spacing = 4 # Space between dots + if not coordinates: + return layers + + height, width = layers.shape[:2] + # Precompute color and alpha + r, g, b, a = color + alpha = a / 255.0 + inv_alpha = 1.0 - alpha + color_rgb = np.array([r, g, b], dtype=np.float32) for zone in coordinates: - points = zone["points"] - min_x = max(0, min(points[::2])) - max_x = min(layers.shape[1] - 1, max(points[::2])) - min_y = max(0, min(points[1::2])) - max_y = min(layers.shape[0] - 1, max(points[1::2])) + try: + pts = zone["points"] + except (KeyError, TypeError): + continue + if not pts or len(pts) < 6: + continue - # Skip if zone is outside the image + # Compute bounding box and clamp + min_x = max(0, int(min(pts[::2]))) + max_x = min(width - 1, int(max(pts[::2]))) + min_y = max(0, int(min(pts[1::2]))) + max_y = min(height - 1, int(max(pts[1::2]))) if min_x >= max_x or min_y >= max_y: continue - # Sample a point from the zone to get the background color - # Use the center of the zone for sampling - sample_x = (min_x + max_x) // 2 - sample_y = (min_y + max_y) // 2 - - # Blend the color with the background color at the sample point - if 0 <= sample_y < layers.shape[0] and 0 <= sample_x < layers.shape[1]: - blended_color = ColorsManagement.sample_and_blend_color( - layers, sample_x, sample_y, color - ) - else: - blended_color = color - - # Create a grid of dot centers - x_centers = np.arange(min_x, max_x, dot_spacing) - y_centers = np.arange(min_y, max_y, dot_spacing) - - # Draw dots at each grid point - for y in y_centers: - for x in x_centers: - # Create a small mask for the dot - y_min = max(0, y - dot_radius) - y_max = min(layers.shape[0], y + dot_radius + 1) - x_min = max(0, x - dot_radius) - x_max = min(layers.shape[1], x + dot_radius + 1) - - # Create coordinate arrays for the dot - y_indices, x_indices = np.ogrid[y_min:y_max, x_min:x_max] - - # Create a circular mask - mask = (y_indices - y) ** 2 + (x_indices - x) ** 2 <= dot_radius**2 + # Adjust polygon points to local bbox coordinates + poly_xy = [ + (int(pts[i] - min_x), int(pts[i + 1] - min_y)) + for i in range(0, len(pts), 2) + ] + box_w = max_x - min_x + 1 + box_h = max_y - min_y + 1 + + # Build mask via PIL polygon fill (fast, C-impl) + mask_img = Image.new("L", (box_w, box_h), 0) + draw = ImageDraw.Draw(mask_img) + draw.polygon(poly_xy, fill=255) + zone_mask = np.array(mask_img, dtype=bool) + if not np.any(zone_mask): + continue - # Apply the color to the masked region - layers[y_min:y_max, x_min:x_max][mask] = blended_color + # Vectorized alpha blend on RGB channels only + region = layers[min_y : max_y + 1, min_x : max_x + 1] + rgb = region[..., :3].astype(np.float32) + mask3 = zone_mask[:, :, None] + blended_rgb = np.where(mask3, rgb * inv_alpha + color_rgb * alpha, rgb) + region[..., :3] = blended_rgb.astype(np.uint8) + # Leave alpha channel unchanged to avoid stacking transparency return layers @@ -815,60 +815,60 @@ async def async_draw_obstacles( image: np.ndarray, obstacle_info_list, color: Color ) -> np.ndarray: """ - Optimized async version of draw_obstacles using batch processing. - Includes color blending for better visual integration. + Optimized async version of draw_obstacles using a precomputed mask + and minimal Python overhead. Handles hundreds of obstacles efficiently. """ if not obstacle_info_list: return image - # Extract alpha from color + h, w = image.shape[:2] alpha = color[3] if len(color) == 4 else 255 need_blending = alpha < 255 - # Extract obstacle centers and prepare for batch processing + # Precompute circular mask for radius + radius = 6 + diameter = radius * 2 + 1 + yy, xx = np.ogrid[-radius : radius + 1, -radius : radius + 1] + circle_mask = (xx**2 + yy**2) <= radius**2 + + # Collect valid obstacles centers = [] for obs in obstacle_info_list: try: x = obs["points"]["x"] y = obs["points"]["y"] - # Skip if coordinates are out of bounds - if not (0 <= x < image.shape[1] and 0 <= y < image.shape[0]): + if not (0 <= x < w and 0 <= y < h): continue - # Apply color blending if needed - obstacle_color = color if need_blending: - obstacle_color = ColorsManagement.sample_and_blend_color( + obs_color = ColorsManagement.sample_and_blend_color( image, x, y, color ) + else: + obs_color = color - # Add to centers list with radius - centers.append({"center": (x, y), "radius": 6, "color": obstacle_color}) + centers.append((x, y, obs_color)) except (KeyError, TypeError): continue - # Draw each obstacle with its blended color - if centers: - for obstacle in centers: - cx, cy = obstacle["center"] - radius = obstacle["radius"] - obs_color = obstacle["color"] - - # Create a small mask for the obstacle - min_y = max(0, cy - radius) - max_y = min(image.shape[0], cy + radius + 1) - min_x = max(0, cx - radius) - max_x = min(image.shape[1], cx + radius + 1) + # Draw all obstacles + for cx, cy, obs_color in centers: + min_y = max(0, cy - radius) + max_y = min(h, cy + radius + 1) + min_x = max(0, cx - radius) + max_x = min(w, cx + radius + 1) - # Create coordinate arrays for the circle - y_indices, x_indices = np.ogrid[min_y:max_y, min_x:max_x] + # Slice mask to fit image edges + mask_y_start = min_y - (cy - radius) + mask_y_end = mask_y_start + (max_y - min_y) + mask_x_start = min_x - (cx - radius) + mask_x_end = mask_x_start + (max_x - min_x) - # Create a circular mask - mask = (y_indices - cy) ** 2 + (x_indices - cx) ** 2 <= radius**2 + mask = circle_mask[mask_y_start:mask_y_end, mask_x_start:mask_x_end] - # Apply the color to the masked region - image[min_y:max_y, min_x:max_x][mask] = obs_color + # Apply color in one vectorized step + image[min_y:max_y, min_x:max_x][mask] = obs_color return image diff --git a/SCR/valetudo_map_parser/config/rand25_parser.py b/SCR/valetudo_map_parser/config/rand25_parser.py deleted file mode 100644 index c94ad9a..0000000 --- a/SCR/valetudo_map_parser/config/rand25_parser.py +++ /dev/null @@ -1,412 +0,0 @@ -""" -Version: v2024.08.2 -- This parser is the python version of @rand256 valetudo_mapper. -- This class is extracting the vacuum binary map_data. -- Additional functions are to get in our image_handler the images datas. -""" - -import math -import struct -from enum import Enum -from typing import Any, Callable, Dict, List, Optional, TypeVar - - -_CallableT = TypeVar("_CallableT", bound=Callable[..., Any]) - - -def callback(func: _CallableT) -> _CallableT: - """Annotation to mark method as safe to call from within the event loop.""" - setattr(func, "_hass_callback", True) # Attach a custom attribute to the function - return func # Return the function without modifying its behavior - - -# noinspection PyTypeChecker -class RRMapParser: - """Parse the map data from the Rand256 vacuum.""" - - def __init__(self): - self.map_data = None - - class Tools: - """Tools for the RRMapParser.""" - - DIMENSION_PIXELS = 1024 - DIMENSION_MM = 50 * 1024 - - class Types(Enum): - """Types of blocks in the RRMapParser.""" - - CHARGER_LOCATION = 1 - IMAGE = 2 - PATH = 3 - GOTO_PATH = 4 - GOTO_PREDICTED_PATH = 5 - CURRENTLY_CLEANED_ZONES = 6 - GOTO_TARGET = 7 - ROBOT_POSITION = 8 - FORBIDDEN_ZONES = 9 - VIRTUAL_WALLS = 10 - CURRENTLY_CLEANED_BLOCKS = 11 - FORBIDDEN_MOP_ZONES = 12 - DIGEST = 1024 - - @staticmethod - def parse_block( - buf: bytes, - offset: int, - result: Optional[Dict[int, Any]] = None, - pixels: bool = False, - ) -> Dict[int, Any]: - """Parse a block of data from the map data.""" - result = result or {} - if len(buf) <= offset: - return result - - type_ = struct.unpack("= 12 - else 0 - ), - } - elif type_ == RRMapParser.Types.IMAGE.value: - RRMapParser._parse_image_block(buf, offset, length, hlength, result, pixels) - elif type_ in ( - RRMapParser.Types.PATH.value, - RRMapParser.Types.GOTO_PATH.value, - RRMapParser.Types.GOTO_PREDICTED_PATH.value, - ): - result[type_] = RRMapParser._parse_path_block(buf, offset, length) - elif type_ == RRMapParser.Types.GOTO_TARGET.value: - result[type_] = { - "position": [ - struct.unpack(" None: - """Parse the image block of the map data.""" - g3offset = 4 if hlength > 24 else 0 - parameters = { - "segments": { - "count": ( - struct.unpack(" 0 - and parameters["dimensions"]["width"] > 0 - ): - for i in range(length): - segment_type = ( - struct.unpack( - "> 3 - ) - if s == 0 and pixels: - parameters["pixels"]["floor"].append(i) - elif s != 0: - if s not in parameters["segments"]["id"]: - parameters["segments"]["id"].append(s) - parameters["segments"]["pixels_seg_" + str(s)] = [] - if pixels: - parameters["segments"]["pixels_seg_" + str(s)].append(i) - result[RRMapParser.Types.IMAGE.value] = parameters - - @staticmethod - def _parse_path_block(buf: bytes, offset: int, length: int) -> Dict[str, Any]: - """Parse a path block of the map data.""" - points = [ - [ - struct.unpack(" List[List[int]]: - """Parse the cleaned zones block of the map data.""" - zone_count = struct.unpack(" 0 - else [] - ) - - @staticmethod - def _parse_forbidden_zones(buf: bytes, offset: int, length: int) -> List[List[int]]: - """Parse the forbidden zones block of the map data.""" - zone_count = struct.unpack(" 0 - else [] - ) - - @callback - def parse(self, map_buf: bytes) -> Dict[str, Any]: - """Parse the map data.""" - if map_buf[0:2] == b"rr": - return { - "header_length": struct.unpack(" Optional[Dict[str, Any]]: - """Parse the complete map data.""" - if not self.parse(map_buf).get("map_index"): - return None - - parsed_map_data = {} - blocks = self.parse_block(map_buf, 0x14, None, pixels) - - self._parse_image_data(parsed_map_data, blocks) - self._parse_charger_data(parsed_map_data, blocks) - self._parse_robot_data(parsed_map_data, blocks) - self._parse_zones_data(parsed_map_data, blocks) - self._parse_virtual_walls_data(parsed_map_data, blocks) - self._parse_misc_data(parsed_map_data, blocks) - - return parsed_map_data - - @staticmethod - def _parse_image_data(parsed_map_data: Dict[str, Any], blocks: Dict[int, Any]): - """Parse image-related data.""" - if RRMapParser.Types.IMAGE.value in blocks: - parsed_map_data["image"] = blocks[RRMapParser.Types.IMAGE.value] - for item in [ - {"type": RRMapParser.Types.PATH.value, "path": "path"}, - { - "type": RRMapParser.Types.GOTO_PREDICTED_PATH.value, - "path": "goto_predicted_path", - }, - ]: - if item["type"] in blocks: - parsed_map_data[item["path"]] = blocks[item["type"]] - parsed_map_data[item["path"]]["points"] = [ - [point[0], RRMapParser.Tools.DIMENSION_MM - point[1]] - for point in parsed_map_data[item["path"]]["points"] - ] - if len(parsed_map_data[item["path"]]["points"]) >= 2: - parsed_map_data[item["path"]]["current_angle"] = math.degrees( - math.atan2( - parsed_map_data[item["path"]]["points"][-1][1] - - parsed_map_data[item["path"]]["points"][-2][1], - parsed_map_data[item["path"]]["points"][-1][0] - - parsed_map_data[item["path"]]["points"][-2][0], - ) - ) - - @staticmethod - def _parse_charger_data(parsed_map_data: Dict[str, Any], blocks: Dict[int, Any]): - """Parse charger location data.""" - if RRMapParser.Types.CHARGER_LOCATION.value in blocks: - charger = blocks[RRMapParser.Types.CHARGER_LOCATION.value]["position"] - parsed_map_data["charger"] = charger - - @staticmethod - def _parse_robot_data(parsed_map_data: Dict[str, Any], blocks: Dict[int, Any]): - """Parse robot position data.""" - if RRMapParser.Types.ROBOT_POSITION.value in blocks: - robot = blocks[RRMapParser.Types.ROBOT_POSITION.value]["position"] - rob_angle = blocks[RRMapParser.Types.ROBOT_POSITION.value]["angle"] - parsed_map_data["robot"] = robot - parsed_map_data["robot_angle"] = rob_angle - - @staticmethod - def _parse_zones_data(parsed_map_data: Dict[str, Any], blocks: Dict[int, Any]): - """Parse zones and forbidden zones data.""" - if RRMapParser.Types.CURRENTLY_CLEANED_ZONES.value in blocks: - parsed_map_data["currently_cleaned_zones"] = [ - [ - zone[0], - RRMapParser.Tools.DIMENSION_MM - zone[1], - zone[2], - RRMapParser.Tools.DIMENSION_MM - zone[3], - ] - for zone in blocks[RRMapParser.Types.CURRENTLY_CLEANED_ZONES.value] - ] - - if RRMapParser.Types.FORBIDDEN_ZONES.value in blocks: - parsed_map_data["forbidden_zones"] = [ - [ - zone[0], - RRMapParser.Tools.DIMENSION_MM - zone[1], - zone[2], - RRMapParser.Tools.DIMENSION_MM - zone[3], - zone[4], - RRMapParser.Tools.DIMENSION_MM - zone[5], - zone[6], - RRMapParser.Tools.DIMENSION_MM - zone[7], - ] - for zone in blocks[RRMapParser.Types.FORBIDDEN_ZONES.value] - ] - - @staticmethod - def _parse_virtual_walls_data( - parsed_map_data: Dict[str, Any], blocks: Dict[int, Any] - ): - """Parse virtual walls data.""" - if RRMapParser.Types.VIRTUAL_WALLS.value in blocks: - parsed_map_data["virtual_walls"] = [ - [ - wall[0], - RRMapParser.Tools.DIMENSION_MM - wall[1], - wall[2], - RRMapParser.Tools.DIMENSION_MM - wall[3], - ] - for wall in blocks[RRMapParser.Types.VIRTUAL_WALLS.value] - ] - - @staticmethod - def _parse_misc_data(parsed_map_data: Dict[str, Any], blocks: Dict[int, Any]): - """Parse miscellaneous data like cleaned blocks and mop zones.""" - if RRMapParser.Types.CURRENTLY_CLEANED_BLOCKS.value in blocks: - parsed_map_data["currently_cleaned_blocks"] = blocks[ - RRMapParser.Types.CURRENTLY_CLEANED_BLOCKS.value - ] - - if RRMapParser.Types.FORBIDDEN_MOP_ZONES.value in blocks: - parsed_map_data["forbidden_mop_zones"] = [ - [ - zone[0], - RRMapParser.Tools.DIMENSION_MM - zone[1], - zone[2], - RRMapParser.Tools.DIMENSION_MM - zone[3], - zone[4], - RRMapParser.Tools.DIMENSION_MM - zone[5], - zone[6], - RRMapParser.Tools.DIMENSION_MM - zone[7], - ] - for zone in blocks[RRMapParser.Types.FORBIDDEN_MOP_ZONES.value] - ] - - if RRMapParser.Types.GOTO_TARGET.value in blocks: - parsed_map_data["goto_target"] = blocks[ - RRMapParser.Types.GOTO_TARGET.value - ]["position"] - - def parse_data( - self, payload: Optional[bytes] = None, pixels: bool = False - ) -> Optional[Dict[str, Any]]: - """Get the map data from MQTT and return the json.""" - if payload: - self.map_data = self.parse(payload) - self.map_data.update(self.parse_rrm_data(payload, pixels) or {}) - return self.map_data - - def get_image(self) -> Dict[str, Any]: - """Get the image data from the map data.""" - return self.map_data.get("image", {}) - - @staticmethod - def get_int32(data: bytes, address: int) -> int: - """Get a 32-bit integer from the data.""" - return struct.unpack_from(" bool: + """Check if the vacuum is charging.""" + return (self.vacuum_state == "docked") and (int(self.vacuum_battery) < 100) + @staticmethod def _compose_obstacle_links(vacuum_host_ip: str, obstacles: list) -> list | None: """ @@ -186,6 +197,7 @@ def generate_attributes(self) -> dict: attrs = { ATTR_CAMERA_MODE: self.camera_mode, ATTR_VACUUM_BATTERY: f"{self.vacuum_battery}%", + ATTR_VACUUM_CHARGING: self.vacuum_bat_charged, ATTR_VACUUM_POSITION: self.current_room, ATTR_VACUUM_STATUS: self.vacuum_state, ATTR_VACUUM_JSON_ID: self.vac_json_id, @@ -220,12 +232,13 @@ def generate_attributes(self) -> dict: class CameraSharedManager: """Camera Shared Manager class.""" - def __init__(self, file_name, device_info): + def __init__(self, file_name: str, device_info: dict = None): self._instances = {} self._lock = asyncio.Lock() self.file_name = file_name - self.device_info = device_info - self.update_shared_data(device_info) + if device_info: + self.device_info = device_info + self.update_shared_data(device_info) # Automatically initialize shared data for the instance # self._init_shared_data(device_info) diff --git a/SCR/valetudo_map_parser/config/types.py b/SCR/valetudo_map_parser/config/types.py index 019f8e3..5fde73f 100644 --- a/SCR/valetudo_map_parser/config/types.py +++ b/SCR/valetudo_map_parser/config/types.py @@ -568,7 +568,8 @@ async def async_set_vacuum_json(self, vacuum_id: str, json_data: Any) -> None: """ Constants for the attribute keys """ ATTR_FRIENDLY_NAME = "friendly_name" -ATTR_VACUUM_BATTERY = "vacuum_battery" +ATTR_VACUUM_BATTERY = "battery" +ATTR_VACUUM_CHARGING = "charging" ATTR_VACUUM_POSITION = "vacuum_position" ATTR_VACUUM_TOPIC = "vacuum_topic" ATTR_VACUUM_STATUS = "vacuum_status" diff --git a/SCR/valetudo_map_parser/config/utils.py b/SCR/valetudo_map_parser/config/utils.py index 20d22e8..e7cda32 100644 --- a/SCR/valetudo_map_parser/config/utils.py +++ b/SCR/valetudo_map_parser/config/utils.py @@ -1,5 +1,7 @@ """Utility code for the valetudo map parser.""" +import datetime +from time import time import hashlib import json from dataclasses import dataclass @@ -10,7 +12,7 @@ from PIL import Image, ImageOps from .drawable import Drawable -from .drawable_elements import DrawableElement, DrawingConfig +from .drawable_elements import DrawingConfig from .enhanced_drawable import EnhancedDrawable from .types import ( LOGGER, @@ -79,6 +81,97 @@ def get_robot_position(self) -> RobotPosition | None: """Return the robot position.""" return self.robot_pos + async def async_get_image( + self, + m_json: dict | None, + destinations: list | None = None, + bytes_format: bool = False, + ) -> PilPNG | None: + """ + Unified async function to get PIL image from JSON data for both Hypfer and Rand256 handlers. + + This function: + 1. Calls the appropriate async_get_image_from_json method + 2. Stores the processed data in shared.new_image + 3. Backs up previous data to shared.last_image + 4. Updates shared.image_last_updated with current datetime + + @param m_json: The JSON data to use to draw the image + @param destinations: MQTT destinations for labels (used by Rand256) + @param bytes_format: If True, also convert to PNG bytes and store in shared.binary_image + @return: PIL Image or None + """ + try: + # Backup current image to last_image before processing new one + if hasattr(self.shared, "new_image") and self.shared.new_image is not None: + self.shared.last_image = self.shared.new_image + + # Call the appropriate handler method based on handler type + if hasattr(self, "get_image_from_rrm"): + # This is a Rand256 handler + new_image = await self.get_image_from_rrm( + m_json=m_json, + destinations=destinations, + return_webp=False, # Always return PIL Image + ) + elif hasattr(self, "async_get_image_from_json"): + # This is a Hypfer handler + new_image = await self.async_get_image_from_json( + m_json=m_json, + return_webp=False, # Always return PIL Image + ) + else: + LOGGER.warning( + "%s: Handler type not recognized for async_get_image", + self.file_name, + ) + return ( + self.shared.last_image + if hasattr(self.shared, "last_image") + else None + ) + + # Store the new image in shared data + if new_image is not None: + self.shared.new_image = new_image + + # Convert to binary (PNG bytes) if requested + if bytes_format: + with io.BytesIO() as buf: + new_image.save(buf, format="PNG", compress_level=1) + self.shared.binary_image = buf.getvalue() + LOGGER.debug( + "%s: Binary image conversion completed", self.file_name + ) + else: + self.shared.binary_image = None + # Update the timestamp with current datetime + self.shared.image_last_updated = datetime.datetime.fromtimestamp(time()) + LOGGER.debug( + "%s: Image processed and stored in shared data", self.file_name + ) + return new_image + else: + LOGGER.warning( + "%s: Failed to generate image from JSON data", self.file_name + ) + return ( + self.shared.last_image + if hasattr(self.shared, "last_image") + else None + ) + + except Exception as e: + LOGGER.error( + "%s: Error in async_get_image: %s", + self.file_name, + str(e), + exc_info=True, + ) + return ( + self.shared.last_image if hasattr(self.shared, "last_image") else None + ) + def get_charger_position(self) -> ChargerPosition | None: """Return the charger position.""" return self.charger_pos diff --git a/SCR/valetudo_map_parser/hypfer_draw.py b/SCR/valetudo_map_parser/hypfer_draw.py index 7458f79..264e1b6 100755 --- a/SCR/valetudo_map_parser/hypfer_draw.py +++ b/SCR/valetudo_map_parser/hypfer_draw.py @@ -1,7 +1,7 @@ """ Image Draw Class for Valetudo Hypfer Image Handling. This class is used to simplify the ImageHandler class. -Version: 2024.07.2 +Version: 0.1.9 """ from __future__ import annotations @@ -9,9 +9,9 @@ import logging from .config.drawable_elements import DrawableElement -from .config.types import Color, JsonType, NumpyArray, RobotPosition - +from .config.types import Color, JsonType, NumpyArray, RobotPosition, RoomStore +MAP_BOUNDARY = 20000 # typical map extent ~5000–10000 units _LOGGER = logging.getLogger(__name__) @@ -84,87 +84,93 @@ async def draw_go_to_flag( async def async_draw_base_layer( self, - img_np_array, - compressed_pixels_list, - layer_type, - color_wall, - color_zone_clean, - pixel_size, - disabled_rooms=None, - ): - """Draw the base layer of the map. - - Args: - img_np_array: The image array to draw on - compressed_pixels_list: The list of compressed pixels to draw - layer_type: The type of layer to draw (segment, floor, wall) - color_wall: The color to use for walls - color_zone_clean: The color to use for clean zones - pixel_size: The size of each pixel - disabled_rooms: A set of room IDs that are disabled - - Returns: - A tuple of (room_id, img_np_array) + img_np_array: NumpyArray, + compressed_pixels_list: list[list[int]], + layer_type: str, + color_wall: Color, + color_zone_clean: Color, + pixel_size: int, + disabled_rooms: set[int] | None = None, + ) -> tuple[int, NumpyArray]: """ + Draw the base layer of the map. + + Sequential for rooms/segments to maintain room_id state. Returns (last_room_id, image_array). + """ + if not compressed_pixels_list: + return 0, img_np_array + room_id = 0 - + SEGMENT_LAYERS = ("segment", "floor") + for compressed_pixels in compressed_pixels_list: pixels = self.img_h.data.sublist(compressed_pixels, 3) - - if layer_type in ["segment", "floor"]: - img_np_array, room_id = await self._process_room_layer( - img_np_array, - pixels, - layer_type, - room_id, - pixel_size, - color_zone_clean, - ) - elif layer_type == "wall": - img_np_array = await self._process_wall_layer( - img_np_array, pixels, pixel_size, color_wall, disabled_rooms - ) - + + try: + if layer_type in SEGMENT_LAYERS: + img_np_array, room_id = await self._process_room_layer( + img_np_array, pixels, layer_type, room_id, pixel_size, color_zone_clean + ) + elif layer_type == "wall": + img_np_array = await self._process_wall_layer( + img_np_array, pixels, pixel_size, color_wall, disabled_rooms + ) + except Exception as e: + _LOGGER.warning("%s: Failed processing %s layer: %s", self.file_name, layer_type, e) + return room_id, img_np_array async def _process_room_layer( - self, img_np_array, pixels, layer_type, room_id, pixel_size, color_zone_clean - ): - """Process a room layer (segment or floor).""" - # Check if this room should be drawn + self, + img_np_array: NumpyArray, + pixels: list[tuple[int, int, int]], + layer_type: str, + room_id: int, + pixel_size: int, + color_zone_clean: Color + ) -> tuple[NumpyArray, int]: + """ + Draw a room layer (segment or floor) onto the image array. + + Returns a tuple of (updated image array, next room_id). + """ draw_room = True - if layer_type == "segment" and hasattr(self.img_h, "drawing_config"): - # The room_id is 0-based, but DrawableElement.ROOM_x is 1-based - current_room_id = room_id + 1 + drawing_config = getattr(self.img_h, "drawing_config", None) + + # Segment-specific enable check + if layer_type == "segment" and drawing_config: + current_room_id = room_id + 1 # 1-based for DrawableElement if 1 <= current_room_id <= 15: - # Use the DrawableElement imported at the top of the file - room_element = getattr(DrawableElement, f"ROOM_{current_room_id}", None) - if room_element and hasattr(self.img_h.drawing_config, "is_enabled"): - draw_room = self.img_h.drawing_config.is_enabled(room_element) - - # Get the room color - room_color = self.img_h.shared.rooms_colors[room_id] - + if room_element and hasattr(drawing_config, "is_enabled"): + draw_room = drawing_config.is_enabled(room_element) + try: - if layer_type == "segment": - room_color = self._get_active_room_color( - room_id, room_color, color_zone_clean - ) - - # Only draw the room if it's enabled - if draw_room: + room_color = self.img_h.shared.rooms_colors[room_id] + except IndexError: + _LOGGER.warning( + "%s: Invalid room_id %d for layer_type '%s'", + self.file_name, room_id, layer_type + ) + return img_np_array, room_id + + if layer_type == "segment": + room_color = self._get_active_room_color(room_id, room_color, color_zone_clean) + + # Draw only if enabled + if draw_room: + try: img_np_array = await self.img_h.draw.from_json_to_image( img_np_array, pixels, pixel_size, room_color ) - - # Always increment the room_id, even if the room is not drawn - room_id = (room_id + 1) % 16 # Cycle room_id back to 0 after 15 - - except IndexError as e: - _LOGGER.warning("%s: Image Draw Error: %s", self.file_name, str(e)) - - return img_np_array, room_id + except IndexError as e: + _LOGGER.warning( + "%s: Image draw error for room_id %d in '%s': %s", + self.file_name, room_id, layer_type, e + ) + + # Cycle room_id back to 0 after 15 + return img_np_array, (room_id + 1) % 16 def _get_active_room_color(self, room_id, room_color, color_zone_clean): """Adjust the room color if the room is active.""" @@ -176,92 +182,42 @@ def _get_active_room_color(self, room_id, room_color, color_zone_clean): return room_color async def _process_wall_layer( - self, img_np_array, pixels, pixel_size, color_wall, disabled_rooms=None - ): - """Process a wall layer. - - Args: - img_np_array: The image array to draw on - pixels: The pixels to draw - pixel_size: The size of each pixel - color_wall: The color to use for the walls - disabled_rooms: A set of room IDs that are disabled - - Returns: - The updated image array - """ - # Log the wall color to verify alpha is being passed correctly + self, + img_np_array: NumpyArray, + pixels: list[tuple[int, int, int]], + pixel_size: int, + color_wall: Color, + disabled_rooms: set[int] | None = None, + ) -> NumpyArray: + """Draw a wall layer, optionally filtering out walls near disabled rooms.""" _LOGGER.debug("%s: Drawing walls with color %s", self.file_name, color_wall) - - # If there are no disabled rooms, draw all walls + if not disabled_rooms: return await self.img_h.draw.from_json_to_image( img_np_array, pixels, pixel_size, color_wall ) - - # If there are disabled rooms, we need to check each wall pixel - # to see if it belongs to a disabled room - _LOGGER.debug( - "%s: Filtering walls for disabled rooms: %s", self.file_name, disabled_rooms - ) - - # Get the element map if available + element_map = getattr(self.img_h, "element_map", None) if element_map is None: - _LOGGER.warning( - "%s: Element map not available, drawing all walls", self.file_name - ) + _LOGGER.warning("%s: Element map not available, drawing all walls", self.file_name) return await self.img_h.draw.from_json_to_image( img_np_array, pixels, pixel_size, color_wall ) - - # Filter out walls that belong to disabled rooms + + shape_y, shape_x = element_map.shape filtered_pixels = [] + offsets = [(dx, dy) for dx in (-1, 0, 1) for dy in (-1, 0, 1) if not (dx == dy == 0)] + for x, y, z in pixels: - # Check if this wall pixel is adjacent to a disabled room - # by checking the surrounding pixels in the element map - is_disabled_room_wall = False - - # Check the element map at this position and surrounding positions - # to see if this wall is adjacent to a disabled room - for dx in range(-1, 2): - for dy in range(-1, 2): - # Skip the center pixel - if dx == 0 and dy == 0: - continue - - # Calculate the position to check - check_x = x + dx - check_y = y + dy - - # Make sure the position is within bounds - if ( - check_x < 0 - or check_y < 0 - or check_x >= element_map.shape[1] - or check_y >= element_map.shape[0] - ): - continue - - # Get the element at this position - element = element_map[check_y, check_x] - - # Check if this element is a disabled room - # Room elements are in the range 101-115 (ROOM_1 to ROOM_15) - if 101 <= element <= 115: - room_id = element - 101 # Convert to 0-based index - if room_id in disabled_rooms: - is_disabled_room_wall = True - break - - if is_disabled_room_wall: - break - - # If this wall is not adjacent to a disabled room, add it to the filtered pixels - if not is_disabled_room_wall: + for dx, dy in offsets: + cx, cy = x + dx, y + dy + if 0 <= cx < shape_x and 0 <= cy < shape_y: + elem = element_map[cy, cx] + if 101 <= elem <= 115 and (elem - 101) in disabled_rooms: + break + else: filtered_pixels.append((x, y, z)) - - # Draw the filtered walls + _LOGGER.debug( "%s: Drawing %d of %d wall pixels after filtering", self.file_name, @@ -272,7 +228,6 @@ async def _process_wall_layer( return await self.img_h.draw.from_json_to_image( img_np_array, filtered_pixels, pixel_size, color_wall ) - return img_np_array async def async_draw_obstacle( @@ -325,59 +280,46 @@ async def async_draw_zones( color_zone_clean: Color, color_no_go: Color, ) -> NumpyArray: - """Get the zone clean from the JSON data.""" + """Draw active, no-go, and no-mop zones in-place on the map array.""" try: - zone_clean = self.img_h.data.find_zone_entities(m_json) - except (ValueError, KeyError): - zone_clean = None - else: - _LOGGER.info("%s: Got zones.", self.file_name) - if zone_clean: - try: - zones_active = zone_clean.get("active_zone") - except KeyError: - zones_active = None - if zones_active: - np_array = await self.img_h.draw.zones( - np_array, zones_active, color_zone_clean - ) - try: - no_go_zones = zone_clean.get("no_go_area") - except KeyError: - no_go_zones = None - - if no_go_zones: - np_array = await self.img_h.draw.zones( - np_array, no_go_zones, color_no_go - ) - - try: - no_mop_zones = zone_clean.get("no_mop_area") - except KeyError: - no_mop_zones = None - - if no_mop_zones: - np_array = await self.img_h.draw.zones( - np_array, no_mop_zones, color_no_go - ) + zones = self.img_h.data.find_zone_entities(m_json) or {} + except (ValueError, KeyError) as e: + _LOGGER.debug("%s: No zones found (%s)", self.file_name, e) + return np_array + + _LOGGER.info("%s: Got zones.", self.file_name) + + for key, color in [ + ("active_zone", color_zone_clean), + ("no_go_area", color_no_go), + ("no_mop_area", color_no_go), + ]: + if key in zones: + np_array = await self.img_h.draw.zones(np_array, zones[key], color) + return np_array + def _prepare_xyz_sequences(points: list[int]) -> list[tuple[int, int, int]]: + """Convert a flat [x1, y1, z1, x2, y2, z2, ...] list into (x, y, z) triples.""" + return self.img_h.data.sublist(points, 3) + async def async_draw_virtual_walls( self, m_json: JsonType, np_array: NumpyArray, color_no_go: Color ) -> NumpyArray: - """Get the virtual walls from the JSON data.""" + """Draw any configured virtual walls on the map.""" try: virtual_walls = self.img_h.data.find_virtual_walls(m_json) - except (ValueError, KeyError): - virtual_walls = None - else: - _LOGGER.info("%s: Got virtual walls.", self.file_name) - if virtual_walls: - np_array = await self.img_h.draw.draw_virtual_walls( - np_array, virtual_walls, color_no_go - ) - return np_array - + except (ValueError, KeyError) as e: + _LOGGER.debug("%s: No virtual walls found (%s)", self.file_name, e) + return np_array + + if not virtual_walls: + return np_array + + _LOGGER.info("%s: Drawing %d virtual wall(s).", self.file_name, len(virtual_walls)) + return await self.img_h.draw.draw_virtual_walls(np_array, virtual_walls, color_no_go) + + async def async_draw_paths( self, np_array: NumpyArray, @@ -385,36 +327,27 @@ async def async_draw_paths( color_move: Color, color_gray: Color, ) -> NumpyArray: - """Get the paths from the JSON data.""" - # Initialize the variables - path_pixels = None - predicted_path = None - # Extract the paths data from the JSON data. + """Draw predicted and actual paths from the JSON data.""" try: paths_data = self.img_h.data.find_paths_entities(m_json) - predicted_path = paths_data.get("predicted_path", []) - path_pixels = paths_data.get("path", []) except KeyError as e: - _LOGGER.warning("%s: Error extracting paths data:", str(e)) - - if predicted_path: - predicted_path = predicted_path[0]["points"] - predicted_path = self.img_h.data.sublist(predicted_path, 2) - predicted_pat2 = self.img_h.data.sublist_join(predicted_path, 2) + _LOGGER.warning("%s: Error extracting paths data: %s", self.file_name, e) + return np_array + + predicted_path = (paths_data.get("predicted_path") or []) + if predicted_path and "points" in predicted_path[0]: + coords = _prepare_xy_sequences(predicted_path[0]["points"]) + joined = self.img_h.data.sublist_join(coords, 2) + np_array = await self.img_h.draw.lines(np_array, joined, 2, color_gray) + + path_pixels = paths_data.get("path") or [] + for path in path_pixels: + coords = _prepare_xy_sequences(path.get("points", [])) + self.img_h.shared.map_new_path = self.img_h.data.sublist_join(coords, 2) np_array = await self.img_h.draw.lines( - np_array, predicted_pat2, 2, color_gray + np_array, self.img_h.shared.map_new_path, 5, color_move ) - if path_pixels: - for path in path_pixels: - # Get the points from the current path and extend multiple paths. - points = path.get("points", []) - sublist = self.img_h.data.sublist(points, 2) - self.img_h.shared.map_new_path = self.img_h.data.sublist_join( - sublist, 2 - ) - np_array = await self.img_h.draw.lines( - np_array, self.img_h.shared.map_new_path, 5, color_move - ) + return np_array async def async_get_entity_data(self, m_json: JsonType) -> dict or None: @@ -427,288 +360,135 @@ async def async_get_entity_data(self, m_json: JsonType) -> dict or None: return entity_dict def _check_active_zone_and_set_zooming(self) -> None: - """Helper function to check active zones and set zooming state.""" + """Check active zones and update zooming state accordingly.""" + zooming = False + if self.img_h.active_zones and self.img_h.robot_in_room: - from .config.types import RoomStore - - segment_id = str(self.img_h.robot_in_room["id"]) - room_store = RoomStore(self.file_name) - room_keys = list(room_store.get_rooms().keys()) - + segment_id = str(self.img_h.robot_in_room.get("id")) + room_keys = list(RoomStore(self.file_name).get_rooms().keys()) + _LOGGER.debug( "%s: Active zones debug - segment_id: %s, room_keys: %s, active_zones: %s", - self.file_name, - segment_id, - room_keys, - self.img_h.active_zones, + self.file_name, segment_id, room_keys, self.img_h.active_zones ) - + if segment_id in room_keys: - position = room_keys.index(segment_id) + pos = room_keys.index(segment_id) + in_bounds = pos < len(self.img_h.active_zones) + _LOGGER.debug( "%s: Segment ID %s found at position %s, active_zones[%s] = %s", - self.file_name, - segment_id, - position, - position, - self.img_h.active_zones[position] - if position < len(self.img_h.active_zones) - else "OUT_OF_BOUNDS", + self.file_name, segment_id, pos, pos, + self.img_h.active_zones[pos] if in_bounds else "OUT_OF_BOUNDS" ) - if position < len(self.img_h.active_zones): - self.img_h.zooming = bool(self.img_h.active_zones[position]) - else: - self.img_h.zooming = False + + if in_bounds: + zooming = bool(self.img_h.active_zones[pos]) else: _LOGGER.warning( "%s: Segment ID %s not found in room_keys %s", - self.file_name, - segment_id, - room_keys, + self.file_name, segment_id, room_keys ) - self.img_h.zooming = False - else: - self.img_h.zooming = False + + self.img_h.zooming = zooming + @staticmethod - def point_in_polygon(x: int, y: int, polygon: list) -> bool: + def point_in_polygon(x: float, y: float, polygon: list[tuple[float, float]]) -> bool: """ - Check if a point is inside a polygon using ray casting algorithm. - + Determine if (x, y) lies inside a polygon using ray casting. + Args: - x: X coordinate of the point - y: Y coordinate of the point - polygon: List of (x, y) tuples forming the polygon - + x, y: Coordinates of the point to test. + polygon: List of (x, y) vertices; first and last may be the same but need not be. + Returns: - True if the point is inside the polygon, False otherwise + True if inside or on the boundary, False otherwise. """ n = len(polygon) + if n < 3: + return False + inside = False - p1x, p1y = polygon[0] - xinters = None # Initialize with default value for i in range(1, n + 1): p2x, p2y = polygon[i % n] - if y > min(p1y, p2y): - if y <= max(p1y, p2y): - if x <= max(p1x, p2x): - if p1y != p2y: - xinters = (y - p1y) * (p2x - p1x) / (p2y - p1y) + p1x - if p1x == p2x or (xinters is not None and x <= xinters): - inside = not inside + if ((p1y > y) != (p2y > y)): # crosses horizontal ray + xinters = (y - p1y) * (p2x - p1x) / (p2y - p1y) + p1x + if x <= xinters: + inside = not inside p1x, p1y = p2x, p2y - return inside - + async def async_get_robot_in_room( - self, robot_y: int = 0, robot_x: int = 0, angle: float = 0.0 + self, + robot_y: int = 0, + robot_x: int = 0, + angle: float = 0.0 ) -> RobotPosition: - """Get the robot position and return in what room is.""" - # First check if we already have a cached room and if the robot is still in it - if self.img_h.robot_in_room: - # If we have outline data, use point_in_polygon for accurate detection - if "outline" in self.img_h.robot_in_room: - outline = self.img_h.robot_in_room["outline"] - if self.point_in_polygon(int(robot_x), int(robot_y), outline): - temp = { - "x": robot_x, - "y": robot_y, - "angle": angle, - "in_room": self.img_h.robot_in_room["room"], - } - # Handle active zones - self._check_active_zone_and_set_zooming() - return temp - # Fallback to bounding box check if no outline data - elif all( - k in self.img_h.robot_in_room for k in ["left", "right", "up", "down"] - ): - if ( - (self.img_h.robot_in_room["right"] >= int(robot_x)) - and (self.img_h.robot_in_room["left"] <= int(robot_x)) - ) and ( - (self.img_h.robot_in_room["down"] >= int(robot_y)) - and (self.img_h.robot_in_room["up"] <= int(robot_y)) - ): - temp = { - "x": robot_x, - "y": robot_y, - "angle": angle, - "in_room": self.img_h.robot_in_room["room"], - } - # Handle active zones + """Return robot position and the room it’s currently in.""" + def make_pos(room_name: str | None) -> RobotPosition: + return {"x": robot_x, "y": robot_y, "angle": angle, "in_room": room_name} + + last_room = self.img_h.robot_in_room or None + + # Reuse cached polygon or bbox + if last_room: + if "outline" in last_room and self.point_in_polygon(int(robot_x), int(robot_y), last_room["outline"]): + self._check_active_zone_and_set_zooming() + return make_pos(last_room["room"]) + if all(k in last_room for k in ("left", "right", "up", "down")): + if (last_room["left"] <= robot_x <= last_room["right"] and + last_room["up"] <= robot_y <= last_room["down"]): self._check_active_zone_and_set_zooming() - return temp - - # If we don't have a cached room or the robot is not in it, search all rooms - last_room = None - room_count = 0 - if self.img_h.robot_in_room: - last_room = self.img_h.robot_in_room - - # Check if the robot is far outside the normal map boundaries - # This helps prevent false positives for points very far from any room - map_boundary = 20000 # Typical map size is around 5000-10000 units - if abs(robot_x) > map_boundary or abs(robot_y) > map_boundary: - _LOGGER.debug( - "%s robot position (%s, %s) is far outside map boundaries.", - self.file_name, - robot_x, - robot_y, - ) + return make_pos(last_room["room"]) + + # Reject obviously off‑map points + if abs(robot_x) > MAP_BOUNDARY or abs(robot_y) > MAP_BOUNDARY: + _LOGGER.debug("%s: position (%s, %s) outside map bounds", self.file_name, robot_x, robot_y) self.img_h.robot_in_room = last_room self.img_h.zooming = False - temp = { - "x": robot_x, - "y": robot_y, - "angle": angle, - "in_room": last_room["room"] if last_room else None, - } - return temp - - # Search through all rooms to find which one contains the robot + return make_pos(last_room["room"] if last_room else None) + + # No room geometry? bail early if self.img_h.rooms_pos is None: - _LOGGER.debug( - "%s: No rooms data available for robot position detection.", - self.file_name, - ) + _LOGGER.debug("%s: No rooms data for position detection", self.file_name) self.img_h.robot_in_room = last_room self.img_h.zooming = False - temp = { - "x": robot_x, - "y": robot_y, - "angle": angle, - "in_room": last_room["room"] if last_room else None, - } - return temp - - for room in self.img_h.rooms_pos: - # Check if the room has an outline (polygon points) + return make_pos(last_room["room"] if last_room else None) + + # Search all rooms + for idx, room in enumerate(self.img_h.rooms_pos): if "outline" in room: - outline = room["outline"] - # Use point_in_polygon for accurate detection with complex shapes - if self.point_in_polygon(int(robot_x), int(robot_y), outline): - # Robot is in this room + if self.point_in_polygon(int(robot_x), int(robot_y), room["outline"]): self.img_h.robot_in_room = { - "id": room.get( - "id", room_count - ), # Use actual segment ID if available + "id": room.get("id", idx), "room": str(room["name"]), - "outline": outline, - } - temp = { - "x": robot_x, - "y": robot_y, - "angle": angle, - "in_room": self.img_h.robot_in_room["room"], + "outline": room["outline"], } - - # Handle active zones - Map segment ID to active_zones position - if self.img_h.active_zones: - from .config.types import RoomStore - - segment_id = str(self.img_h.robot_in_room["id"]) - room_store = RoomStore(self.file_name) - room_keys = list(room_store.get_rooms().keys()) - - _LOGGER.debug( - "%s: Active zones debug - segment_id: %s, room_keys: %s, active_zones: %s", - self.file_name, - segment_id, - room_keys, - self.img_h.active_zones, - ) - - if segment_id in room_keys: - position = room_keys.index(segment_id) - _LOGGER.debug( - "%s: Segment ID %s found at position %s, active_zones[%s] = %s", - self.file_name, - segment_id, - position, - position, - self.img_h.active_zones[position] - if position < len(self.img_h.active_zones) - else "OUT_OF_BOUNDS", - ) - if position < len(self.img_h.active_zones): - self.img_h.zooming = bool( - self.img_h.active_zones[position] - ) - else: - self.img_h.zooming = False - else: - _LOGGER.warning( - "%s: Segment ID %s not found in room_keys %s", - self.file_name, - segment_id, - room_keys, - ) - self.img_h.zooming = False - else: - self.img_h.zooming = False - - _LOGGER.debug( - "%s is in %s room (polygon detection).", - self.file_name, - self.img_h.robot_in_room["room"], - ) - return temp - # Fallback to bounding box if no outline is available + self._check_active_zone_and_set_zooming() + _LOGGER.debug("%s is in %s room (polygon)", self.file_name, room["name"]) + return make_pos(room["name"]) elif "corners" in room: - corners = room["corners"] - # Create a bounding box from the corners - self.img_h.robot_in_room = { - "id": room.get( - "id", room_count - ), # Use actual segment ID if available - "left": int(corners[0][0]), - "right": int(corners[2][0]), - "up": int(corners[0][1]), - "down": int(corners[2][1]), - "room": str(room["name"]), - } - # Check if the robot is inside the bounding box - if ( - (self.img_h.robot_in_room["right"] >= int(robot_x)) - and (self.img_h.robot_in_room["left"] <= int(robot_x)) - ) and ( - (self.img_h.robot_in_room["down"] >= int(robot_y)) - and (self.img_h.robot_in_room["up"] <= int(robot_y)) - ): - temp = { - "x": robot_x, - "y": robot_y, - "angle": angle, - "in_room": self.img_h.robot_in_room["room"], + left, up = map(int, room["corners"][0]) + right, down = map(int, room["corners"][2]) + if left <= robot_x <= right and up <= robot_y <= down: + self.img_h.robot_in_room = { + "id": room.get("id", idx), + "left": left, "right": right, + "up": up, "down": down, + "room": str(room["name"]), } - - # Handle active zones self._check_active_zone_and_set_zooming() - - _LOGGER.debug( - "%s is in %s room (bounding box detection).", - self.file_name, - self.img_h.robot_in_room["room"], - ) - return temp - room_count += 1 - - # Robot not found in any room - _LOGGER.debug( - "%s not located within any room coordinates.", - self.file_name, - ) + _LOGGER.debug("%s is in %s room (bbox)", self.file_name, room["name"]) + return make_pos(room["name"]) + + # Not found + _LOGGER.debug("%s not located in any room", self.file_name) self.img_h.robot_in_room = last_room self.img_h.zooming = False - temp = { - "x": robot_x, - "y": robot_y, - "angle": angle, - "in_room": last_room["room"] if last_room else None, - } - return temp + return make_pos(last_room["room"] if last_room else None) + async def async_get_robot_position(self, entity_dict: dict) -> tuple | None: """Get the robot position from the entity data.""" diff --git a/SCR/valetudo_map_parser/hypfer_handler.py b/SCR/valetudo_map_parser/hypfer_handler.py index 50b8951..28bc7f0 100644 --- a/SCR/valetudo_map_parser/hypfer_handler.py +++ b/SCR/valetudo_map_parser/hypfer_handler.py @@ -7,13 +7,16 @@ from __future__ import annotations -import json +import asyncio +import numpy as np from PIL import Image +from .config.async_utils import AsyncNumPy, AsyncPIL from .config.auto_crop import AutoCrop from .config.drawable_elements import DrawableElement from .config.shared import CameraShared +from .config.utils import pil_to_webp_bytes from .config.types import ( COLORS, LOGGER, @@ -22,6 +25,7 @@ RoomsProperties, RoomStore, WebPBytes, + JsonType, ) from .config.utils import ( BaseHandler, @@ -55,6 +59,9 @@ def __init__(self, shared_data: CameraShared): self.go_to = None # vacuum go to data self.img_hash = None # hash of the image calculated to check differences. self.img_base_layer = None # numpy array store the map base layer. + self.img_work_layer = ( + None # persistent working buffer to avoid per-frame allocations + ) self.active_zones = None # vacuum active zones. self.svg_wait = False # SVG image creation wait. self.imd = ImDraw(self) # Image Draw class. @@ -97,7 +104,7 @@ async def async_extract_room_properties(self, json_data) -> RoomsProperties: # noinspection PyUnresolvedReferences,PyUnboundLocalVariable async def async_get_image_from_json( self, - m_json: json | None, + m_json: JsonType | None, return_webp: bool = False, ) -> WebPBytes | Image.Image | None: """Get the image from the JSON data. @@ -207,14 +214,14 @@ async def async_get_image_from_json( ) % 16 # Increment room_id even if we skip continue - # Check if this is a wall layer and if walls are enabled + # Draw the layer ONLY if enabled is_wall_layer = layer_type == "wall" if is_wall_layer: + # Skip walls entirely if disabled if not self.drawing_config.is_enabled( DrawableElement.WALL ): - pass - + continue # Draw the layer ( room_id, @@ -229,13 +236,6 @@ async def async_get_image_from_json( disabled_rooms if layer_type == "wall" else None, ) - # Update element map for this layer - if is_room_layer and 0 < room_id <= 15: - # Mark the room in the element map - room_element = getattr( - DrawableElement, f"ROOM_{room_id}", None - ) - # Draw the virtual walls if enabled if self.drawing_config.is_enabled(DrawableElement.VIRTUAL_WALL): img_np_array = await self.imd.async_draw_virtual_walls( @@ -277,6 +277,7 @@ async def async_get_image_from_json( LOGGER.info("%s: Completed base Layers", self.file_name) # Copy the new array in base layer. self.img_base_layer = await self.async_copy_array(img_np_array) + self.shared.frame_number = self.frame_number self.frame_number += 1 if (self.frame_number >= self.max_frames) or ( @@ -289,16 +290,47 @@ async def async_get_image_from_json( str(self.json_id), str(self.frame_number), ) - # Copy the base layer to the new image. - img_np_array = await self.async_copy_array(self.img_base_layer) - # All below will be drawn at each frame. - # Draw zones if any and if enabled + # Ensure persistent working buffer exists and matches base (allocate only when needed) + if ( + self.img_work_layer is None + or self.img_work_layer.shape != self.img_base_layer.shape + or self.img_work_layer.dtype != self.img_base_layer.dtype + ): + self.img_work_layer = np.empty_like(self.img_base_layer) + + # Copy the base layer into the persistent working buffer (no new allocation per frame) + np.copyto(self.img_work_layer, self.img_base_layer) + img_np_array = self.img_work_layer + + # Prepare parallel data extraction tasks + data_tasks = [] + + # Prepare zone data extraction + if self.drawing_config.is_enabled(DrawableElement.RESTRICTED_AREA): + data_tasks.append(self._prepare_zone_data(m_json)) + + # Prepare go_to flag data extraction + if self.drawing_config.is_enabled(DrawableElement.GO_TO_TARGET): + data_tasks.append(self._prepare_goto_data(entity_dict)) + + # Prepare path data extraction + path_enabled = self.drawing_config.is_enabled(DrawableElement.PATH) + LOGGER.info( + "%s: PATH element enabled: %s", self.file_name, path_enabled + ) + if path_enabled: + LOGGER.info("%s: Drawing path", self.file_name) + data_tasks.append(self._prepare_path_data(m_json)) + + # Await all data preparation tasks if any were created + if data_tasks: + await asyncio.gather(*data_tasks) + + # Process drawing operations sequentially (since they modify the same array) + # Draw zones if enabled if self.drawing_config.is_enabled(DrawableElement.RESTRICTED_AREA): img_np_array = await self.imd.async_draw_zones( - m_json, - img_np_array, - colors["zone_clean"], - colors["no_go"], + m_json, img_np_array, colors["zone_clean"], colors["no_go"] ) # Draw the go_to target flag if enabled @@ -307,13 +339,8 @@ async def async_get_image_from_json( img_np_array, entity_dict, colors["go_to"] ) - # Draw path prediction and paths if enabled - path_enabled = self.drawing_config.is_enabled(DrawableElement.PATH) - LOGGER.info( - "%s: PATH element enabled: %s", self.file_name, path_enabled - ) + # Draw paths if enabled if path_enabled: - LOGGER.info("%s: Drawing path", self.file_name) img_np_array = await self.imd.async_draw_paths( img_np_array, m_json, colors["move"], self.color_grey ) @@ -371,15 +398,13 @@ async def async_get_image_from_json( # Handle resizing if needed, then return based on format preference if self.check_zoom_and_aspect_ratio(): # Convert to PIL for resizing - pil_img = Image.fromarray(img_np_array, mode="RGBA") + pil_img = await AsyncPIL.async_fromarray(img_np_array, mode="RGBA") del img_np_array resize_params = prepare_resize_params(self, pil_img, False) resized_image = await self.async_resize_images(resize_params) # Return WebP bytes or PIL Image based on parameter if return_webp: - from .config.utils import pil_to_webp_bytes - webp_bytes = await pil_to_webp_bytes(resized_image) return webp_bytes else: @@ -394,7 +419,7 @@ async def async_get_image_from_json( return webp_bytes else: # Convert to PIL Image (original behavior) - pil_img = Image.fromarray(img_np_array, mode="RGBA") + pil_img = await AsyncPIL.async_fromarray(img_np_array, mode="RGBA") del img_np_array LOGGER.debug("%s: Frame Completed.", self.file_name) return pil_img @@ -474,4 +499,27 @@ def set_element_property( @staticmethod async def async_copy_array(original_array): """Copy the array.""" - return original_array.copy() + return await AsyncNumPy.async_copy(original_array) + + async def _prepare_zone_data(self, m_json): + """Prepare zone data for parallel processing.""" + await asyncio.sleep(0) # Yield control + try: + return self.data.find_zone_entities(m_json) + except (ValueError, KeyError): + return None + + @staticmethod + async def _prepare_goto_data(entity_dict): + """Prepare go-to flag data for parallel processing.""" + await asyncio.sleep(0) # Yield control + # Extract go-to target data from entity_dict + return entity_dict.get("go_to_target", None) + + async def _prepare_path_data(self, m_json): + """Prepare path data for parallel processing.""" + await asyncio.sleep(0) # Yield control + try: + return self.data.find_paths_entities(m_json) + except (ValueError, KeyError): + return None diff --git a/SCR/valetudo_map_parser/rand256_handler.py b/SCR/valetudo_map_parser/rand256_handler.py index 71741e2..6cb69e0 100644 --- a/SCR/valetudo_map_parser/rand256_handler.py +++ b/SCR/valetudo_map_parser/rand256_handler.py @@ -12,8 +12,8 @@ from typing import Any import numpy as np -from PIL import Image +from .config.async_utils import AsyncNumPy, AsyncPIL from .config.auto_crop import AutoCrop from .config.drawable_elements import DrawableElement from .config.types import ( @@ -145,7 +145,7 @@ async def get_image_from_rrm( m_json: JsonType, # json data destinations: None = None, # MQTT destinations for labels return_webp: bool = False, - ) -> WebPBytes | Image.Image | None: + ) -> WebPBytes | PilPNG | None: """Generate Images from the json data. @param m_json: The JSON data to use to draw the image. @param destinations: MQTT destinations for labels (unused). @@ -195,8 +195,8 @@ async def get_image_from_rrm( del img_np_array # free memory return webp_bytes else: - # Convert to PIL Image (original behavior) - pil_img = Image.fromarray(img_np_array, mode="RGBA") + # Convert to PIL Image using async utilities + pil_img = await AsyncPIL.async_fromarray(img_np_array, mode="RGBA") del img_np_array # free memory return await self._finalize_image(pil_img) @@ -297,11 +297,6 @@ async def _setup_robot_and_image( original_rooms_pos = self.rooms_pos self.rooms_pos = temp_rooms_pos - # Perform robot room detection to check active zones - robot_room_result = await self.async_get_robot_in_room( - robot_position[0], robot_position[1], robot_position_angle - ) - # Restore original rooms_pos self.rooms_pos = original_rooms_pos @@ -673,3 +668,7 @@ def set_element_property( property_name=property_name, value=value, ) + + async def async_copy_array(self, original_array): + """Copy the array using async utilities.""" + return await AsyncNumPy.async_copy(original_array) diff --git a/SCR/valetudo_map_parser/reimg_draw.py b/SCR/valetudo_map_parser/reimg_draw.py index f4bd877..4bc4e56 100644 --- a/SCR/valetudo_map_parser/reimg_draw.py +++ b/SCR/valetudo_map_parser/reimg_draw.py @@ -9,6 +9,7 @@ import logging from .config.drawable import Drawable +from .config.drawable_elements import DrawableElement from .config.types import Color, JsonType, NumpyArray from .map_data import ImageData, RandImageData @@ -107,16 +108,18 @@ async def async_draw_base_layer( color_wall, color_zone_clean, ) - img_np_array = await self._draw_walls( - img_np_array, - walls_data, - size_x, - size_y, - pos_top, - pos_left, - pixel_size, - color_wall, - ) + # Draw walls only if enabled in drawing config + if self.img_h.drawing_config.is_enabled(DrawableElement.WALL): + img_np_array = await self._draw_walls( + img_np_array, + walls_data, + size_x, + size_y, + pos_top, + pos_left, + pixel_size, + color_wall, + ) return room_id, img_np_array async def _draw_floor( diff --git a/pyproject.toml b/pyproject.toml index 17e8907..bafaebf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "valetudo-map-parser" -version = "0.1.9b60" +version = "0.1.9" description = "A Python library to parse Valetudo map data returning a PIL Image object." authors = ["Sandro Cantarella "] license = "Apache-2.0" diff --git a/tests/test.py b/tests/test.py index 1f9a7b0..a8b656e 100644 --- a/tests/test.py +++ b/tests/test.py @@ -7,6 +7,11 @@ import sys import cProfile import pstats +import tracemalloc +import psutil +import gc +import time +from typing import Dict, List, Tuple # Add the project root directory to the Python path sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) @@ -16,7 +21,6 @@ from SCR.valetudo_map_parser.config.shared import CameraSharedManager from SCR.valetudo_map_parser.config.types import RoomStore from SCR.valetudo_map_parser.hypfer_handler import HypferMapImageHandler -from SCR.valetudo_map_parser.hypfer_draw import ImageDraw # Configure logging logging.basicConfig( @@ -26,21 +30,210 @@ _LOGGER = logging.getLogger(__name__) +# ----- Test Configuration ----- +FRAME_COUNT = 1 # Set to 1/10/25/50/100 as needed +ENABLE_PROFILER = False # Master switch for profiler usage +ENABLE_CPU_TIMING = True # Lightweight per-frame CPU timing (process CPU time) +ENABLE_MEMORY_PROFILING = True # Use tracemalloc snapshots +SNAPSHOT_EVERY_FRAME = False # If False, snapshot only first and last frame +ENABLE_LEGACY_CPROFILE = False # Legacy cProfile around the whole run +# ------------------------------ + + +class PerformanceProfiler: + """Comprehensive profiling for memory and CPU usage analysis.""" + + def __init__(self, enable_memory_profiling: bool = ENABLE_MEMORY_PROFILING, enable_cpu_profiling: bool = ENABLE_CPU_TIMING): + self.enable_memory_profiling = enable_memory_profiling + self.enable_cpu_profiling = enable_cpu_profiling + self.memory_snapshots: List[Tuple[str, tracemalloc.Snapshot]] = [] + self.cpu_profiles: List[Tuple[str, cProfile.Profile]] = [] + self.memory_stats: List[Dict] = [] + self.timing_stats: List[Dict] = [] + + if self.enable_memory_profiling: + tracemalloc.start() + _LOGGER.info("πŸ” Memory profiling enabled") + + if self.enable_cpu_profiling: + _LOGGER.info("⚑ CPU profiling enabled") + + def take_memory_snapshot(self, label: str) -> None: + """Take a memory snapshot with a descriptive label.""" + if not self.enable_memory_profiling: + return + + snapshot = tracemalloc.take_snapshot() + self.memory_snapshots.append((label, snapshot)) + + # Get current memory usage + process = psutil.Process() + memory_info = process.memory_info() + + self.memory_stats.append({ + 'label': label, + 'timestamp': time.time(), + 'rss_mb': memory_info.rss / 1024 / 1024, # Resident Set Size in MB + 'vms_mb': memory_info.vms / 1024 / 1024, # Virtual Memory Size in MB + 'percent': process.memory_percent(), + }) + + _LOGGER.debug(f"πŸ“Š Memory snapshot '{label}': RSS={memory_info.rss / 1024 / 1024:.1f}MB") + + def start_cpu_profile(self, label: str) -> cProfile.Profile: + """Start CPU profiling for a specific operation.""" + if not self.enable_cpu_profiling: + return None + + profiler = cProfile.Profile() + profiler.enable() + self.cpu_profiles.append((label, profiler)) + return profiler + + def stop_cpu_profile(self, profiler: cProfile.Profile) -> None: + """Stop CPU profiling.""" + if profiler: + profiler.disable() + + def time_operation(self, label: str, start_time: float, end_time: float) -> None: + """Record timing information for an operation.""" + duration = end_time - start_time + self.timing_stats.append({ + 'label': label, + 'duration_ms': duration * 1000, + 'timestamp': start_time + }) + _LOGGER.info(f"⏱️ {label}: {duration * 1000:.1f}ms") + + def analyze_memory_usage(self) -> None: + """Analyze memory usage patterns and print detailed report.""" + if not self.enable_memory_profiling or len(self.memory_snapshots) < 2: + return + + print("\n" + "="*80) + print("πŸ“Š MEMORY USAGE ANALYSIS") + print("="*80) + + # Memory usage over time + print("\nπŸ” Memory Usage Timeline:") + for i, stats in enumerate(self.memory_stats): + print(f" {i+1:2d}. {stats['label']:30s} | RSS: {stats['rss_mb']:6.1f}MB | VMS: {stats['vms_mb']:6.1f}MB | {stats['percent']:4.1f}%") + + # Memory growth analysis + if len(self.memory_stats) >= 2: + start_rss = self.memory_stats[0]['rss_mb'] + peak_rss = max(stats['rss_mb'] for stats in self.memory_stats) + end_rss = self.memory_stats[-1]['rss_mb'] + + print(f"\nπŸ“ˆ Memory Growth Analysis:") + print(f" Start RSS: {start_rss:.1f}MB") + print(f" Peak RSS: {peak_rss:.1f}MB (+{peak_rss - start_rss:.1f}MB)") + print(f" End RSS: {end_rss:.1f}MB ({'+' if end_rss > start_rss else ''}{end_rss - start_rss:.1f}MB from start)") + + # Top memory allocations + if len(self.memory_snapshots) >= 2: + print(f"\nπŸ”₯ Top Memory Allocations (comparing first vs last snapshot):") + first_snapshot = self.memory_snapshots[0][1] + last_snapshot = self.memory_snapshots[-1][1] + + top_stats = last_snapshot.compare_to(first_snapshot, 'lineno')[:10] + for index, stat in enumerate(top_stats): + print(f" {index+1:2d}. {stat}") + + def analyze_cpu_usage(self) -> None: + """Analyze CPU usage patterns and print detailed report.""" + if not self.enable_cpu_profiling or not self.cpu_profiles: + return + + print("\n" + "="*80) + print("⚑ CPU USAGE ANALYSIS") + print("="*80) + + for label, profiler in self.cpu_profiles: + print(f"\nπŸ” CPU Profile: {label}") + print("-" * 50) + + # Create stats object + stats = pstats.Stats(profiler) + stats.sort_stats('cumulative') + + # Print top 15 functions by cumulative time + print("Top 15 functions by cumulative time:") + stats.print_stats(15) + + def analyze_timing_patterns(self) -> None: + """Analyze timing patterns across operations.""" + if not self.timing_stats: + return + + print("\n" + "="*80) + print("⏱️ TIMING ANALYSIS") + print("="*80) + + # Group by operation type + timing_groups = {} + for stat in self.timing_stats: + operation = stat['label'].split(' ')[0] # Get first word as operation type + if operation not in timing_groups: + timing_groups[operation] = [] + timing_groups[operation].append(stat['duration_ms']) + + print("\nπŸ“Š Timing Summary by Operation:") + for operation, durations in timing_groups.items(): + avg_duration = sum(durations) / len(durations) + min_duration = min(durations) + max_duration = max(durations) + print(f" {operation:20s} | Avg: {avg_duration:6.1f}ms | Min: {min_duration:6.1f}ms | Max: {max_duration:6.1f}ms | Count: {len(durations)}") + + def generate_report(self) -> None: + """Generate comprehensive performance report.""" + print("\n" + "="*80) + print("🎯 HYPFER COMPREHENSIVE PERFORMANCE REPORT") + print("="*80) + + self.analyze_memory_usage() + self.analyze_cpu_usage() + self.analyze_timing_patterns() + + # Garbage collection stats + print(f"\nπŸ—‘οΈ Garbage Collection Stats:") + gc_stats = gc.get_stats() + for i, stats in enumerate(gc_stats): + print(f" Generation {i}: Collections={stats['collections']}, Collected={stats['collected']}, Uncollectable={stats['uncollectable']}") + + print("\n" + "="*80) + class TestImageHandler: - def __init__(self): + def __init__(self, enable_profiling: bool = True): self.test_data = None self.image = None - def setUp(self): + # Initialize profiler + self.profiler = PerformanceProfiler( + enable_memory_profiling=ENABLE_MEMORY_PROFILING and ENABLE_PROFILER, + enable_cpu_profiling=ENABLE_CPU_TIMING and ENABLE_PROFILER, + ) if ENABLE_PROFILER else None + # Always-on lightweight accumulators for per-frame stats (work even if profiler is off) + self.wall_times_ms: list[float] = [] + self.cpu_times_ms: list[float] = [] + + def set_up(self): + """Set up test data with profiling.""" + if self.profiler: + self.profiler.take_memory_snapshot("Test Setup Start") + # Load the test.json file - test_file_path = os.path.join(os.path.dirname(__file__), "test.json") + test_file_path = os.path.join(os.path.dirname(__file__), "test.json") #glossyhardtofindnarwhal _LOGGER.info(f"Loading test data from {test_file_path}") with open(test_file_path, "r") as file: self.test_data = json.load(file) _LOGGER.debug("Test data loaded.") + if self.profiler: + self.profiler.take_memory_snapshot("Test Setup Complete") + async def test_image_handler(self): _LOGGER.info("Starting test_image_handler...") @@ -62,8 +255,8 @@ async def test_image_handler(self): 'alpha_text': 255.0, 'alpha_wall': 150.0, # Testing with a lower alpha value 'alpha_zone_clean': 125.0, - 'aspect_ratio': '16, 9', - 'auto_zoom': True, + 'aspect_ratio': '1, 1', + 'auto_zoom': False, 'zoom_lock_ratio': True, 'color_background': [0, 125, 255], 'color_charger': [255, 128, 0], @@ -110,7 +303,7 @@ async def test_image_handler(self): 'offset_bottom': 0, 'offset_left': 10, 'offset_right': 0, - 'rotate_image': '180', + 'rotate_image': '90', 'margins': '100', 'show_vac_status': False, 'vac_status_font': 'custom_components/mqtt_vacuum_camera/utils/fonts/FiraSans.ttf', @@ -119,6 +312,7 @@ async def test_image_handler(self): 'enable_www_snapshots': False, 'get_svg_file': False, 'trims_data': { + "floor": "1", 'trim_left': 1980, 'trim_up': 1650, 'trim_right': 3974, @@ -152,9 +346,10 @@ async def test_image_handler(self): 'disable_room_15': False } - shared_data = CameraSharedManager("test_vacuum", device_info) + shared_data = CameraSharedManager("test_rand256", device_info) shared = shared_data.get_instance() - shared.vacuum_state = "cleaning" + shared.vacuum_state = "docked" + shared.vacuum_battery = 100 shared.vacuum_ips = "192.168.8.1" # Active zones will be populated from the JSON data automatically @@ -172,14 +367,75 @@ async def test_image_handler(self): # ImageDraw is already initialized in the handler constructor # Active zones will be populated from the JSON data during image processing - # Get the image with elements disabled from device_info - self.image = await handler.async_get_image_from_json(self.test_data) - if self.image is None: - _LOGGER.error("Failed to generate image from JSON data") - return + # Lightweight per-frame CPU timing process handle + proc = psutil.Process(os.getpid()) if ENABLE_CPU_TIMING else None + + # Run image generation N times to observe handler performance + for i in range(FRAME_COUNT): + iteration = i + 1 + + # Optional sparse memory snapshot before frame + if self.profiler and ENABLE_MEMORY_PROFILING: + if SNAPSHOT_EVERY_FRAME or iteration in (1, FRAME_COUNT): + self.profiler.take_memory_snapshot(f"Before Image Generation #{iteration}") + + # Start timing (wall + CPU) + cpu0 = proc.cpu_times() if proc else None + start_time = time.time() + + # Get the image (PIL format) + self.image = await handler.async_get_image(self.test_data, bytes_format=False) + if shared.binary_image is None: + _LOGGER.warning("❌ Binary image is None") + else: + _LOGGER.info(f"Image size: {len(shared.binary_image)} bytes") + + # End timing + end_time = time.time() + cpu1 = proc.cpu_times() if proc else None + + # Record timings (always capture into local accumulators) + wall_ms = (end_time - start_time) * 1000.0 + self.wall_times_ms.append(wall_ms) + + cpu_used = None + if proc and cpu0 and cpu1: + cpu_used = (cpu1.user + cpu1.system) - (cpu0.user + cpu0.system) + wall = max(end_time - start_time, 1e-9) + core_util = (cpu_used / wall) * 100.0 + cpu_ms = cpu_used * 1000.0 + self.cpu_times_ms.append(cpu_ms) + _LOGGER.info(f"CPU/frame: {cpu_used:.3f}s | core-util: {core_util:.1f}% of one core") + + # Optional profiler bookkeeping + if self.profiler: + if ENABLE_MEMORY_PROFILING and (SNAPSHOT_EVERY_FRAME or iteration in (1, FRAME_COUNT)): + self.profiler.take_memory_snapshot(f"After Image Generation #{iteration}") + self.profiler.time_operation(f"Image #{iteration}", start_time, end_time) + if cpu_used is not None: + self.profiler.timing_stats.append({'label': f"CPU #{iteration}", 'duration_ms': cpu_ms, 'timestamp': start_time}) + + # After loop: measure data sampling times separately + t0=time.time(); calibration_data = handler.get_calibration_data(); t1=time.time() + if self.profiler: self.profiler.time_operation("Calib", t0, t1) + _LOGGER.info(f"Calibration_data: {calibration_data}") + + _LOGGER.info(f"PIL image size: {self.image.size}") + store = RoomStore("test_vacuum") + t2=time.time(); rooms_data = await handler.async_get_rooms_attributes(); t3=time.time() + if self.profiler: self.profiler.time_operation("Rooms", t2, t3) + _LOGGER.info(f"Room Properties: {rooms_data}") + + # Robot in room timing (using existing robot_pos) + rp = handler.robot_pos or {} + rx, ry, ra = rp.get("x"), rp.get("y"), rp.get("angle") + t4=time.time(); _ = await handler.imd.async_get_robot_in_room(robot_y=ry, robot_x=rx, angle=ra); t5=time.time() + if self.profiler: self.profiler.time_operation("RobotRoom", t4, t5) + + _LOGGER.info(f"Calibration_data: {handler.get_calibration_data()}") - _LOGGER.info(f"Image size: {self.image.size}") + _LOGGER.info(f"PIL image size: {self.image.size}") store = RoomStore("test_vacuum") rooms_data = await handler.async_get_rooms_attributes() _LOGGER.info(f"Room Properties: {rooms_data}") @@ -274,73 +530,70 @@ async def test_image_handler(self): _LOGGER.info("No obstacles positions found in the map data") _LOGGER.info(f"Obstacles data: {shared.obstacles_data}") - #_LOGGER.info("Testing robot detection in each room...") - success_count = 0 - - # # Set the rooms_pos attribute in the handler.imd object - # # Convert the rooms dictionary to a list of room objects - # rooms_list = [] - # for room_id, props in store.get_rooms().items(): - # room_obj = { - # 'name': props['name'], - # 'outline': props['outline'] - # } - # rooms_list.append(room_obj) - # - # # Set the rooms_pos attribute - # handler.imd.img_h.rooms_pos = rooms_list - # - # for room_id, props in store.get_rooms().items(): - # # Use the room's center coordinates as the robot position - # robot_x = props['x'] - # robot_y = props['y'] - # - # # Verify that the point is actually inside the polygon using our algorithm - # is_inside = handler.imd.point_in_polygon(robot_x, robot_y, props['outline']) - # if not is_inside: - # _LOGGER.warning(f"⚠️ Center point ({robot_x}, {robot_y}) is not inside room {room_id}: {props['name']}") - # # Try to find a better test point by averaging some points from the outline - # points = props['outline'] - # if len(points) >= 3: - # # Use the average of the first 3 points as an alternative test point - # alt_x = sum(p[0] for p in points[:3]) // 3 - # alt_y = sum(p[1] for p in points[:3]) // 3 - # if handler.imd.point_in_polygon(alt_x, alt_y, props['outline']): - # _LOGGER.info(f" Using alternative point ({alt_x}, {alt_y}) for testing") - # robot_x, robot_y = alt_x, alt_y - # - # # Call the function to detect which room the robot is in - # #result = await handler.imd.async_get_robot_in_room(robot_y=robot_y, robot_x=robot_x) - # #_LOGGER.info(f"Robot in room: {result}") - - - - self.image.show() def __main__(): - test = TestImageHandler() - test.setUp() + # Enable comprehensive profiling (disable CPU profiling to avoid conflicts with main cProfile) + test = TestImageHandler(enable_profiling=True) + # Disable CPU profiling in the custom profiler to avoid conflicts + if test.profiler: + test.profiler.enable_cpu_profiling = False + + test.set_up() loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) - profiler = cProfile.Profile() - profiler.enable() + # Legacy cProfile for compatibility + profiler = cProfile.Profile() if ENABLE_LEGACY_CPROFILE else None + if profiler: + profiler.enable() try: + if test.profiler: + test.profiler.take_memory_snapshot("Test Start") + loop.run_until_complete(test.test_image_handler()) + + if test.profiler: + test.profiler.take_memory_snapshot("Test Complete") + finally: - profiler.disable() + if profiler: + profiler.disable() loop.close() + if test.image: + test.image.show() + + if profiler: + # Save profiling data + profile_output = "hypfer_profile_output.prof" + profiler.dump_stats(profile_output) + + # Print legacy profiling results + print("\n" + "="*80) + print("πŸ“Š LEGACY CPROFILE RESULTS (Top 50 functions)") + print("="*80) + stats = pstats.Stats(profile_output) + stats.strip_dirs().sort_stats("cumulative").print_stats(50) + + # Generate comprehensive profiling report + # Summarize adjusted stats (remove warm-up frame for steady-state) + def _avg(values: list[float]) -> float: + return sum(values) / max(len(values), 1) + + if hasattr(test, 'wall_times_ms') and test.wall_times_ms: + steady_wall = test.wall_times_ms[1:] if len(test.wall_times_ms) > 1 else test.wall_times_ms + steady_cpu = test.cpu_times_ms[1:] if len(test.cpu_times_ms) > 1 else test.cpu_times_ms + print("\n=== ADJUSTED (steady-state, excl. warm-up) ===") + if steady_wall: + print(f"Image avg (ms): {_avg(steady_wall):.1f} | min: {min(steady_wall):.1f} | max: {max(steady_wall):.1f} | n={len(steady_wall)}") + if steady_cpu: + print(f"CPU avg (ms): {_avg(steady_cpu):.1f} | min: {min(steady_cpu):.1f} | max: {max(steady_cpu):.1f} | n={len(steady_cpu)}") + + if test.profiler: + test.profiler.generate_report() - # Save profiling data - profile_output = "hypfer_profile_output.prof" - profiler.dump_stats(profile_output) - - # Print profiling summary - stats = pstats.Stats(profile_output) - stats.strip_dirs().sort_stats("cumulative").print_stats(50) # Show top 50 functions if __name__ == "__main__": diff --git a/tests/test_rand.py b/tests/test_rand.py index 1e0224f..955ddb2 100644 --- a/tests/test_rand.py +++ b/tests/test_rand.py @@ -5,6 +5,11 @@ import logging import cProfile import pstats +import tracemalloc +import psutil +import gc +import time +from typing import Dict, List, Tuple import sys import os @@ -14,7 +19,9 @@ from SCR.valetudo_map_parser.config.colors import ColorsManagement from SCR.valetudo_map_parser.config.shared import CameraSharedManager -from SCR.valetudo_map_parser.rand25_handler import ReImageHandler +from SCR.valetudo_map_parser.rand256_handler import ReImageHandler +from SCR.valetudo_map_parser.config.rand256_parser import RRMapParser as Rand256Parser + # Configure logging logging.basicConfig( @@ -25,11 +32,181 @@ _LOGGER = logging.getLogger(__name__) +class PerformanceProfiler: + """Comprehensive profiling for memory and CPU usage analysis.""" + + def __init__(self, enable_memory_profiling: bool = True, enable_cpu_profiling: bool = True): + self.enable_memory_profiling = enable_memory_profiling + self.enable_cpu_profiling = enable_cpu_profiling + self.memory_snapshots: List[Tuple[str, tracemalloc.Snapshot]] = [] + self.cpu_profiles: List[Tuple[str, cProfile.Profile]] = [] + self.memory_stats: List[Dict] = [] + self.timing_stats: List[Dict] = [] + + if self.enable_memory_profiling: + tracemalloc.start() + _LOGGER.info("πŸ” Memory profiling enabled") + + if self.enable_cpu_profiling: + _LOGGER.info("⚑ CPU profiling enabled") + + def take_memory_snapshot(self, label: str) -> None: + """Take a memory snapshot with a descriptive label.""" + if not self.enable_memory_profiling: + return + + snapshot = tracemalloc.take_snapshot() + self.memory_snapshots.append((label, snapshot)) + + # Get current memory usage + process = psutil.Process() + memory_info = process.memory_info() + + self.memory_stats.append({ + 'label': label, + 'timestamp': time.time(), + 'rss_mb': memory_info.rss / 1024 / 1024, # Resident Set Size in MB + 'vms_mb': memory_info.vms / 1024 / 1024, # Virtual Memory Size in MB + 'percent': process.memory_percent(), + }) + + _LOGGER.debug(f"πŸ“Š Memory snapshot '{label}': RSS={memory_info.rss / 1024 / 1024:.1f}MB") + + def start_cpu_profile(self, label: str) -> cProfile.Profile: + """Start CPU profiling for a specific operation.""" + if not self.enable_cpu_profiling: + return None + + profiler = cProfile.Profile() + profiler.enable() + self.cpu_profiles.append((label, profiler)) + return profiler + + def stop_cpu_profile(self, profiler: cProfile.Profile) -> None: + """Stop CPU profiling.""" + if profiler: + profiler.disable() + + def time_operation(self, label: str, start_time: float, end_time: float) -> None: + """Record timing information for an operation.""" + duration = end_time - start_time + self.timing_stats.append({ + 'label': label, + 'duration_ms': duration * 1000, + 'timestamp': start_time + }) + _LOGGER.info(f"⏱️ {label}: {duration * 1000:.1f}ms") + + def analyze_memory_usage(self) -> None: + """Analyze memory usage patterns and print detailed report.""" + if not self.enable_memory_profiling or len(self.memory_snapshots) < 2: + return + + print("\n" + "="*80) + print("πŸ“Š MEMORY USAGE ANALYSIS") + print("="*80) + + # Memory usage over time + print("\nπŸ” Memory Usage Timeline:") + for i, stats in enumerate(self.memory_stats): + print(f" {i+1:2d}. {stats['label']:30s} | RSS: {stats['rss_mb']:6.1f}MB | VMS: {stats['vms_mb']:6.1f}MB | {stats['percent']:4.1f}%") + + # Memory growth analysis + if len(self.memory_stats) >= 2: + start_rss = self.memory_stats[0]['rss_mb'] + peak_rss = max(stats['rss_mb'] for stats in self.memory_stats) + end_rss = self.memory_stats[-1]['rss_mb'] + + print(f"\nπŸ“ˆ Memory Growth Analysis:") + print(f" Start RSS: {start_rss:.1f}MB") + print(f" Peak RSS: {peak_rss:.1f}MB (+{peak_rss - start_rss:.1f}MB)") + print(f" End RSS: {end_rss:.1f}MB ({'+' if end_rss > start_rss else ''}{end_rss - start_rss:.1f}MB from start)") + + # Top memory allocations + if len(self.memory_snapshots) >= 2: + print(f"\nπŸ”₯ Top Memory Allocations (comparing first vs last snapshot):") + first_snapshot = self.memory_snapshots[0][1] + last_snapshot = self.memory_snapshots[-1][1] + + top_stats = last_snapshot.compare_to(first_snapshot, 'lineno')[:10] + for index, stat in enumerate(top_stats): + print(f" {index+1:2d}. {stat}") + + def analyze_cpu_usage(self) -> None: + """Analyze CPU usage patterns and print detailed report.""" + if not self.enable_cpu_profiling or not self.cpu_profiles: + return + + print("\n" + "="*80) + print("⚑ CPU USAGE ANALYSIS") + print("="*80) + + for label, profiler in self.cpu_profiles: + print(f"\nπŸ” CPU Profile: {label}") + print("-" * 50) + + # Create stats object + stats = pstats.Stats(profiler) + stats.sort_stats('cumulative') + + # Print top 15 functions by cumulative time + print("Top 15 functions by cumulative time:") + stats.print_stats(15) + + def analyze_timing_patterns(self) -> None: + """Analyze timing patterns across operations.""" + if not self.timing_stats: + return + + print("\n" + "="*80) + print("⏱️ TIMING ANALYSIS") + print("="*80) + + # Group by operation type + timing_groups = {} + for stat in self.timing_stats: + operation = stat['label'].split(' ')[0] # Get first word as operation type + if operation not in timing_groups: + timing_groups[operation] = [] + timing_groups[operation].append(stat['duration_ms']) + + print("\nπŸ“Š Timing Summary by Operation:") + for operation, durations in timing_groups.items(): + avg_duration = sum(durations) / len(durations) + min_duration = min(durations) + max_duration = max(durations) + print(f" {operation:20s} | Avg: {avg_duration:6.1f}ms | Min: {min_duration:6.1f}ms | Max: {max_duration:6.1f}ms | Count: {len(durations)}") + + def generate_report(self) -> None: + """Generate comprehensive performance report.""" + print("\n" + "="*80) + print("🎯 COMPREHENSIVE PERFORMANCE REPORT") + print("="*80) + + self.analyze_memory_usage() + self.analyze_cpu_usage() + self.analyze_timing_patterns() + + # Garbage collection stats + print(f"\nπŸ—‘οΈ Garbage Collection Stats:") + gc_stats = gc.get_stats() + for i, stats in enumerate(gc_stats): + print(f" Generation {i}: Collections={stats['collections']}, Collected={stats['collected']}, Uncollectable={stats['uncollectable']}") + + print("\n" + "="*80) + + class TestRandImageHandler: - def __init__(self): + def __init__(self, enable_profiling: bool = True): self.test_data = None self.image = None + # Initialize profiler + self.profiler = PerformanceProfiler( + enable_memory_profiling=enable_profiling, + enable_cpu_profiling=enable_profiling + ) if enable_profiling else None + def setUp(self): # Load test data from the rand.json file test_file_path = os.path.join(os.path.dirname(__file__), "rand.json") @@ -43,6 +220,13 @@ def setUp(self): async def test_image_handler(self): _LOGGER.info("Starting test_rand_image_handler...") + # Start profiling for this image generation + start_time = time.time() + cpu_profiler = None + if self.profiler: + self.profiler.take_memory_snapshot(f"Before Image Gen - {self.current_file}") + cpu_profiler = self.profiler.start_cpu_profile(f"Image Generation - {self.current_file}") + device_info = { 'platform': 'mqtt_vacuum_camera', 'unique_id': 'rockrobo_camera', @@ -231,19 +415,40 @@ async def test_image_handler(self): # Show the image self.image.show() + # End profiling for this image generation + end_time = time.time() + if self.profiler: + if cpu_profiler: + self.profiler.stop_cpu_profile(cpu_profiler) + self.profiler.take_memory_snapshot(f"After Image Gen - {self.current_file}") + self.profiler.time_operation(f"Image Generation - {self.current_file}", start_time, end_time) + def __main__(): - test = TestRandImageHandler() + # Enable comprehensive profiling (disable CPU profiling to avoid conflicts with main cProfile) + test = TestRandImageHandler(enable_profiling=True) + # Disable CPU profiling in the custom profiler to avoid conflicts + if test.profiler: + test.profiler.enable_cpu_profiling = False + test.setUp() loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) + # Legacy cProfile for compatibility profiler = cProfile.Profile() profiler.enable() try: + if test.profiler: + test.profiler.take_memory_snapshot("Test Start") + loop.run_until_complete(test.test_image_handler()) + + if test.profiler: + test.profiler.take_memory_snapshot("Test Complete") + finally: profiler.disable() loop.close() @@ -252,9 +457,16 @@ def __main__(): profile_output = "profile_output_rand.prof" profiler.dump_stats(profile_output) - # Print profiling summary + # Print legacy profiling results + print("\n" + "="*80) + print("πŸ“Š LEGACY CPROFILE RESULTS (Top 50 functions)") + print("="*80) stats = pstats.Stats(profile_output) - stats.strip_dirs().sort_stats("cumulative").print_stats(50) # Show top 50 functions + stats.strip_dirs().sort_stats("cumulative").print_stats(50) + + # Generate comprehensive profiling report + if test.profiler: + test.profiler.generate_report() if __name__ == "__main__":