In [31]:
import os
import math
import random
import argparse
import pickle as pkl
import matplotlib
import matplotlib.pyplot as plt
import numpy as np

import taichi as ti
import gymnasium as gym
from gymnasium import spaces

# from collections import namedtuple, deque
# from itertools import count

# import torch
# import torch.nn as nn
# import torch.optim as optim
# import torch.nn.functional as F

GRAV = 1
DRAG = 1e6
ACOUSTIC_PRESSURE = 10e6
CIRCLE_RADIUS = 10

In [15]:
def diffReward(state: list) -> int: 
    #try to set reward for circle of radius 10
    aggregate_reward = 0
    
    for point in state: 
        dist = math.sqrt(point[0]**2 + point[1]**2) #distance from origin (center of circle)
        dist -= CIRCLE_RADIUS #gives distance to border 
        aggregate_reward += (1 / (1 - dist)) if (dist < 0) else (0.4 / (1 + dist)) #punish outside of border more heavily

    #may need normalization 
    return aggregate_reward

In [53]:
@ti.data_oriented
class AcousticEnv():
    def __init__(self, particles: int):
        """ti.init(arch=ti.cpu) #initialization arch ti.cpu/ti.gpu"""
        self.res = 512 #resolution
        self.paused = ti.field(ti.i32, ()) # a scalar i32
        self.time_step = 1e-5 
        self.substepping = 10 # the number of sub-iterations within a time step

        self.num_particles = particles
        self.max_mass = 5.0 
        self.galaxy_size = 0.6 #???
        self.max_radius = 10.0 / float(self.res) # particle radius for rendering
        self.init_vel = 100.0 # inital veclocity
        self.particle_radius = ti.field(ti.f32, particles)
        self.particle_m = ti.field(ti.f32, particles)
        self.particle_color = ti.Vector.field(3, ti.f32, particles)
        
        #declare fields (pos, vel, force of the planets)
        # 2d problem
        self.pos = ti.Vector.field(2, ti.f32, particles)
        self.pos_1 = ti.Vector.field(2, ti.f32, 1) #???
        self.vel = ti.Vector.field(2, ti.f32, particles)
        self.vel_p1 = ti.Vector.field(2, ti.f32, particles)
        self.force = ti.Vector.field(2, ti.f32, particles)
        self.energy = ti.field(ti.f32, shape = 2) # [1] current energy [0] inital energy
        
        self.is_haled = ti.field(ti.i32, particles) #???
        # Acoustic properties
        # po = 10e6 # acoustic pressure level 1 
        # pxy = [1,0.2]
        # k = [3,3]

        ###########
        ax = np.array([1.0, 0.5]) #change with actions 
        ay = np.array([0.2, 0.3])
        kx = np.array([3, 4])
        ky = np.array([2, 1])

        # convert arrays into Taichi fields 
        self.ax_field = ti.field(dtype=ti.f32, shape=ax.shape)
        self.ay_field = ti.field(dtype=ti.f32, shape=ay.shape)
        self.kx_field = ti.field(dtype=ti.f32, shape=kx.shape)
        self.ky_field = ti.field(dtype=ti.f32, shape=ky.shape)

        self.ax_field.from_numpy(ax)
        self.ay_field.from_numpy(ay)
        self.kx_field.from_numpy(kx)
        self.ky_field.from_numpy(ky)

        self.num_waves_x = ti.field(dtype=ti.i32, shape=())
        self.num_waves_y = ti.field(dtype=ti.i32, shape=())
        self.num_waves_x[None] = len(ax)
        self.num_waves_y[None] = len(ay)

        self.limit = 100
        
        # Actions: (frequency, amplitude) = (a, b)
        self.action_space = spaces.Box(-self.limit, self.limit, shape=(2,2), dtype=int)
        
        ###########
        # bin by 0.1
        # set for different particles rather list 
        
        
        # # Observation space is the location of all particles
        # self.observation_space = spaces.Dict(
        #     {
        #        1: space
        #     }
        # )
        
        # Set simulation length
        
        self.gui = ti.GUI('N-body problem', (self.res, self.res)) # create a window of resolution 512*512
        
    def render(self): 
        for e in self.gui.get_events(ti.GUI.PRESS): #event processing
            if e.key == 'e':  # 'Esc'
                self.gui.close()
            elif e.key == 'r':  # 'r'
                self.reset()
            elif e.key == ti.GUI.SPACE:  # 'space'
                self.paused[None] = not self.paused[None]
                
        self.gui.clear(0x112F41) # Hex code of the color: 0x000000 = black, 0xffffff = white

        for i in range(self.num_particles):
            self.pos_1[0] = self.pos[i]
            self.gui.circles(self.pos_1.to_numpy(), \
                color = int(ti.rgb_to_hex((self.particle_color[i][0],self.particle_color[i][1],self.particle_color[i][2])) ), \
                radius = self.particle_radius[i] * float(self.res))
            
        # relative position is ranging from (0.0, 0.0) lower left corner to (1.0, 1.0) upper right coner
        self.gui.fps_limit = 30
        self.gui.show()
    
    
    def step(self, action):
        #start the simulation
        if not self.paused[None]:
            ax = np.array(action[0]) #change with actions 
            ay = np.array(action[1])
            kx = np.array(action[2])
            ky = np.array(action[3])
            self.ax_field.from_numpy(np.array(ax))
            self.ay_field.from_numpy(np.array(ay))
            self.kx_field.from_numpy(np.array(kx))
            self.ky_field.from_numpy(np.array(ky))
            for i in range(self.substepping): # run substepping times for each time step
                self.compute_force()
                self.update()
                self.vel_p1.copy_from(self.vel)
                self.collision_update()
                self.vel.copy_from(self.vel_p1)
                self.compute_energy()
        
        # self.sim_length -= 1 
        
        # # Calculate reward
        # pass
        
        # # Check if shower is done
        # done = (self.sim_length <= 0)
        
        # # Set placeholder for info
        # info = {}
        
        # # Return step information
        # return self.state, reward, done, info
    
    @ti.kernel
    def compute_force(self):
        
        for i in range(self.num_particles): 
            self.force[i] = ti.Vector([0.0, 0.0]) #reset force
        
        #compute acoustic force
        for i in range(self.num_particles):
            # f = po * (ti.sin(2*PI*kx*pos[i][0])) 
            # force[i][0] += f # acoustic force on planet i
            # f = po * (ti.sin(2*PI*pos[i]*k))*pxy 
            f_x = 0.0
            f_y = 0.0
            
            for wave in range(self.num_waves_x[None]):
                f_x += self.ax_field[wave] * ti.sin(2 * math.pi * self.pos[i][0] * self.kx_field[wave])
            for wave in range(self.num_waves_y[None]):
                f_y += self.ay_field[wave] * ti.sin(2 * math.pi * self.pos[i][1] * self.ky_field[wave])            
            
            # Compute total force for this position
            f_vector = ti.Vector([f_x, f_y]) * ACOUSTIC_PRESSURE
            self.force[i] += f_vector  

        # force due to drag
        for i in range(self.num_particles):
            drag_force = -DRAG * self.particle_radius[i] * self.vel[i]
            self.force[i] += drag_force
            
    @ti.kernel
    def update(self):  # update each particles's vel and pos based on gravity 
        step = self.time_step / self.substepping # time step 
        for i in range(self.num_particles):
            
            self.vel[i] += step * self.force[i] / self.particle_m[i]
            self.pos[i] += step * self.vel[i]
            # collision detection at edges, flip the velocity
            if self.pos[i][0] < 0.0 + self.particle_radius[i] or self.pos[i][0] > 1.0 - self.particle_radius[i]:
                self.vel[i][0] *= -1
            if self.pos[i][1] < 0.0 + self.particle_radius[i] or self.pos[i][1] > 1.0 - self.particle_radius[i]:
                self.vel[i][1] *= -1

    @ti.kernel
    def collision_update(self): # 1: brute force
        for i in range(self.num_particles):
            for j in range(self.num_particles):
                
                if i != j:
                    diff  = self.pos[i] - self.pos[j]
                    r = diff.norm(1e-4) #norm of Vector diff and minimum value is 1e-5 (clamp to 1e-5)
                    
                    if r <= (self.particle_radius[i] + self.particle_radius[j]):
                        vel_diff = self.vel[i] - self.vel[j]
                        dot_vx = min(diff[0] * vel_diff[0] + diff[1] * vel_diff[1],-1e-2)
                        self.vel_p1[i] = self.vel_p1[i] - 2*self.particle_m[j]/(self.particle_m[i]+self.particle_m[j])*dot_vx/r**2*diff * (self.energy[0] / self.energy[1])

    @ti.kernel  
    def compute_energy(self): 
        self.energy[1] = 0.0
        for i in range(self.num_particles):
            self.energy[1] += 0.5 * self.particle_m[i] * (self.vel[i][0]**2 + self.vel[i][1]**2)
    
    @ti.kernel  
    def reset(self):
        center = ti.Vector([0.5,0.5])
        
        for i in range(self.num_particles):
            
            theta = ti.random() * 2 * math.pi  # theta = (0, 2 pi)
            r = (ti.sqrt(ti.random()) * 0.7 + 0.3) * self.galaxy_size # r = (0.3 1)*galaxy_size
            offset = r * ti.Vector([ti.cos(theta), ti.sin(theta)]) #
            
            self.pos[i] = center + offset
            self.vel[i] = [-offset.y, offset.x] # vel direction is perpendicular to its offset
            self.vel[i] *= self.init_vel

            self.particle_radius[i] = max(0.4,ti.random()) * self.max_radius
            self.particle_m[i] = (self.particle_radius[i] / self.max_radius)**2 * self.max_mass

            self.energy[0] += 0.5 * self.particle_m[i] * (self.vel[i][0]**2 + self.vel[i][1]**2)
            self.energy[1] += 0.5 * self.particle_m[i] * (self.vel[i][0]**2 + self.vel[i][1]**2)
            
            self.particle_color[i][0] = 1 - self.particle_m[i] / self.max_mass
            self.particle_color[i][1] = 1 - self.particle_m[i] / self.max_mass
            self.particle_color[i][2] = 1 - self.particle_m[i] / self.max_mass
        

