Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions .github/workflows/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ jobs:
build-and-publish-pypi:
name: Build and publish release to PyPI
runs-on: ubuntu-latest
permissions:
id-token: write
contents: read
environment:
name: valetudo_map_parser_pypi

steps:
# Step 1: Checkout the repository
- uses: actions/checkout@v5
Expand Down Expand Up @@ -46,9 +52,8 @@ jobs:
run: poetry build --no-interaction

# Step 8: Publish to PyPI
- name: Publish to PyPi
env:
PYPI_TOKEN: ${{ secrets.PYPI_TOKEN }}
run: |
poetry config pypi-token.pypi "${PYPI_TOKEN}"
poetry publish --no-interaction
- name: Publish to PyPI (Trusted Publishing)
uses: pypa/gh-action-pypi-publish@release/v1
with:
skip-existing: true
packages-dir: dist/
169 changes: 80 additions & 89 deletions SCR/valetudo_map_parser/config/shared.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""
Class Camera Shared.
Keep the data between the modules.
Version: v0.1.10
Version: v0.1.12
"""

import asyncio
Expand Down Expand Up @@ -54,129 +54,112 @@ class CameraShared:
"""

def __init__(self, file_name):
self.camera_mode: str = CameraModes.MAP_VIEW # Camera mode
self.frame_number: int = 0 # camera Frame number
self.destinations: list = [] # MQTT rand destinations
self.rand256_active_zone: list = [] # Active zone for rand256
self.rand256_zone_coordinates: list = [] # Active zone coordinates for rand256
self.is_rand: bool = False # MQTT rand data
self._new_mqtt_message = False # New MQTT message
# Initialize last_image with default gray image (250x150 minimum)
self.last_image = Image.new(
"RGBA", (250, 150), (128, 128, 128, 255)
) # Gray default image
self.new_image: PilPNG | None = None # New image received
self.binary_image: bytes | None = None # Current image in binary format
self.image_last_updated: float = 0.0 # Last image update time
self.image_format = "image/pil" # Image format
self.image_size = None # Image size
self.robot_size = None # Robot size
self.image_auto_zoom: bool = False # Auto zoom image
self.image_zoom_lock_ratio: bool = True # Zoom lock ratio
self.image_ref_height: int = 0 # Image reference height
self.image_ref_width: int = 0 # Image reference width
self.image_aspect_ratio: str = "None" # Change Image aspect ratio
self.image_grab = True # Grab image from MQTT
self.image_rotate: int = 0 # Rotate image
self.drawing_limit: float = 0.0 # Drawing CPU limit
self.current_room = None # Current room of rhe vacuum
self.user_colors = Colors # User base colors
self.rooms_colors = Colors # Rooms colors
self.vacuum_battery = 0 # Vacuum battery state
self.vacuum_connection = False # Vacuum connection state
self.vacuum_state = None # Vacuum state
self.charger_position = None # Vacuum Charger position
self.show_vacuum_state = None # Show vacuum state on the map
self.camera_mode: str = CameraModes.MAP_VIEW
self.frame_number: int = 0
self.destinations: list = []
self.rand256_active_zone: list = []
self.rand256_zone_coordinates: list = []
self.is_rand: bool = False
self._new_mqtt_message = False
self.last_image = Image.new("RGBA", (250, 150), (128, 128, 128, 255))
self.new_image: PilPNG | None = None
self.binary_image: bytes | None = None
self.image_last_updated: float = 0.0
self.image_format = "image/pil"
self.image_size = None
self.robot_size = None
self.image_auto_zoom: bool = False
self.image_zoom_lock_ratio: bool = True
self.image_ref_height: int = 0
self.image_ref_width: int = 0
self.image_aspect_ratio: str = "None"
self.image_grab = True
self.image_rotate: int = 0
self.drawing_limit: float = 0.0
self.current_room = None
self.user_colors = Colors
self.rooms_colors = Colors
self.vacuum_battery = 0
self.vacuum_connection = False
self.vacuum_state = None
self.charger_position = None
self.show_vacuum_state = None
self.vacuum_status_font: str = (
"custom_components/mqtt_vacuum_camera/utils/fonts/FiraSans.ttf" # Font
"custom_components/mqtt_vacuum_camera/utils/fonts/FiraSans.ttf"
)
self.vacuum_status_size: int = 50 # Vacuum status size
self.vacuum_status_position: bool = True # Vacuum status text image top
self.snapshot_take = False # Take snapshot
self.vacuum_error = None # Vacuum error
self.vacuum_api = None # Vacuum API
self.vacuum_ips = None # Vacuum IPs
self.vac_json_id = None # Vacuum json id
self.margins = "100" # Image margins
self.obstacles_data = None # Obstacles data
self.obstacles_pos = None # Obstacles position
self.offset_top = 0 # Image offset top
self.offset_down = 0 # Image offset down
self.offset_left = 0 # Image offset left
self.offset_right = 0 # Image offset right
self.export_svg = False # Export SVG
self.svg_path = None # SVG Export path
self.enable_snapshots = False # Enable snapshots
self.file_name = file_name # vacuum friendly name as File name
self.attr_calibration_points = None # Calibration points of the image
self.map_rooms = None # Rooms data from the vacuum
self.map_pred_zones = None # Predefined zones data
self.map_pred_points = None # Predefined points data
self.map_new_path = None # New path data
self.map_old_path = None # Old path data
self.user_language = None # User language
self.vacuum_status_size: int = 50
self.vacuum_status_position: bool = True
self.snapshot_take = False
self.vacuum_error = None
self.vacuum_api = None
self.vacuum_ips = None
self.vac_json_id = None
self.margins = "100"
self.obstacles_data = None
self.obstacles_pos = None
self.offset_top = 0
self.offset_down = 0
self.offset_left = 0
self.offset_right = 0
self.export_svg = False
self.svg_path = None
self.enable_snapshots = False
self.file_name = file_name
self.attr_calibration_points = None
self.map_rooms = None
self.map_pred_zones = None
self.map_pred_points = None
self.map_new_path = None
self.map_old_path = None
self.user_language = None
self.trim_crop_data = None
self.trims = TrimsData.from_dict(DEFAULT_VALUES["trims_data"]) # Trims data
self.trims = TrimsData.from_dict(DEFAULT_VALUES["trims_data"])
self.skip_room_ids: List[str] = []
self.device_info = None # Store the device_info
self.device_info = None

