In [1]:
import serial
import time
import random
random.seed(1)
import pandas as pd
import numpy as np

import plotly.express as px
import plotly.io as pio

In [2]:
import serial_interface as si

### Environment Class

In [3]:
def load_and_prep_data(data_path):
    raw_df = pd.read_csv(data_path)
    data_df = raw_df.copy()
    # Make current positive
    data_df = data_df.drop_duplicates(subset=['motor_1_position','motor_2_position'])
    data_df['I_ivp_1'] = data_df['I_ivp_1'].abs()
    data_df['power'] = data_df['I_ivp_1'] * data_df['V_ivp_1']
    return data_df

In [4]:
def convert_motor_positions_to_index(position_tuple):
    # position tuple is (m1 position, m2 position)
    return (int(position_tuple[0]//5), int(position_tuple[1]//5))

def convert_index_to_motor_positions(index_tuple):
    return (index_tuple[0]*5, index_tuple[1]*5)

In [5]:
def arg_max_array_index(array):
    # Finds the argmax and randomly selects if multiple
    array_max = -10000
    max_list = []
    for i in range(array.shape[0]):
        for j in range(array.shape[1]):
            if array[i][j] > array_max:
                array_max = array[i][j]
                max_list = [[i,j]]
            elif array[i][j] == array_max:
                max_list.append([i,j])
    if len(max_list) > 1:
        max_index = random.choice(max_list)
    else:
        max_index = max_list[0]
    return max_index

def random_array_index(array):
    x_index = range(array.shape[0])
    y_index = range(array.shape[1])
    return (random.choice(x_index), random.choice(y_index))

In [25]:
class SolarEnv:
    def __init__(self, reward_data_path, shape=(37,37)):
        self.shape = shape
        self.reward_array = np.zeros(shape)
        # load in reward data
        rewards = load_and_prep_data(reward_data_path)
        for index, row in rewards.iterrows():
            motor_1_index = int(row['motor_1_position'].item()//5)
            motor_2_index = int(row['motor_2_position'].item()//5)
            position_reward = row['power'].item()
            self.reward_array[motor_1_index][motor_2_index] = position_reward
    
    # For debugging
    def get_reward_array(self):
        return self.reward_array
    
    def get_env_shape(self):
        return self.reward_array.shape
                                          
    # Not needed right now
    def env_init(self):
        """
        Setup for the environment called when the experiment first starts.
        """
        pass
    
    # Not needed right now
    def env_start(self):
        """
        The first method called when the experiment starts, called before the
        agent starts.

        Returns:
            The first state from the environment.
        """
        pass
    
    def env_step(self, action):
        """A step taken by the environment.

        Args:
            action: The action taken by the agent, a tuple of motor positions

        Returns:
            (float, state): a tuple of the reward, state
        """
        index_tuples = convert_motor_positions_to_index(action)
        return self.reward_array[index_tuples[0]][index_tuples[1]], convert_index_to_motor_positions(index_tuples)

Visualizing the reward array

### Agent Class

Agent will start out as a TD(0) agent with epsilon-greedy policy
* V(st) = V(st) + step_size * [Rewardt+1 + discount_factor * V(st+1) - V(st)]

In [29]:
class SolarAgent:
    def __init__(self, step_size, epsilon, discount_factor, initialization_value, env):
        self.step_size = step_size
        self.epsilon = epsilon
        self.discount_factor = discount_factor
        self.env_shape = env.get_env_shape()
        self.state_values = np.full(self.env_shape, initialization_value)
        self.last_state = None
        self.state = None
        self.last_reward = None
        self.reward = None
        self.env = env
        self.total_energy = 0
    
    def get_state_value_array(self):
        return self.state_values
    
    def agent_policy(self):
        # if random greedy
        if random.random() <= self.epsilon:
            action = random_array_index(self.state_values)
        # otherwise arg max
        else:
            action = arg_max_array_index(self.state_values)
        return convert_index_to_motor_positions(action)
    
    def agent_start(self):
        """The first method called when the experiment starts, called after
        the environment starts.
        Args:
            state (Numpy array): the state from the
                environment's evn_start function.
        Returns:
            self.last_action [int] : The first action the agent takes.
        """
        self.last_state = (90,90)
        self.last_reward = 0
        self.state, self.reward = env.env_step(self.last_state)
    
    def get_state_values(self, state):
        converted_index = convert_motor_positions_to_index(state)
        return self.state_values[converted_index[0]][converted_index[1]]
    
    def agent_step(self):
        """A step taken by the agent.
        Args:
            reward [float]: the reward received for taking the last action taken
            state [int]: the state from the environment's step, where the agent ended up after the last step
        Returns:
            self.last_action [int] : The action the agent is taking.
        """
        # Make a policy decision
        action = self.agent_policy()
        
        # Interact with the environment
        reward, next_state = self.env.env_step(action)
        
        # TD Update
        new_state_value = self.get_state_values(next_state)
        last_state_value = self.get_state_values(self.last_state)
        error_term = reward + self.discount_factor * new_state_value - last_state_value
        last_state_index = convert_motor_positions_to_index(self.last_state)
        self.state_values[last_state_index[0]][last_state_index[1]] = last_state_value + self.step_size * error_term
        
        # Update internal variables
        self.last_state = next_state
        
        # For tracking
        self.total_energy += reward
    
    def get_agent_energy(self):
        return self.total_energy
    
    def agent_end(self, reward):
        """Run when the agent terminates.
        Args:
            reward (float): the reward the agent received for entering the
                terminal state.
        """
        pass

### Testing the Agent

In [30]:
data_path = '../../../rl_agent/simulation_data/data/corrected_motors/run_5_kitchen_no_lights.csv'
env = SolarEnv(reward_data_path=data_path, shape=(37,37))
agent = SolarAgent(step_size=0.1, epsilon=0.05, discount_factor=0.1, initialization_value=10, env=env)

### Resume here with experiment

In [31]:
steps = 10000
array_list = []
agent.agent_start()
for i in range(steps):
    agent.agent_step()
    if i%100 == 0:
        array_list.append(agent.get_state_value_array())
        print('Step: ', i)
        print('Total power: ', agent.get_agent_energy())

Step:  0
Total power:  0.00213
Step:  100
Total power:  0.40814600000000006
Step:  200
Total power:  0.7191800000000003
Step:  300
Total power:  1.0726190000000009
Step:  400
Total power:  1.4569030000000005
Step:  500
Total power:  1.8434200000000007
Step:  600
Total power:  2.170594000000002
Step:  700
Total power:  2.478278000000003
Step:  800
Total power:  2.749764000000004
Step:  900
Total power:  3.068588000000004
Step:  1000
Total power:  3.435232000000005
Step:  1100
Total power:  3.782514000000005
Step:  1200
Total power:  4.109903000000006
Step:  1300
Total power:  4.471809000000007
Step:  1400
Total power:  4.816803000000012
Step:  1500
Total power:  5.159343000000013
Step:  1600
Total power:  5.509080000000017
Step:  1700
Total power:  5.852434000000018
Step:  1800
Total power:  6.22591300000002
Step:  1900
Total power:  6.535545000000019
Step:  2000
Total power:  6.86578200000002
Step:  2100
Total power:  7.233832000000017
Step:  2200
Total power:  7.551704000000019
Step: 

----
----

In [None]:
# Request codes
MOTOR_CONTROL = 1000
STATE_REQUEST = 2000
RESET_CODE = 6666

def scan_space(arduino):
    # Run start
    run_start = time.time()
    data_dict_list = []
    last_motor_interval = 0
    last_measure_interval = -1
    motor_frequency = 2
    measure_frequency = 1
    # Set timeouts
    abort = False
    
    for xy_degree in range(0, 181, 5):
        for yz_degree in range(0, 181, 5):
            si.write_serial_line(arduino, [MOTOR_CONTROL, xy_degree, yz_degree], print_message=False)
            new_message, abort = si.listen_for_serial(arduino)
            if new_message is not None and not abort:
                data_dict_list.append(new_message)
            elif abort:
                break
            else:
                print('Empty message received without abort issue')
            time.sleep(0.1) # Wait for steady state
        if abort:
            break
        print('xy:',xy_degree,'yz:',yz_degree)
    # Write back to start state
    write_serial_line(arduino, [si.MOTOR_CONTROL, 90, 90])

    return pd.DataFrame(data_dict_list)

In [None]:
if __name__ == '__main__':
    print('\nARDUINO CONTROL TESTING')
    print('-------------------------')
    # Initialize serial port
    print('\nIniitalizing device...')
    serial_port = '/dev/cu.usbmodem14101'
    baud_rate = 9600
    timeout = 5
    arduino = si.initialize_serial(serial_port=serial_port, baud_rate=baud_rate, timeout=timeout)
    print('\t - SUCCESS: Device initialized.')
    
    si.write_serial_line(arduino, [MOTOR_CONTROL, 90, 90])

    # Run a loop where motor position incremented every 5 seconds, print out message
    print('\nBeginning loop sequence...')
#     data = scan_space(arduino)
    print('\t - Loop complete.')

    # Add relative time to returned data and print out
#     data['t_relative'] = data['timestamp'] - data['timestamp'].iloc[0]
    print('\nData broadcasted by Arduino:\n')
    
#     data.to_csv('/Users/jackogrady/Git/rl-solar/rl_agent/simulation_data/data/run_6_kitchen_no_lights_swapped_motors.csv', index=False)

In [None]:
print(data)

In [None]:
write_serial_line(arduino, [1000, 180, 90])