# **Environment: Water Level Control of a Tank**

In [1]:
# Building the custom environment for OpenAi Gym

from gym import Env 
# Env is a placeholder class that allows us to build our environment

from gym.spaces import Box
# Superclass that is used to define observation and action spaces
#`Box` is for continuing-space, `Discrete` for discrete-space and `Dict` for multiple input

import numpy as np
import random

# (This removes np.float32 warnings, but can be solved with np.float64 at "box" definition):
# gym.logger.set_level(40) 

class Model(Env):
# By passing Env to the class Model we defined, we inherit the methods and properties of OpenAI Gym

    def __init__(self):

        # Actions we can take, we only have the valve opening at each timestep (we consider a lower/upper bound of |0.01|)
        self.action_space = Box(low=np.array([-0.01]), high=np.array([0.01]), dtype=np.float64)
        # The function "Box" can support a tensor

        # Valve opening observation array (maximum physically possible opening of 1.0)
        self.valve_opening  = Box(low=np.array([0.00]), high=np.array([1.00]), dtype=np.float64)

        # Water Height observation array (assuming Maximum Height of Tank of 1.0 m)
        self.water_height = Box(low=np.array([0.00]), high=np.array([1.00]), dtype=np.float64)

        # Inflow observation array (assuming Maximum Possible Flow of 150 m³/h) / remember: Inflow is not a state!
        self.inflow = Box(low=np.array([0.00]), high=np.array([150.00]), dtype=np.float64)

        ## Assuming Maximum Possible Flow of 150 m³/h

        ## Area of 3 m²
        ## Diameter of 1.95441 m
        self.tank_diameter = 1.95441

        ##  Calculate Volume of Water inside Tank
        ##  Maximum Possible Volume of 3.0 m³
        self.current_water_volume = (((self.tank_diameter / 2) ** 2) * np.pi) * self.water_height

        ## Valve coefficient (Cv) is given
        self.valve_coefficient = 282.84

        ## Setpoint in terms of the Usual Operation Height of 0.5 m 
        self.setpoint =  0.5

        # Set observation space (reminder: flow isn't measured/ isn't a state)
        self.observation_space = (self.inflow , self.valve_opening, self.water_height)

        # Set initial states (we can instantiate them a bit randomly):
        self.current_inflow = 100 + random.uniform(-1.00, 1.00)
        self.current_valve_opening = 0.5 + random.uniform(-0.10, 0.10)
        self.current_water_height = 0.5 + random.uniform(-0.10, 0.10)
        
        self.state = (self.current_inflow , self.current_valve_opening, self.current_water_height, self.setpoint, self.error())

        # Set episode length
        ## Timestep = 0.01h
        ## Episode total time = 2h (200 timesteps)
        ## Total number of episodes = 2000
        self.time_per_episode = 200

        # Initialize time counter
        self.time_step = 0

    ## Calculates the non-linear outflow rate of water from the tank
    def outflow(self):
        return self.valve_coefficient * self.valve_opening * np.sqrt(self.water_height)

    ## Provides current water height from the most up to date water volume
    def update_water_height(self):
        return self.current_water_volume / (((self.tank_diameter / 2) ** 2) * np.pi)

    ## Error of water height from current set point
    def error(self):
        error=(self.update_water_height - self.setpoint)
        return error
        
    def step(self, action):

        # Flow rate of water + disturbances
        self.current_inflow = self.current_inflow + disturbance(self.time_step)

        ## Current water volume in the tank
        self.current_water_volume = self.current_water_volume + self.current_inflow - self.outflow()

        # Apply action (valve opening)
        # Continuous: [-0.01, 0.01] at each timestep
        self.current_valve_opening += action

        # Add 1 Timestep = 0.01h
        self.time_step += 1
        
        # Calculate reward
        ## Reward: minus the square of height error -(m)^2
        ## Our objective is to minimize this error (or negative reward)
        reward = -((error)**2)

        ## Determine whether it is a terminal state
        terminal = self.is_terminal(self.update_water_height)

        # Set placeholder for info
        info = {}

        # Return step information
        return self.retrieve_observation(), reward, terminal, info

    ## The terminal state is reached if time step reaches more than 200 or if water level is at 2 extremes
    def is_terminal(self, water_h):
        if self.time_step >= self.time_per_episode-1 or update_water_height == 0 or update_water_height == 1:
            return True
        else:
            return False

    ## Disturbances on flow rate:
    def disturbance(self):
        if self.time_step == 10: #0.1 h
            return 20
        elif self.time_step == 100: #1.1 h
            return -20
        else:
            return 0

    ## Retrieve current state
    def retrieve_observation(self):

        self.state = (
            self.current_inflow , self.current_valve_opening, self.current_water_height, self.setpoint, self.error()
        )
        return self.state

    def render(self):
        # Implement visualization for a game environment for example
        pass
    
    ## Reset the current state of the water tank. This involves time_step, water volume, input flow rate of water and error
    def reset(self):

        ## Set point remains fixed:
        self.setpoint = 0.5

        ## Reset time counter and other variables (we can instantiate them as before, a bit randomly)
        self.time_step = 0
        self.current_inflow = 100 + random.uniform(-1.00, 1.00)
        self.current_valve_opening = 0.5 + random.uniform(-0.10, 0.10)
        self.current_water_height = 0.5 + random.uniform(-0.10, 0.10)

        return self.retrieve_observation()

    ## Reset the current state of the water tank. This involves time_step, water volume, input flow rate of water and error
    def reset(self):

        ## Set point remains fixed and water volume in the tank is adjusted randomly at a level of 10–90%:
        self.setpoint = (((self.tank_diameter / 2) ** 2) * np.pi) * 0.5
        
        self.time_step = 0
        self.current_inflow = 100 + random.uniform(-1.00, 1.00)
        self.current_valve_opening = 0.5 + random.uniform(-0.10, 0.10)
        self.current_water_height = 0.5 + random.uniform(-0.10, 0.10)
        
        return self.retrieve_observation()

In [2]:
env = Model()

TypeError: ignored

In [None]:
# This allows us to see numpy arrays with more precision
# (print with a higher number of digits of precision for floating point output)

np.set_printoptions(precision=4) 

In [None]:
# Just to confirm that the observation space (of valve_opening) is a continuum between [0, 1]

env.observation_space.sample()

array([73.0842], dtype=float32)

In [None]:
# Just to confirm that the action space (of valve_opening) is a continuum between [-0.01, 0.01]

env.action_space.sample()

array([0.0022])