Skip to content

Commit

Permalink
Missing files
Browse files Browse the repository at this point in the history
  • Loading branch information
kylegordon committed May 13, 2024
1 parent a42ec85 commit 71a1e48
Show file tree
Hide file tree
Showing 13 changed files with 1,048 additions and 0 deletions.
22 changes: 22 additions & 0 deletions custom_components/octopus_energy/api_client/greenness_forecast.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from datetime import datetime

class GreennessForecast:
start: datetime
end: datetime
greenness_score: int
greenness_index: str
highlight_flag: bool

def __init__(
self,
start: datetime,
end: datetime,
greenness_score: int,
greenness_index: str,
highlight_flag: bool
):
self.start = start
self.end = end
self.greenness_score = greenness_score
self.greenness_index = greenness_index
self.highlight_flag = highlight_flag
29 changes: 29 additions & 0 deletions custom_components/octopus_energy/api_client/intelligent_device.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
class IntelligentDevice:
krakenflexDeviceId: str
provider: str
vehicleMake:str
vehicleModel: str
vehicleBatterySizeInKwh: float | None
chargePointMake: str
chargePointModel: str
chargePointPowerInKw: float | None

def __init__(
self,
krakenflexDeviceId: str,
provider: str,
vehicleMake:str,
vehicleModel: str,
vehicleBatterySizeInKwh: float | None,
chargePointMake: str,
chargePointModel: str,
chargePointPowerInKw: float | None
):
self.krakenflexDeviceId = krakenflexDeviceId
self.provider = provider
self.vehicleMake = vehicleMake
self.vehicleModel = vehicleModel
self.vehicleBatterySizeInKwh = vehicleBatterySizeInKwh
self.chargePointMake = chargePointMake
self.chargePointModel = chargePointModel
self.chargePointPowerInKw = chargePointPowerInKw
11 changes: 11 additions & 0 deletions custom_components/octopus_energy/api_client/octoplus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
class RedeemOctoplusPointsResponse:
is_successful: bool
errors: list[str]

def __init__(
self,
is_successful: bool,
errors: list[str]
):
self.is_successful = is_successful
self.errors = errors
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import logging
from datetime import datetime, timedelta

from homeassistant.util.dt import (now)
from homeassistant.helpers.update_coordinator import (
DataUpdateCoordinator
)

from ..const import (
COORDINATOR_REFRESH_IN_SECONDS,
DATA_GREENNESS_FORECAST_COORDINATOR,
DOMAIN,
DATA_CLIENT,
DATA_GREENNESS_FORECAST,
REFRESH_RATE_IN_MINUTES_GREENNESS_FORECAST,
)

from ..api_client import ApiException, OctopusEnergyApiClient
from . import BaseCoordinatorResult
from ..api_client.greenness_forecast import GreennessForecast

_LOGGER = logging.getLogger(__name__)

class GreennessForecastCoordinatorResult(BaseCoordinatorResult):
last_retrieved: datetime
forecast: list[GreennessForecast]

def __init__(self, last_retrieved: datetime, request_attempts: int, forecast: list[GreennessForecast]):
super().__init__(last_retrieved, request_attempts, REFRESH_RATE_IN_MINUTES_GREENNESS_FORECAST)
self.forecast = forecast

async def async_refresh_greenness_forecast(
current: datetime,
client: OctopusEnergyApiClient,
existing_result: GreennessForecastCoordinatorResult
) -> GreennessForecastCoordinatorResult:
if existing_result is None or current >= existing_result.next_refresh:
try:
result = await client.async_get_greenness_forecast()

return GreennessForecastCoordinatorResult(current, 1, result)
except Exception as e:
if isinstance(e, ApiException) == False:
raise

result = None
if (existing_result is not None):
result = GreennessForecastCoordinatorResult(existing_result.last_retrieved, existing_result.request_attempts + 1, existing_result.forecast)
_LOGGER.warning(f'Failed to retrieve greenness forecast - using cached data. Next attempt at {result.next_refresh}')
else:
result = GreennessForecastCoordinatorResult(
# We want to force into our fallback mode
current - timedelta(minutes=REFRESH_RATE_IN_MINUTES_GREENNESS_FORECAST),
2,
None
)
_LOGGER.warning(f'Failed to retrieve greenness forecast. Next attempt at {result.next_refresh}')

