diff --git a/.github/workflows/code_quality.yaml b/.github/workflows/code_quality.yaml index c157039..19dbf32 100644 --- a/.github/workflows/code_quality.yaml +++ b/.github/workflows/code_quality.yaml @@ -26,7 +26,7 @@ jobs: uses: actions/setup-python@v6 id: python with: - python-version: "3.12" + python-version: "3.13" - name: Install Poetry run: pip install poetry diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index 5069751..e1d1b65 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -13,10 +13,10 @@ jobs: - uses: actions/checkout@v5 # Step 2: Set up Python - - name: Set up Python 3.12 + - name: Set up Python 3.13 uses: actions/setup-python@v6 with: - python-version: "3.12" + python-version: "3.13" # Step 3: Install Poetry - name: Install Poetry diff --git a/SCR/__init__.py b/SCR/__init__.py index b2dca22..2217ee2 100644 --- a/SCR/__init__.py +++ b/SCR/__init__.py @@ -1,2 +1,2 @@ """Valetudo map parser. -Version: 0.1.9""" +Version: 0.1.10""" diff --git a/SCR/valetudo_map_parser/__init__.py b/SCR/valetudo_map_parser/__init__.py index d9a8560..c5d0efa 100644 --- a/SCR/valetudo_map_parser/__init__.py +++ b/SCR/valetudo_map_parser/__init__.py @@ -1,5 +1,7 @@ """Valetudo map parser. -Version: 0.1.9""" +Version: 0.1.10""" + +from pathlib import Path from .config.colors import ColorsManagement from .config.drawable import Drawable @@ -19,14 +21,27 @@ NumpyArray, ImageSize, ) +from .config.status_text.status_text import StatusText +from .config.status_text.translations import translations as STATUS_TEXT_TRANSLATIONS from .hypfer_handler import HypferMapImageHandler from .rand256_handler import ReImageHandler from .rooms_handler import RoomsHandler, RandRoomsHandler +from .map_data import HyperMapData + + +def get_default_font_path() -> str: + """Return the absolute path to the bundled default font directory. + + This returns the path to the fonts folder; the caller can join a specific font file + to avoid hard-coding a particular font here. + """ + return str((Path(__file__).resolve().parent / "config" / "fonts").resolve()) __all__ = [ "RoomsHandler", "RandRoomsHandler", + "HyperMapData", "HypferMapImageHandler", "ReImageHandler", "RRMapParser", @@ -47,4 +62,7 @@ "PilPNG", "NumpyArray", "ImageSize", + "StatusText", + "STATUS_TEXT_TRANSLATIONS", + "get_default_font_path", ] diff --git a/SCR/valetudo_map_parser/config/auto_crop.py b/SCR/valetudo_map_parser/config/auto_crop.py index 5fdb542..5c7b10d 100644 --- a/SCR/valetudo_map_parser/config/auto_crop.py +++ b/SCR/valetudo_map_parser/config/auto_crop.py @@ -6,10 +6,9 @@ import logging import numpy as np -from numpy import rot90 from scipy import ndimage -from .async_utils import AsyncNumPy, make_async +from .async_utils import AsyncNumPy from .types import Color, NumpyArray, TrimCropData, TrimsData from .utils import BaseHandler @@ -91,7 +90,6 @@ def _calculate_trimmed_dimensions(self): async def _async_auto_crop_data(self, tdata: TrimsData): # , tdata=None """Load the auto crop data from the Camera config.""" - _LOGGER.debug("Auto Crop init data: %s, %s", str(tdata), str(self.auto_crop)) if not self.auto_crop: trims_data = TrimCropData.from_dict(dict(tdata.to_dict())).to_list() ( @@ -100,7 +98,6 @@ async def _async_auto_crop_data(self, tdata: TrimsData): # , tdata=None self.trim_right, self.trim_down, ) = trims_data - _LOGGER.debug("Auto Crop trims data: %s", trims_data) if trims_data != [0, 0, 0, 0]: self._calculate_trimmed_dimensions() else: @@ -118,10 +115,6 @@ def auto_crop_offset(self): async def _init_auto_crop(self): """Initialize the auto crop data.""" - _LOGGER.debug("Auto Crop Init data: %s", str(self.auto_crop)) - _LOGGER.debug( - "Auto Crop Init trims data: %r", self.handler.shared.trims.to_dict() - ) if not self.auto_crop: # and self.handler.shared.vacuum_state == "docked": self.auto_crop = await self._async_auto_crop_data(self.handler.shared.trims) if self.auto_crop: @@ -131,7 +124,6 @@ async def _init_auto_crop(self): # Fallback: Ensure auto_crop is valid if not self.auto_crop or any(v < 0 for v in self.auto_crop): - _LOGGER.debug("Auto-crop data unavailable. Scanning full image.") self.auto_crop = None return self.auto_crop @@ -164,14 +156,6 @@ async def async_image_margins( min_y, max_y = y_slice.start, y_slice.stop - 1 min_x, max_x = x_slice.start, x_slice.stop - 1 - _LOGGER.debug( - "%s: Found trims max and min values (y,x) (%s, %s) (%s, %s)...", - self.handler.file_name, - int(max_y), - int(max_x), - int(min_y), - int(min_x), - ) return min_y, min_x, max_x, max_y async def async_get_room_bounding_box( @@ -247,7 +231,7 @@ async def async_get_room_bounding_box( return None except Exception as e: - _LOGGER.error( + _LOGGER.warning( "%s: Error calculating room bounding box for '%s': %s", self.handler.file_name, room_name, @@ -403,7 +387,6 @@ async def async_auto_trim_and_zoom_image( try: self.auto_crop = await self._init_auto_crop() if (self.auto_crop is None) or (self.auto_crop == [0, 0, 0, 0]): - _LOGGER.debug("%s: Calculating auto trim box", self.handler.file_name) # Find the coordinates of the first occurrence of a non-background color min_y, min_x, max_x, max_y = await self.async_image_margins( image_array, detect_colour @@ -456,15 +439,7 @@ async def async_auto_trim_and_zoom_image( # Rotate the cropped image based on the given angle rotated = await self.async_rotate_the_image(trimmed, rotate) del trimmed # Free memory. - _LOGGER.debug( - "%s: Auto Trim Box data: %s", self.handler.file_name, self.crop_area - ) self.handler.crop_img_size = [rotated.shape[1], rotated.shape[0]] - _LOGGER.debug( - "%s: Auto Trimmed image size: %s", - self.handler.file_name, - self.handler.crop_img_size, - ) except RuntimeError as e: _LOGGER.warning( diff --git a/SCR/valetudo_map_parser/config/colors.py b/SCR/valetudo_map_parser/config/colors.py index b9c9f9c..50356c2 100644 --- a/SCR/valetudo_map_parser/config/colors.py +++ b/SCR/valetudo_map_parser/config/colors.py @@ -250,7 +250,7 @@ def add_alpha_to_rgb(alpha_channels, rgb_colors): List[Tuple[int, int, int, int]]: List of RGBA colors with alpha channel added. """ if len(alpha_channels) != len(rgb_colors): - LOGGER.error("Input lists must have the same length.") + LOGGER.warning("Input lists must have the same length.") return [] # Fast path for empty lists @@ -357,7 +357,7 @@ def set_initial_colours(self, device_info: dict) -> None: self.color_cache.clear() except (ValueError, IndexError, UnboundLocalError) as e: - LOGGER.error("Error while populating colors: %s", e) + LOGGER.warning("Error while populating colors: %s", e) def initialize_user_colors(self, device_info: dict) -> List[Color]: """ diff --git a/SCR/valetudo_map_parser/config/drawable.py b/SCR/valetudo_map_parser/config/drawable.py index bb452d2..919c785 100644 --- a/SCR/valetudo_map_parser/config/drawable.py +++ b/SCR/valetudo_map_parser/config/drawable.py @@ -11,6 +11,7 @@ from __future__ import annotations import logging +from pathlib import Path import numpy as np from PIL import Image, ImageDraw, ImageFont @@ -874,11 +875,25 @@ def status_text( position: bool, ) -> None: """Draw the status text on the image.""" - path_default_font = ( - "custom_components/mqtt_vacuum_camera/utils/fonts/FiraSans.ttf" - ) - default_font = ImageFont.truetype(path_default_font, size) - user_font = ImageFont.truetype(path_font, size) + module_dir = Path(__file__).resolve().parent + default_font_path = module_dir / "fonts" / "FiraSans.ttf" + # Load default font with safety fallback to PIL's built-in if missing + try: + default_font = ImageFont.truetype(str(default_font_path), size) + except OSError: + _LOGGER.warning( + "Default font not found at %s; using PIL default font", + default_font_path, + ) + default_font = ImageFont.load_default() + + # Use provided font directly if available; else fall back to default + user_font = default_font + if path_font: + try: + user_font = ImageFont.truetype(str(path_font), size) + except OSError: + user_font = default_font if position: x, y = 10, 10 else: diff --git a/SCR/valetudo_map_parser/config/fonts/FiraSans.ttf b/SCR/valetudo_map_parser/config/fonts/FiraSans.ttf new file mode 100644 index 0000000..6f80647 Binary files /dev/null and b/SCR/valetudo_map_parser/config/fonts/FiraSans.ttf differ diff --git a/SCR/valetudo_map_parser/config/fonts/Inter-VF.ttf b/SCR/valetudo_map_parser/config/fonts/Inter-VF.ttf new file mode 100644 index 0000000..e724708 Binary files /dev/null and b/SCR/valetudo_map_parser/config/fonts/Inter-VF.ttf differ diff --git a/SCR/valetudo_map_parser/config/fonts/Lato-Regular.ttf b/SCR/valetudo_map_parser/config/fonts/Lato-Regular.ttf new file mode 100644 index 0000000..bb2e887 Binary files /dev/null and b/SCR/valetudo_map_parser/config/fonts/Lato-Regular.ttf differ diff --git a/SCR/valetudo_map_parser/config/fonts/MPLUSRegular.ttf b/SCR/valetudo_map_parser/config/fonts/MPLUSRegular.ttf new file mode 100644 index 0000000..c8a6a55 Binary files /dev/null and b/SCR/valetudo_map_parser/config/fonts/MPLUSRegular.ttf differ diff --git a/SCR/valetudo_map_parser/config/fonts/NotoKufiArabic-VF.ttf b/SCR/valetudo_map_parser/config/fonts/NotoKufiArabic-VF.ttf new file mode 100644 index 0000000..451ca36 Binary files /dev/null and b/SCR/valetudo_map_parser/config/fonts/NotoKufiArabic-VF.ttf differ diff --git a/SCR/valetudo_map_parser/config/fonts/NotoSansCJKhk-VF.ttf b/SCR/valetudo_map_parser/config/fonts/NotoSansCJKhk-VF.ttf new file mode 100644 index 0000000..30066c0 Binary files /dev/null and b/SCR/valetudo_map_parser/config/fonts/NotoSansCJKhk-VF.ttf differ diff --git a/SCR/valetudo_map_parser/config/fonts/NotoSansKhojki.ttf b/SCR/valetudo_map_parser/config/fonts/NotoSansKhojki.ttf new file mode 100644 index 0000000..64b9823 Binary files /dev/null and b/SCR/valetudo_map_parser/config/fonts/NotoSansKhojki.ttf differ diff --git a/SCR/valetudo_map_parser/config/shared.py b/SCR/valetudo_map_parser/config/shared.py index 6503303..839b1e7 100755 --- a/SCR/valetudo_map_parser/config/shared.py +++ b/SCR/valetudo_map_parser/config/shared.py @@ -1,7 +1,7 @@ """ Class Camera Shared. Keep the data between the modules. -Version: v0.1.9 +Version: v0.1.10 """ import asyncio @@ -58,6 +58,7 @@ def __init__(self, file_name): self.frame_number: int = 0 # camera Frame number self.destinations: list = [] # MQTT rand destinations self.rand256_active_zone: list = [] # Active zone for rand256 + self.rand256_zone_coordinates: list = [] # Active zone coordinates for rand256 self.is_rand: bool = False # MQTT rand data self._new_mqtt_message = False # New MQTT message # Initialize last_image with default gray image (250x150 minimum) @@ -69,6 +70,7 @@ def __init__(self, file_name): self.image_last_updated: float = 0.0 # Last image update time self.image_format = "image/pil" # Image format self.image_size = None # Image size + self.robot_size = None # Robot size self.image_auto_zoom: bool = False # Auto zoom image self.image_zoom_lock_ratio: bool = True # Zoom lock ratio self.image_ref_height: int = 0 # Image reference height @@ -81,8 +83,7 @@ def __init__(self, file_name): self.user_colors = Colors # User base colors self.rooms_colors = Colors # Rooms colors self.vacuum_battery = 0 # Vacuum battery state - self.vacuum_bat_charged: bool = True # Vacuum charged and ready - self.vacuum_connection = None # Vacuum connection state + self.vacuum_connection = False # Vacuum connection state self.vacuum_state = None # Vacuum state self.charger_position = None # Vacuum Charger position self.show_vacuum_state = None # Show vacuum state on the map @@ -197,14 +198,13 @@ def generate_attributes(self) -> dict: attrs = { ATTR_CAMERA_MODE: self.camera_mode, ATTR_VACUUM_BATTERY: f"{self.vacuum_battery}%", - ATTR_VACUUM_CHARGING: self.vacuum_bat_charged, + ATTR_VACUUM_CHARGING: self.vacuum_bat_charged(), ATTR_VACUUM_POSITION: self.current_room, ATTR_VACUUM_STATUS: self.vacuum_state, ATTR_VACUUM_JSON_ID: self.vac_json_id, ATTR_CALIBRATION_POINTS: self.attr_calibration_points, } if self.obstacles_pos and self.vacuum_ips: - _LOGGER.debug("Generating obstacle links from: %s", self.obstacles_pos) self.obstacles_data = self._compose_obstacle_links( self.vacuum_ips, self.obstacles_pos ) @@ -302,19 +302,30 @@ def update_shared_data(self, device_info): ) # Ensure trims are updated correctly trim_data = device_info.get("trims_data", DEFAULT_VALUES["trims_data"]) - _LOGGER.debug( - "%s: Updating shared trims with: %s", instance.file_name, trim_data - ) instance.trims = TrimsData.from_dict(trim_data) + # Robot size + robot_size = device_info.get("robot_size", 25) + try: + robot_size = int(robot_size) + except (ValueError, TypeError): + robot_size = 25 + # Clamp robot_size to [8, 25] + if robot_size < 8: + robot_size = 8 + elif robot_size > 25: + robot_size = 25 + instance.robot_size = robot_size except TypeError as ex: - _LOGGER.error("Shared data can't be initialized due to a TypeError! %s", ex) + _LOGGER.warning( + "Shared data can't be initialized due to a TypeError! %s", ex + ) except AttributeError as ex: - _LOGGER.error( + _LOGGER.warning( "Shared data can't be initialized due to an AttributeError! %s", ex ) except RuntimeError as ex: - _LOGGER.error( + _LOGGER.warning( "An unexpected error occurred while initializing shared data %s:", ex ) diff --git a/SCR/valetudo_map_parser/config/status_text/status_text.py b/SCR/valetudo_map_parser/config/status_text/status_text.py new file mode 100644 index 0000000..720ec2f --- /dev/null +++ b/SCR/valetudo_map_parser/config/status_text/status_text.py @@ -0,0 +1,95 @@ +""" +Version: 0.1.10 +Status text of the vacuum cleaners. +Class to handle the status text of the vacuum cleaners. +""" + +from __future__ import annotations + +from ..types import LOGGER, PilPNG +from .translations import translations + +LOGGER.propagate = True + + +class StatusText: + """ + Status text of the vacuum cleaners. + """ + + def __init__(self, camera_shared): + self._shared = camera_shared + self.file_name = self._shared.file_name + + @staticmethod + async def get_vacuum_status_translation( + language: str = "en", + ) -> dict[str, str] | None: + """ + Get the vacuum status translation. + @param language: Language code, default 'en'. + @return: Mapping for the given language or None. + """ + return translations.get((language or "en").lower()) + + async def translate_vacuum_status(self) -> str: + """Return the translated status with EN fallback and safe default.""" + status = self._shared.vacuum_state or "unknown" + language = (self._shared.user_language or "en").lower() + translation = await self.get_vacuum_status_translation(language) + if not translation: + translation = translations.get("en", {}) + return translation.get(status, str(status).capitalize()) + + async def get_status_text(self, text_img: PilPNG) -> tuple[list[str], int]: + """ + Compose the image status text. + :param text_img: Image to draw the text on. + :return status_text, text_size: List of the status text and the text size. + """ + status_text = ["If you read me, something really went wrong.."] # default text + text_size_coverage = 1.5 # resize factor for the text + text_size = self._shared.vacuum_status_size # default text size + charge_level = "\u03de" # unicode Koppa symbol + charging = "\u2211" # unicode Charging symbol + vacuum_state = await self.translate_vacuum_status() + if self._shared.show_vacuum_state: + status_text = [f"{self.file_name}: {vacuum_state}"] + language = (self._shared.user_language or "en").lower() + lang_map = translations.get(language) or translations.get("en", {}) + if not self._shared.vacuum_connection: + mqtt_disc = lang_map.get( + "mqtt_disconnected", + translations.get("en", {}).get( + "mqtt_disconnected", "Disconnected from MQTT?" + ), + ) + status_text = [f"{self.file_name}: {mqtt_disc}"] + else: + if self._shared.current_room: + in_room = self._shared.current_room.get("in_room") + if in_room: + status_text.append(f" ({in_room})") + if self._shared.vacuum_state == "docked": + if self._shared.vacuum_bat_charged(): + status_text.append(" \u00b7 ") + status_text.append(f"{charging}{charge_level} ") + status_text.append(f"{self._shared.vacuum_battery}%") + else: + status_text.append(" \u00b7 ") + status_text.append(f"{charge_level} ") + ready_txt = lang_map.get( + "ready", + translations.get("en", {}).get("ready", "Ready."), + ) + status_text.append(ready_txt) + else: + status_text.append(" \u00b7 ") + status_text.append(f"{charge_level}") + status_text.append(f" {self._shared.vacuum_battery}%") + if text_size >= 50 and getattr(text_img, "width", None): + text_pixels = max(1, sum(len(text) for text in status_text)) + text_size = int( + (text_size_coverage * text_img.width) // text_pixels + ) + return status_text, text_size diff --git a/SCR/valetudo_map_parser/config/status_text/translations.py b/SCR/valetudo_map_parser/config/status_text/translations.py new file mode 100644 index 0000000..25cb009 --- /dev/null +++ b/SCR/valetudo_map_parser/config/status_text/translations.py @@ -0,0 +1,280 @@ +"""Translations for vacuum status and status text snippets.""" + +translations = { + "en": { + "connected": "Connected", + "disconnected": "Disconnected", + "charging": "Charging", + "cleaning": "Cleaning", + "docked": "Docked", + "idle": "Idle", + "paused": "Paused", + "returning": "Returning", + "ready": "Ready.", + "mqtt_disconnected": "Disconnected from MQTT?", + }, + "de": { + "connected": "Verbunden", + "disconnected": "Nicht verbunden", + "charging": "Laden", + "cleaning": "Reinigen", + "docked": "Angefahren", + "idle": "Inaktiv", + "paused": "Pausiert", + "returning": "Zurückkehren", + "ready": "Bereit.", + "mqtt_disconnected": "Von MQTT getrennt?", + }, + "fr": { + "connected": "Connecté", + "disconnected": "Déconnecté", + "charging": "En charge", + "cleaning": "Nettoyage", + "docked": "Ancré", + "idle": "Inactif", + "paused": "En pause", + "returning": "Retour", + "ready": "Prêt.", + "mqtt_disconnected": "Déconnecté de MQTT ?", + }, + "it": { + "connected": "Connesso", + "disconnected": "Disconnesso", + "charging": "Caricamento", + "cleaning": "Pulizia", + "docked": "Ancorato", + "idle": "Inattivo", + "paused": "In pausa", + "returning": "Ritorno", + "ready": "Pronto.", + "mqtt_disconnected": "Disconnesso da MQTT?", + }, + "pl": { + "connected": "Połączony", + "disconnected": "Rozłączony", + "charging": "Ładowanie", + "cleaning": "Czyszczenie", + "docked": "Zaparkowany", + "idle": "Nieaktywny", + "paused": "Wstrzymany", + "returning": "Powrót", + "ready": "Gotowy.", + "mqtt_disconnected": "Odłączono od MQTT?", + }, + "pt": { + "connected": "Conectado", + "disconnected": "Desconectado", + "charging": "Carregando", + "cleaning": "Limpando", + "docked": "Anclado", + "idle": "Inativo", + "paused": "Pausado", + "returning": "Retornando", + "ready": "Pronto.", + "mqtt_disconnected": "Desconectado do MQTT?", + }, + "ru": { + "connected": "Подключен", + "disconnected": "Отключен", + "charging": "Заряжается", + "cleaning": "Очищается", + "docked": "Закреплен", + "idle": "Неактивен", + "paused": "Приостановлен", + "returning": "Возвращается", + "ready": "Готово.", + "mqtt_disconnected": "Отключено от MQTT?", + }, + "tr": { + "connected": "Bağlandı", + "disconnected": "Bağlantı kesildi", + "charging": "Şarj ediliyor", + "cleaning": "Temizleniyor", + "docked": "Dokundu", + "idle": "Boşta", + "paused": "Duraklatıldı", + "returning": "Geri dönüyor", + "ready": "Hazır.", + "mqtt_disconnected": "MQTT bağlantısı kesildi mi?", + }, + "zh": { + "connected": "已连接", + "disconnected": "未连接", + "charging": "充电中", + "cleaning": "清扫中", + "docked": "已停靠", + "idle": "空闲", + "paused": "已暂停", + "returning": "返回中", + "ready": "已就绪。", + "mqtt_disconnected": "MQTT 已断开?", + }, + "ja": { + "connected": "接続済み", + "disconnected": "切断されました", + "charging": "充電中", + "cleaning": "掃除中", + "docked": "停泊中", + "idle": "アイドル", + "paused": "一時停止中", + "returning": "戻り中", + "ready": "準備完了。", + "mqtt_disconnected": "MQTT から切断されていますか?", + }, + "ko": { + "connected": "연결됨", + "disconnected": "연결 해제됨", + "charging": "충전 중", + "cleaning": "청소 중", + "docked": "도킹됨", + "idle": "대기 중", + "paused": "일시 중지됨", + "returning": "돌아오는 중", + "ready": "준비 완료.", + "mqtt_disconnected": "MQTT에서 연결이 끊겼나요?", + }, + "ar": { + "connected": "متصل", + "disconnected": "غير متصل", + "charging": "جار الت충ّد", + "cleaning": "washering", + "docked": "anchored", + "idle": "iddle", + "paused": "paused", + "returning": "returning", + "ready": "جاهز.", + "mqtt_disconnected": "هل تم قطع الاتصال بـ MQTT؟", + }, + "hi": { + "connected": "संयुक्त", + "disconnected": "असंयुक्त", + "charging": "चार्जिंग", + "cleaning": "साफ़ कर रहा है", + "docked": "डॉक में", + "idle": "रूक गया है", + "paused": "रूक गया है", + "returning": "वापस आ रहा है", + "ready": "तैयार.", + "mqtt_disconnected": "MQTT से डिसकनेक्ट?", + }, + "bn": { + "connected": "যোগাযোগ করা হয়েছে", + "disconnected": "বিরাজিত", + "charging": "চার্জিং", + "cleaning": "সাফ করা হচ্ছে", + "docked": "ডকেড", + "idle": "বিনা কর্মে", + "paused": "বিরত", + "returning": "পুনরায় আসছে", + "ready": "প্রস্তুত।", + "mqtt_disconnected": "MQTT থেকে সংযোগ বিচ্ছিন্ন?", + }, + "sv": { + "connected": "Ansluten", + "disconnected": "Frånkopplad", + "charging": "Laddar", + "cleaning": "Rensar", + "docked": "Anknyttad", + "idle": "Väntande", + "paused": "Paus", + "returning": "Återvänder", + "ready": "Klar.", + "mqtt_disconnected": "Frånkopplad från MQTT?", + }, + "fn": { + "connected": "Konektado", + "disconnected": "Hindi konektado", + "charging": "Kinakarga", + "cleaning": "Binabagay", + "docked": "Ankore", + "idle": "Idle", + "paused": "Napupaua", + "returning": "Nagbabalik", + "ready": "Handa.", + "mqtt_disconnected": "Na-disconnect sa MQTT?", + }, + "no": { + "connected": "Tilkoblet", + "disconnected": "Frakoblet", + "charging": "Lader", + "cleaning": "Renser", + "docked": "Ankoblet", + "idle": "Inaktiv", + "paused": "Pause", + "returning": "Gir tilbake", + "ready": "Klar.", + "mqtt_disconnected": "Frakoblet fra MQTT?", + }, + "cz": { + "connected": "Připojeno", + "disconnected": "Odpojeno", + "charging": "Nabíjení", + "cleaning": "Čištění", + "docked": "Zaparkováno", + "idle": "Nečinný", + "paused": "Pozastaveno", + "returning": "Vrací se", + "ready": "Připraven.", + "mqtt_disconnected": "Odpojeno od MQTT?", + }, + "da": { + "connected": "Tilsluttet", + "disconnected": "Afvist", + "charging": "Oplader", + "cleaning": "Renser", + "docked": "Ankeret", + "idle": "Inaktiv", + "paused": "Pause", + "returning": "Returnerer", + "ready": "Klar.", + "mqtt_disconnected": "Afbrudt fra MQTT?", + }, + "fi": { + "connected": "Yhdistetty", + "disconnected": "Yhteys katkaistu", + "charging": "Lataa", + "cleaning": "Siivous", + "docked": "Ankeroitu", + "idle": "Lähes", + "paused": "Tauko", + "returning": "Palauttaa", + "ready": "Valmis.", + "mqtt_disconnected": "Yhteys katkennut MQTT:stä?", + }, + "el": { + "connected": "Συνδεδεμένος", + "disconnected": "Αποσυνδεδεμένος", + "charging": "Φορτώνει", + "cleaning": "Καθαρισμός", + "docked": "Ανκερώθηκε", + "idle": "Αδρανής", + "paused": "Παύση", + "returning": "Επιστρέφει", + "ready": "Έτοιμο.", + "mqtt_disconnected": "Αποσυνδεδεμένο από MQTT;", + }, + "es": { + "connected": "Conectado", + "disconnected": "Desconectado", + "charging": "Cargando", + "cleaning": "Limpiando", + "docked": "Anclado", + "idle": "Inactivo", + "paused": "Pausado", + "returning": "Regresando", + "ready": "Listo.", + "mqtt_disconnected": "¿Desconectado de MQTT?", + }, + "nl": { + "connected": "Verbonden", + "disconnected": "Niet verbonden", + "charging": "Laden", + "cleaning": "Schoonmaken", + "docked": "Aangeschoten", + "idle": "Inactief", + "paused": "Gepauzeerd", + "returning": "Terugkeren", + "ready": "Klaar.", + "mqtt_disconnected": "Verbinding met MQTT verbroken?", + }, +} diff --git a/SCR/valetudo_map_parser/config/types.py b/SCR/valetudo_map_parser/config/types.py index 5fde73f..9a56022 100644 --- a/SCR/valetudo_map_parser/config/types.py +++ b/SCR/valetudo_map_parser/config/types.py @@ -81,6 +81,8 @@ def __new__(cls, vacuum_id: str, rooms_data: Optional[dict] = None) -> "RoomStor instance = super(RoomStore, cls).__new__(cls) instance.vacuum_id = vacuum_id instance.vacuums_data = rooms_data or {} + instance.rooms_count = instance.get_rooms_count() + instance.floor = None cls._instances[vacuum_id] = instance else: if rooms_data is not None: @@ -126,10 +128,10 @@ async def set_user_language(self, user_id: str, language: str) -> None: async with self._lock: self.user_languages[user_id] = language - async def get_user_language(self, user_id: str) -> str or None: + async def get_user_language(self, user_id: str) -> str: """Get the user language.""" async with self._lock: - return self.user_languages.get(user_id, None) + return self.user_languages.get(user_id, "") async def get_all_languages(self): """Get all the user languages.""" @@ -197,13 +199,13 @@ async def async_set_vacuum_json(self, vacuum_id: str, json_data: Any) -> None: Color = Union[Tuple[int, int, int], Tuple[int, int, int, int]] Colors = Dict[str, Color] CalibrationPoints = list[dict[str, Any]] -RobotPosition = dict[str, int | float] +RobotPosition = Optional[dict[str, Union[int | float]]] ChargerPosition = dict[str, Any] RoomsProperties = dict[str, RoomProperty] ImageSize = dict[str, int | list[int]] +Size = dict[str, int] JsonType = Any # json.loads() return type is Any PilPNG = Image.Image # Keep for backward compatibility -WebPBytes = bytes # WebP image as bytes NumpyArray = np.ndarray Point = Tuple[int, int] @@ -283,7 +285,7 @@ async def async_set_vacuum_json(self, vacuum_id: str, json_data: Any) -> None: "auto_zoom": False, "zoom_lock_ratio": True, "show_vac_status": False, - "vac_status_font": "custom_components/mqtt_vacuum_camera/utils/fonts/FiraSans.ttf", + "vac_status_font": "SCR/valetudo_map_parser/config/fonts/FiraSans.ttf", "vac_status_size": 50, "vac_status_position": True, "get_svg_file": False, @@ -443,31 +445,31 @@ async def async_set_vacuum_json(self, vacuum_id: str, json_data: Any) -> None: FONTS_AVAILABLE = [ { "label": "Fira Sans", - "value": "custom_components/mqtt_vacuum_camera/utils/fonts/FiraSans.ttf", + "value": "config/fonts/FiraSans.ttf", }, { "label": "Inter", - "value": "custom_components/mqtt_vacuum_camera/utils/fonts/Inter-VF.ttf", + "value": "config/fonts/Inter-VF.ttf", }, { "label": "M Plus Regular", - "value": "custom_components/mqtt_vacuum_camera/utils/fonts/MPLUSRegular.ttf", + "value": "config/fonts/MPLUSRegular.ttf", }, { "label": "Noto Sans CJKhk", - "value": "custom_components/mqtt_vacuum_camera/utils/fonts/NotoSansCJKhk-VF.ttf", + "value": "config/fonts/NotoSansCJKhk-VF.ttf", }, { "label": "Noto Kufi Arabic", - "value": "custom_components/mqtt_vacuum_camera/utils/fonts/NotoKufiArabic-VF.ttf", + "value": "config/fonts/NotoKufiArabic-VF.ttf", }, { "label": "Noto Sans Khojki", - "value": "custom_components/mqtt_vacuum_camera/utils/fonts/NotoSansKhojki.ttf", + "value": "config/fonts/NotoSansKhojki.ttf", }, { "label": "Lato Regular", - "value": "custom_components/mqtt_vacuum_camera/utils/fonts/Lato-Regular.ttf", + "value": "config/fonts/Lato-Regular.ttf", }, ] diff --git a/SCR/valetudo_map_parser/config/utils.py b/SCR/valetudo_map_parser/config/utils.py index e7cda32..ce479b1 100644 --- a/SCR/valetudo_map_parser/config/utils.py +++ b/SCR/valetudo_map_parser/config/utils.py @@ -14,15 +14,18 @@ from .drawable import Drawable from .drawable_elements import DrawingConfig from .enhanced_drawable import EnhancedDrawable +from .status_text.status_text import StatusText + from .types import ( LOGGER, ChargerPosition, - ImageSize, + Size, NumpyArray, PilPNG, RobotPosition, - WebPBytes, ) +from ..map_data import HyperMapData +from .async_utils import AsyncNumPy @dataclass @@ -69,15 +72,19 @@ def __init__(self): self.crop_img_size = [0, 0] self.offset_x = 0 self.offset_y = 0 - self.crop_area = None + self.crop_area = [0, 0, 0, 0] self.zooming = False self.async_resize_images = async_resize_image + # Drawing components are initialized by initialize_drawing_config in handlers + self.drawing_config: Optional[DrawingConfig] = None + self.draw: Optional[Drawable] = None + self.enhanced_draw: Optional[EnhancedDrawable] = None def get_frame_number(self) -> int: """Return the frame number of the image.""" return self.frame_number - def get_robot_position(self) -> RobotPosition | None: + def get_robot_position(self) -> RobotPosition: """Return the robot position.""" return self.robot_pos @@ -86,7 +93,7 @@ async def async_get_image( m_json: dict | None, destinations: list | None = None, bytes_format: bool = False, - ) -> PilPNG | None: + ) -> PilPNG | bytes: """ Unified async function to get PIL image from JSON data for both Hypfer and Rand256 handlers. @@ -99,6 +106,8 @@ async def async_get_image( @param m_json: The JSON data to use to draw the image @param destinations: MQTT destinations for labels (used by Rand256) @param bytes_format: If True, also convert to PNG bytes and store in shared.binary_image + @param text_enabled: If True, draw text on the image + @param vacuum_status: Vacuum status to display on the image @return: PIL Image or None """ try: @@ -112,13 +121,12 @@ async def async_get_image( new_image = await self.get_image_from_rrm( m_json=m_json, destinations=destinations, - return_webp=False, # Always return PIL Image ) elif hasattr(self, "async_get_image_from_json"): # This is a Hypfer handler + self.json_data = await HyperMapData.async_from_valetudo_json(m_json) new_image = await self.async_get_image_from_json( m_json=m_json, - return_webp=False, # Always return PIL Image ) else: LOGGER.warning( @@ -134,27 +142,31 @@ async def async_get_image( # Store the new image in shared data if new_image is not None: self.shared.new_image = new_image - + if self.shared.show_vacuum_state: + text_editor = StatusText(self.shared) + img_text = await text_editor.get_status_text(new_image) + Drawable.status_text( + new_image, + img_text[1], + self.shared.user_colors[8], + img_text[0], + self.shared.vacuum_status_font, + self.shared.vacuum_status_position, + ) # Convert to binary (PNG bytes) if requested if bytes_format: - with io.BytesIO() as buf: - new_image.save(buf, format="PNG", compress_level=1) - self.shared.binary_image = buf.getvalue() - LOGGER.debug( - "%s: Binary image conversion completed", self.file_name - ) + self.shared.binary_image = pil_to_png_bytes(new_image) else: - self.shared.binary_image = None + self.shared.binary_image = pil_to_png_bytes(self.shared.last_image) # Update the timestamp with current datetime self.shared.image_last_updated = datetime.datetime.fromtimestamp(time()) - LOGGER.debug( - "%s: Image processed and stored in shared data", self.file_name - ) return new_image else: LOGGER.warning( "%s: Failed to generate image from JSON data", self.file_name ) + if bytes_format and hasattr(self.shared, "last_image"): + return pil_to_png_bytes(self.shared.last_image) return ( self.shared.last_image if hasattr(self.shared, "last_image") @@ -162,7 +174,7 @@ async def async_get_image( ) except Exception as e: - LOGGER.error( + LOGGER.warning( "%s: Error in async_get_image: %s", self.file_name, str(e), @@ -172,11 +184,29 @@ async def async_get_image( self.shared.last_image if hasattr(self.shared, "last_image") else None ) + def prepare_resize_params(self, pil_img: PilPNG, rand: bool=False) -> ResizeParams: + """Prepare resize parameters for image resizing.""" + if self.shared.image_rotate in [0, 180]: + width, height = pil_img.size + else: + height, width = pil_img.size + LOGGER.debug("Shared PIL image size: %s x %s", self.shared.image_ref_width, + self.shared.image_ref_height) + return ResizeParams( + pil_img=pil_img, + width=width, + height=height, + aspect_ratio=self.shared.image_aspect_ratio, + crop_size=self.crop_img_size, + offset_func=self.async_map_coordinates_offset, + is_rand=rand, + ) + def get_charger_position(self) -> ChargerPosition | None: """Return the charger position.""" return self.charger_pos - def get_img_size(self) -> ImageSize | None: + def get_img_size(self) -> Size | None: """Return the size of the image.""" return self.img_size @@ -194,6 +224,30 @@ def check_zoom_and_aspect_ratio(self) -> bool: or self.shared.image_aspect_ratio != "None" ) + # Element selection methods centralized here + def enable_element(self, element_code): + """Enable drawing of a specific element.""" + if hasattr(self, "drawing_config") and self.drawing_config is not None: + self.drawing_config.enable_element(element_code) + + def disable_element(self, element_code): + """Disable drawing of a specific element.""" + manage_drawable_elements(self, "disable", element_code=element_code) + + def set_elements(self, element_codes: list): + """Enable only the specified elements, disable all others.""" + manage_drawable_elements(self, "set_elements", element_codes=element_codes) + + def set_element_property(self, element_code, property_name: str, value): + """Set a drawing property for an element.""" + manage_drawable_elements( + self, + "set_property", + element_code=element_code, + property_name=property_name, + value=value, + ) + def _set_image_offset_ratio_1_1( self, width: int, height: int, rand256: Optional[bool] = False ) -> None: @@ -214,12 +268,6 @@ def _set_image_offset_ratio_1_1( elif rotation in [90, 270]: self.offset_y = (self.crop_img_size[0] - width) // 2 self.offset_x = self.crop_img_size[1] - height - LOGGER.debug( - "%s Image Coordinates Offsets (x,y): %s. %s", - self.file_name, - self.offset_x, - self.offset_y, - ) def _set_image_offset_ratio_2_1( self, width: int, height: int, rand256: Optional[bool] = False @@ -242,13 +290,6 @@ def _set_image_offset_ratio_2_1( self.offset_x = width - self.crop_img_size[0] self.offset_y = height - self.crop_img_size[1] - LOGGER.debug( - "%s Image Coordinates Offsets (x,y): %s. %s", - self.file_name, - self.offset_x, - self.offset_y, - ) - def _set_image_offset_ratio_3_2( self, width: int, height: int, rand256: Optional[bool] = False ) -> None: @@ -273,13 +314,6 @@ def _set_image_offset_ratio_3_2( self.offset_y = (self.crop_img_size[0] - width) // 2 self.offset_x = self.crop_img_size[1] - height - LOGGER.debug( - "%s Image Coordinates Offsets (x,y): %s. %s", - self.file_name, - self.offset_x, - self.offset_y, - ) - def _set_image_offset_ratio_5_4( self, width: int, height: int, rand256: Optional[bool] = False ) -> None: @@ -305,13 +339,6 @@ def _set_image_offset_ratio_5_4( self.offset_y = (self.crop_img_size[0] - width) // 2 self.offset_x = self.crop_img_size[1] - height - LOGGER.debug( - "%s Image Coordinates Offsets (x,y): %s. %s", - self.file_name, - self.offset_x, - self.offset_y, - ) - def _set_image_offset_ratio_9_16( self, width: int, height: int, rand256: Optional[bool] = False ) -> None: @@ -333,13 +360,6 @@ def _set_image_offset_ratio_9_16( self.offset_x = width - self.crop_img_size[0] self.offset_y = height - self.crop_img_size[1] - LOGGER.debug( - "%s Image Coordinates Offsets (x,y): %s. %s", - self.file_name, - self.offset_x, - self.offset_y, - ) - def _set_image_offset_ratio_16_9( self, width: int, height: int, rand256: Optional[bool] = False ) -> None: @@ -361,13 +381,6 @@ def _set_image_offset_ratio_16_9( self.offset_x = width - self.crop_img_size[0] self.offset_y = height - self.crop_img_size[1] - LOGGER.debug( - "%s Image Coordinates Offsets (x,y): %s. %s", - self.file_name, - self.offset_x, - self.offset_y, - ) - async def async_map_coordinates_offset( self, params: OffsetParams ) -> tuple[int, int]: @@ -414,15 +427,21 @@ async def calculate_array_hash( return hashlib.sha256(data_json.encode()).hexdigest() return None - @staticmethod - async def async_copy_array(original_array: NumpyArray) -> NumpyArray: - """Copy the array.""" - return NumpyArray.copy(original_array) + async def async_copy_array(self, original_array: NumpyArray) -> NumpyArray: + """Copy the array using AsyncNumPy to yield control to the event loop.""" + return await AsyncNumPy.async_copy(original_array) def get_map_points( self, ) -> list[dict[str, int] | dict[str, int] | dict[str, int] | dict[str, int]]: """Return the map points.""" + if not self.crop_img_size: + return [ + {"x": 0, "y": 0}, + {"x": 0, "y": 0}, + {"x": 0, "y": 0}, + {"x": 0, "y": 0}, + ] return [ {"x": 0, "y": 0}, # Top-left corner 0 {"x": self.crop_img_size[0], "y": 0}, # Top-right corner 1 @@ -435,7 +454,13 @@ def get_map_points( def get_vacuum_points(self, rotation_angle: int) -> list[dict[str, int]]: """Calculate the calibration points based on the rotation angle.""" - + if not self.crop_area: + return [ + {"x": 0, "y": 0}, + {"x": 0, "y": 0}, + {"x": 0, "y": 0}, + {"x": 0, "y": 0}, + ] # get_calibration_data vacuum_points = [ { @@ -528,7 +553,8 @@ def re_get_vacuum_points(self, rotation_angle: int) -> list[dict[str, int]]: return vacuum_points - async def async_zone_propriety(self, zones_data) -> dict: + @staticmethod + async def async_zone_propriety(zones_data) -> dict: """Get the zone propriety""" zone_properties = {} id_count = 1 @@ -546,10 +572,11 @@ async def async_zone_propriety(self, zones_data) -> dict: } id_count += 1 if id_count > 1: - LOGGER.debug("%s: Zones Properties updated.", self.file_name) + pass return zone_properties - async def async_points_propriety(self, points_data) -> dict: + @staticmethod + async def async_points_propriety(points_data) -> dict: """Get the point propriety""" point_properties = {} id_count = 1 @@ -567,7 +594,7 @@ async def async_points_propriety(self, points_data) -> dict: } id_count += 1 if id_count > 1: - LOGGER.debug("%s: Point Properties updated.", self.file_name) + pass return point_properties @staticmethod @@ -585,8 +612,14 @@ def get_corners( async def async_resize_image(params: ResizeParams): """Resize the image to the given dimensions and aspect ratio.""" - if params.aspect_ratio: - wsf, hsf = [int(x) for x in params.aspect_ratio.split(",")] + LOGGER.debug("Resizing image to aspect ratio: %s", params.aspect_ratio) + LOGGER.debug("Original image size: %s x %s", params.width, params.height) + LOGGER.debug("Image crop size: %s", params.crop_size) + if params.aspect_ratio == "None": + return params.pil_img + if params.aspect_ratio != "None": + ratio = params.aspect_ratio.replace(",", ":").replace(" ", "") + wsf, hsf = [int(x) for x in ratio.split(":")] if wsf == 0 or hsf == 0 or params.width <= 0 or params.height <= 0: LOGGER.warning( @@ -609,29 +642,13 @@ async def async_resize_image(params: ResizeParams): new_width = params.pil_img.width new_height = int(params.pil_img.width / new_aspect_ratio) - LOGGER.debug("Resizing image to aspect ratio: %s, %s", wsf, hsf) - LOGGER.debug("New image size: %s x %s", new_width, new_height) - if (params.crop_size is not None) and (params.offset_func is not None): offset = OffsetParams(wsf, hsf, new_width, new_height, params.is_rand) params.crop_size[0], params.crop_size[1] = await params.offset_func(offset) - + LOGGER.debug("New image size: %r * %r", new_width, new_height) return ImageOps.pad(params.pil_img, (new_width, new_height)) - return ImageOps.pad(params.pil_img, (params.width, params.height)) - - -def prepare_resize_params(handler, pil_img, rand): - """Prepare resize parameters for image resizing.""" - return ResizeParams( - pil_img=pil_img, - width=handler.shared.image_ref_width, - height=handler.shared.image_ref_height, - aspect_ratio=handler.shared.image_aspect_ratio, - crop_size=handler.crop_img_size, - offset_func=handler.async_map_coordinates_offset, - is_rand=rand, - ) + return params.pil_img def initialize_drawing_config(handler): @@ -787,6 +804,51 @@ def manage_drawable_elements( handler.drawing_config.set_property(element_code, property_name, value) +def point_in_polygon(x: int, y: int, polygon: list) -> bool: + """ + Check if a point is inside a polygon using ray casting algorithm. + Enhanced version with better handling of edge cases. + + Args: + x: X coordinate of the point + y: Y coordinate of the point + polygon: List of (x, y) tuples forming the polygon + + Returns: + True if the point is inside the polygon, False otherwise + """ + # Ensure we have a valid polygon with at least 3 points + if len(polygon) < 3: + return False + + # Make sure the polygon is closed (last point equals first point) + if polygon[0] != polygon[-1]: + polygon = polygon + [polygon[0]] + + # Use winding number algorithm for better accuracy + wn = 0 # Winding number counter + + # Loop through all edges of the polygon + for i in range(len(polygon) - 1): # Last vertex is first vertex + p1x, p1y = polygon[i] + p2x, p2y = polygon[i + 1] + + # Test if a point is left/right/on the edge defined by two vertices + if p1y <= y: # Start y <= P.y + if p2y > y: # End y > P.y (upward crossing) + # Point left of edge + if ((p2x - p1x) * (y - p1y) - (x - p1x) * (p2y - p1y)) > 0: + wn += 1 # Valid up intersect + else: # Start y > P.y + if p2y <= y: # End y <= P.y (downward crossing) + # Point right of edge + if ((p2x - p1x) * (y - p1y) - (x - p1x) * (p2y - p1y)) < 0: + wn -= 1 # Valid down intersect + + # If winding number is not 0, the point is inside the polygon + return wn != 0 + + def handle_room_outline_error(file_name, room_id, error): """ Handle errors during room outline extraction. @@ -943,83 +1005,14 @@ def calculate_angle(point): return rect_outline -async def numpy_to_webp_bytes( - img_np_array: np.ndarray, quality: int = 85, lossless: bool = False -) -> WebPBytes: - """ - Convert NumPy array directly to WebP bytes. - - Args: - img_np_array: RGBA NumPy array - quality: WebP quality (0-100, ignored if lossless=True) - lossless: Use lossless WebP compression - - Returns: - WebP image as bytes - """ - # Convert NumPy array to PIL Image - pil_img = Image.fromarray(img_np_array, mode="RGBA") - - # Create bytes buffer - webp_buffer = io.BytesIO() - - # Save as WebP - PIL images should use lossless=True for best results - pil_img.save( - webp_buffer, - format="WEBP", - lossless=True, # Always lossless for PIL images - method=1, # Fastest method for lossless - ) - - # Get bytes and cleanup - webp_bytes = webp_buffer.getvalue() - webp_buffer.close() - - return webp_bytes - - -async def pil_to_webp_bytes( - pil_img: Image.Image, quality: int = 85, lossless: bool = False -) -> bytes: - """ - Convert PIL Image to WebP bytes. - - Args: - pil_img: PIL Image object - quality: WebP quality (0-100, ignored if lossless=True) - lossless: Use lossless WebP compression - - Returns: - WebP image as bytes - """ - # Create bytes buffer - webp_buffer = io.BytesIO() - - # Save as WebP - PIL images should use lossless=True for best results - pil_img.save( - webp_buffer, - format="WEBP", - lossless=True, # Always lossless for PIL images - method=1, # Fastest method for lossless - ) - - # Get bytes and cleanup - webp_bytes = webp_buffer.getvalue() - webp_buffer.close() - - return webp_bytes +def pil_to_png_bytes(pil_img: Image.Image, compress_level: int = 1) -> bytes: + """Convert PIL Image to PNG bytes asynchronously.""" + with io.BytesIO() as buf: + pil_img.save(buf, format="PNG", compress_level=compress_level) + return buf.getvalue() -def webp_bytes_to_pil(webp_bytes: bytes) -> Image.Image: - """ - Convert WebP bytes back to PIL Image for display or further processing. - - Args: - webp_bytes: WebP image as bytes - - Returns: - PIL Image object - """ - webp_buffer = io.BytesIO(webp_bytes) - pil_img = Image.open(webp_buffer) - return pil_img +def png_bytes_to_pil(png_bytes: bytes) -> Image.Image: + """Convert PNG bytes back to a PIL Image.""" + png_buffer = io.BytesIO(png_bytes) + return Image.open(png_buffer) diff --git a/SCR/valetudo_map_parser/hypfer_draw.py b/SCR/valetudo_map_parser/hypfer_draw.py index dee8943..9432e35 100755 --- a/SCR/valetudo_map_parser/hypfer_draw.py +++ b/SCR/valetudo_map_parser/hypfer_draw.py @@ -1,7 +1,7 @@ """ Image Draw Class for Valetudo Hypfer Image Handling. This class is used to simplify the ImageHandler class. -Version: 0.1.9 +Version: 0.1.10 """ from __future__ import annotations @@ -10,6 +10,7 @@ from .config.drawable_elements import DrawableElement from .config.types import Color, JsonType, NumpyArray, RobotPosition, RoomStore +from .config.utils import point_in_polygon _LOGGER = logging.getLogger(__name__) @@ -23,51 +24,6 @@ def __init__(self, image_handler): self.img_h = image_handler self.file_name = self.img_h.shared.file_name - @staticmethod - def point_in_polygon(x: int, y: int, polygon: list) -> bool: - """ - Check if a point is inside a polygon using ray casting algorithm. - Enhanced version with better handling of edge cases. - - Args: - x: X coordinate of the point - y: Y coordinate of the point - polygon: List of (x, y) tuples forming the polygon - - Returns: - True if the point is inside the polygon, False otherwise - """ - # Ensure we have a valid polygon with at least 3 points - if len(polygon) < 3: - return False - - # Make sure the polygon is closed (last point equals first point) - if polygon[0] != polygon[-1]: - polygon = polygon + [polygon[0]] - - # Use winding number algorithm for better accuracy - wn = 0 # Winding number counter - - # Loop through all edges of the polygon - for i in range(len(polygon) - 1): # Last vertex is first vertex - p1x, p1y = polygon[i] - p2x, p2y = polygon[i + 1] - - # Test if a point is left/right/on the edge defined by two vertices - if p1y <= y: # Start y <= P.y - if p2y > y: # End y > P.y (upward crossing) - # Point left of edge - if ((p2x - p1x) * (y - p1y) - (x - p1x) * (p2y - p1y)) > 0: - wn += 1 # Valid up intersect - else: # Start y > P.y - if p2y <= y: # End y <= P.y (downward crossing) - # Point right of edge - if ((p2x - p1x) * (y - p1y) - (x - p1x) * (p2y - p1y)) < 0: - wn -= 1 # Valid down intersect - - # If winding number is not 0, the point is inside the polygon - return wn != 0 - async def draw_go_to_flag( self, np_array: NumpyArray, entity_dict: dict, color_go_to: Color ) -> NumpyArray: @@ -191,8 +147,7 @@ async def _process_wall_layer( Returns: The updated image array """ - # Log the wall color to verify alpha is being passed correctly - _LOGGER.debug("%s: Drawing walls with color %s", self.file_name, color_wall) + # Draw walls # If there are no disabled rooms, draw all walls if not disabled_rooms: @@ -202,9 +157,6 @@ async def _process_wall_layer( # If there are disabled rooms, we need to check each wall pixel # to see if it belongs to a disabled room - _LOGGER.debug( - "%s: Filtering walls for disabled rooms: %s", self.file_name, disabled_rooms - ) # Get the element map if available element_map = getattr(self.img_h, "element_map", None) @@ -247,7 +199,7 @@ async def _process_wall_layer( # Get the element at this position element = element_map[check_y, check_x] - # Check if this element is a disabled room + # Check if this element is a disabled one # Room elements are in the range 101-115 (ROOM_1 to ROOM_15) if 101 <= element <= 115: room_id = element - 101 # Convert to 0-based index @@ -263,12 +215,6 @@ async def _process_wall_layer( filtered_pixels.append((x, y, z)) # Draw the filtered walls - _LOGGER.debug( - "%s: Drawing %d of %d wall pixels after filtering", - self.file_name, - len(filtered_pixels), - len(pixels), - ) if filtered_pixels: return await self.img_h.draw.from_json_to_image( img_np_array, filtered_pixels, pixel_size, color_wall @@ -310,15 +256,6 @@ async def async_draw_charger( return np_array return np_array - async def async_get_json_id(self, my_json: JsonType) -> str | None: - """Return the JSON ID from the image.""" - try: - json_id = my_json["metaData"]["nonce"] - except (ValueError, KeyError) as e: - _LOGGER.debug("%s: No JsonID provided: %s", self.file_name, str(e)) - json_id = None - return json_id - async def async_draw_zones( self, m_json: JsonType, @@ -417,15 +354,6 @@ async def async_draw_paths( ) return np_array - async def async_get_entity_data(self, m_json: JsonType) -> dict or None: - """Get the entity data from the JSON data.""" - try: - entity_dict = self.img_h.data.find_points_entities(m_json) - except (ValueError, KeyError): - return None - _LOGGER.info("%s: Got the points in the json.", self.file_name) - return entity_dict - def _check_active_zone_and_set_zooming(self) -> None: """Helper function to check active zones and set zooming state.""" if self.img_h.active_zones and self.img_h.robot_in_room: @@ -433,26 +361,8 @@ def _check_active_zone_and_set_zooming(self) -> None: room_store = RoomStore(self.file_name) room_keys = list(room_store.get_rooms().keys()) - _LOGGER.debug( - "%s: Active zones debug - segment_id: %s, room_keys: %s, active_zones: %s", - self.file_name, - segment_id, - room_keys, - self.img_h.active_zones, - ) - if segment_id in room_keys: position = room_keys.index(segment_id) - _LOGGER.debug( - "%s: Segment ID %s found at position %s, active_zones[%s] = %s", - self.file_name, - segment_id, - position, - position, - self.img_h.active_zones[position] - if position < len(self.img_h.active_zones) - else "OUT_OF_BOUNDS", - ) if position < len(self.img_h.active_zones): self.img_h.zooming = bool(self.img_h.active_zones[position]) else: @@ -468,37 +378,6 @@ def _check_active_zone_and_set_zooming(self) -> None: else: self.img_h.zooming = False - @staticmethod - def point_in_polygon(x: int, y: int, polygon: list) -> bool: - """ - Check if a point is inside a polygon using ray casting algorithm. - - Args: - x: X coordinate of the point - y: Y coordinate of the point - polygon: List of (x, y) tuples forming the polygon - - Returns: - True if the point is inside the polygon, False otherwise - """ - n = len(polygon) - inside = False - - p1x, p1y = polygon[0] - xinters = None # Initialize with default value - for i in range(1, n + 1): - p2x, p2y = polygon[i % n] - if y > min(p1y, p2y): - if y <= max(p1y, p2y): - if x <= max(p1x, p2x): - if p1y != p2y: - xinters = (y - p1y) * (p2x - p1x) / (p2y - p1y) + p1x - if p1x == p2x or (xinters is not None and x <= xinters): - inside = not inside - p1x, p1y = p2x, p2y - - return inside - async def async_get_robot_in_room( self, robot_y: int = 0, robot_x: int = 0, angle: float = 0.0 ) -> RobotPosition: @@ -508,7 +387,7 @@ async def async_get_robot_in_room( # If we have outline data, use point_in_polygon for accurate detection if "outline" in self.img_h.robot_in_room: outline = self.img_h.robot_in_room["outline"] - if self.point_in_polygon(int(robot_x), int(robot_y), outline): + if point_in_polygon(int(robot_x), int(robot_y), outline): temp = { "x": robot_x, "y": robot_y, @@ -549,12 +428,6 @@ async def async_get_robot_in_room( # This helps prevent false positives for points very far from any room map_boundary = 20000 # Typical map size is around 5000-10000 units if abs(robot_x) > map_boundary or abs(robot_y) > map_boundary: - _LOGGER.debug( - "%s robot position (%s, %s) is far outside map boundaries.", - self.file_name, - robot_x, - robot_y, - ) self.img_h.robot_in_room = last_room self.img_h.zooming = False temp = { @@ -567,10 +440,6 @@ async def async_get_robot_in_room( # Search through all rooms to find which one contains the robot if self.img_h.rooms_pos is None: - _LOGGER.debug( - "%s: No rooms data available for robot position detection.", - self.file_name, - ) self.img_h.robot_in_room = last_room self.img_h.zooming = False temp = { @@ -586,7 +455,7 @@ async def async_get_robot_in_room( if "outline" in room: outline = room["outline"] # Use point_in_polygon for accurate detection with complex shapes - if self.point_in_polygon(int(robot_x), int(robot_y), outline): + if point_in_polygon(int(robot_x), int(robot_y), outline): # Robot is in this room self.img_h.robot_in_room = { "id": room.get( @@ -608,26 +477,8 @@ async def async_get_robot_in_room( room_store = RoomStore(self.file_name) room_keys = list(room_store.get_rooms().keys()) - _LOGGER.debug( - "%s: Active zones debug - segment_id: %s, room_keys: %s, active_zones: %s", - self.file_name, - segment_id, - room_keys, - self.img_h.active_zones, - ) - if segment_id in room_keys: position = room_keys.index(segment_id) - _LOGGER.debug( - "%s: Segment ID %s found at position %s, active_zones[%s] = %s", - self.file_name, - segment_id, - position, - position, - self.img_h.active_zones[position] - if position < len(self.img_h.active_zones) - else "OUT_OF_BOUNDS", - ) if position < len(self.img_h.active_zones): self.img_h.zooming = bool( self.img_h.active_zones[position] @@ -645,11 +496,6 @@ async def async_get_robot_in_room( else: self.img_h.zooming = False - _LOGGER.debug( - "%s is in %s room (polygon detection).", - self.file_name, - self.img_h.robot_in_room["room"], - ) return temp # Fallback to bounding box if no outline is available elif "corners" in room: @@ -683,19 +529,10 @@ async def async_get_robot_in_room( # Handle active zones self._check_active_zone_and_set_zooming() - _LOGGER.debug( - "%s is in %s room (bounding box detection).", - self.file_name, - self.img_h.robot_in_room["room"], - ) return temp room_count += 1 # Robot not found in any room - _LOGGER.debug( - "%s not located within any room coordinates.", - self.file_name, - ) self.img_h.robot_in_room = last_room self.img_h.zooming = False temp = { diff --git a/SCR/valetudo_map_parser/hypfer_handler.py b/SCR/valetudo_map_parser/hypfer_handler.py index 28bc7f0..85d78e4 100644 --- a/SCR/valetudo_map_parser/hypfer_handler.py +++ b/SCR/valetudo_map_parser/hypfer_handler.py @@ -2,7 +2,7 @@ Hypfer Image Handler Class. It returns the PIL PNG image frame relative to the Map Data extrapolated from the vacuum json. It also returns calibration, rooms data to the card and other images information to the camera. -Version: 0.1.9 +Version: 0.1.10 """ from __future__ import annotations @@ -12,11 +12,13 @@ from PIL import Image -from .config.async_utils import AsyncNumPy, AsyncPIL -from .config.auto_crop import AutoCrop +from .config.async_utils import AsyncPIL + +# from .config.auto_crop import AutoCrop +from mvcrender.autocrop import AutoCrop from .config.drawable_elements import DrawableElement from .config.shared import CameraShared -from .config.utils import pil_to_webp_bytes + from .config.types import ( COLORS, LOGGER, @@ -24,15 +26,11 @@ Colors, RoomsProperties, RoomStore, - WebPBytes, JsonType, ) from .config.utils import ( BaseHandler, initialize_drawing_config, - manage_drawable_elements, - numpy_to_webp_bytes, - prepare_resize_params, ) from .hypfer_draw import ImageDraw as ImDraw from .map_data import ImageData @@ -50,7 +48,6 @@ def __init__(self, shared_data: CameraShared): AutoCrop.__init__(self, self) self.calibration_data = None # camera shared data. self.data = ImageData # imported Image Data Module. - # Initialize drawing configuration using the shared utility function self.drawing_config, self.draw, self.enhanced_draw = initialize_drawing_config( self @@ -62,7 +59,7 @@ def __init__(self, shared_data: CameraShared): self.img_work_layer = ( None # persistent working buffer to avoid per-frame allocations ) - self.active_zones = None # vacuum active zones. + self.active_zones = [] # vacuum active zones. self.svg_wait = False # SVG image creation wait. self.imd = ImDraw(self) # Image Draw class. self.color_grey = (128, 128, 128, 255) @@ -83,9 +80,6 @@ async def async_extract_room_properties(self, json_data) -> RoomsProperties: ) if room_properties: rooms = RoomStore(self.file_name, room_properties) - LOGGER.debug( - "%s: Rooms data extracted! %s", self.file_name, rooms.get_rooms() - ) # Convert room_properties to the format expected by async_get_robot_in_room self.rooms_pos = [] for room_id, room_data in room_properties.items(): @@ -97,7 +91,6 @@ async def async_extract_room_properties(self, json_data) -> RoomsProperties: } ) else: - LOGGER.debug("%s: Rooms data not available!", self.file_name) self.rooms_pos = None return room_properties @@ -105,14 +98,12 @@ async def async_extract_room_properties(self, json_data) -> RoomsProperties: async def async_get_image_from_json( self, m_json: JsonType | None, - return_webp: bool = False, - ) -> WebPBytes | Image.Image | None: + ) -> Image.Image | None: """Get the image from the JSON data. It uses the ImageDraw class to draw some of the elements of the image. The robot itself will be drawn in this function as per some of the values are needed for other tasks. @param m_json: The JSON data to use to draw the image. - @param return_webp: If True, return WebP bytes; if False, return PIL Image (default). - @return WebPBytes | Image.Image: WebP bytes or PIL Image depending on return_webp parameter. + @return Image.Image: PIL Image. """ # Initialize the colors. colors: Colors = { @@ -121,21 +112,12 @@ async def async_get_image_from_json( # Check if the JSON data is not None else process the image. try: if m_json is not None: - LOGGER.debug("%s: Creating Image.", self.file_name) - # buffer json data - self.json_data = m_json # Get the image size from the JSON data - size_x = int(m_json["size"]["x"]) - size_y = int(m_json["size"]["y"]) - self.img_size = { - "x": size_x, - "y": size_y, - "centre": [(size_x // 2), (size_y // 2)], - } + self.img_size = self.json_data.image_size # Get the JSON ID from the JSON data. - self.json_id = await self.imd.async_get_json_id(m_json) + self.json_id = self.json_data.json_id # Check entity data. - entity_dict = await self.imd.async_get_entity_data(m_json) + entity_dict = self.json_data.entity_dict # Update the Robot position. ( robot_pos, @@ -145,15 +127,16 @@ async def async_get_image_from_json( # Get the pixels size and layers from the JSON data pixel_size = int(m_json["pixelSize"]) - layers, active = self.data.find_layers(m_json["layers"], {}, []) - # Populate active_zones from the JSON data - self.active_zones = active - new_frame_hash = await self.calculate_array_hash(layers, active) + self.active_zones = self.json_data.active_zones + + new_frame_hash = await self.calculate_array_hash( + self.json_data.layers, self.active_zones + ) if self.frame_number == 0: self.img_hash = new_frame_hash # Create empty image img_np_array = await self.draw.create_empty_image( - size_x, size_y, colors["background"] + self.img_size["x"], self.img_size["y"], colors["background"] ) # Draw layers and segments if enabled room_id = 0 @@ -162,7 +145,10 @@ async def async_get_image_from_json( if self.drawing_config.is_enabled(DrawableElement.FLOOR): # First pass: identify disabled rooms - for layer_type, compressed_pixels_list in layers.items(): + for ( + layer_type, + compressed_pixels_list, + ) in self.json_data.layers.items(): # Check if this is a room layer if layer_type == "segment": # The room_id is the current room being processed (0-based index) @@ -180,11 +166,6 @@ async def async_get_image_from_json( ): # Add this room to the disabled rooms set disabled_rooms.add(room_id) - LOGGER.debug( - "%s: Room %d is disabled and will be skipped", - self.file_name, - current_room_id, - ) room_id = ( room_id + 1 ) % 16 # Cycle room_id back to 0 after 15 @@ -193,7 +174,10 @@ async def async_get_image_from_json( room_id = 0 # Second pass: draw enabled rooms and walls - for layer_type, compressed_pixels_list in layers.items(): + for ( + layer_type, + compressed_pixels_list, + ) in self.json_data.layers.items(): # Check if this is a room layer is_room_layer = layer_type == "segment" @@ -258,13 +242,13 @@ async def async_get_image_from_json( # Robot and rooms position if (room_id > 0) and not self.room_propriety: self.room_propriety = await self.async_extract_room_properties( - self.json_data + self.json_data.json_data ) # Ensure room data is available for robot room detection (even if not extracted above) if not self.rooms_pos and not self.room_propriety: self.room_propriety = await self.async_extract_room_properties( - self.json_data + self.json_data.json_data ) # Always check robot position for zooming (moved outside the condition) @@ -284,12 +268,6 @@ async def async_get_image_from_json( new_frame_hash != self.img_hash ): self.frame_number = 0 - LOGGER.debug( - "%s: %s at Frame Number: %s", - self.file_name, - str(self.json_id), - str(self.frame_number), - ) # Ensure persistent working buffer exists and matches base (allocate only when needed) if ( self.img_work_layer is None @@ -366,6 +344,7 @@ async def async_get_image_from_json( y=robot_position[1], angle=robot_position_angle, fill=robot_color, + radius=self.shared.robot_size, robot_state=self.shared.vacuum_state, ) @@ -383,7 +362,7 @@ async def async_get_image_from_json( self.zooming = self.imd.img_h.zooming # Resize the image - img_np_array = await self.async_auto_trim_and_zoom_image( + img_np_array = self.async_auto_trim_and_zoom_image( img_np_array, colors["background"], int(self.shared.margins), @@ -400,29 +379,16 @@ async def async_get_image_from_json( # Convert to PIL for resizing pil_img = await AsyncPIL.async_fromarray(img_np_array, mode="RGBA") del img_np_array - resize_params = prepare_resize_params(self, pil_img, False) + resize_params = self.prepare_resize_params(pil_img) resized_image = await self.async_resize_images(resize_params) - # Return WebP bytes or PIL Image based on parameter - if return_webp: - webp_bytes = await pil_to_webp_bytes(resized_image) - return webp_bytes - else: - return resized_image + # Return PIL Image + return resized_image else: - # Return WebP bytes or PIL Image based on parameter - if return_webp: - # Convert directly from NumPy to WebP for better performance - webp_bytes = await numpy_to_webp_bytes(img_np_array) - del img_np_array - LOGGER.debug("%s: Frame Completed.", self.file_name) - return webp_bytes - else: - # Convert to PIL Image (original behavior) - pil_img = await AsyncPIL.async_fromarray(img_np_array, mode="RGBA") - del img_np_array - LOGGER.debug("%s: Frame Completed.", self.file_name) - return pil_img + # Return PIL Image (convert from NumPy) + pil_img = await AsyncPIL.async_fromarray(img_np_array, mode="RGBA") + del img_np_array + return pil_img except (RuntimeError, RuntimeWarning) as e: LOGGER.warning( "%s: Error %s during image creation.", @@ -438,12 +404,9 @@ async def async_get_rooms_attributes(self) -> RoomsProperties: if self.room_propriety: return self.room_propriety if self.json_data: - LOGGER.debug("Checking %s Rooms data..", self.file_name) self.room_propriety = await self.async_extract_room_properties( - self.json_data + self.json_data.json_data ) - if self.room_propriety: - LOGGER.debug("Got %s Rooms Attributes.", self.file_name) return self.room_propriety def get_calibration_data(self) -> CalibrationPoints: @@ -465,42 +428,6 @@ def get_calibration_data(self) -> CalibrationPoints: del vacuum_points, map_points, calibration_point, rotation_angle # free memory. return calibration_data - # Element selection methods - def enable_element(self, element_code: DrawableElement) -> None: - """Enable drawing of a specific element.""" - self.drawing_config.enable_element(element_code) - LOGGER.info( - "%s: Enabled element %s, now enabled: %s", - self.file_name, - element_code.name, - self.drawing_config.is_enabled(element_code), - ) - - def disable_element(self, element_code: DrawableElement) -> None: - """Disable drawing of a specific element.""" - manage_drawable_elements(self, "disable", element_code=element_code) - - def set_elements(self, element_codes: list[DrawableElement]) -> None: - """Enable only the specified elements, disable all others.""" - manage_drawable_elements(self, "set_elements", element_codes=element_codes) - - def set_element_property( - self, element_code: DrawableElement, property_name: str, value - ) -> None: - """Set a drawing property for an element.""" - manage_drawable_elements( - self, - "set_property", - element_code=element_code, - property_name=property_name, - value=value, - ) - - @staticmethod - async def async_copy_array(original_array): - """Copy the array.""" - return await AsyncNumPy.async_copy(original_array) - async def _prepare_zone_data(self, m_json): """Prepare zone data for parallel processing.""" await asyncio.sleep(0) # Yield control diff --git a/SCR/valetudo_map_parser/map_data.py b/SCR/valetudo_map_parser/map_data.py index 23f6a28..ef2f3e1 100755 --- a/SCR/valetudo_map_parser/map_data.py +++ b/SCR/valetudo_map_parser/map_data.py @@ -3,26 +3,134 @@ ImageData is part of the Image_Handler used functions to search data in the json provided for the creation of the new camera frame -Version: v0.1.6 +Version: v0.1.10 """ from __future__ import annotations import numpy as np +from typing import ( + List, + Sequence, + TypeVar, + Any, + TypedDict, + NotRequired, + Literal, + Optional, +) + +from dataclasses import dataclass, field, asdict from .config.types import ImageSize, JsonType +T = TypeVar("T") + +# --- Common Nested Structures --- + + +class RangeStats(TypedDict): + min: int + max: int + mid: int + avg: int + + +class Dimensions(TypedDict): + x: RangeStats + y: RangeStats + pixelCount: int + + +# --- Layer Types --- + + +class FloorWallMeta(TypedDict, total=False): + area: int + + +class SegmentMeta(TypedDict, total=False): + segmentId: str + active: bool + source: str + area: int + + +class MapLayerBase(TypedDict): + __class__: Literal["MapLayer"] + type: str + pixels: list[int] + compressedPixels: list[int] + dimensions: Dimensions + + +class FloorWallLayer(MapLayerBase): + metaData: FloorWallMeta + type: Literal["floor", "wall"] + + +class SegmentLayer(MapLayerBase): + metaData: SegmentMeta + type: Literal["segment"] + + +# --- Entity Types --- + + +class PointMeta(TypedDict, total=False): + angle: float + label: str + id: str + + +class PointMapEntity(TypedDict): + __class__: Literal["PointMapEntity"] + type: str + points: list[int] + metaData: NotRequired[PointMeta] + + +class PathMapEntity(TypedDict): + __class__: Literal["PathMapEntity"] + type: str + points: list[int] + metaData: dict[str, object] # flexible for now + + +Entity = PointMapEntity | PathMapEntity + +# --- Top-level Map --- + + +class MapMeta(TypedDict, total=False): + version: int + totalLayerArea: int + + +class Size(TypedDict): + x: int + y: int + + +class ValetudoMap(TypedDict): + __class__: Literal["ValetudoMap"] + metaData: MapMeta + size: Size + pixelSize: int + layers: list[FloorWallLayer | SegmentLayer] + entities: list[Entity] + class ImageData: """Class to handle the image data.""" @staticmethod - def sublist(lst, n): + def sublist(lst: Sequence[T], n: int) -> list[Sequence[T]]: """Sub lists of specific n number of elements""" return [lst[i : i + n] for i in range(0, len(lst), n)] @staticmethod - def sublist_join(lst, n): + def sublist_join(lst: Sequence[T], n: int) -> list[list[T]]: """Join the lists in a unique list of n elements""" arr = np.array(lst) num_windows = len(lst) - n + 1 @@ -35,57 +143,130 @@ def sublist_join(lst, n): # Vacuums Json in parallel. @staticmethod - def get_obstacles(entity_dict: dict) -> list: - """Get the obstacles positions from the entity data.""" + def get_image_size(json_data: JsonType) -> dict[str, int | list[int]]: + """Get the image size from the json.""" + if json_data: + size_x = int(json_data["size"]["x"]) + size_y = int(json_data["size"]["y"]) + return { + "x": size_x, + "y": size_y, + "centre": [(size_x // 2), (size_y // 2)], + } + return {"x": 0, "y": 0, "centre": [0, 0]} + + @staticmethod + def get_json_id(json_data: JsonType) -> str | None: + """Get the json id from the json.""" try: - obstacle_data = entity_dict.get("obstacle") - except KeyError: + json_id = json_data["metaData"]["nonce"] + except (ValueError, KeyError): + json_id = None + return json_id + + @staticmethod + def get_obstacles( + entity_dict: dict[str, list[PointMapEntity]], + ) -> list[dict[str, str | int | None]]: + """ + Extract obstacle positions from Valetudo entity data. + + Args: + entity_dict: Parsed JSON-like dict containing obstacle data. + + Returns: + A list of obstacle dicts with keys: + - 'label': obstacle label string + - 'points': dict with 'x' and 'y' coordinates + - 'id': obstacle image/metadata ID (if any) + Returns an empty list if no valid obstacles found. + """ + obstacle_data = entity_dict.get("obstacle") # .get() won't raise KeyError + if not obstacle_data: return [] - obstacle_positions = [] - if obstacle_data: - for obstacle in obstacle_data: - label = obstacle.get("metaData", {}).get("label") - points = obstacle.get("points", []) - image_id = obstacle.get("metaData", {}).get("id") - - if label and points: - obstacle_pos = { + + obstacle_positions: list[dict[str, Any]] = [] + + for obstacle in obstacle_data: + meta = obstacle.get("metaData", {}) or {} + label = meta.get("label") + image_id = meta.get("id") + points = obstacle.get("points") or [] + + # Expecting at least two coordinates for a valid obstacle + if label and len(points) >= 2: + obstacle_positions.append( + { "label": label, "points": {"x": points[0], "y": points[1]}, "id": image_id, } - obstacle_positions.append(obstacle_pos) - return obstacle_positions - return [] + ) + + return obstacle_positions @staticmethod def find_layers( - json_obj: JsonType, layer_dict: dict, active_list: list - ) -> tuple[dict, list]: - """Find the layers in the json object.""" - layer_dict = {} if layer_dict is None else layer_dict - active_list = [] if active_list is None else active_list + json_obj: JsonType, + layer_dict: dict[str, list[Any]] | None, + active_list: list[int] | None, + ) -> tuple[dict[str, list[Any]], list[int]]: + """ + Recursively traverse a JSON-like structure to find MapLayer entries. + + Args: + json_obj: The JSON-like object (dicts/lists) to search. + layer_dict: Optional mapping of layer_type to a list of compressed pixel data. + active_list: Optional list of active segment flags. + + Returns: + A tuple: + - dict mapping layer types to their compressed pixel arrays. + - list of integers marking active segment layers. + """ + if layer_dict is None: + layer_dict = {} + active_list = [] + if isinstance(json_obj, dict): - if "__class" in json_obj and json_obj["__class"] == "MapLayer": + if json_obj.get("__class") == "MapLayer": layer_type = json_obj.get("type") - active_type = json_obj.get("metaData") + meta_data = json_obj.get("metaData") or {} if layer_type: - if layer_type not in layer_dict: - layer_dict[layer_type] = [] - layer_dict[layer_type].append(json_obj.get("compressedPixels", [])) - if layer_type == "segment": - active_list.append(int(active_type["active"])) - - for value in json_obj.items(): + layer_dict.setdefault(layer_type, []).append( + json_obj.get("compressedPixels", []) + ) + # Safely extract "active" flag if present and convertible to int + if layer_type == "segment": + try: + active_list.append(int(meta_data.get("active", 0))) + except (ValueError, TypeError): + pass # skip invalid/missing 'active' values + + # json_obj.items() yields (key, value), so we only want the values + for _, value in json_obj.items(): ImageData.find_layers(value, layer_dict, active_list) + elif isinstance(json_obj, list): for item in json_obj: ImageData.find_layers(item, layer_dict, active_list) + return layer_dict, active_list @staticmethod - def find_points_entities(json_obj: JsonType, entity_dict: dict = None) -> dict: - """Find the points entities in the json object.""" + def find_points_entities( + json_obj: ValetudoMap, entity_dict: dict = None + ) -> dict[str, list[PointMapEntity]]: + """ + Traverse a ValetudoMap and collect PointMapEntity objects by their `type`. + + Args: + json_obj: The full parsed JSON structure of a ValetudoMap. + entity_dict: Optional starting dict to append into. + + Returns: + A dict mapping entity type strings to lists of PointMapEntitys. + """ if entity_dict is None: entity_dict = {} if isinstance(json_obj, dict): @@ -101,7 +282,9 @@ def find_points_entities(json_obj: JsonType, entity_dict: dict = None) -> dict: return entity_dict @staticmethod - def find_paths_entities(json_obj: JsonType, entity_dict: dict = None) -> dict: + def find_paths_entities( + json_obj: JsonType, entity_dict: dict[str, list[Entity]] | None = None + ) -> dict[str, list[Entity]]: """Find the paths entities in the json object.""" if entity_dict is None: @@ -119,7 +302,9 @@ def find_paths_entities(json_obj: JsonType, entity_dict: dict = None) -> dict: return entity_dict @staticmethod - def find_zone_entities(json_obj: JsonType, entity_dict: dict = None) -> dict: + def find_zone_entities( + json_obj: JsonType, entity_dict: dict[str, list[Entity]] | None = None + ) -> dict[str, list[Entity]]: """Find the zone entities in the json object.""" if entity_dict is None: entity_dict = {} @@ -136,61 +321,81 @@ def find_zone_entities(json_obj: JsonType, entity_dict: dict = None) -> dict: return entity_dict @staticmethod - def find_virtual_walls(json_obj: JsonType) -> list: - """Find the virtual walls in the json object.""" - virtual_walls = [] + def find_virtual_walls(json_obj: JsonType) -> list[list[tuple[float, float]]]: + """ + Recursively search a JSON-like structure for virtual wall line entities. + + Args: + json_obj: The JSON-like data (dicts/lists) to search. - def find_virtual_walls_recursive(obj): - """Find the virtual walls in the json object recursively.""" + Returns: + A list of point lists, where each point list belongs to a virtual wall. + """ + virtual_walls: list[list[tuple[float, float]]] = [] + + def _recurse(obj: Any) -> None: if isinstance(obj, dict): - if obj.get("__class") == "LineMapEntity": - entity_type = obj.get("type") - if entity_type == "virtual_wall": - virtual_walls.append(obj["points"]) + if ( + obj.get("__class") == "LineMapEntity" + and obj.get("type") == "virtual_wall" + ): + points = obj.get("points") + if isinstance(points, list): + virtual_walls.append( + points + ) # Type checkers may refine further here + for value in obj.values(): - find_virtual_walls_recursive(value) + _recurse(value) + elif isinstance(obj, list): for item in obj: - find_virtual_walls_recursive(item) + _recurse(item) - find_virtual_walls_recursive(json_obj) + _recurse(json_obj) return virtual_walls @staticmethod async def async_get_rooms_coordinates( - pixels: list, pixel_size: int = 5, rand: bool = False - ) -> tuple: + pixels: Sequence[tuple[int, int, int]], pixel_size: int = 5, rand: bool = False + ) -> tuple[int, int, int, int] | tuple[tuple[int, int], tuple[int, int]]: """ - Extract the room coordinates from the vacuum pixels data. - piexels: dict: The pixels data format [[x,y,z], [x1,y1,z1], [xn,yn,zn]]. - pixel_size: int: The size of the pixel in mm (optional). - rand: bool: Return the coordinates in a rand256 format (optional). + Extract the room bounding box coordinates from vacuum pixel data. + + Args: + pixels: Sequence of (x, y, z) values representing pixels. + pixel_size: Size of each pixel in mm. Defaults to 5. + rand: If True, return coordinates in rand256 format. + + Returns: + If rand is True: + ((max_x_mm, max_y_mm), (min_x_mm, min_y_mm)) + Else: + (min_x_mm, min_y_mm, max_x_mm, max_y_mm) """ - # Initialize variables to store max and min coordinates - max_x, max_y = pixels[0][0], pixels[0][1] - min_x, min_y = pixels[0][0], pixels[0][1] - # Iterate through the data list to find max and min coordinates - for entry in pixels: + if not pixels: + raise ValueError("Pixels list cannot be empty.") + + # Initialise min/max using the first pixel + first_x, first_y, _ = pixels[0] + min_x = max_x = first_x + min_y = max_y = first_y + + for x, y, z in pixels: if rand: - x, y, _ = entry # Extract x and y coordinates - max_x = max(max_x, x) # Update max x coordinate - max_y = max(max_y, y + pixel_size) # Update max y coordinate - min_x = min(min_x, x) # Update min x coordinate - min_y = min(min_y, y) # Update min y coordinate + max_x = max(max_x, x) + max_y = max(max_y, y + pixel_size) else: - x, y, z = entry # Extract x and y coordinates - max_x = max(max_x, x + z) # Update max x coordinate - max_y = max(max_y, y + pixel_size) # Update max y coordinate - min_x = min(min_x, x) # Update min x coordinate - min_y = min(min_y, y) # Update min y coordinate + max_x = max(max_x, x + z) + max_y = max(max_y, y + pixel_size) + + min_x = min(min_x, x) + min_y = min(min_y, y) + if rand: - return ( - (((max_x * pixel_size) * 10), ((max_y * pixel_size) * 10)), - ( - ((min_x * pixel_size) * 10), - ((min_y * pixel_size) * 10), - ), - ) + to_mm = lambda v: v * pixel_size * 10 + return (to_mm(max_x), to_mm(max_y)), (to_mm(min_x), to_mm(min_y)) + return ( min_x * pixel_size, min_y * pixel_size, @@ -279,7 +484,7 @@ def get_rrm_path(json_data: JsonType) -> JsonType: return json_data.get("path", {}) @staticmethod - def get_rrm_goto_predicted_path(json_data: JsonType) -> list or None: + def get_rrm_goto_predicted_path(json_data: JsonType) -> Optional[list]: """Get the predicted path data from the json.""" try: predicted_path = json_data.get("goto_predicted_path", {}) @@ -321,7 +526,7 @@ def get_rrm_robot_angle(json_data: JsonType) -> tuple: return angle, json_data.get("robot_angle", 0) @staticmethod - def get_rrm_goto_target(json_data: JsonType) -> list or None: + def get_rrm_goto_target(json_data: JsonType) -> Any: """Get the goto target from the json.""" try: path_data = json_data.get("goto_target", {}) @@ -348,7 +553,7 @@ def get_rrm_forbidden_zones(json_data: JsonType) -> dict: return formatted_zones @staticmethod - def _rrm_valetudo_format_zone(coordinates: list) -> any: + def _rrm_valetudo_format_zone(coordinates: list) -> list[dict[str, Any]]: """Format the zones from RRM to Valetudo.""" formatted_zones = [] for zone_data in coordinates: @@ -497,3 +702,111 @@ def get_rrm_segments_ids(json_data: JsonType) -> list or None: except KeyError: return None return seg_ids + + +@dataclass +class HyperMapData: + """Class to handle the map data snapshots.""" + + json_data: Any = None + json_id: Optional[str] = None + obstacles: dict[str, list[Any]] = field(default_factory=dict) + paths: dict[str, list[Any]] = field(default_factory=dict) + image_size: dict[str, int | list[int]] = field(default_factory=dict) + areas: dict[str, list[Any]] = field(default_factory=dict) + pixel_size: int = 0 + entity_dict: dict[str, list[Any]] = field(default_factory=dict) + layers: dict[str, list[Any]] = field(default_factory=dict) + active_zones: list[int] = field(default_factory=list) + virtual_walls: list[list[tuple[float, float]]] = field(default_factory=list) + + @classmethod + async def async_from_valetudo_json(cls, json_data: Any) -> "HyperMapData": + """ + Build a fully-populated MapSnapshot from raw Valetudo JSON + using ImageData's helper functions. + """ + + # Call into your refactored static/class methods + json_id = ImageData.get_json_id(json_data) + paths = ImageData.find_paths_entities(json_data) + image_size = ImageData.get_image_size(json_data) + areas = ImageData.find_zone_entities(json_data) + layers = {} + active_zones = [] + # Hypothetical obstacles finder, if you have one + obstacles = getattr(ImageData, "find_obstacles_entities", lambda *_: {})( + json_data + ) + virtual_walls = ImageData.find_virtual_walls(json_data) + pixel_size = int(json_data["pixelSize"]) + layers, active_zones = ImageData.find_layers( + json_data["layers"], layers, active_zones + ) + entity_dict = ImageData.find_points_entities(json_data) + + return cls( + json_data=json_data, + json_id=json_id, + image_size=image_size, + obstacles=obstacles, + paths=paths, + areas=areas, + virtual_walls=virtual_walls, + entity_dict=entity_dict, + pixel_size=pixel_size, + layers=layers, + active_zones=active_zones, + ) + + def to_dict(self) -> dict[str, Any]: + """Return a dictionary representation of this dataclass.""" + return asdict(self) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "HyperMapData": + """Construct a HyperMapData from a plain dictionary. + Unknown keys are ignored; missing keys use safe defaults. + """ + return cls( + json_data=data.get("json_data"), + json_id=data.get("json_id") or None, + obstacles=data.get("obstacles", {}), + paths=data.get("paths", {}), + image_size=data.get("image_size", {}), + areas=data.get("areas", {}), + pixel_size=int(data.get("pixel_size", 0) or 0), + entity_dict=data.get("entity_dict", {}), + layers=data.get("layers", {}), + active_zones=data.get("active_zones", []), + virtual_walls=data.get("virtual_walls", []), + ) + + def update_from_dict(self, updates: dict[str, Any]) -> None: + """Update one or more fields in place, preserving the rest. + Unknown keys are ignored; pixel_size is coerced to int. + """ + if not updates: + return + allowed = { + "json_data", + "json_id", + "obstacles", + "paths", + "image_size", + "areas", + "pixel_size", + "entity_dict", + "layers", + "active_zones", + "virtual_walls", + } + for key, value in updates.items(): + if key not in allowed: + continue + if key == "pixel_size": + try: + value = int(value) + except (TypeError, ValueError): + continue + setattr(self, key, value) diff --git a/SCR/valetudo_map_parser/rand256_handler.py b/SCR/valetudo_map_parser/rand256_handler.py index 6cb69e0..7a342ca 100644 --- a/SCR/valetudo_map_parser/rand256_handler.py +++ b/SCR/valetudo_map_parser/rand256_handler.py @@ -13,8 +13,10 @@ import numpy as np -from .config.async_utils import AsyncNumPy, AsyncPIL -from .config.auto_crop import AutoCrop +from .config.async_utils import AsyncPIL + +# from .config.auto_crop import AutoCrop +from mvcrender.autocrop import AutoCrop from .config.drawable_elements import DrawableElement from .config.types import ( COLORS, @@ -26,14 +28,11 @@ RobotPosition, RoomsProperties, RoomStore, - WebPBytes, ) from .config.utils import ( BaseHandler, initialize_drawing_config, - manage_drawable_elements, - numpy_to_webp_bytes, - prepare_resize_params, + point_in_polygon, ) from .map_data import RandImageData from .reimg_draw import ImageDraw @@ -111,29 +110,19 @@ async def extract_room_properties( # Update shared.map_rooms with the room IDs for MQTT active zone mapping self.shared.map_rooms = room_ids - _LOGGER.debug("Updated shared.map_rooms with room IDs: %s", room_ids) # get the zones and points data zone_properties = await self.async_zone_propriety(zones_data) # get the points data point_properties = await self.async_points_propriety(points_data) - if room_properties or zone_properties: - extracted_data = [ - f"{len(room_properties)} Rooms" if room_properties else None, - f"{len(zone_properties)} Zones" if zone_properties else None, - ] - extracted_data = ", ".join(filter(None, extracted_data)) - _LOGGER.debug("Extracted data: %s", extracted_data) - else: + if not (room_properties or zone_properties): self.rooms_pos = None - _LOGGER.debug("%s: Rooms and Zones data not available!", self.file_name) rooms = RoomStore(self.file_name, room_properties) - _LOGGER.debug("Rooms Data: %s", rooms.get_rooms()) return room_properties, zone_properties, point_properties except (RuntimeError, ValueError) as e: - _LOGGER.debug( + _LOGGER.warning( "No rooms Data or Error in extract_room_properties: %s", e, exc_info=True, @@ -144,13 +133,11 @@ async def get_image_from_rrm( self, m_json: JsonType, # json data destinations: None = None, # MQTT destinations for labels - return_webp: bool = False, - ) -> WebPBytes | PilPNG | None: + ) -> PilPNG | None: """Generate Images from the json data. @param m_json: The JSON data to use to draw the image. @param destinations: MQTT destinations for labels (unused). - @param return_webp: If True, return WebP bytes; if False, return PIL Image (default). - @return WebPBytes | Image.Image: WebP bytes or PIL Image depending on return_webp parameter. + @return Image.Image: PIL Image. """ colors: Colors = { name: self.shared.user_colors[idx] for idx, name in enumerate(COLORS) @@ -177,9 +164,6 @@ async def get_image_from_rrm( # Increment frame number self.frame_number += 1 img_np_array = await self.async_copy_array(self.img_base_layer) - _LOGGER.debug( - "%s: Frame number %s", self.file_name, str(self.frame_number) - ) if self.frame_number > 5: self.frame_number = 0 @@ -188,17 +172,10 @@ async def get_image_from_rrm( img_np_array, m_json, colors, robot_position, robot_position_angle ) - # Return WebP bytes or PIL Image based on parameter - if return_webp: - # Convert directly to WebP bytes for better performance - webp_bytes = await numpy_to_webp_bytes(img_np_array) - del img_np_array # free memory - return webp_bytes - else: - # Convert to PIL Image using async utilities - pil_img = await AsyncPIL.async_fromarray(img_np_array, mode="RGBA") - del img_np_array # free memory - return await self._finalize_image(pil_img) + # Return PIL Image using async utilities + pil_img = await AsyncPIL.async_fromarray(img_np_array, mode="RGBA") + del img_np_array # free memory + return await self._finalize_image(pil_img) except (RuntimeError, RuntimeWarning) as e: _LOGGER.warning( @@ -301,11 +278,6 @@ async def _setup_robot_and_image( self.rooms_pos = original_rooms_pos except Exception as e: - _LOGGER.debug( - "%s: Early room extraction failed: %s, falling back to robot-position zoom", - self.file_name, - e, - ) # Fallback to robot-position-based zoom if room extraction fails if ( self.shared.image_auto_zoom @@ -313,10 +285,6 @@ async def _setup_robot_and_image( and robot_position ): self.zooming = True - _LOGGER.debug( - "%s: Enabling fallback robot-position-based zoom", - self.file_name, - ) return self.img_base_layer, robot_position, robot_position_angle @@ -379,19 +347,10 @@ async def _draw_map_elements( active_zones = self.shared.rand256_active_zone if active_zones and any(zone for zone in active_zones): self.zooming = True - _LOGGER.debug( - "%s: Enabling zoom for Rand256 - active zones detected: %s", - self.file_name, - active_zones, - ) else: self.zooming = False - _LOGGER.debug( - "%s: Zoom disabled for Rand256 - no active zones set", - self.file_name, - ) - img_np_array = await self.async_auto_trim_and_zoom_image( + img_np_array = self.async_auto_trim_and_zoom_image( img_np_array, detect_colour=colors["background"], margin_size=int(self.shared.margins), @@ -408,9 +367,8 @@ async def _finalize_image(self, pil_img): ) return pil_img if self.check_zoom_and_aspect_ratio(): - resize_params = prepare_resize_params(self, pil_img, True) + resize_params = self.prepare_resize_params(pil_img, True) pil_img = await self.async_resize_images(resize_params) - _LOGGER.debug("%s: Frame Completed.", self.file_name) return pil_img async def get_rooms_attributes( @@ -420,59 +378,11 @@ async def get_rooms_attributes( if self.room_propriety: return self.room_propriety if self.json_data and destinations: - _LOGGER.debug("Checking for rooms data..") self.room_propriety = await self.extract_room_properties( self.json_data, destinations ) - if self.room_propriety: - _LOGGER.debug("Got Rooms Attributes.") return self.room_propriety - @staticmethod - def point_in_polygon(x: int, y: int, polygon: list) -> bool: - """ - Check if a point is inside a polygon using ray casting algorithm. - Enhanced version with better handling of edge cases. - - Args: - x: X coordinate of the point - y: Y coordinate of the point - polygon: List of (x, y) tuples forming the polygon - - Returns: - True if the point is inside the polygon, False otherwise - """ - # Ensure we have a valid polygon with at least 3 points - if len(polygon) < 3: - return False - - # Make sure the polygon is closed (last point equals first point) - if polygon[0] != polygon[-1]: - polygon = polygon + [polygon[0]] - - # Use winding number algorithm for better accuracy - wn = 0 # Winding number counter - - # Loop through all edges of the polygon - for i in range(len(polygon) - 1): # Last vertex is first vertex - p1x, p1y = polygon[i] - p2x, p2y = polygon[i + 1] - - # Test if a point is left/right/on the edge defined by two vertices - if p1y <= y: # Start y <= P.y - if p2y > y: # End y > P.y (upward crossing) - # Point left of edge - if ((p2x - p1x) * (y - p1y) - (x - p1x) * (p2y - p1y)) > 0: - wn += 1 # Valid up intersect - else: # Start y > P.y - if p2y <= y: # End y <= P.y (downward crossing) - # Point right of edge - if ((p2x - p1x) * (y - p1y) - (x - p1x) * (p2y - p1y)) < 0: - wn -= 1 # Valid down intersect - - # If winding number is not 0, the point is inside the polygon - return wn != 0 - async def async_get_robot_in_room( self, robot_x: int, robot_y: int, angle: float ) -> RobotPosition: @@ -482,7 +392,7 @@ async def async_get_robot_in_room( # If we have outline data, use point_in_polygon for accurate detection if "outline" in self.robot_in_room: outline = self.robot_in_room["outline"] - if self.point_in_polygon(int(robot_x), int(robot_y), outline): + if point_in_polygon(int(robot_x), int(robot_y), outline): temp = { "x": robot_x, "y": robot_y, @@ -537,12 +447,6 @@ async def async_get_robot_in_room( # This helps prevent false positives for points very far from any room map_boundary = 50000 # Typical map size is around 25000-30000 units for Rand25 if abs(robot_x) > map_boundary or abs(robot_y) > map_boundary: - _LOGGER.debug( - "%s robot position (%s, %s) is far outside map boundaries.", - self.file_name, - robot_x, - robot_y, - ) self.robot_in_room = last_room self.zooming = False temp = { @@ -555,10 +459,6 @@ async def async_get_robot_in_room( # Search through all rooms to find which one contains the robot if not self.rooms_pos: - _LOGGER.debug( - "%s: No rooms data available for robot position detection.", - self.file_name, - ) self.robot_in_room = last_room self.zooming = False temp = { @@ -569,13 +469,12 @@ async def async_get_robot_in_room( } return temp - _LOGGER.debug("%s: Searching for robot in rooms...", self.file_name) for room in self.rooms_pos: # Check if the room has an outline (polygon points) if "outline" in room: outline = room["outline"] # Use point_in_polygon for accurate detection with complex shapes - if self.point_in_polygon(int(robot_x), int(robot_y), outline): + if point_in_polygon(int(robot_x), int(robot_y), outline): # Robot is in this room self.robot_in_room = { "id": room_count, @@ -598,19 +497,10 @@ async def async_get_robot_in_room( else: self.zooming = False - _LOGGER.debug( - "%s is in %s room (polygon detection).", - self.file_name, - self.robot_in_room["room"], - ) return temp room_count += 1 # Robot not found in any room - _LOGGER.debug( - "%s not located within any room coordinates.", - self.file_name, - ) self.robot_in_room = last_room self.zooming = False temp = { @@ -643,32 +533,3 @@ def get_calibration_data(self, rotation_angle: int = 0) -> Any: self.calibration_data.append(calibration_point) return self.calibration_data - - # Element selection methods - def enable_element(self, element_code: DrawableElement) -> None: - """Enable drawing of a specific element.""" - self.drawing_config.enable_element(element_code) - - def disable_element(self, element_code: DrawableElement) -> None: - """Disable drawing of a specific element.""" - manage_drawable_elements(self, "disable", element_code=element_code) - - def set_elements(self, element_codes: list[DrawableElement]) -> None: - """Enable only the specified elements, disable all others.""" - manage_drawable_elements(self, "set_elements", element_codes=element_codes) - - def set_element_property( - self, element_code: DrawableElement, property_name: str, value - ) -> None: - """Set a drawing property for an element.""" - manage_drawable_elements( - self, - "set_property", - element_code=element_code, - property_name=property_name, - value=value, - ) - - async def async_copy_array(self, original_array): - """Copy the array using async utilities.""" - return await AsyncNumPy.async_copy(original_array) diff --git a/SCR/valetudo_map_parser/reimg_draw.py b/SCR/valetudo_map_parser/reimg_draw.py index 4bc4e56..bc82dac 100644 --- a/SCR/valetudo_map_parser/reimg_draw.py +++ b/SCR/valetudo_map_parser/reimg_draw.py @@ -213,7 +213,6 @@ async def async_draw_charger( except KeyError as e: _LOGGER.warning("%s: No charger position found: %s", self.file_name, e) else: - _LOGGER.debug("Charger position: %s", charger_pos) if charger_pos: charger_pos_dictionary = { "x": (charger_pos[0] * 10), @@ -317,11 +316,6 @@ async def async_get_robot_position(self, m_json: JsonType) -> tuple | None: robot_position_angle = round(angle[0], 0) if robot_pos and robot_position_angle: robot_position = robot_pos - _LOGGER.debug( - "robot position: %s, robot angle: %s", - str(robot_pos), - str(robot_position_angle), - ) if self.img_h.rooms_pos is None: self.img_h.robot_pos = { "x": robot_position[0] * 10, @@ -351,6 +345,7 @@ async def async_draw_robot_on_map( y=robot_pos[1], angle=robot_angle, fill=color_robot, + radius=self.img_h.shared.robot_size, robot_state=self.img_h.shared.vacuum_state, ) return np_array diff --git a/SCR/valetudo_map_parser/rooms_handler.py b/SCR/valetudo_map_parser/rooms_handler.py index f14fc83..08ad391 100644 --- a/SCR/valetudo_map_parser/rooms_handler.py +++ b/SCR/valetudo_map_parser/rooms_handler.py @@ -104,14 +104,11 @@ async def _process_room_layer( is_enabled = self.drawing_config.is_enabled(room_element) if not is_enabled: # Skip this room if it's disabled - LOGGER.debug("Skipping disabled room %s", segment_id) return None, None except (ValueError, TypeError): # If segment_id is not a valid integer, we can't map it to a room element # In this case, we'll include the room (fail open) - LOGGER.debug( - "Could not convert segment_id %s to room element", segment_id - ) + pass # Optimization: Create a smaller mask for just the room area if not pixels: @@ -221,9 +218,8 @@ async def async_extract_room_properties(self, json_data) -> RoomsProperties: if room_id is not None and room_data is not None: room_properties[room_id] = room_data - # Log timing information + # Log timing information (kept internal, no debug output) total_time = time.time() - start_total - LOGGER.debug("Room extraction Total time: %.3fs", total_time) return room_properties @@ -339,11 +335,11 @@ async def _process_segment_data( is_enabled = self.drawing_config.is_enabled(room_element) if not is_enabled: # Skip this room if it's disabled - LOGGER.debug("Skipping disabled room %s", segment_id) return None, None except (ValueError, TypeError): # If segment_id is not a valid integer, we can't map it to a room element # In this case, we'll include the room (fail open) + pass LOGGER.debug( "Could not convert segment_id %s to room element", segment_id ) @@ -467,8 +463,6 @@ async def async_extract_room_properties( room_properties[room_id] = room_data - # Log timing information + # Log timing information (kept internal, no debug output) total_time = time.time() - start_total - LOGGER.debug("Room extraction Total time: %.3fs", total_time) - return room_properties diff --git a/pyproject.toml b/pyproject.toml index bafaebf..8a3c16a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "valetudo-map-parser" -version = "0.1.9" +version = "0.1.10rc5" description = "A Python library to parse Valetudo map data returning a PIL Image object." authors = ["Sandro Cantarella "] license = "Apache-2.0" @@ -14,10 +14,11 @@ packages = [{include = "valetudo_map_parser", from = "SCR"}] "Changelog" = "https://github.com/sca075/Python-package-valetudo-map-parser/releases" [tool.poetry.dependencies] -python = ">=3.12" +python = ">=3.13" numpy = ">=1.26.4" Pillow = ">=10.3.0" scipy = ">=1.12.0" +mvcrender = ">=0.0.2" [tool.poetry.group.dev.dependencies] ruff = "*" diff --git a/tests/test.py b/tests/test.py index a8b656e..722d357 100644 --- a/tests/test.py +++ b/tests/test.py @@ -31,10 +31,10 @@ _LOGGER = logging.getLogger(__name__) # ----- Test Configuration ----- -FRAME_COUNT = 1 # Set to 1/10/25/50/100 as needed -ENABLE_PROFILER = False # Master switch for profiler usage -ENABLE_CPU_TIMING = True # Lightweight per-frame CPU timing (process CPU time) -ENABLE_MEMORY_PROFILING = True # Use tracemalloc snapshots +FRAME_COUNT = 10 # Set to 1/10/25/50/100 as needed +ENABLE_PROFILER = True # Master switch for profiler usage +ENABLE_CPU_TIMING = False # Lightweight per-frame CPU timing (process CPU time) +ENABLE_MEMORY_PROFILING = False # Use tracemalloc snapshots SNAPSHOT_EVERY_FRAME = False # If False, snapshot only first and last frame ENABLE_LEGACY_CPROFILE = False # Legacy cProfile around the whole run # ------------------------------ @@ -224,7 +224,7 @@ def set_up(self): self.profiler.take_memory_snapshot("Test Setup Start") # Load the test.json file - test_file_path = os.path.join(os.path.dirname(__file__), "test.json") #glossyhardtofindnarwhal + test_file_path = os.path.join(os.path.dirname(__file__), "glossyhardtofindnarwhal.json") #glossyhardtofindnarwhal _LOGGER.info(f"Loading test data from {test_file_path}") with open(test_file_path, "r") as file: @@ -236,7 +236,6 @@ def set_up(self): async def test_image_handler(self): _LOGGER.info("Starting test_image_handler...") - device_info = { 'platform': 'mqtt_vacuum_camera', 'unique_id': 'rockrobo_camera', @@ -249,14 +248,14 @@ async def test_image_handler(self): 'alpha_background': 255.0, 'alpha_charger': 255.0, 'alpha_go_to': 255.0, - 'alpha_move': 150.0, # Higher alpha for better visibility + 'alpha_move': 50.0, # Higher alpha for better visibility 'alpha_no_go': 125.0, 'alpha_robot': 255.0, 'alpha_text': 255.0, 'alpha_wall': 150.0, # Testing with a lower alpha value 'alpha_zone_clean': 125.0, - 'aspect_ratio': '1, 1', - 'auto_zoom': False, + 'aspect_ratio': '16, 9', + 'auto_zoom': True, 'zoom_lock_ratio': True, 'color_background': [0, 125, 255], 'color_charger': [255, 128, 0], @@ -299,14 +298,15 @@ async def test_image_handler(self): 'alpha_room_13': 255.0, 'alpha_room_14': 255.0, 'alpha_room_15': 255.0, + 'robot_size': 20, 'offset_top': 0, 'offset_bottom': 0, 'offset_left': 10, 'offset_right': 0, 'rotate_image': '90', 'margins': '100', - 'show_vac_status': False, - 'vac_status_font': 'custom_components/mqtt_vacuum_camera/utils/fonts/FiraSans.ttf', + 'show_vac_status': True, + 'vac_status_font': 'SCR/valetudo_map_parser/config/fonts/FiraSans.ttf', 'vac_status_position': True, 'vac_status_size': 50.0, 'enable_www_snapshots': False, @@ -345,10 +345,11 @@ async def test_image_handler(self): 'disable_room_14': False, 'disable_room_15': False } - - shared_data = CameraSharedManager("test_rand256", device_info) + file_name = "test_hypfer" + shared_data = CameraSharedManager(file_name, device_info) shared = shared_data.get_instance() shared.vacuum_state = "docked" + shared.vacuum_connection = True shared.vacuum_battery = 100 shared.vacuum_ips = "192.168.8.1" @@ -384,7 +385,7 @@ async def test_image_handler(self): start_time = time.time() # Get the image (PIL format) - self.image = await handler.async_get_image(self.test_data, bytes_format=False) + self.image = await handler.async_get_image(self.test_data, bytes_format=True) if shared.binary_image is None: _LOGGER.warning("❌ Binary image is None") else: @@ -421,7 +422,7 @@ async def test_image_handler(self): _LOGGER.info(f"Calibration_data: {calibration_data}") _LOGGER.info(f"PIL image size: {self.image.size}") - store = RoomStore("test_vacuum") + store = RoomStore(file_name) t2=time.time(); rooms_data = await handler.async_get_rooms_attributes(); t3=time.time() if self.profiler: self.profiler.time_operation("Rooms", t2, t3) _LOGGER.info(f"Room Properties: {rooms_data}") @@ -436,7 +437,6 @@ async def test_image_handler(self): _LOGGER.info(f"Calibration_data: {handler.get_calibration_data()}") _LOGGER.info(f"PIL image size: {self.image.size}") - store = RoomStore("test_vacuum") rooms_data = await handler.async_get_rooms_attributes() _LOGGER.info(f"Room Properties: {rooms_data}") count = store.get_rooms_count() @@ -463,37 +463,8 @@ async def test_image_handler(self): else: _LOGGER.info(f"Position {i}: OUT_OF_BOUNDS = active: {bool(active)}") - # Test: Simulate your vacuum's scenario - make "Entrance" active and put robot there _LOGGER.info("=== TESTING YOUR VACUUM SCENARIO ===") - # Find "Entrance" position in room_keys - entrance_position = None - for i, room_id in enumerate(room_keys): - if rooms[room_id]['name'] == 'Entrance': - entrance_position = i - break - - if entrance_position is not None: - # Simulate your vacuum's active zones: [0, 0, 1, 0, 0] where position 2 is active - # But we need to make "Entrance" active instead - test_active_zones = [0] * len(handler.active_zones) - test_active_zones[entrance_position] = 1 # Make Entrance active - - # Override the active zones for testing - handler.active_zones = test_active_zones - _LOGGER.info(f"Test active zones (Entrance active): {test_active_zones}") - _LOGGER.info(f"Entrance is at position {entrance_position} in room_keys") - - # Test robot detection in Entrance - entrance_room_data = rooms[room_keys[entrance_position]] - test_x, test_y = entrance_room_data['x'], entrance_room_data['y'] - - # Test the robot detection function with robot in Entrance - test_result = await handler.imd.async_get_robot_in_room(robot_y=test_y, robot_x=test_x, angle=0.0) - _LOGGER.info(f"Test robot in Entrance: {test_result}") - _LOGGER.info(f"Test zooming enabled: {handler.imd.img_h.zooming}") - else: - _LOGGER.warning("Entrance room not found in test data") _LOGGER.info(f"Trims update: {shared.trims.to_dict()}") calibration_data = handler.get_calibration_data() _LOGGER.info(f"Calibration Data: {calibration_data}") @@ -514,7 +485,7 @@ async def test_image_handler(self): # Debug: Check the device_info auto_zoom setting _LOGGER.info(f"Device info auto_zoom: {device_info.get('auto_zoom')}") _LOGGER.info(f"Shared image_auto_zoom: {handler.shared.image_auto_zoom}") - + _LOGGER.info(f"Camera Attributes: {handler.shared.generate_attributes()}") # Debug: Check the zooming flags _LOGGER.info(f"Handler zooming: {handler.zooming}") _LOGGER.info(f"ImageDraw img_h zooming: {handler.imd.img_h.zooming}") diff --git a/tests/test_rand.py b/tests/test_rand.py index 955ddb2..58b8d41 100644 --- a/tests/test_rand.py +++ b/tests/test_rand.py @@ -1,7 +1,6 @@ from __future__ import annotations import asyncio -import json import logging import cProfile import pstats @@ -208,17 +207,181 @@ def __init__(self, enable_profiling: bool = True): ) if enable_profiling else None def setUp(self): - # Load test data from the rand.json file - test_file_path = os.path.join(os.path.dirname(__file__), "rand.json") - logging.getLogger(__name__).info(f"Loading test data from {test_file_path}") + """Set up test data for first and last bin files.""" + _LOGGER.debug("Setting up test data for first and last bin files...") - with open(test_file_path, "r") as file: - self.test_data = json.load(file) + if self.profiler: + self.profiler.take_memory_snapshot("Test Setup Start") + + # Test with first and last bin files + # ("map_data_20250729_084141.bin", "LAST FILE - Multi-room with segments, vacuum at dock") + self.test_files = [ + ("map_data_20250728_185945.bin", "FIRST FILE - Single room, vacuum at dock"), + ("map_data_20250728_193950.bin", "FIRST FILE - Single room, vacuum at dock"), + ("map_data_20250728_194519.bin", "FIRST FILE - Single room, vacuum at dock"), + ("map_data_20250728_204538.bin", "FIRST FILE - Single room, vacuum at dock"), + ("map_data_20250728_204552.bin", "FIRST FILE - Single room, vacuum at dock"), + ("map_data_20250729_084141.bin", "LAST FILE - Multi-room with segments, vacuum at dock"), + ] + + self.test_data_sets = [] + + for filename, description in self.test_files: + payload_file = os.path.join(os.path.dirname(__file__), filename) + + if not os.path.exists(payload_file): + raise FileNotFoundError(f"Test payload file not found: {payload_file}") + + with open(payload_file, "rb") as f: + payload = f.read() + + _LOGGER.debug(f"Loaded {filename}: {len(payload)} bytes") + + # Only use the new rand256 parser for both files + import time + self.new_rand256_parser = Rand256Parser() + + # Measure new_rand256 parser performance + start_time = time.time() + new_rand256_json = self.new_rand256_parser.parse_data(payload, pixels=True) + parse_time = time.time() - start_time + parsed_data = new_rand256_json + self.test_data_sets.append({ + 'filename': filename, + 'description': description, + 'payload': payload, + 'data': parsed_data, + 'json': new_rand256_json, + 'parse_time': parse_time + }) + + _LOGGER.debug(f"Parsed {filename} in {parse_time:.4f}s") + + # Display data for both files + print("\n" + "="*80) + print("NEW_RAND256_PARSER DATA COMPARISON") + print("="*80) + + for i, dataset in enumerate(self.test_data_sets): + print(f"\n📁 {dataset['description']}") + print(f" File: {dataset['filename']}") + print(f" Size: {len(dataset['payload']):,} bytes") + print(f" Parse time: {dataset['parse_time']:.4f} seconds") + print(f" JSON length: {len(dataset['json']):,} characters") + + data = dataset['data'] + if data: + robot = data.get('robot', [0, 0]) + robot_angle = data.get('robot_angle', 0) + charger = data.get('charger', [0, 0]) + path_data = data.get('path', {}) + path_points = len(path_data.get('points', [])) + path_angle = path_data.get('current_angle', 0) + + # Segments info + image_data = data.get('image', {}) + segments = image_data.get('segments', {}) + segment_count = segments.get('count', 0) + segment_ids = segments.get('id', []) + + print(f" 🤖 Robot: {robot}, Angle: {robot_angle}°") + print(f" 🔌 Charger: {charger}") + print(f" 🛤️ Path: {path_points} points, Angle: {path_angle:.1f}°") + print(f" 🏠 Segments: {segment_count} rooms {segment_ids}") + + # Check if robot is at charger (close positions) + if robot and charger: + distance = ((robot[0] - charger[0])**2 + (robot[1] - charger[1])**2)**0.5 + at_dock = distance < 500 # Within 500 units + print(f" 🏠 At dock: {'✅ YES' if at_dock else '❌ NO'} (distance: {distance:.0f})") + else: + print(" ❌ PARSING FAILED") + + print("="*80) - _LOGGER.debug("Test data loaded.") + # Use the first dataset for the image test + self.test_data = self.test_data_sets[0]['data'] + self.current_file = self.test_data_sets[0]['filename'] + + _LOGGER.debug("Test data loaded and compared.") + + async def simulate_ha_background_task(self, task_name: str, duration: float): + """Simulate Home Assistant background tasks like sensors, automations, etc.""" + start_time = asyncio.get_event_loop().time() + while asyncio.get_event_loop().time() - start_time < duration: + # Simulate sensor updates + await asyncio.sleep(0.1) + # Simulate some CPU work + _ = sum(i * i for i in range(1000)) + # Yield control back to event loop + await asyncio.sleep(0) + _LOGGER.debug(f"Background task {task_name} completed after {duration}s") + + def _generate_single_image_sync(self): + """Synchronous wrapper for image generation (for asyncio.to_thread).""" + # This will be called in a thread pool, so we need a new event loop + try: + # Create new event loop for this thread + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + return loop.run_until_complete(self._generate_single_image()) + finally: + loop.close() async def test_image_handler(self): - _LOGGER.info("Starting test_rand_image_handler...") + """Test image generation with Home Assistant environment simulation.""" + _LOGGER.info("Testing with Home Assistant concurrent environment simulation...") + + for i, dataset in enumerate(self.test_data_sets): + print(f"\n{'='*80}") + print(f"TESTING IMAGE GENERATION: {dataset['description']}") + print(f"File: {dataset['filename']}") + print(f"{'='*80}") + + # Set current test data + self.test_data = dataset['data'] + self.current_file = dataset['filename'] + + # Simulate Home Assistant concurrent environment + background_tasks = [] + + # Simulate various HA background operations + background_tasks.append(self.simulate_ha_background_task("sensor_updates", 2.0)) + background_tasks.append(self.simulate_ha_background_task("automation_engine", 1.5)) + background_tasks.append(self.simulate_ha_background_task("state_machine", 1.8)) + background_tasks.append(self.simulate_ha_background_task("websocket_handler", 2.2)) + background_tasks.append(self.simulate_ha_background_task("recorder", 1.3)) + + _LOGGER.info(f"🏠 Starting image generation {i+1}/{len(self.test_data_sets)} with HA simulation...") + + try: + # Use asyncio.to_thread to simulate HA's approach + image_task = asyncio.to_thread(self._generate_single_image_sync) + + # Run image generation concurrently with background tasks + start_time = asyncio.get_event_loop().time() + results = await asyncio.gather( + image_task, + *background_tasks, + return_exceptions=True + ) + end_time = asyncio.get_event_loop().time() + + # Check if image generation succeeded + image_result = results[0] + if isinstance(image_result, Exception): + _LOGGER.error(f"❌ Image generation failed: {image_result}") + raise image_result + + _LOGGER.info(f"✅ Image {i+1}/{len(self.test_data_sets)} completed in {end_time - start_time:.3f}s with concurrent load") + + except Exception as e: + _LOGGER.error(f"❌ Test failed for {dataset['filename']}: {e}") + raise + + async def _generate_single_image(self): + """Generate image for the current test data.""" + _LOGGER.info(f"Generating image for {self.current_file}...") # Start profiling for this image generation start_time = time.time() @@ -290,14 +453,15 @@ async def test_image_handler(self): 'alpha_room_13': 255.0, 'alpha_room_14': 255.0, 'alpha_room_15': 255.0, + 'robot_size': 15, 'offset_top': 0, 'offset_bottom': 0, 'offset_left': 10, 'offset_right': 0, 'rotate_image': '90', 'margins': '100', - 'show_vac_status': False, - 'vac_status_font': 'custom_components/mqtt_vacuum_camera/utils/fonts/FiraSans.ttf', + 'show_vac_status': True, + 'vac_status_font': 'SCR/valetudo_map_parser/config/fonts/FiraSans.ttf', 'vac_status_position': True, 'vac_status_size': 50.0, 'enable_www_snapshots': False, @@ -339,6 +503,7 @@ async def test_image_handler(self): shared_data = CameraSharedManager("test_vacuum", device_info) shared = shared_data.get_instance() shared.vacuum_state = "cleaning" + shared.user_language = "it" # The room IDs in the test data are 16-20, but the handler uses an internal ID (0-4) # We need to set up the active zones array to match the internal IDs @@ -359,7 +524,8 @@ async def test_image_handler(self): # Try to generate an image from the JSON data try: _LOGGER.info("Attempting to generate image from JSON data...") - self.image = await handler.get_image_from_rrm(self.test_data) + # Test with PNG output (WebP functionality kept in library but not used in test) + self.image = await handler.async_get_image(self.test_data) _LOGGER.info("Successfully generated image from JSON data") if self.image is None: _LOGGER.error("Failed to generate image from JSON data") @@ -367,8 +533,17 @@ async def test_image_handler(self): except Exception as e: _LOGGER.warning(f"Error generating image from JSON: {e}") - # Display image size and other properties - _LOGGER.info(f"Image size: {self.image.size}") + # Check if image generation was successful + if self.image is None: + _LOGGER.error("TEST FAILED: Image generation returned None") + return + else: + _LOGGER.info("TEST PASSED: Image generated successfully") + # Image should be PIL Image (not WebP bytes) + if hasattr(self.image, 'size'): + _LOGGER.info(f"PIL image size: {self.image.size}") + else: + _LOGGER.warning(f"Unexpected image type: {type(self.image)}") _LOGGER.info(f"Trims update: {shared.trims.to_dict()}") _LOGGER.info(f"Calibration_data: {handler.get_calibration_data()}") _LOGGER.info(await handler.get_rooms_attributes({ @@ -412,8 +587,23 @@ async def test_image_handler(self): _LOGGER.info(f"Zoom conditions: zoom={handler.zooming}, vacuum_state={handler.shared.vacuum_state}, image_auto_zoom={handler.shared.image_auto_zoom}") _LOGGER.info(f"Zooming enabled: {handler.zooming}") - # Show the image - self.image.show() + # Show the image if successful + if self.image is not None: + print(f"\n🖼️ PROCESSING IMAGE: {self.current_file}") + + # Display PIL image directly without saving to disk + if hasattr(self.image, 'size'): + print(f" 📐 Image size: {self.image.size}") + + robot_in_room = getattr(handler, 'robot_in_room', 'Unknown') + print(f" 🤖 Robot in room: {robot_in_room}") + + # Display the image directly + self.image.show() + else: + print(f" ❌ Unexpected image type: {type(self.image)}") + else: + print(f"\n❌ IMAGE GENERATION FAILED: {self.current_file}") # End profiling for this image generation end_time = time.time() diff --git a/tests/test_status_text.py b/tests/test_status_text.py new file mode 100644 index 0000000..aefcbfd --- /dev/null +++ b/tests/test_status_text.py @@ -0,0 +1,139 @@ +"""Tests for status text generation and translations.""" + +from __future__ import annotations + +import asyncio +import os +import sys + +from PIL import Image +from valetudo_map_parser.config.shared import CameraSharedManager +from SCR.valetudo_map_parser.config.status_text.status_text import StatusText + +# Ensure package root is on sys.path for test execution contexts +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) + + + +async def _build_shared(file_name: str = "test_device"): + manager = CameraSharedManager(file_name, device_info={}) + shared = manager.get_instance() + # Defaults + shared.show_vacuum_state = True + shared.vacuum_status_size = 60 # >=50 triggers dynamic sizing + shared.user_language = None + shared.vacuum_connection = True + shared.vacuum_state = "docked" + shared.vacuum_battery = 90 + shared.current_room = {"in_room": "Kitchen"} + return shared + + +def run_async(coro): + return asyncio.get_event_loop().run_until_complete(coro) + + +def test_language_fallback_and_ready_text(): + async def inner(): + shared = await _build_shared() + shared.user_language = None # fallback to EN + st = StatusText(hass=None, camera_shared=shared) + img = Image.new("RGBA", (300, 50), (0, 0, 0, 0)) + + status_text, size = await st.get_status_text(img) + + # Expect device name + translated status (docked) and ready text or charging indicator + assert status_text[0].startswith(shared.file_name + ": ") + assert any(t in status_text for t in ["Ready.", "∑Ϟ ", "∑Ϟ"]) or any( + "Docked" in t for t in status_text + ) + assert isinstance(size, int) and size > 0 + + run_async(inner()) + + +def test_docked_battery_branches(): + async def inner(): + shared = await _build_shared() + st = StatusText(hass=None, camera_shared=shared) + img = Image.new("RGBA", (600, 60), (0, 0, 0, 0)) + + # Battery < 100 -> should show charging symbol and percentage + shared.vacuum_battery = 50 + status_text, _ = await st.get_status_text(img) + assert any("%" in t for t in status_text) + + # Battery 100 -> should show 'Ready.' text branch + shared.vacuum_battery = 100 + status_text, _ = await st.get_status_text(img) + assert any("Ready." in t for t in status_text) + + run_async(inner()) + + +def test_mqtt_disconnected_uses_translated_key(): + async def inner(): + shared = await _build_shared() + shared.vacuum_connection = False + shared.user_language = "en" + st = StatusText(hass=None, camera_shared=shared) + img = Image.new("RGBA", (400, 40), (0, 0, 0, 0)) + + status_text, _ = await st.get_status_text(img) + assert status_text[0] == f"{shared.file_name}: Disconnected from MQTT?" + + run_async(inner()) + + +def test_current_room_suffix_appended(): + async def inner(): + shared = await _build_shared() + st = StatusText(hass=None, camera_shared=shared) + img = Image.new("RGBA", (400, 40), (0, 0, 0, 0)) + + status_text, _ = await st.get_status_text(img) + # Should contain " (Kitchen)" + print(status_text) + assert any(" (Kitchen)" in t for t in status_text) + + + +def test_graceful_when_no_image_passed(): + async def inner(): + shared = await _build_shared() + st = StatusText(hass=None, camera_shared=shared) + status_text, size = await st.get_status_text(None) # type: ignore[arg-type] + assert size == shared.vacuum_status_size + assert status_text[0].startswith(shared.file_name + ": ") + + run_async(inner()) + + +def test_graceful_when_image_closed(): + async def inner(): + shared = await _build_shared() + st = StatusText(hass=None, camera_shared=shared) + img = Image.new("RGBA", (300, 50), (0, 0, 0, 0)) + img.close() + status_text, size = await st.get_status_text(img) + assert isinstance(size, int) and size >= 0 + assert status_text[0].startswith(shared.file_name + ": ") + + run_async(inner()) + + +def test_dynamic_size_uses_image_width_when_large_size(): + async def inner(): + shared = await _build_shared() + st = StatusText(hass=None, camera_shared=shared) + + # Two images with different widths should yield different computed sizes when size >= 50 + img_small = Image.new("RGBA", (200, 40), (0, 0, 0, 0)) + img_large = Image.new("RGBA", (800, 40), (0, 0, 0, 0)) + + _, size_small = await st.get_status_text(img_small) + _, size_large = await st.get_status_text(img_large) + + assert size_large >= size_small + + run_async(inner())