Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
294 changes: 57 additions & 237 deletions SCR/valetudo_map_parser/config/drawable.py

Large diffs are not rendered by default.

21 changes: 7 additions & 14 deletions SCR/valetudo_map_parser/config/rand256_parser.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""New Rand256 Map Parser - Based on Xiaomi/Roborock implementation with precise binary parsing."""
"""New Rand256 Map Parser -
Based on Xiaomi/Roborock implementation with precise binary parsing."""

import math
import struct
Expand Down Expand Up @@ -78,6 +79,7 @@ def _get_int32_signed(data: bytes, address: int) -> int:

@staticmethod
def _parse_carpet_map(data: bytes) -> set[int]:
"""Parse carpet map using Xiaomi method."""
carpet_map = set()

for i, v in enumerate(data):
Expand All @@ -87,6 +89,7 @@ def _parse_carpet_map(data: bytes) -> set[int]:

@staticmethod
def _parse_area(header: bytes, data: bytes) -> list:
"""Parse area using Xiaomi method."""
area_pairs = RRMapParser._get_int16(header, 0x08)
areas = []
for area_start in range(0, area_pairs * 16, 16):
Expand Down Expand Up @@ -114,6 +117,7 @@ def _parse_area(header: bytes, data: bytes) -> list:

@staticmethod
def _parse_zones(data: bytes, header: bytes) -> list:
"""Parse zones using Xiaomi method."""
zone_pairs = RRMapParser._get_int16(header, 0x08)
zones = []
for zone_start in range(0, zone_pairs * 8, 8):
Expand Down Expand Up @@ -146,21 +150,9 @@ def _parse_object_position(block_data_length: int, data: bytes) -> Dict[str, Any
angle = raw_angle
return {"position": [x, y], "angle": angle}


@staticmethod
def _parse_walls(data: bytes, header: bytes) -> list:
wall_pairs = RRMapParser._get_int16(header, 0x08)
walls = []
for wall_start in range(0, wall_pairs * 8, 8):
x0 = RRMapParser._get_int16(data, wall_start + 0)
y0 = RRMapParser._get_int16(data, wall_start + 2)
x1 = RRMapParser._get_int16(data, wall_start + 4)
y1 = RRMapParser._get_int16(data, wall_start + 6)
walls.append([x0, RRMapParser.Tools.DIMENSION_MM - y0, x1, RRMapParser.Tools.DIMENSION_MM - y1])
return walls

@staticmethod
def _parse_walls(data: bytes, header: bytes) -> list:
"""Parse walls using Xiaomi method."""
wall_pairs = RRMapParser._get_int16(header, 0x08)
walls = []
for wall_start in range(0, wall_pairs * 8, 8):
Expand Down Expand Up @@ -223,6 +215,7 @@ def parse(self, map_buf: bytes) -> Dict[str, Any]:
return {}

def parse_blocks(self, raw: bytes, pixels: bool = True) -> Dict[int, Any]:
"""Parse all blocks using Xiaomi method."""
blocks = {}
map_header_length = self._get_int16(raw, 0x02)
block_start_position = map_header_length
Expand Down
4 changes: 3 additions & 1 deletion SCR/valetudo_map_parser/config/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from PIL import Image

from .utils import pil_size_rotation
from .types import (
ATTR_CALIBRATION_POINTS,
ATTR_CAMERA_MODE,
Expand Down Expand Up @@ -210,11 +211,12 @@ def generate_attributes(self) -> dict:

def to_dict(self) -> dict:
"""Return a dictionary with image and attributes data."""

return {
"image": {
"binary": self.binary_image,
"pil_image": self.new_image,
"size": self.new_image.size if self.new_image else (10, 10),
"size": pil_size_rotation(self.image_rotate, self.new_image),
},
"attributes": self.generate_attributes(),
}
Expand Down
3 changes: 1 addition & 2 deletions SCR/valetudo_map_parser/config/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,11 @@ class Room(TypedDict):
id: int


# list[dict[str, str | list[int]]] | list[dict[str, str | list[list[int]]]] | list[dict[str, str | int]] | int]'
class Destinations(TypedDict, total=False):
spots: NotRequired[Optional[List[Spot]]]
zones: NotRequired[Optional[List[Zone]]]
rooms: NotRequired[Optional[List[Room]]]
updated: NotRequired[Optional[float]]
updated: NotRequired[Optional[float | int]]


class RoomProperty(TypedDict):
Expand Down
131 changes: 19 additions & 112 deletions SCR/valetudo_map_parser/config/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ async def async_get_image(
try:
# Backup current image to last_image before processing new one
if hasattr(self.shared, "new_image") and self.shared.new_image is not None:
# Close old last_image to free memory before replacing it
if hasattr(self.shared, "last_image") and self.shared.last_image is not None:
try:
self.shared.last_image.close()
except Exception:
pass # Ignore errors if image is already closed
self.shared.last_image = self.shared.new_image

# Call the appropriate handler method based on handler type
Expand Down Expand Up @@ -231,15 +237,8 @@ def prepare_resize_params(
self, pil_img: PilPNG, rand: bool = False
) -> ResizeParams:
"""Prepare resize parameters for image resizing."""
if self.shared.image_rotate in [0, 180]:
width, height = pil_img.size
else:
height, width = pil_img.size
LOGGER.debug(
"Shared PIL image size: %s x %s",
self.shared.image_ref_width,
self.shared.image_ref_height,
)
width, height = pil_size_rotation(self.shared.image_rotate, pil_img)

return ResizeParams(
pil_img=pil_img,
width=width,
Expand Down Expand Up @@ -660,9 +659,6 @@ def get_corners(

async def async_resize_image(params: ResizeParams):
"""Resize the image to the given dimensions and aspect ratio."""
LOGGER.debug("Resizing image to aspect ratio: %s", params.aspect_ratio)
LOGGER.debug("Original image size: %s x %s", params.width, params.height)
LOGGER.debug("Image crop size: %s", params.crop_size)
if params.aspect_ratio == "None":
return params.pil_img
if params.aspect_ratio != "None":
Expand Down Expand Up @@ -699,6 +695,17 @@ async def async_resize_image(params: ResizeParams):
return params.pil_img


def pil_size_rotation(image_rotate, pil_img):
"""Return the size of the image."""
if not pil_img:
return 0, 0
if image_rotate in [0, 180]:
width, height = pil_img.size
else:
height, width = pil_img.size
return width, height


def initialize_drawing_config(handler):
"""
Initialize drawing configuration from device_info.
Expand All @@ -725,93 +732,6 @@ def initialize_drawing_config(handler):
return drawing_config, draw