In [52]:
action_space = spaces.Box(0, 100, shape=(4,2), dtype=int)
action_space.sample()

array([[69,  1],
       [85, 27],
       [69, 47],
       [16, 32]])

In [55]:
ti.init(arch=ti.cpu) #initialization arch ti.cpu/ti.gpu
acoustic_env = AcousticEnv(100)
acoustic_env.reset()
while acoustic_env.gui.running: 
    # action = [[random.uniform(0, 6), random.uniform(0,6)],[random.uniform(0,6), random.uniform(0,6)],[random.uniform(0,6), random.uniform(0,6)],[random.uniform(0,6), random.uniform(0,6)]]
    action = [2, 2, random.uniform(0,6), random.uniform(0.6)]
    acoustic_env.step(action)
    acoustic_env.render()
    

[Taichi] Starting on arch=x64


AttributeError: 'NoneType' object has no attribute 'has_key_event'

In [None]:
Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward'))


class ReplayMemory(object):

    def __init__(self, capacity):
        self.memory = deque([], maxlen=capacity)

    def push(self, *args):
        """Save a transition"""
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

In [None]:
class DQN(nn.Module):

    #n_actions = 6^8
    
    def __init__(self, n_observations, n_actions):
        super(DQN, self).__init__()
        self.layer1 = nn.Linear(n_observations, 128)
        self.layer2 = nn.Linear(128, 128)
        self.layer3 = nn.Linear(128, n_actions)

    # Called with either one element to determine next action, or a batch
    # during optimization. Returns tensor([[left0exp,right0exp]...]).
    def forward(self, x):
        x = F.relu(self.layer1(x))
        x = F.relu(self.layer2(x))
        return self.layer3(x)