return result

return existing_result

async def async_setup_greenness_forecast_coordinator(hass, account_id: str):
async def async_update_data():
"""Fetch data from API endpoint."""
current = now()
client: OctopusEnergyApiClient = hass.data[DOMAIN][account_id][DATA_CLIENT]

hass.data[DOMAIN][account_id][DATA_GREENNESS_FORECAST] = await async_refresh_greenness_forecast(
current,
client,
hass.data[DOMAIN][account_id][DATA_GREENNESS_FORECAST] if DATA_GREENNESS_FORECAST in hass.data[DOMAIN][account_id] else None
)

return hass.data[DOMAIN][account_id][DATA_GREENNESS_FORECAST]

hass.data[DOMAIN][account_id][DATA_GREENNESS_FORECAST_COORDINATOR] = DataUpdateCoordinator(
hass,
_LOGGER,
name=f"{account_id}_greenness_forecast",
update_method=async_update_data,
# Because of how we're using the data, we'll update every minute, but we will only actually retrieve
# data every 30 minutes
update_interval=timedelta(seconds=COORDINATOR_REFRESH_IN_SECONDS),
always_update=True
)

return hass.data[DOMAIN][account_id][DATA_GREENNESS_FORECAST_COORDINATOR]
224 changes: 224 additions & 0 deletions custom_components/octopus_energy/cost_tracker/cost_tracker_month.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
from datetime import datetime
import logging

from homeassistant.core import Event, HomeAssistant, callback
from homeassistant.exceptions import ServiceValidationError
from homeassistant.helpers.entity import generate_entity_id
from homeassistant.util.dt import (now)

from homeassistant.components.sensor import (
RestoreSensor,
SensorDeviceClass,
SensorStateClass,
)

from homeassistant.helpers.event import (
EventStateChangedData,
async_track_state_change_event,
async_track_entity_registry_updated_event,
)

from homeassistant.const import (
STATE_UNAVAILABLE,
STATE_UNKNOWN,
)

from ..const import (
CONFIG_COST_MONTH_DAY_RESET,
CONFIG_COST_NAME,
DOMAIN,
)

from . import accumulate_cost

from ..utils.attributes import dict_to_typed_dict

_LOGGER = logging.getLogger(__name__)

class OctopusEnergyCostTrackerMonthSensor(RestoreSensor):
"""Sensor for calculating the cost for a given sensor over the course of a month."""

def __init__(self, hass: HomeAssistant, config_entry, config, tracked_entity_id: str, peak_type = None):
"""Init sensor."""
# Pass coordinator to base class

self._state = None
self._config = config
self._attributes = self._config.copy()
self._attributes["total_consumption"] = 0
self._attributes["accumulated_data"] = []
self._last_reset = None
self._tracked_entity_id = tracked_entity_id
self._config_entry = config_entry
self._peak_type = peak_type

self._hass = hass
self.entity_id = generate_entity_id("sensor.{}", self.unique_id, hass=hass)

@property
def entity_registry_enabled_default(self) -> bool:
"""Return if the entity should be enabled when first added.
This only applies when fist added to the entity registry.
"""
return self._peak_type is None

@property
def unique_id(self):
"""The id of the sensor."""
base_name = f"octopus_energy_cost_tracker_{self._config[CONFIG_COST_NAME]}_month"
if self._peak_type is not None:
return f"{base_name}_{self._peak_type}"

return base_name

@property
def name(self):
"""Name of the sensor."""
base_name = f"Octopus Energy Cost Tracker {self._config[CONFIG_COST_NAME]} Month"
if self._peak_type is not None:
return f"{base_name} ({self._peak_type})"

return base_name


@property
def device_class(self):
"""The type of sensor"""
return SensorDeviceClass.MONETARY

@property
def state_class(self):
"""The state class of sensor"""
return SensorStateClass.TOTAL

