In [5]:
'''This code trains an agent (which is a DNN) to mix a set of active particles 
to mix a binary system of passive particles. '''

'This code trains an agent (which is a DNN) to mix a set of active particles \nto mix a binary system of passive particles. '

In [6]:
import numpy as np
import os
from math import pi, ceil, sqrt
import time
import pandas as pd
import pathlib
import ctypes
from IPython.display import clear_output

if not os.path.exists("particle_data"):os.makedirs("particle_data")

Parameters

In [7]:
INT_RAD = 1.0 # Radius of interior particle
DOMAIN_TO_RAD = 15 # Ratio of domain radius to interior particle radius
INT_DIA = 2.0*INT_RAD  # Diameter of interior particle
DOMAIN_RAD = DOMAIN_TO_RAD * INT_RAD  #  Radius of the domain

In [8]:
part_rad = INT_RAD # Radius of a particle
cell_width = 4*part_rad # single cell width for force calculation
nn_cell_width = 4*part_rad # single cell width for mixing index calculation
sq_cutoff_rad = (nn_cell_width)**2 # for the search radius r_c

Declare C function prototypes for "mixing.c": 
To communicate with Particle Dynamic Subroutine

In [9]:
# Load the shared library to communicate with Particle Dynamic Sub-routine
path = os.getcwd()
mixing_lib = ctypes.CDLL(os.path.join(path,'mixing.so')) # '.so' is the compiled C file name

# Define the C function signature
process = mixing_lib.mix_state
process.argtypes = [ctypes.POINTER(ctypes.c_double), ctypes.POINTER(ctypes.c_double),\
                    ctypes.c_int, ctypes.c_int, ctypes.c_double, ctypes.c_double,\
                    ctypes.c_int, ctypes.c_int, ctypes.c_int, ctypes.c_double]#, ctypes.c_double, ctypes.c_double]
#state, thetaRL, rows, cols, P_rad, D_rad, N_wall, N_passive, N_active, delta_t, Time
process.restype = ctypes.POINTER(ctypes.c_double)

# To free the dynamic array in C code
dealloc_mem = mixing_lib.free_array
dealloc_mem.argtypes = [ctypes.POINTER(ctypes.c_double)]

Function call for mixing process

In [10]:
# Calling Particle Dynamic Sub-routine
def call_mix_function(state_array, theta_array, del_t):
    rows = state_array.shape[0] # No of rows 
    cols = state_array.shape[1]
    
    # Convert the state to a flat C-style array
    state_array = state_array.astype(np.float64)
    state_array_flat = state_array.flatten(order='C')
    cstyle_state = state_array_flat.ctypes.data_as(ctypes.POINTER(ctypes.c_double))
    
    # Convert theta to a C-style array
    theta_array = theta_array.astype(np.float64)
    cstyle_theta = theta_array.ctypes.data_as(ctypes.POINTER(ctypes.c_double))

    #Passing arguements to C function.................................................
    new_state = process(cstyle_state, cstyle_theta, rows, cols, INT_RAD, DOMAIN_RAD,\
                        N_WALL, N_PASSIVE, N_ACTIVE, del_t)#,Time)
    #...................................................................................
    
    # Convert the pointer to a Python array
    python_state = np.ctypeslib.as_array(new_state, shape=(rows * cols,))
    
    # convert from 1D to 2D Python array
    mod_state = python_state.reshape(rows,cols).copy()
    
    # Free the allocated memory in C
    dealloc_mem(new_state)
    
    return mod_state

Mixing Index Calculator

