/
client.py
116 lines (98 loc) · 3.91 KB
/
client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
"""Baby buddy client class"""
from __future__ import annotations
import logging
from asyncio import TimeoutError
from datetime import datetime, time
from http import HTTPStatus
from typing import Any
import async_timeout
import homeassistant.util.dt as dt_util
from aiohttp.client import ClientSession
from aiohttp.client_exceptions import ClientError, ClientResponseError
from .errors import AuthorizationError, ConnectError, ValidationError
_LOGGER = logging.getLogger(__name__)
class BabyBuddyClient:
"""Class for babybuddy API interface."""
def __init__(
self, host: str, port: int, api_key: str, session: ClientSession
) -> None:
"""Initialize the client."""
self.headers = {"Authorization": f"Token {api_key}"}
self.url = f"{host}:{port}"
self.session = session
self.endpoints: dict[str, str] = {}
async def async_get(
self, endpoint: str | None = None, entry: str | None = None
) -> Any:
"""GET request to babybuddy API."""
url = f"{self.url}/api/"
if endpoint:
url = self.endpoints[endpoint]
if entry:
url = f"{url}{entry}"
with async_timeout.timeout(10):
resp = await self.session.get(
url=url,
headers=self.headers,
raise_for_status=True,
)
return await resp.json()
async def async_post(self, endpoint: str, data: dict[str, Any]) -> None:
"""POST request to babybuddy API."""
_LOGGER.debug(f"POST data: {data}")
try:
with async_timeout.timeout(10):
resp = await self.session.post(
self.endpoints[endpoint],
headers=self.headers,
data=data,
)
except (TimeoutError, ClientError) as err:
_LOGGER.error(err)
if resp.status != HTTPStatus.CREATED:
error = await resp.json()
_LOGGER.error(f"Could not create {endpoint}. error: {error}")
async def async_patch(
self, endpoint: str, entry: str, data: dict[str, str]
) -> None:
try:
with async_timeout.timeout(10):
resp = await self.session.patch(
f"{self.endpoints[endpoint]}{entry}/",
headers=self.headers,
data=data,
)
except (TimeoutError, ClientError) as err:
_LOGGER.error(err)
if resp.status != HTTPStatus.OK:
error = await resp.json()
_LOGGER.error(f"Could not update {endpoint}/{entry}. error: {error}")
async def async_delete(self, endpoint: str, entry: str) -> None:
try:
with async_timeout.timeout(10):
resp = await self.session.delete(
f"{self.endpoints[endpoint]}{entry}/",
headers=self.headers,
)
except (TimeoutError, ClientError) as err:
_LOGGER.error(err)
if resp.status != 204:
error = await resp.json()
_LOGGER.error(f"Could not delete {endpoint}/{entry}. error: {error}")
async def async_connect(self) -> None:
"""Check connection to babybuddy API."""
try:
self.endpoints = await self.async_get()
except ClientResponseError as err:
raise AuthorizationError from err
except (TimeoutError, ClientError) as err:
raise ConnectError(err) from err
def get_datetime_from_time(value: datetime | time) -> datetime:
"""Return datetime for start/end/time service fields."""
if isinstance(value, time):
value = datetime.combine(dt_util.now().date(), value, dt_util.DEFAULT_TIME_ZONE)
if isinstance(value, datetime):
value = value.replace(tzinfo=dt_util.DEFAULT_TIME_ZONE)
if value > dt_util.now():
raise ValidationError("Time cannot be in the future.")
return value