In [221]:
import typing as t
import numpy as np
import tensorflow as tf
from abc import ABC, abstractmethod
from datetime import datetime, time as Time, timedelta

MAX_STATES = 6
N_OF_STATES = 2**MAX_STATES

STATE_IS = 0
STATE_SHOULD_BE = 1

learning_rate = 0.01

def float_to_bool(action: np.ndarray):
    return action >= 0.5

def bool_to_float(state: np.ndarray):
    return state.astype(float)

def reward_function(n: int, lambda_: float = 1) -> float:
    return (lambda_ * np.e)**(-lambda_*n)

In [222]:
class HomeAssistantTrainingEnvironment:
    state: np.ndarray
    timestep: int

    def __init__(self, episodes: t.List[t.Tuple[np.ndarray, np.ndarray]]):
        self.episodes = episodes
        self.reset()
    
    def reset(self):
        self.state = np.zeros(shape=(MAX_STATES, ), dtype=float)
        self.timestep = 0
        return self.state
    
    def reward(self, action: np.ndarray):
        _, state_should_be = self.episodes[self.timestep]
        return -np.sum(np.square(action - state_should_be))
        # action
        # return reward_function()

    
    
    def step(self, action: np.ndarray):
        """
        args:
            action should be a binary tensor with ones in places where the action should be flipped

        returs:
            next state - next_state of the system
            reward -
            done - 
        """
        
        # performed_action = float_to_bool(action)
        reward = self.reward(action)
        
        self.timestep += 1
        try:
            return self.episodes[self.timestep][STATE_IS], reward, False
        except IndexError:
            return self.episodes[self.timestep - 1][STATE_IS], reward, True

In [223]:

def create_dnn_network(parameters: t.Sequence[str], values: t.Sequence[str]):
    inputs = {}
    outputs = {}

    for parameter in parameters:
        inputs[parameter] = tf.keras.layers.Input(shape=(1,), name=parameter)
    
    input = tf.keras.layers.Concatenate(axis=-1)(list(inputs.values()))
    proper_input = tf.keras.layers.Flatten()(input)
    layer = tf.keras.layers.Dense(128, activation='relu')(proper_input)
    layer = tf.keras.layers.Dense(64, activation='relu')(layer)
    
    for value in values:
        outputs[value] = tf.keras.layers.Dense(1, activation='tanh')(layer)

    return tf.keras.models.Model(inputs=inputs, outputs=outputs)


In [224]:
class DQNAgent:

    def __init__(self, num_actions=MAX_STATES, gamma=0.99):
        self.num_actions = num_actions
        self.model = QNetwork(num_actions)
        self.optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
        self.model.compile(loss='mse', optimizer=self.optimizer)
        self.gamma = gamma


        # self.target_network = QNetwork(num_actions)
        # self.target_network.compile(loss='mse', optimizer=self.optimizer)
        
    def select_action(self, state, epsilon):
        if np.random.rand() < epsilon:
            # Explore: choose a random action
            return np.random.random(size=(1, MAX_STATES))
        else:
            # Exploit: choose the action with the highest Q-value
            # Predict Q-values for the current state
            # Select the action with the highest Q-value
            q_values = self.model(state.reshape(1, MAX_STATES))
            return q_values  
        
    def train(self, state: np.ndarray, action: np.ndarray, reward: float, next_state: np.ndarray, done: bool):

        target = reward
        if not done:
            target = reward + self.gamma * np.max(self.model.predict(next_state.reshape(1, MAX_STATES))[0])
        target_f = self.model.predict(next_state.reshape(1, MAX_STATES))
        self.model.fit(state.reshape(1, MAX_STATES), target_f, epochs=1, verbose=0)
        
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay


In [1]:
class DNNAgent:
    def __init__(self, parameters: t.Sequence[str], values: t.Sequence[str]):
        self.model = create_dnn_network(parameters, values)
        self.optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
        self.loss = tf.losses.MeanSquaredError(reduction='sum')
        self.model.compile(optimizer=self.optimizer, loss=self.loss)

    def train(self, x: t.Dict[str, np.ndarray], y: t.Dict[str, np.ndarray], epochs: int):
        return self.model.fit(x, y, 16, epochs=epochs, verbose=True)

    def predict(self, x: t.Dict[str, np.ndarray]) -> t.Dict[str, np.ndarray]:
        return self.model.predict(x, verbose=False)
        

NameError: name 't' is not defined

In [226]:
T = t.TypeVar('T')
class Convertable(t.Generic[T], ABC):
    
    @abstractmethod
    def convert_to(self, x: T) -> float: ...

    @abstractmethod
    def convert_from(self, x: float) -> T: ...

class AnyConvertable(Convertable[t.Any]):
    def convert_from(self, x: float) -> t.Any:
        return x
    
    def convert_to(self, x: t.Any) -> float:
        return x 