In [12]:
# Function to find the Mixing Index
def nn_fn(data_passive):
    '''Data passive holds the x,y poitions for passive particles'''
    nn_list = np.zeros(N_PASSIVE,dtype=int)
    posx_nn = data_passive[:,0]
    posy_nn = data_passive[:,1]
    head_nn = np.ones(N_cells_nn)*(-1)
    cell_x_nn = (np.divide(np.subtract(posx_nn,x_low_nn),nn_cell_width)).astype(int) # Putting agent into respective xcell
    cell_y_nn = (np.divide(np.subtract(posy_nn,y_low_nn),nn_cell_width)).astype(int); # Putting agent into respective ycell
    cell_pos_nn = (cell_x_nn*Nc_y_nn + cell_y_nn).astype(int); # Cell number calculated vertically
    
    #assigning particles' (identity) in head2 & nn_list for easily accessing
    for i in range (N_PASSIVE):
        nn_list[i] = head_nn[cell_pos_nn[i]]
        head_nn[cell_pos_nn[i]] = i

    # To save the number of type
    pass_type = np.zeros((N_PASSIVE,2)) 

    for i in range (Nc_x_nn):
        for j in range (Nc_y_nn):
            nn_cell_n = i*Nc_y_nn + j
            iat = int(head_nn[nn_cell_n])
            while (iat != -1): # -1 implies no particles in this head/cell 
                for neigh_cell_x in [i-1,i,i+1]:
                    for neigh_cell_y in [j-1,j,j+1]:
                        neigh_cell_n = neigh_cell_x*Nc_y_nn + neigh_cell_y
                        jat = int(head_nn[neigh_cell_n])
                        while (jat != -1): # -1 implies no particles in this head/cell 
                            if(iat != jat):
                                # Distance calculation of neighboring particles
                                dx = posx_nn[jat] - posx_nn[iat]
                                dy = posy_nn[jat] - posy_nn[iat]
                                sq_distance = dx*dx + dy*dy
                                
                                if (sq_distance <=sq_cutoff_rad):
                                    if(jat<PASSIVE_HALF):
                                        pass_type[iat][0] += 1 # TYPE 1 PASSIVE
                                    if(jat>=PASSIVE_HALF):
                                        pass_type[iat][1] += 1 # TYPE 2 PASSIVE

                            jat = int(nn_list[jat])

                iat = nn_list[iat]
                
    ratio_1 = np.sum(2*np.nan_to_num(np.divide(pass_type[:PASSIVE_HALF,1],(pass_type[:PASSIVE_HALF,0]+pass_type[:PASSIVE_HALF,1]))),axis=0)
    ratio_2 = np.sum(2*np.nan_to_num(np.divide(pass_type[-PASSIVE_HALF:,0],(pass_type[-PASSIVE_HALF:,0]+pass_type[-PASSIVE_HALF:,1]))),axis=0)
    mix_index = (ratio_1+ratio_2)/N_PASSIVE #Mixing Index

    return mix_index

Functions in class Simulate

In [13]:
class Simulate:

    """To read initial particle positions and assign values to the arrays and variables used"""
    @staticmethod
    def interiorParticlesRead():
        global theta, rad, vel, part_type,  N_WALL, N_PASSIVE, N_ACTIVE,\
            INT_PARTICLES, N_TOTAL, PASSIVE_HALF, x_low_nn, y_low_nn, x_max_nn,\
            y_max_nn, Nc_x_nn, Nc_y_nn, N_cells_nn
        part = pd.read_csv("ini_state_q1.csv", header = 0, delimiter="\t", names=["x", "y",\
                        "radius", "part_type"])#Read values and assigned under these headers
        pos_x = np.asarray(part['x']) #Assigns the values under x to an array named pos_x
        pos_y = np.asarray(part['y']) #Assigns the values under y to an array named pos_y
        rad = np.asarray(part['radius']).reshape(-1,1) # radius of all particles used
        
        '''Type of particle : 1:active,-1:passive, 0:boundary'''
        part_type = np.asarray(part['part_type']).reshape(-1,1)

        '''values needed to define neighbouring cells to search for neighbours 
           - helps to search for neighbours faster. used in mixing index calculation'''        
        x_low_nn = np.amin(pos_x)-nn_cell_width
        y_low_nn = np.amin(pos_y)-nn_cell_width
        x_max_nn = np.amax(pos_x)+nn_cell_width
        y_max_nn = np.amax(pos_y)+nn_cell_width
        Nc_x_nn = int(np.ceil((x_max_nn-x_low_nn)/nn_cell_width))
        Nc_y_nn = int(np.ceil((y_max_nn-y_low_nn)/nn_cell_width))
        N_cells_nn = int(Nc_x_nn*Nc_y_nn) # Total no of cells
        
        N_WALL = len((part_type[part_type==0]))
        N_PASSIVE = len((part_type[part_type==-1]))
        N_ACTIVE = len((part_type[part_type==1]))
        INT_PARTICLES = N_ACTIVE + N_PASSIVE
        N_TOTAL = INT_PARTICLES + N_WALL
        PASSIVE_HALF = int(N_PASSIVE/2)
        del part # Memory freed
    
    '''Initial state : (x, y) of passive & active particles'''
    def readInitialState():
        """
        Ensure all data is entered through a single file in the format given below.
        Ensure the data is in order of wall data, passive data and active data 
        from top to bottom.The initial input file contained u,v velocities from which 
        orientation (theta) is calculated.
        """
        #active, passive, and wall data : x, y, rad, vel, theta, type : indices :0 1 2 3 4 5
        global wall_state
        part = pd.read_csv("ini_state_q1.csv", delimiter="\t",header=0) 
        initial_state = part.to_numpy(dtype=float)
        int_state = initial_state[:INT_PARTICLES, :2] # Storing initial position of active particles of active particles
        wall_state = initial_state[INT_PARTICLES:, :2] # Storing wall particle x,y data
        del part
        return int_state # Returns initial state : (x, y) of passive & active particles   
    
    '''Receives a state, does dynamics through a series of sub functions and returns new state'''
    @staticmethod
    def systemState(interior_state, theta_RL, delta_t): # receives (x, y), theta(RL) data - for active+passive separate
        global wall_state, rad, part_type
        state = np.concatenate((interior_state,wall_state), axis=0) # active+passive+wall - x,y data
        theta_up = np.concatenate((theta_RL, np.zeros(N_PASSIVE) , np.zeros(N_WALL)),axis=0).reshape(-1,1)
        c_state = np.hstack((state, rad, part_type)) # x,y,rad,part_type for all particles

        #......................................................
        up_state = call_mix_function(c_state,theta_up,delta_t) # CALLING FUNCTION FOR MIXING
        '''up_state is the updated state of the system after delta_t time'''
        #......................................................
         
        '''rew is the reward or mixing index(MI) based on current position of passive particles'''
        rew = nn_fn(up_state[N_ACTIVE:INT_PARTICLES,:2]) #passing passive positions to find MI
    
        obs_state = up_state[:INT_PARTICLES, :2] # x,y positions of both active + passsive
        return obs_state, rew # Returning passive, active states separately along 
                            # with reward(MI) after delta_t time - delta_t/dt iterations