@property
def native_unit_of_measurement(self):
"""The unit of measurement of sensor"""
return "GBP"

@property
def icon(self):
"""Icon of the sensor."""
return "mdi:currency-gbp"

@property
def extra_state_attributes(self):
"""Attributes of the sensor."""
return self._attributes

@property
def native_value(self):
"""Determines the total cost of the tracked entity."""
return self._state

@property
def last_reset(self):
"""Return the time when the sensor was last reset, if any."""
current: datetime = now()
self._reset_if_new_month(current)

return self._last_reset

async def async_added_to_hass(self):
"""Call when entity about to be added to hass."""
# If not None, we got an initial value.
await super().async_added_to_hass()
state = await self.async_get_last_state()

if state is not None and self._state is None:
self._state = None if state.state in (STATE_UNAVAILABLE, STATE_UNKNOWN) else state.state
self._attributes = dict_to_typed_dict(state.attributes)
# Make sure our attributes don't override any changed settings
self._attributes.update(self._config)

_LOGGER.debug(f'Restored {self.unique_id} state: {self._state}')

self.async_on_remove(
async_track_state_change_event(
self.hass, [self._tracked_entity_id], self._async_calculate_cost
)
)

self.async_on_remove(
async_track_entity_registry_updated_event(
self.hass, [self._tracked_entity_id], self._async_update_tracked_entity
)
)

async def _async_update_tracked_entity(self, event) -> None:
data = event.data
if data["action"] != "update":
return

if "entity_id" in data["changes"]:
new_entity_id = data["entity_id"]
_LOGGER.debug(f"Tracked entity for '{self.entity_id}' updated from '{self._tracked_entity_id}' to '{new_entity_id}'. Reloading...")
await self._hass.config_entries.async_reload(self._config_entry.entry_id)

async def _async_calculate_cost(self, event: Event[EventStateChangedData]):
current = now()
self._reset_if_new_month(current)

new_state = event.data["new_state"]
if new_state is None or new_state.state in (STATE_UNAVAILABLE, STATE_UNKNOWN):
return

_LOGGER.debug(f"Source entity updated for '{self.entity_id}'; Event: {event.data}")

self._recalculate_cost(current, float(new_state.state), float(new_state.attributes["total_consumption"]))

@callback
async def async_reset_cost_tracker(self):
"""Resets the sensor"""
self._state = 0
self._attributes["accumulated_data"] = []
self._attributes["total_consumption"] = 0

self.async_write_ha_state()

@callback
async def async_adjust_accumulative_cost_tracker(self, date, consumption: float, cost: float):
"""Adjusts the sensor"""
selected_date = None
for data in self._attributes["accumulated_data"]:
if data["start"].date() == date:
selected_date = data["start"]

if selected_date is None:
raise ServiceValidationError(
translation_domain=DOMAIN,
translation_key="cost_tracker_invalid_date",
translation_placeholders={
"min_date": self._attributes["accumulated_data"][0]["start"].date(),
"max_date": self._attributes["accumulated_data"][-1]["start"].date()
},
)

self._recalculate_cost(selected_date, cost, consumption)

def _recalculate_cost(self, current: datetime, new_cost: float, new_consumption: float):
result = accumulate_cost(current, self._attributes["accumulated_data"], new_cost, new_consumption)

self._attributes["total_consumption"] = result.total_consumption
self._attributes["accumulated_data"] = result.accumulative_data
self._state = result.total_cost

self.async_write_ha_state()

def _reset_if_new_month(self, current: datetime):
start_of_day = current.replace(hour=0, minute=0, second=0, microsecond=0)
if self._last_reset is None:
self._last_reset = start_of_day
return True

target_day = self._config[CONFIG_COST_MONTH_DAY_RESET] if CONFIG_COST_MONTH_DAY_RESET in self._config else 1
if self._last_reset.day != current.day and current.day == target_day:
self._state = 0
self._attributes["total_consumption"] = 0
self._attributes["accumulated_data"] = []
self._last_reset = start_of_day

return True

return False
Loading

0 comments on commit 71a1e48

Please sign in to comment.