def blend_colors(base_color, overlay_color):
"""
Blend two RGBA colors using alpha compositing.

Args:
base_color: Base RGBA color tuple (r, g, b, a)
overlay_color: Overlay RGBA color tuple (r, g, b, a)

Returns:
Blended RGBA color tuple (r, g, b, a)
"""
r1, g1, b1, a1 = base_color
r2, g2, b2, a2 = overlay_color

# Convert alpha to 0-1 range
a1 = a1 / 255.0
a2 = a2 / 255.0

# Calculate resulting alpha
a_out = a1 + a2 * (1 - a1)

# Avoid division by zero
if a_out < 0.0001:
return [0, 0, 0, 0]

# Calculate blended RGB components
r_out = (r1 * a1 + r2 * a2 * (1 - a1)) / a_out
g_out = (g1 * a1 + g2 * a2 * (1 - a1)) / a_out
b_out = (b1 * a1 + b2 * a2 * (1 - a1)) / a_out

# Convert back to 0-255 range and return as tuple
return (
int(max(0, min(255, r_out))),
int(max(0, min(255, g_out))),
int(max(0, min(255, b_out))),
int(max(0, min(255, a_out * 255))),
)


def blend_pixel(array, x, y, color, element, element_map=None, drawing_config=None):
"""
Blend a pixel color with the existing color at the specified position.
Also updates the element map if the new element has higher z-index.

Args:
array: The image array to modify
x: X coordinate
y: Y coordinate
color: RGBA color tuple to blend
element: Element code for the pixel
element_map: Optional element map to update
drawing_config: Optional drawing configuration for z-index lookup

Returns:
None
"""
# Check bounds
if not (0 <= y < array.shape[0] and 0 <= x < array.shape[1]):
return

# Get current element at this position
current_element = None
if element_map is not None:
current_element = element_map[y, x]

# Get z-index values for comparison
current_z = 0
new_z = 0

if drawing_config is not None:
current_z = (
drawing_config.get_property(current_element, "z_index", 0)
if current_element
else 0
)
new_z = drawing_config.get_property(element, "z_index", 0)

# Update element map if new element has higher z-index
if element_map is not None and new_z >= current_z:
element_map[y, x] = element

# Blend colors
base_color = array[y, x]
blended_color = blend_colors(base_color, color)
array[y, x] = blended_color


def manage_drawable_elements(
handler,
action,
Expand Down Expand Up @@ -993,12 +913,6 @@ async def async_extract_room_outline(

# If we found too few boundary points, use the rectangular outline
if len(boundary_points) < 8: # Need at least 8 points for a meaningful shape
LOGGER.debug(
"%s: Room %s has too few boundary points (%d), using rectangular outline",
file_name,
str(room_id_int),
len(boundary_points),
)
return rect_outline

# Use a more sophisticated algorithm to create a coherent outline
Expand Down Expand Up @@ -1034,13 +948,6 @@ def calculate_angle(point):
# Convert NumPy int64 values to regular Python integers
simplified_outline = [(int(x), int(y)) for x, y in simplified_outline]

LOGGER.debug(
"%s: Room %s outline has %d points",
file_name,
str(room_id_int),
len(simplified_outline),
)

return simplified_outline

except (ValueError, IndexError, TypeError, ArithmeticError) as e:
Expand Down
2 changes: 0 additions & 2 deletions SCR/valetudo_map_parser/hypfer_draw.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,6 @@ async def async_draw_zones(
zone_clean = self.img_h.data.find_zone_entities(m_json)
except (ValueError, KeyError):
zone_clean = None
else:
_LOGGER.info("%s: Got zones.", self.file_name)

if zone_clean:
# Process zones sequentially to avoid memory-intensive array copies
Expand Down
8 changes: 8 additions & 0 deletions SCR/valetudo_map_parser/hypfer_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,12 @@ async def async_get_image_from_json(
)
LOGGER.info("%s: Completed base Layers", self.file_name)
# Copy the new array in base layer.
# Delete old base layer before creating new one to free memory
if self.img_base_layer is not None:
del self.img_base_layer
self.img_base_layer = await self.async_copy_array(img_np_array)
# Delete source array after copying to free memory
del img_np_array

self.shared.frame_number = self.frame_number
self.frame_number += 1
Expand All @@ -268,6 +273,9 @@ async def async_get_image_from_json(
or self.img_work_layer.shape != self.img_base_layer.shape
or self.img_work_layer.dtype != self.img_base_layer.dtype
):
# Delete old buffer before creating new one to free memory
if self.img_work_layer is not None:
del self.img_work_layer
self.img_work_layer = np.empty_like(self.img_base_layer)

# Copy the base layer into the persistent working buffer (no new allocation per frame)
Expand Down
Loading
Loading