In [14]:
'''To initiate the variables and arrays used'''
Simulate.interiorParticlesRead()

# RL Implementation

In [15]:
import gym
import tqdm
import rich
from gym import spaces
from stable_baselines3.common.env_checker import check_env
from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3.common.evaluation import evaluate_policy
import tensorflow as tf
import datetime, os



In [16]:
# Create directory particle_data for storing data
if not os.path.exists("RL_particle_data"):
  os.makedirs("RL_particle_data")

Directories for Saving

In [17]:
models_dir = "models/PPO"
log_dir =  "logs"

if not os.path.exists("RL_particle_data"):  # Create directory particle_data for storing data
  os.makedirs("RL_particle_data")

if not os.path.exists(models_dir):  # Create directory particle_data for storing data
  os.makedirs(models_dir)
if not os.path.exists(log_dir):  # Create directory particle_data for storing data
  os.makedirs(log_dir)

In [18]:
sim_time = 50000.0 # Total Simulation Time

Environment

In [20]:
'''t_sim determines the time duration for an episode'''
'''delta_t is the interval with in which RL interacts with C subroutine'''

"""ParticleMixer is the Custom Environment that follows gym interface"""
class ParticleMixer(gym.Env):

    '''To initiate the variables and arrays in the environment, a constructor is used'''
    def __init__(self, n_dim=2, t_sim=sim_time, delta_t=20.0, rr=0):  # self is a constructor
        super(ParticleMixer, self).__init__()
        self.n_obs = INT_PARTICLES  # Observing particle count
        self.n_active = N_ACTIVE # No of active particles
        
        # Define the pool of angles
        self.angle_pool = np.arange(0, 2.0*pi, pi/2) 
        '''pool of angles is in steps of pi/2 - can be replaced with the angle of our choice'''

        # Setting up the action space                                                                                                            
        self.action_space = gym.spaces.MultiDiscrete([len(self.angle_pool)] * self.n_active)

        '''Defining the observation space that contains x,y for active and passive
           Defined as a 1-D array'''
        self.observation_space = spaces.Box(low=0, high=2*DOMAIN_RAD,
                                            shape=((self.n_obs)*n_dim, ), dtype=np.float64)  # Observation space size: [n_passive+n_active] x 2 : (x,y)                                    

        self.obs_state = np.zeros((self.n_obs,2), dtype=np.float64) # 2 columns (x,y)
        self.time_statistics = np.zeros((5000,2), dtype=float) # To save episode no and length
        self.t_sim = t_sim # Maximum simulation time for 1 episode
        self.n_time = 0.0 # Current iteration
        self.del_t = delta_t # Duration between agent-environment interaction
        self.t = 0.0 # Current time
        self.row=rr # To point to the row in time_statistics
        self.prev_action = self.action_space.sample() # creates a sample action space
        self.done = False # To determine if the episode is over. False:Not Over, True:Over

    def step(self, action): #This function is called by alg after reset function to perform the action
                            # This function is linked to the system dynamics equation
        self.action_val = [self.angle_pool[i] for i in action]  # Sampled angles for active particles
        #print(self.prev_action)
        
        """ To find & assign velocity when angle "d_theta" is the action_space """

        #self.active_state[:,2] = self.prev_action  # theta assigned

        self.obs_state, index = Simulate.systemState(self.obs_state, self.action_val, self.del_t) # sending x, y, theta
        """ Send current [position, theta] and returns new [position, theta] after implementing Langevin code for delta_t time"""
        
        '''We take the -ve of index as reward. As mixing is better, index value decreases
        If we take the negative, the value can be seen increasing with mixing (in the -ve range towards 0)
        As the algorithm tends to maximise reward, the cumulative reward should be closer to 0'''
        reward = index 
        observation = self.obs_state.flatten() # self.observation_space.sample()
        info = {}
        self.t += self.del_t  # Total Time Completed
        self.n_time += self.del_t/0.01  # Total Timesteps completed
        if ((self.t > self.t_sim)|(reward>=0.99)): 
            self.done = True
            self.time_statistics[self.row,0] = self.t
            self.time_statistics[self.row,1] = reward
            self.row = self.row+1
        return observation, reward, self.done, info

    def reset(self): #While running the RL algorithm, this function is called first. This sets initial observation space
        self.t = 0.0
        self.n_time = 0.0
        self.done=False     
        self.obs_state = Simulate.readInitialState() # Resetting the initial state before an episode during first case : (x, y, theta(from i/p file))
        observation = self.obs_state.flatten() # 1-D initial state
