-
Notifications
You must be signed in to change notification settings - Fork 0
/
services.py
80 lines (67 loc) · 2.7 KB
/
services.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
"""Monzo-specific services"""
from __future__ import annotations
from collections.abc import Callable
from datetime import datetime
from json import dumps
from socket import gethostname
from typing import Any
from helpers import HAExceptionCatcher, get_secret
from requests import get
from requests.exceptions import ConnectionError as RequestsConnectionError
from wg_utilities.clients import MonzoClient
if gethostname() != "homeassistant":
# pylint: disable=ungrouped-imports
from helpers import local_setup
log, async_mock, sync_mock, decorator, _ = local_setup()
task = async_mock
sensor = sync_mock
sensor.ipv4_address_eth0 = "0.0.0.0"
service: Callable[..., Callable[..., Any]] = decorator
CACHE_PATH = None
else:
CACHE_PATH = "/config/.credentials/monzo_api_creds.json"
MODULE_NAME = "monzo"
MONZO = MonzoClient(
client_id=get_secret("client_id", module=MODULE_NAME),
client_secret=get_secret("client_secret", module=MODULE_NAME),
redirect_uri=f"http://{sensor.ipv4_address_eth0}:5001/get_auth_code",
creds_cache_path=CACHE_PATH,
)
@service
def top_up_credit_card_pot(top_up_amount: float) -> None:
"""Top up the Monzo credit card pot with a chosen amount
Args:
top_up_amount (float): the amount of money to put into the pot, in GBP
Raises:
Exception: if something goes wrong in the entire process. AN attempt is made
to kill the temp server is made if this occurs
"""
# convert GBP into pence
top_up_amount = round(top_up_amount * 100)
with HAExceptionCatcher(MODULE_NAME, "top_up_credit_card_pot"):
try:
credit_card_pot = task.executor(MONZO.get_pot_by_name, "credit cards")
log.info("DEPOSITING %f INTO %s", top_up_amount, str(credit_card_pot))
log.info(type(credit_card_pot))
task.executor(
MONZO.deposit_into_pot,
credit_card_pot,
amount_pence=top_up_amount,
dedupe_id=f"hass-{top_up_amount}-"
+ datetime.today().strftime("%Y%m%d%H%M%S"),
)
except Exception as exc:
if hasattr(exc, "response"):
res_json = exc.response.json()
log.info(dumps(res_json, default=str))
log.error("KILLING SERVER")
try:
task.executor(get, f"http://{sensor.ipv4_address_eth0}:5001/kill")
log.info("KILLED")
except (ConnectionError, RequestsConnectionError) as connection_exc:
log.warning(
"UNABLE TO KILL SERVER: %s - %s",
type(connection_exc).__name__,
str(connection_exc),
)
raise