diff --git a/SCR/valetudo_map_parser/config/drawable.py b/SCR/valetudo_map_parser/config/drawable.py index d963c7a..0b07f06 100644 --- a/SCR/valetudo_map_parser/config/drawable.py +++ b/SCR/valetudo_map_parser/config/drawable.py @@ -53,49 +53,30 @@ async def from_json_to_image( ) -> NumpyArray: """Draw the layers (rooms) from the vacuum JSON data onto the image array.""" image_array = layer - # Extract alpha from color - alpha = color[3] if len(color) == 4 else 255 - - # Create the full color with alpha - full_color = color if len(color) == 4 else (*color, 255) - - # Check if we need to blend colors (alpha < 255) - need_blending = alpha < 255 + need_blending = color[3] < 255 - # Loop through pixels to find min and max coordinates for x, y, z in pixels: col = x * pixel_size row = y * pixel_size - # Draw pixels as blocks for i in range(z): - # Get the region to update region_slice = ( slice(row, row + pixel_size), slice(col + i * pixel_size, col + (i + 1) * pixel_size), ) if need_blending: - # Sample the center of the region for blending - center_y = row + pixel_size // 2 - center_x = col + i * pixel_size + pixel_size // 2 - - # Only blend if coordinates are valid + cy = row + pixel_size // 2 + cx = col + i * pixel_size + pixel_size // 2 if ( - 0 <= center_y < image_array.shape[0] - and 0 <= center_x < image_array.shape[1] + 0 <= cy < image_array.shape[0] + and 0 <= cx < image_array.shape[1] ): - # Get blended color - blended_color = sample_and_blend_color( - image_array, center_x, center_y, full_color - ) - # Apply blended color to the region - image_array[region_slice] = blended_color + px = sample_and_blend_color(image_array, cx, cy, color) + image_array[region_slice] = px else: - # Use original color if out of bounds - image_array[region_slice] = full_color + image_array[region_slice] = color else: - # No blending needed, use direct assignment - image_array[region_slice] = full_color + image_array[region_slice] = color return image_array @@ -331,36 +312,6 @@ def _filled_circle( return image - @staticmethod - def _filled_circle_optimized( - image: np.ndarray, - center: Tuple[int, int], - radius: int, - color: Color, - outline_color: Color = None, - outline_width: int = 0, - ) -> np.ndarray: - """ - Optimized _filled_circle ensuring dtype compatibility with uint8. - """ - x, y = center - h, w = image.shape[:2] - color_np = np.array(color, dtype=image.dtype) - outline_color_np = ( - np.array(outline_color, dtype=image.dtype) - if outline_color is not None - else None - ) - y_indices, x_indices = np.meshgrid(np.arange(h), np.arange(w), indexing="ij") - dist_sq = (y_indices - y) ** 2 + (x_indices - x) ** 2 - circle_mask = dist_sq <= radius**2 - image[circle_mask] = color_np - if outline_width > 0 and outline_color_np is not None: - outer_mask = dist_sq <= (radius + outline_width) ** 2 - outline_mask = outer_mask & ~circle_mask - image[outline_mask] = outline_color_np - return image - @staticmethod def _ellipse( image: NumpyArray, center: Point, radius: int, color: Color @@ -593,161 +544,6 @@ def overlay_robot( ) return background_image - @staticmethod - def draw_filled_circle( - image: np.ndarray, - centers: Tuple[int, int], - radius: int, - color: Tuple[int, int, int, int], - ) -> np.ndarray: - """ - Draw multiple filled circles at once using a single NumPy mask. - """ - h, w = image.shape[:2] - y_indices, x_indices = np.ogrid[:h, :w] # Precompute coordinate grids - mask = np.zeros((h, w), dtype=bool) - for cx, cy in centers: - mask |= (x_indices - cx) ** 2 + (y_indices - cy) ** 2 <= radius**2 - image[mask] = color - return image - - @staticmethod - def batch_draw_elements( - image: np.ndarray, - elements: list, - element_type: str, - color: Color, - ) -> np.ndarray: - """ - Efficiently draw multiple elements of the same type at once. - - Args: - image: The image array to draw on - elements: List of element data (coordinates, etc.) - element_type: Type of element to draw ('circle', 'line', etc.) - color: Color to use for drawing - - Returns: - Modified image array - """ - if not elements or len(elements) == 0: - return image - - # Get image dimensions - height, width = image.shape[:2] - - if element_type == "circle": - # Extract circle centers and radii - centers = [] - radii = [] - for elem in elements: - if isinstance(elem, dict) and "center" in elem and "radius" in elem: - centers.append(elem["center"]) - radii.append(elem["radius"]) - elif isinstance(elem, (list, tuple)) and len(elem) >= 3: - # Format: (x, y, radius) - centers.append((elem[0], elem[1])) - radii.append(elem[2]) - - # Process circles with the same radius together - for radius in set(radii): - same_radius_centers = [ - centers[i] for i in range(len(centers)) if radii[i] == radius - ] - if same_radius_centers: - # Create a combined mask for all circles with this radius - mask = np.zeros((height, width), dtype=bool) - for cx, cy in same_radius_centers: - if 0 <= cx < width and 0 <= cy < height: - # Calculate circle bounds - min_y = max(0, cy - radius) - max_y = min(height, cy + radius + 1) - min_x = max(0, cx - radius) - max_x = min(width, cx + radius + 1) - - # Create coordinate arrays for the circle - y_indices, x_indices = np.ogrid[min_y:max_y, min_x:max_x] - - # Add this circle to the mask - circle_mask = (y_indices - cy) ** 2 + ( - x_indices - cx - ) ** 2 <= radius**2 - mask[min_y:max_y, min_x:max_x] |= circle_mask - - # Apply color to all circles at once - image[mask] = color - - elif element_type == "line": - # Extract line endpoints - lines = [] - widths = [] - for elem in elements: - if isinstance(elem, dict) and "start" in elem and "end" in elem: - lines.append((elem["start"], elem["end"])) - widths.append(elem.get("width", 1)) - elif isinstance(elem, (list, tuple)) and len(elem) >= 4: - # Format: (x1, y1, x2, y2, [width]) - lines.append(((elem[0], elem[1]), (elem[2], elem[3]))) - widths.append(elem[4] if len(elem) > 4 else 1) - - # Process lines with the same width together - for width in set(widths): - same_width_lines = [ - lines[i] for i in range(len(lines)) if widths[i] == width - ] - if same_width_lines: - # Create a combined mask for all lines with this width - mask = np.zeros((height, width), dtype=bool) - - # Draw all lines into the mask - for start, end in same_width_lines: - x1, y1 = start - x2, y2 = end - - # Skip invalid lines - if not ( - 0 <= x1 < width - and 0 <= y1 < height - and 0 <= x2 < width - and 0 <= y2 < height - ): - continue - - # Use Bresenham's algorithm to get line points - length = max(abs(x2 - x1), abs(y2 - y1)) - if length == 0: - continue - - t = np.linspace(0, 1, length * 2) - x_coordinates = np.round(x1 * (1 - t) + x2 * t).astype(int) - y_coordinates = np.round(y1 * (1 - t) + y2 * t).astype(int) - - # Add line points to mask - for x, y in zip(x_coordinates, y_coordinates): - if width == 1: - mask[y, x] = True - else: - # For thicker lines - half_width = width // 2 - min_y = max(0, y - half_width) - max_y = min(height, y + half_width + 1) - min_x = max(0, x - half_width) - max_x = min(width, x + half_width + 1) - - # Create a circular brush - y_indices, x_indices = np.ogrid[ - min_y:max_y, min_x:max_x - ] - brush = (y_indices - y) ** 2 + ( - x_indices - x - ) ** 2 <= half_width**2 - mask[min_y:max_y, min_x:max_x] |= brush - - # Apply color to all lines at once - image[mask] = color - - return image - @staticmethod async def async_draw_obstacles( image: np.ndarray, obstacle_info_list, color: Color diff --git a/SCR/valetudo_map_parser/config/rand256_parser.py b/SCR/valetudo_map_parser/config/rand256_parser.py index c1bb0f3..11088ca 100644 --- a/SCR/valetudo_map_parser/config/rand256_parser.py +++ b/SCR/valetudo_map_parser/config/rand256_parser.py @@ -1,4 +1,5 @@ -"""New Rand256 Map Parser - Based on Xiaomi/Roborock implementation with precise binary parsing.""" +"""New Rand256 Map Parser - +Based on Xiaomi/Roborock implementation with precise binary parsing.""" import math import struct @@ -78,6 +79,7 @@ def _get_int32_signed(data: bytes, address: int) -> int: @staticmethod def _parse_carpet_map(data: bytes) -> set[int]: + """Parse carpet map using Xiaomi method.""" carpet_map = set() for i, v in enumerate(data): @@ -87,6 +89,7 @@ def _parse_carpet_map(data: bytes) -> set[int]: @staticmethod def _parse_area(header: bytes, data: bytes) -> list: + """Parse area using Xiaomi method.""" area_pairs = RRMapParser._get_int16(header, 0x08) areas = [] for area_start in range(0, area_pairs * 16, 16): @@ -114,6 +117,7 @@ def _parse_area(header: bytes, data: bytes) -> list: @staticmethod def _parse_zones(data: bytes, header: bytes) -> list: + """Parse zones using Xiaomi method.""" zone_pairs = RRMapParser._get_int16(header, 0x08) zones = [] for zone_start in range(0, zone_pairs * 8, 8): @@ -146,21 +150,9 @@ def _parse_object_position(block_data_length: int, data: bytes) -> Dict[str, Any angle = raw_angle return {"position": [x, y], "angle": angle} - - @staticmethod - def _parse_walls(data: bytes, header: bytes) -> list: - wall_pairs = RRMapParser._get_int16(header, 0x08) - walls = [] - for wall_start in range(0, wall_pairs * 8, 8): - x0 = RRMapParser._get_int16(data, wall_start + 0) - y0 = RRMapParser._get_int16(data, wall_start + 2) - x1 = RRMapParser._get_int16(data, wall_start + 4) - y1 = RRMapParser._get_int16(data, wall_start + 6) - walls.append([x0, RRMapParser.Tools.DIMENSION_MM - y0, x1, RRMapParser.Tools.DIMENSION_MM - y1]) - return walls - @staticmethod def _parse_walls(data: bytes, header: bytes) -> list: + """Parse walls using Xiaomi method.""" wall_pairs = RRMapParser._get_int16(header, 0x08) walls = [] for wall_start in range(0, wall_pairs * 8, 8): @@ -223,6 +215,7 @@ def parse(self, map_buf: bytes) -> Dict[str, Any]: return {} def parse_blocks(self, raw: bytes, pixels: bool = True) -> Dict[int, Any]: + """Parse all blocks using Xiaomi method.""" blocks = {} map_header_length = self._get_int16(raw, 0x02) block_start_position = map_header_length diff --git a/SCR/valetudo_map_parser/config/shared.py b/SCR/valetudo_map_parser/config/shared.py index bffdec4..62c4173 100755 --- a/SCR/valetudo_map_parser/config/shared.py +++ b/SCR/valetudo_map_parser/config/shared.py @@ -10,6 +10,7 @@ from PIL import Image +from .utils import pil_size_rotation from .types import ( ATTR_CALIBRATION_POINTS, ATTR_CAMERA_MODE, @@ -210,11 +211,12 @@ def generate_attributes(self) -> dict: def to_dict(self) -> dict: """Return a dictionary with image and attributes data.""" + return { "image": { "binary": self.binary_image, "pil_image": self.new_image, - "size": self.new_image.size if self.new_image else (10, 10), + "size": pil_size_rotation(self.image_rotate, self.new_image), }, "attributes": self.generate_attributes(), } diff --git a/SCR/valetudo_map_parser/config/types.py b/SCR/valetudo_map_parser/config/types.py index c6740bd..26924f5 100644 --- a/SCR/valetudo_map_parser/config/types.py +++ b/SCR/valetudo_map_parser/config/types.py @@ -34,12 +34,11 @@ class Room(TypedDict): id: int -# list[dict[str, str | list[int]]] | list[dict[str, str | list[list[int]]]] | list[dict[str, str | int]] | int]' class Destinations(TypedDict, total=False): spots: NotRequired[Optional[List[Spot]]] zones: NotRequired[Optional[List[Zone]]] rooms: NotRequired[Optional[List[Room]]] - updated: NotRequired[Optional[float]] + updated: NotRequired[Optional[float | int]] class RoomProperty(TypedDict): diff --git a/SCR/valetudo_map_parser/config/utils.py b/SCR/valetudo_map_parser/config/utils.py index 56bf974..9a3cb43 100644 --- a/SCR/valetudo_map_parser/config/utils.py +++ b/SCR/valetudo_map_parser/config/utils.py @@ -111,6 +111,12 @@ async def async_get_image( try: # Backup current image to last_image before processing new one if hasattr(self.shared, "new_image") and self.shared.new_image is not None: + # Close old last_image to free memory before replacing it + if hasattr(self.shared, "last_image") and self.shared.last_image is not None: + try: + self.shared.last_image.close() + except Exception: + pass # Ignore errors if image is already closed self.shared.last_image = self.shared.new_image # Call the appropriate handler method based on handler type @@ -231,15 +237,8 @@ def prepare_resize_params( self, pil_img: PilPNG, rand: bool = False ) -> ResizeParams: """Prepare resize parameters for image resizing.""" - if self.shared.image_rotate in [0, 180]: - width, height = pil_img.size - else: - height, width = pil_img.size - LOGGER.debug( - "Shared PIL image size: %s x %s", - self.shared.image_ref_width, - self.shared.image_ref_height, - ) + width, height = pil_size_rotation(self.shared.image_rotate, pil_img) + return ResizeParams( pil_img=pil_img, width=width, @@ -660,9 +659,6 @@ def get_corners( async def async_resize_image(params: ResizeParams): """Resize the image to the given dimensions and aspect ratio.""" - LOGGER.debug("Resizing image to aspect ratio: %s", params.aspect_ratio) - LOGGER.debug("Original image size: %s x %s", params.width, params.height) - LOGGER.debug("Image crop size: %s", params.crop_size) if params.aspect_ratio == "None": return params.pil_img if params.aspect_ratio != "None": @@ -699,6 +695,17 @@ async def async_resize_image(params: ResizeParams): return params.pil_img +def pil_size_rotation(image_rotate, pil_img): + """Return the size of the image.""" + if not pil_img: + return 0, 0 + if image_rotate in [0, 180]: + width, height = pil_img.size + else: + height, width = pil_img.size + return width, height + + def initialize_drawing_config(handler): """ Initialize drawing configuration from device_info. @@ -725,93 +732,6 @@ def initialize_drawing_config(handler): return drawing_config, draw -def blend_colors(base_color, overlay_color): - """ - Blend two RGBA colors using alpha compositing. - - Args: - base_color: Base RGBA color tuple (r, g, b, a) - overlay_color: Overlay RGBA color tuple (r, g, b, a) - - Returns: - Blended RGBA color tuple (r, g, b, a) - """ - r1, g1, b1, a1 = base_color - r2, g2, b2, a2 = overlay_color - - # Convert alpha to 0-1 range - a1 = a1 / 255.0 - a2 = a2 / 255.0 - - # Calculate resulting alpha - a_out = a1 + a2 * (1 - a1) - - # Avoid division by zero - if a_out < 0.0001: - return [0, 0, 0, 0] - - # Calculate blended RGB components - r_out = (r1 * a1 + r2 * a2 * (1 - a1)) / a_out - g_out = (g1 * a1 + g2 * a2 * (1 - a1)) / a_out - b_out = (b1 * a1 + b2 * a2 * (1 - a1)) / a_out - - # Convert back to 0-255 range and return as tuple - return ( - int(max(0, min(255, r_out))), - int(max(0, min(255, g_out))), - int(max(0, min(255, b_out))), - int(max(0, min(255, a_out * 255))), - ) - - -def blend_pixel(array, x, y, color, element, element_map=None, drawing_config=None): - """ - Blend a pixel color with the existing color at the specified position. - Also updates the element map if the new element has higher z-index. - - Args: - array: The image array to modify - x: X coordinate - y: Y coordinate - color: RGBA color tuple to blend - element: Element code for the pixel - element_map: Optional element map to update - drawing_config: Optional drawing configuration for z-index lookup - - Returns: - None - """ - # Check bounds - if not (0 <= y < array.shape[0] and 0 <= x < array.shape[1]): - return - - # Get current element at this position - current_element = None - if element_map is not None: - current_element = element_map[y, x] - - # Get z-index values for comparison - current_z = 0 - new_z = 0 - - if drawing_config is not None: - current_z = ( - drawing_config.get_property(current_element, "z_index", 0) - if current_element - else 0 - ) - new_z = drawing_config.get_property(element, "z_index", 0) - - # Update element map if new element has higher z-index - if element_map is not None and new_z >= current_z: - element_map[y, x] = element - - # Blend colors - base_color = array[y, x] - blended_color = blend_colors(base_color, color) - array[y, x] = blended_color - - def manage_drawable_elements( handler, action, @@ -993,12 +913,6 @@ async def async_extract_room_outline( # If we found too few boundary points, use the rectangular outline if len(boundary_points) < 8: # Need at least 8 points for a meaningful shape - LOGGER.debug( - "%s: Room %s has too few boundary points (%d), using rectangular outline", - file_name, - str(room_id_int), - len(boundary_points), - ) return rect_outline # Use a more sophisticated algorithm to create a coherent outline @@ -1034,13 +948,6 @@ def calculate_angle(point): # Convert NumPy int64 values to regular Python integers simplified_outline = [(int(x), int(y)) for x, y in simplified_outline] - LOGGER.debug( - "%s: Room %s outline has %d points", - file_name, - str(room_id_int), - len(simplified_outline), - ) - return simplified_outline except (ValueError, IndexError, TypeError, ArithmeticError) as e: diff --git a/SCR/valetudo_map_parser/hypfer_draw.py b/SCR/valetudo_map_parser/hypfer_draw.py index 9432e35..fb74262 100755 --- a/SCR/valetudo_map_parser/hypfer_draw.py +++ b/SCR/valetudo_map_parser/hypfer_draw.py @@ -269,8 +269,6 @@ async def async_draw_zones( zone_clean = self.img_h.data.find_zone_entities(m_json) except (ValueError, KeyError): zone_clean = None - else: - _LOGGER.info("%s: Got zones.", self.file_name) if zone_clean: # Process zones sequentially to avoid memory-intensive array copies diff --git a/SCR/valetudo_map_parser/hypfer_handler.py b/SCR/valetudo_map_parser/hypfer_handler.py index 05a00de..c417d8c 100644 --- a/SCR/valetudo_map_parser/hypfer_handler.py +++ b/SCR/valetudo_map_parser/hypfer_handler.py @@ -254,7 +254,12 @@ async def async_get_image_from_json( ) LOGGER.info("%s: Completed base Layers", self.file_name) # Copy the new array in base layer. + # Delete old base layer before creating new one to free memory + if self.img_base_layer is not None: + del self.img_base_layer self.img_base_layer = await self.async_copy_array(img_np_array) + # Delete source array after copying to free memory + del img_np_array self.shared.frame_number = self.frame_number self.frame_number += 1 @@ -268,6 +273,9 @@ async def async_get_image_from_json( or self.img_work_layer.shape != self.img_base_layer.shape or self.img_work_layer.dtype != self.img_base_layer.dtype ): + # Delete old buffer before creating new one to free memory + if self.img_work_layer is not None: + del self.img_work_layer self.img_work_layer = np.empty_like(self.img_base_layer) # Copy the base layer into the persistent working buffer (no new allocation per frame) diff --git a/SCR/valetudo_map_parser/rand256_handler.py b/SCR/valetudo_map_parser/rand256_handler.py index 56f5fc9..2d5dc30 100644 --- a/SCR/valetudo_map_parser/rand256_handler.py +++ b/SCR/valetudo_map_parser/rand256_handler.py @@ -19,13 +19,14 @@ COLORS, DEFAULT_IMAGE_SIZE, DEFAULT_PIXEL_SIZE, + LOGGER, Colors, + Destinations, JsonType, PilPNG, RobotPosition, RoomsProperties, RoomStore, - LOGGER, ) from .config.utils import ( BaseHandler, @@ -57,6 +58,7 @@ def __init__(self, shared_data): self.drawing_config, self.draw = initialize_drawing_config(self) self.go_to = None # Go to position data self.img_base_layer = None # Base image layer + self.img_work_layer = None # Persistent working buffer (reused across frames) self.img_rotate = shared_data.image_rotate # Image rotation self.room_propriety = None # Room propriety data self.active_zones = None # Active zones @@ -67,7 +69,9 @@ def __init__(self, shared_data): ) # Room data handler async def extract_room_properties( - self, json_data: JsonType, destinations: JsonType + self, + json_data: JsonType, + destinations: Destinations | None = None, ) -> RoomsProperties: """Extract the room properties.""" # unsorted_id = RandImageData.get_rrm_segments_ids(json_data) @@ -82,9 +86,10 @@ async def extract_room_properties( json_data, size_x, size_y, top, left, True ) - dest_json = destinations - zones_data = dict(dest_json).get("zones", []) - points_data = dict(dest_json).get("spots", []) + + dest_json = destinations if destinations else {} + zones_data = dest_json.get("zones", []) + points_data = dest_json.get("spots", []) # Use the RandRoomsHandler to extract room properties room_properties = await self.rooms_handler.async_extract_room_properties( @@ -121,7 +126,7 @@ async def extract_room_properties( async def get_image_from_rrm( self, m_json: JsonType, # json data - destinations: None = None, # MQTT destinations for labels + destinations: Destinations | None = None, # MQTT destinations for labels ) -> PilPNG | None: """Generate Images from the json data. @param m_json: The JSON data to use to draw the image. @@ -152,10 +157,24 @@ async def get_image_from_rrm( # Increment frame number self.frame_number += 1 - img_np_array = await self.async_copy_array(self.img_base_layer) if self.frame_number > 5: self.frame_number = 0 + # Ensure persistent working buffer exists and matches base (allocate only when needed) + if ( + self.img_work_layer is None + or self.img_work_layer.shape != self.img_base_layer.shape + or self.img_work_layer.dtype != self.img_base_layer.dtype + ): + # Delete old buffer before creating new one to free memory + if self.img_work_layer is not None: + del self.img_work_layer + self.img_work_layer = np.empty_like(self.img_base_layer) + + # Copy the base layer into the persistent working buffer (no new allocation per frame) + np.copyto(self.img_work_layer, self.img_base_layer) + img_np_array = self.img_work_layer + # Draw map elements img_np_array = await self._draw_map_elements( img_np_array, m_json, colors, robot_position, robot_position_angle @@ -163,7 +182,7 @@ async def get_image_from_rrm( # Return PIL Image using async utilities pil_img = await AsyncPIL.async_fromarray(img_np_array, mode="RGBA") - del img_np_array # free memory + # Note: Don't delete img_np_array here as it's the persistent work buffer return await self._finalize_image(pil_img) except (RuntimeError, RuntimeWarning) as e: @@ -214,15 +233,22 @@ async def _setup_robot_and_image( self.room_propriety = await self.get_rooms_attributes(destinations) # Always check robot position for zooming (update if room info is missing) - if self.rooms_pos and robot_position and ( - self.robot_pos is None or "in_room" not in self.robot_pos + if ( + self.rooms_pos + and robot_position + and (self.robot_pos is None or "in_room" not in self.robot_pos) ): self.robot_pos = await self.async_get_robot_in_room( (robot_position[0] * 10), (robot_position[1] * 10), robot_position_angle, ) + # Delete old base layer before creating new one to free memory + if self.img_base_layer is not None: + del self.img_base_layer self.img_base_layer = await self.async_copy_array(img_np_array) + # Delete source array after copying to free memory + del img_np_array else: # If floor is disabled, create an empty image background_color = self.drawing_config.get_property( @@ -231,7 +257,12 @@ async def _setup_robot_and_image( img_np_array = await self.draw.create_empty_image( size_x, size_y, background_color ) + # Delete old base layer before creating new one to free memory + if self.img_base_layer is not None: + del self.img_base_layer self.img_base_layer = await self.async_copy_array(img_np_array) + # Delete source array after copying to free memory + del img_np_array # Check active zones BEFORE auto-crop to enable proper zoom functionality # This needs to run on every frame, not just frame 0 diff --git a/SCR/valetudo_map_parser/rooms_handler.py b/SCR/valetudo_map_parser/rooms_handler.py index a1f5e48..8affc6b 100644 --- a/SCR/valetudo_map_parser/rooms_handler.py +++ b/SCR/valetudo_map_parser/rooms_handler.py @@ -161,8 +161,17 @@ async def _process_room_layer( np.uint8 ) + # Free intermediate arrays to reduce memory usage + del local_mask + del struct_elem + del eroded + # Extract contour from the mask outline = self.convex_hull_outline(mask) + + # Free mask after extracting outline + del mask + if not outline: return None, None diff --git a/pyproject.toml b/pyproject.toml index 04d6ac5..a7ac2c4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,7 +18,7 @@ python = ">=3.13" numpy = ">=1.26.4" Pillow = ">=10.3.0" scipy = ">=1.12.0" -mvcrender = ">=0.0.5" +mvcrender = "==0.0.6" [tool.poetry.group.dev.dependencies] ruff = "*" diff --git a/tests/refactored.py b/tests/refactored.py index 697cb78..f5c81be 100644 --- a/tests/refactored.py +++ b/tests/refactored.py @@ -8,13 +8,13 @@ from __future__ import annotations +import asyncio import math import numpy as np -import asyncio from PIL import ImageDraw, ImageFont -from .types import Color, NumpyArray, PilPNG, Point, Union, Tuple +from .types import Color, NumpyArray, PilPNG, Point, Tuple, Union class Drawable: @@ -321,12 +321,12 @@ def _filled_circle( @staticmethod def _filled_circle_optimized( - image: np.ndarray, - center: Tuple[int, int], - radius: int, - color: Color, - outline_color: Color = None, - outline_width: int = 0, + image: np.ndarray, + center: Tuple[int, int], + radius: int, + color: Color, + outline_color: Color = None, + outline_width: int = 0, ) -> np.ndarray: """ Optimized `_filled_circle` ensuring dtype compatibility with uint8. @@ -354,13 +354,13 @@ def _filled_circle_optimized( outline_color_np = None # Create coordinate grids - y_indices, x_indices = np.meshgrid(np.arange(h), np.arange(w), indexing='ij') + y_indices, x_indices = np.meshgrid(np.arange(h), np.arange(w), indexing="ij") # Compute squared distances from center dist_sq = (y_indices - y) ** 2 + (x_indices - x) ** 2 # Mask for filled circle - circle_mask = dist_sq <= radius ** 2 + circle_mask = dist_sq <= radius**2 image[circle_mask] = color_np # Directly modify the image if outline_width > 0 and outline_color_np is not None: @@ -370,7 +370,6 @@ def _filled_circle_optimized( return image - @staticmethod def _ellipse( image: NumpyArray, center: Point, radius: int, color: Color @@ -573,28 +572,34 @@ def draw_obstacles( return image @staticmethod - def draw_filled_circle(image: np.ndarray, centers: Tuple[int, int], radius: int, - color: Tuple[int, int, int, int], - cached_grid: Tuple[np.ndarray, np.ndarray] = None) -> np.ndarray: + def draw_filled_circle( + image: np.ndarray, + centers: Tuple[int, int], + radius: int, + color: Tuple[int, int, int, int], + cached_grid: Tuple[np.ndarray, np.ndarray] = None, + ) -> np.ndarray: """ - Draw multiple filled circles at once using a single NumPy mask. + Draw multiple filled circles at once using a single NumPy mask. - Parameters: - - image: NumPy array representing the image (H, W, 4) for RGBA. - - centers: (N, 2) NumPy array containing the (x, y) coordinates of N circle centers. - - radius: Radius of all circles. - - color: Color as a tuple (R, G, B, A). + Parameters: + - image: NumPy array representing the image (H, W, 4) for RGBA. + - centers: (N, 2) NumPy array containing the (x, y) coordinates of N circle centers. + - radius: Radius of all circles. + - color: Color as a tuple (R, G, B, A). - Returns: - - Modified image with filled circles drawn in one operation. - """ + Returns: + - Modified image with filled circles drawn in one operation. + """ h, w = image.shape[:2] y_indices, x_indices = np.ogrid[:h, :w] # Precompute coordinate grids # Compute mask for all circles at once mask = np.zeros((h, w), dtype=bool) for cx, cy in centers: - mask |= (x_indices - cx) ** 2 + (y_indices - cy) ** 2 <= radius ** 2 # Apply all circles in one pass + mask |= (x_indices - cx) ** 2 + ( + y_indices - cy + ) ** 2 <= radius**2 # Apply all circles in one pass # Apply color where mask is True (broadcasting works for multi-channel) image[mask] = color @@ -602,8 +607,9 @@ def draw_filled_circle(image: np.ndarray, centers: Tuple[int, int], radius: int, return image @staticmethod - async def async_draw_obstacles(image: np.ndarray, obstacle_info_list, - color: Tuple[int, int, int, int]) -> np.ndarray: + async def async_draw_obstacles( + image: np.ndarray, obstacle_info_list, color: Tuple[int, int, int, int] + ) -> np.ndarray: """ Optimized async version of `draw_obstacles` using `asyncio.gather()`. @@ -626,12 +632,19 @@ def extract_centers(obstacle_info_list): Returns: - NumPy array of shape (N, 2) where each row is (x, y). """ - return np.array([[obs["points"]["x"], obs["points"]["y"]] for obs in obstacle_info_list], dtype=np.int32) + return np.array( + [ + [obs["points"]["x"], obs["points"]["y"]] + for obs in obstacle_info_list + ], + dtype=np.int32, + ) - centers = await asyncio.get_running_loop().run_in_executor(None, extract_centers, obstacle_info_list) + centers = await asyncio.get_running_loop().run_in_executor( + None, extract_centers, obstacle_info_list + ) Drawable.draw_filled_circle(image, centers, 6, color) - return image @staticmethod