#         print("1 reset")
        return observation  # reward, done, info can't be included
    def render(self):
        pass
    def close (self):
        pass

Assigning environment to variable env

In [21]:
env = ParticleMixer() # env is the environment now.

Check the custom environment for errors

In [22]:
check_env(env); 

Algorithm and Policy

In [23]:
from stable_baselines3 import PPO
import torch as th

In [24]:
# Custom MLP policy with Relu activation function
policy_kwargs = dict(activation_fn=th.nn.ReLU, net_arch=[512,256,64])

In [25]:
new_hyperparameters = {
    'learning_rate': 0.000001,
    # Add other hyperparameters and their new values here separeted by commas
}

In [26]:
models_dir = "models/PPO" # Directory for saving the models
log_dir =  "logs" # Directory for saving the data for tensor board view

In [27]:
model_ppo = PPO('MlpPolicy', env, verbose=1, policy_kwargs=policy_kwargs, **new_hyperparameters, tensorboard_log=log_dir)

Using cuda device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.


Learning happens here

In [28]:
'''Intermediate models are saved after every 4096 agent-env communications'''
learn_start = time.time()
for i in range(300):# repeat the total_timesteps 300 times
    model_ppo.learn(total_timesteps=4096, reset_num_timesteps=False, tb_log_name="PPO")
    model_ppo.save(f"{models_dir}/{i+1}") # Saving the trained model
    if (i+1)%100==0:
        outfile = "maxtime_vs_maxMI_statistics.csv"
        df = pd.DataFrame(env.time_statistics,columns=['max_time','max_MI'])
        df.to_csv(outfile, index=False, sep='\t')
print("learn_time = ",time.time()-learn_start) # Printing time taken to learn

'''
the policy is updated after every "n_steps" iterations - 2048 default (agent-env communications)
the model is saved after every "total_timesteps" iterations and 
continue to iterate until for loop is over -->final time_steps = 300*4096
'''

Logging to logs/PPO_0
-----------------------------
| time/              |      |
|    fps             | 4    |
|    iterations      | 1    |
|    time_elapsed    | 413  |
|    total_timesteps | 2048 |
-----------------------------
------------------------------------------
| rollout/                |              |
|    ep_len_mean          | 2.5e+03      |
|    ep_rew_mean          | 1.84e+03     |
| time/                   |              |
|    fps                  | 4            |
|    iterations           | 2            |
|    time_elapsed         | 821          |
|    total_timesteps      | 4096         |
| train/                  |              |
|    approx_kl            | 0.0023084946 |
|    clip_fraction        | 0.000195     |
|    clip_range           | 0.2          |
|    entropy_loss         | -4.15        |
|    explained_variance   | -0.165       |
|    learning_rate        | 1e-06        |
|    loss                 | 0.762        |
|    n_updates            | 10       

KeyboardInterrupt: 

In [None]:
'''Save the episode length for each episode along with MI'''
outfile = "maxtime_vs_maxMI_statistics_final.csv"
df = pd.DataFrame(env.time_statistics,columns=['max_time','max_MI'])
df.to_csv(outfile, index=False, sep='\t')