In [None]:
BATCH_SIZE = 128
GAMMA = 0.99
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 1000
TAU = 0.005
LR = 1e-4

# Get number of actions from gym action space
n_actions = env.action_space.n
# Get the number of state observations
state, info = env.reset()
n_observations = len(state)

policy_net = DQN(n_observations, n_actions).to(device)
target_net = DQN(n_observations, n_actions).to(device)
target_net.load_state_dict(policy_net.state_dict())

optimizer = optim.AdamW(policy_net.parameters(), lr=LR, amsgrad=True)
memory = ReplayMemory(10000)

steps_done = 0


def select_action(state):
    global steps_done
    sample = random.random()
    eps_threshold = EPS_END + (EPS_START - EPS_END) * \
        math.exp(-1. * steps_done / EPS_DECAY)
    steps_done += 1
    if sample > eps_threshold:
        with torch.no_grad():
            # t.max(1) will return the largest column value of each row.
            # second column on max result is index of where max element was
            # found, so we pick action with the larger expected reward.
            return policy_net(state).max(1).indices.view(1, 1)
    else:
        return torch.tensor([[env.action_space.sample()]], device=device, dtype=torch.long)


episode_durations = []


def plot_durations(show_result=False):
    plt.figure(1)
    durations_t = torch.tensor(episode_durations, dtype=torch.float)
    if show_result:
        plt.title('Result')
    else:
        plt.clf()
        plt.title('Training...')
    plt.xlabel('Episode')
    plt.ylabel('Duration')
    plt.plot(durations_t.numpy())
    # Take 100 episode averages and plot them too
    if len(durations_t) >= 100:
        means = durations_t.unfold(0, 100, 1).mean(1).view(-1)
        means = torch.cat((torch.zeros(99), means))
        plt.plot(means.numpy())

    plt.pause(0.001)  # pause a bit so that plots are updated
    if is_ipython:
        if not show_result:
            display.display(plt.gcf())
            display.clear_output(wait=True)
        else:
            display.display(plt.gcf())