def vacuum_bat_charged(self) -> bool:
"""Check if the vacuum is charging."""
return (self.vacuum_state == "docked") and (int(self.vacuum_battery) < 100)

@staticmethod
def _compose_obstacle_links(vacuum_host_ip: str, obstacles: list) -> list | None:
"""
Compose JSON with obstacle details including the image link.
"""
"""Compose JSON with obstacle details including the image link."""
obstacle_links = []
if not obstacles or not vacuum_host_ip:
return None

for obstacle in obstacles:
# Extract obstacle details
label = obstacle.get("label", "")
points = obstacle.get("points", {})
image_id = obstacle.get("id", "None")

if label and points and image_id and vacuum_host_ip:
# Append formatted obstacle data
if image_id != "None":
# Compose the link
image_link = (
f"http://{vacuum_host_ip}"
f"/api/v2/robot/capabilities/ObstacleImagesCapability/img/{image_id}"
)
obstacle_links.append(
{
"point": points,
"label": label,
"link": image_link,
}
{"point": points, "label": label, "link": image_link}
)
else:
obstacle_links.append(
{
"point": points,
"label": label,
}
)
obstacle_links.append({"point": points, "label": label})
return obstacle_links

def update_user_colors(self, user_colors):
"""Update the user colors."""
"""Update user colors palette"""
self.user_colors = user_colors

def get_user_colors(self):
"""Get the user colors."""
"""Return user colors"""
return self.user_colors

def update_rooms_colors(self, user_colors):
"""Update the rooms colors."""
self.rooms_colors = user_colors

def get_rooms_colors(self):
"""Get the rooms colors."""
"""Return rooms colors"""
return self.rooms_colors

def reset_trims(self) -> dict:
Expand All @@ -185,7 +168,7 @@ def reset_trims(self) -> dict:
return self.trims

async def batch_update(self, **kwargs):
"""Batch update multiple attributes."""
"""Update the data of Shared in Batch"""
for key, value in kwargs.items():
setattr(self, key, value)

Expand All @@ -210,24 +193,32 @@ def generate_attributes(self) -> dict:
)
attrs[ATTR_OBSTACLES] = self.obstacles_data

if self.enable_snapshots:
attrs[ATTR_SNAPSHOT] = self.snapshot_take
else:
attrs[ATTR_SNAPSHOT] = False
attrs[ATTR_SNAPSHOT] = self.snapshot_take if self.enable_snapshots else False

# Add dynamic shared attributes if they are available
shared_attrs = {
ATTR_ROOMS: self.map_rooms,
ATTR_ZONES: self.map_pred_zones,
ATTR_POINTS: self.map_pred_points,
}

for key, value in shared_attrs.items():
if value is not None:
attrs[key] = value

return attrs

def to_dict(self) -> dict:
"""Return a dictionary with image and attributes data."""
return {
"image": {
"binary": self.binary_image,
"pil_image_size": self.new_image.size,
"size": self.new_image.size if self.new_image else None,
"format": self.image_format,
"updated": self.image_last_updated,
},
"attributes": self.generate_attributes(),
}


class CameraSharedManager:
"""Camera Shared Manager class."""
Expand Down
19 changes: 18 additions & 1 deletion SCR/valetudo_map_parser/config/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import logging
import threading
from dataclasses import asdict, dataclass
from typing import Any, Dict, Optional, Tuple, TypedDict, Union
from typing import Any, Dict, Optional, Tuple, TypedDict, Union, List, NotRequired

import numpy as np
from PIL import Image
Expand All @@ -18,6 +18,23 @@

LOGGER = logging.getLogger(__package__)

class Spot(TypedDict):
name: str
coordinates: List[int] # [x, y]

class Zone(TypedDict):
name: str
coordinates: List[List[int]] # [[x1, y1, x2, y2, repeats], ...]

class Room(TypedDict):
name: str
id: int

class Destinations(TypedDict, total=False):
spots: NotRequired[Optional[List[Spot]]]
zones: NotRequired[Optional[List[Zone]]]
rooms: NotRequired[Optional[List[Room]]]
updated: NotRequired[Optional[int]]

class RoomProperty(TypedDict):
number: int
Expand Down
Loading