In [227]:
class TimeConvertable(Convertable[Time]):
    SECONDS_IN_A_DAY = 60 * 60 * 24

    def convert_to(self, x: Time) -> float:
        seconds = timedelta(hours=x.hour, minutes=x.minute, seconds=x.second).total_seconds()
        return np.cos(seconds * np.pi / self.SECONDS_IN_A_DAY)

    def convert_from(self, _: float) -> Time:
        raise AttributeError('convert_from is not needed')
    

In [228]:
class ModelManager:
    agents: t.Dict[str, DNNAgent] = {}
    converters: t.Dict[str, Convertable] = {}

    def __init__(self, inputs: t.Dict[str, Convertable], outputs: t.Sequence[str]):
        self.converters = inputs

        for name in outputs:
            self.agents[name] = DNNAgent(inputs.keys(), [name])


    def fit(self, x: t.Sequence[dict], y: t.Sequence[dict], epochs: int):
        x = {i: np.array([self.converters[i].convert_to(d[i]) for d in x]) for i in self.converters.keys()}

        for name, agent in self.agents.items():
            y_new = {name: np.array([d[name] for d in y])}
            print(f"training {name}")
            agent.train(x, y_new, epochs=epochs)

    def predict(self, x: t.Sequence[dict]):
        x = {i: np.array([self.converters[i].convert_to(d[i]) for d in x]) for i in self.converters.keys()}
        ret = {}
        for _, agent in self.agents.items():
            ret.update(agent.predict(x))
        return ret

In [233]:
# generate episodes here
np.random.seed(0x1337)

# IS_1 = np.array([False, False, False, False, False, False], dtype=float)
# SHOULD_BE_1 = np.array([False, True, True, True, False, False], dtype=float)

# IS_2 = np.array([False, False, False, False, False, True], dtype=float)
# SHOULD_BE_2 = np.array([False, False, True, False, False, False], dtype=float)

# IS_3 = np.array([True, True, False, False, False, False], dtype=float)
# SHOULD_BE_3 = np.array([False, True, True, False, True, False], dtype=float)

# EPISODES = []

# for i in range(30):
#     EPISODES.append((IS_1, SHOULD_BE_1))
#     EPISODES.append((IS_2, SHOULD_BE_2))
#     EPISODES.append((IS_3, SHOULD_BE_3))

# ARR = np.array(EPISODES)
get_random = lambda: float(np.random.random() > 0.5)
get_time = lambda x: Time(hour=(hours := x // 3600), minute=(x - (hours * 3600)) // 60, second=x % 60)

IS_1 = {'godzina': Time(10, 30), 'kuchnia': 0.0, 'salon': 0.0, 'swiatlo': 1.0, 'komputer': 0.0, 'wiatrolap': 0.0}
SHOULD_BE_1 = {'kuchnia': 0.0, 'salon': 0.0, 'swiatlo': -1.0, 'komputer': 1.0, 'wiatrolap': 0.0}

IS_2 = {'godzina': Time(14, 30), 'kuchnia': 1.0, 'salon': 0.0, 'swiatlo': 0.0, 'komputer': 0.0, 'wiatrolap': 0.0}
SHOULD_BE_2 = {'kuchnia': -1.0, 'salon': 1.0, 'swiatlo': 1.0, 'komputer': 0.0, 'wiatrolap': 0.0}


x, y = [], []
for i in range(300):
    x.append({'godzina': get_time(i * 250), 'kuchnia': 0.0, 'salon': 0.0, 'swiatlo': 1.0, 'komputer': 0.0, 'wiatrolap': 0.0})
    y.append({'kuchnia': 0.0, 'salon': 0.0, 'swiatlo': 0.0, 'komputer': 0.0, 'wiatrolap': 0.0})

    x.append(IS_1)
    y.append(SHOULD_BE_1)

    x.append(IS_2)
    y.append(SHOULD_BE_2)

In [234]:
agent = ModelManager({
    'godzina': TimeConvertable(),
    'kuchnia': AnyConvertable(),
    'salon': AnyConvertable(),
    'swiatlo': AnyConvertable(),
    'komputer': AnyConvertable(),
    'wiatrolap': AnyConvertable()},
    ['kuchnia', 'salon', 'swiatlo', 'komputer', 'wiatrolap']
)

In [235]:
agent.fit(x, y, 1)

training kuchnia
training salon
training swiatlo
training komputer
training wiatrolap


In [237]:
agent.predict([IS_1])

{'kuchnia': array([[-0.00990071]], dtype=float32),
 'salon': array([[0.01189921]], dtype=float32),
 'swiatlo': array([[-0.7892243]], dtype=float32),
 'komputer': array([[0.83891654]], dtype=float32),
 'wiatrolap': array([[-0.00027093]], dtype=float32)}

In [238]:
SHOULD_BE_1

{'kuchnia': 0.0,
 'salon': 0.0,
 'swiatlo': -1.0,
 'komputer': 1.0,
 'wiatrolap': 0.0}