In [72]:
from typing import Union, Optional, Callable, Any
from typing import Tuple, List, Set, Dict
from typing import NamedTuple

In [3]:
from collections import defaultdict, deque

In [4]:
import os
import sys
import time
import datetime
import random
import math

In [5]:
import numpy as np
import pandas as pd
from PIL import Image

In [6]:
import getfem as gf

initializing ...
numthread = 1


In [7]:
import pyvista as pv
from pyvirtualdisplay.display import Display

In [8]:
import torch
from torch import nn

from torch.nn.modules.loss import _Loss
from torch.optim import Optimizer

print(torch.__version__)

1.10.0


In [9]:
cuda = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
print('Current computing device:', cuda)

Current computing device: cpu


In [10]:
# Data Classes
State = torch.Tensor
    
Action = Tuple[Tuple[int, int], float] 

class Transition(NamedTuple):
    state: State
    action: Action
    reward: float
    next_state: State

In [11]:
class GridHoleBoardEnv():
    def __init__(self, 
                 size: Tuple[float, float], 
                 grid_size: Tuple[int, int],
                 holes_disabled: Optional[Set[Tuple[int, int]]] = None) -> None:
        self.size: Tuple[float, float] = size
        self.grid_size: Tuple[int, int] = grid_size
        self.cell_size: Tuple[float, float] = (size[0] / grid_size[0], size[1] / grid_size[1])
        # (x, y) -> size
        self.holes: torch.Tensor = torch.rand(self.grid_size) * 4
            
        # (x, y) -> (x_coord, y_coord)
        self.holes_center: torch.Tensor = torch.zeros((*self.grid_size, 2))
            
        self.holes_disabled: Set[Tuple(int, int)] = holes_disabled
            
        self.action_space: List[Tuple[Tuple[int, int], float]] = list()
            
        for x in range(self.grid_size[0]):
            for y in range(self.grid_size[1]):
                self.holes_center[x, y, 0] = (x + 0.5) * self.cell_size[0]
                self.holes_center[x, y, 1] = (y + 0.5) * self.cell_size[1]
                if (x, y) not in self.holes_disabled:
                    self.action_space.append(((x, y), 0.5))
                    self.action_space.append(((x, y), -0.5))
    
    def step(self, action: Action) -> None:
        (x, y), size_change = action
        
        self.holes[x, y] += size_change
        
        self.holes[x, y] = torch.clamp(self.holes[x, y], 0., min(self.cell_size) / 2 - 1)
        
    def get_current_state(self) -> State:
        return self.holes.clone()

In [12]:
class GridHoleBoardThermalSimulator():
    def __init__(self, element_diameter: float = 2) -> None:
        self.element_diameter: float = element_diameter
        self.mesh: gf.Mesh = None
        
    
    def generate_fem_mesh(self, 
                          hole_board_env: GridHoleBoardEnv, 
                          state: Optional[torch.Tensor] = None, 
                          export_mesh: bool = False) -> gf.Mesh:
        board = gf.MesherObject('rectangle', [0., 0.], list(hole_board_env.size))
        holes: List[gf.MesherObject] = list()
        
        for x in range(hole_board_env.grid_size[0]):
            for y in range(hole_board_env.grid_size[1]):
                center = hole_board_env.holes_center[x, y].tolist()
                if state:
                    size = state[x, y].item()
                else:
                    size = hole_board_env.holes[x, y].item()

                if size < 0.01 * self.element_diameter: continue
                
                holes.append(gf.MesherObject('ball', center, size))
            
        if holes:
            holes_union = gf.MesherObject('union', *holes)
            mesher = gf.MesherObject('set minus', board, holes_union)
        else:
            mesher = board
        
        print('Beginning mesh generation')
        gf.util('trace level', 2)   # No trace for mesh generation
        mesh = gf.Mesh('generate', mesher, self.element_diameter, 2)
        
        boundary: Dict[str, int] = dict()
            
        # Boundary of the holes
        boundary['HOLE_BOUND'] = 1
        mesh.set_region(boundary['HOLE_BOUND'], 
                        mesh.outer_faces_in_box([1., 1.], 
                                                [hole_board_env.size[0] - 1, hole_board_env.size[1] - 1]))
        
        boundary['LEFT_BOUND'] = 2
        mesh.set_region(boundary['LEFT_BOUND'], mesh.outer_faces_with_direction([-1., 0.], 0.01))        
        
        boundary['RIGHT_BOUND'] = 3
        mesh.set_region(boundary['RIGHT_BOUND'], mesh.outer_faces_with_direction([ 1., 0.], 0.01)) 
        
        boundary['TOP_BOUND'] = 4
        mesh.set_region(boundary['TOP_BOUND'], mesh.outer_faces_with_direction([0.,  1.], 0.01)) 
        
        boundary['BOTTOM_BOUND'] = 5
        mesh.set_region(boundary['BOTTOM_BOUND'], mesh.outer_faces_with_direction([0., -1.], 0.01)) 
        
        mesh.region_subtract( boundary['RIGHT_BOUND'], boundary['HOLE_BOUND'])
        mesh.region_subtract(  boundary['LEFT_BOUND'], boundary['HOLE_BOUND'])
        mesh.region_subtract(   boundary['TOP_BOUND'], boundary['HOLE_BOUND'])
        mesh.region_subtract(boundary['BOTTOM_BOUND'], boundary['HOLE_BOUND'])
                
        region_id = 7
        for x in range(hole_board_env.grid_size[0]):
            for y in range(hole_board_env.grid_size[1]):
                center = hole_board_env.holes_center[x, y].tolist()
                if state:
                    size = state[x, y].item()
                else:
                    size = hole_board_env.holes[x, y].item()
                bound_key = f'HOLE{x}_{y}_BOUND'
                boundary[bound_key] = region_id
                mesh.set_region(boundary[bound_key], 
                                mesh.outer_faces_in_ball(center, size + 0.01 * self.element_diameter))
                if region_id == 7:
                    boundary['HOLE_UNION_BOUND'] = 6
                    mesh.set_region(boundary['HOLE_UNION_BOUND'], 
                                mesh.outer_faces_in_ball(center, size + 0.01 * self.element_diameter))
                else:
                    mesh.region_merge(boundary['HOLE_UNION_BOUND'], boundary[bound_key])
                region_id += 1
            
        np.testing.assert_array_equal(mesh.region(boundary['HOLE_BOUND']), 
                                      mesh.region(boundary['HOLE_UNION_BOUND']))
        
        self.mesh = mesh
        
        if (export_mesh):
            m = mesh.export_to_vtk('temp.vtk');
            print('\nMesh generation completed.');
                        
        return mesh
        
    
    def run(self) -> Any:
        ...
    
    def render_image(self, save_file: Optional[str] = None) -> Any:
        display = Display(visible=0, size=(1280, 1024))
        display.start()
        p = pv.Plotter()
        m = pv.read("temp.vtk")
        p.add_mesh(m, show_edges=True)
        if p.scalar_bars:
            for sb in list(p.scalar_bars.keys()):
                p.remove_scalar_bar(sb)
        p.camera_position = 'xy'
        img_arr = p.screenshot(save_file, transparent_background=True)
        display.stop()
        return img_arr

In [13]:
# Hyperparameters
epsilon = 1.
epsilon_decay = .995
lr = .001
replay_batch_size = 32
target_update_interval = 10

In [14]:
# Network Container    
class Model():
    def __init__(self, network: nn.Module, loss_func: _Loss, optimizer: Optimizer):
        self.network = network
        self.loss_func = loss_func
        self.optimizer = optimizer
    
    def __call__(network_input):
        return self.network(network_input)

In [15]:
class ReplayMemory():
    def __init__(self, capacity):
        self.memory: deque = deque([],maxlen=capacity)

    def push(self, *args):
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

In [16]:
def QNet(action_number = 24):
    net = nn.Sequential(
        nn.Linear(48, 100),
        nn.ReLU(),
        nn.Linear(100, 200),
        nn.ReLU(),
        nn.Linear(200, action_number),
    )
    return Model(network=net, loss_func=nn.MSELoss(), optimizer=torch.optim.Adam(net.parameters(), 0.001))

In [None]:
Q(s, a)

In [88]:
class Agent():
    def __init__(self,
                 environment: GridHoleBoardEnv, 
                 fem_simulator: GridHoleBoardThermalSimulator,
                 q_network: nn.Module, 
                 target_network: nn.Module, 
                 target_update_interval: int = 100, 
                 experience_replay: ReplayMemory = ReplayMemory(10000), 
                 replay_batch_size: int = 32, 
                 discount_factor: float = 0.9,
                 explore_factor: float = 1.,
                 explore_factor_decay: float = 0.995) -> None:
        self.environment: GridHoleBoardEnv = environment
        self.fem_simulator: GridHoleBoardThermalSimulator = fem_simulator
            
        self.q_network: nn.Module = q_network
        self.target_network: nn.Module = target_network
        self.target_update_interval: int = target_update_interval
            
        self.experience_replay: ReplayMemory = experience_replay
        self.replay_batch_size: int = replay_batch_size
            
        self.discount_factor: float = discount_factor
        self.explore_factor: float = explore_factor
        self.explore_factor_decay: float = explore_factor_decay
            
            
    def select_action(self) -> Action:
        state = self.environment.get_current_state()
        if random.random() > self.explore_factor:
            prediction = self.q_network(state)
            action = prediction.argmax()
        else:
            action = random.randrange(len(self.environment.action_space))
        return state, action
            
    def step(self) -> None:
        state, action_index = self.select_action()
        # Run FEM self.fem_simulator.run()
        # Calculate reward
        reward = 0.
        self.environment.step(self.environment.action_space[action_index])
        next_state = self.environment.get_current_state()
        self.experience_replay.push(state, action_index, reward, next_state)
        
    def replay(self) -> None:
        if len(self.experience_replay) < replay_batch_size: return
        
        samples = self.experience_replay.sample(replay_batch_size)
        
        batch = Transition(*zip(*samples))
        
        state_batch = torch.vstack([s.flatten() for s in batch.state])
        
        prediction = self.q_network(state_batch)
        

In [90]:
env = GridHoleBoardEnv(size=(80, 60), grid_size=(4, 3))


fem = GridHoleBoardThermalSimulator(element_diameter=2)
fem.generate_fem_mesh(env, export_mesh=True)
fem.run()
agent = Agent(env, fem, QNet(), QNet())


Beginning mesh generation

Mesh generation completed.


In [96]:
agent.step()
agent.step()
agent.step()
agent.step()
agent.step()

In [98]:
samples = agent.experience_replay.sample(5)
batch = Transition(*zip(*samples))
batch

Transition(state=(tensor([[0.2676, 3.5520, 0.4846, 3.6192, 2.5070, 2.6200],
        [1.5513, 0.9907, 0.8465, 1.7249, 1.7060, 0.1130],
        [1.4369, 0.4342, 2.0138, 0.3143, 3.0012, 2.3721],
        [1.5336, 3.6612, 0.5630, 0.5827, 3.2963, 1.6671],
        [3.9050, 0.4049, 2.0401, 1.9214, 2.3943, 2.5808],
        [3.5225, 1.3591, 1.5267, 0.9014, 0.5129, 3.4557],
        [2.1974, 2.4645, 3.5796, 2.1287, 0.3397, 2.4631],
        [1.2844, 1.2443, 2.3360, 0.6868, 1.1140, 2.6969]]), tensor([[0.2676, 3.5520, 0.4846, 3.6192, 2.5070, 2.6200],
        [2.0513, 0.9907, 0.8465, 1.7249, 2.2060, 0.1130],
        [1.4369, 0.9342, 2.0138, 0.3143, 2.5012, 2.3721],
        [1.5336, 3.6612, 0.5630, 0.5827, 3.2963, 1.6671],
        [3.9050, 0.4049, 2.0401, 1.9214, 2.3943, 2.5808],
        [3.5225, 1.3591, 1.5267, 0.9014, 0.5129, 3.4557],
        [2.1974, 2.4645, 3.5796, 2.1287, 0.3397, 1.9631],
        [1.2844, 1.2443, 2.3360, 0.6868, 1.1140, 2.6969]]), tensor([[0.2676, 3.5520, 0.4846, 3.6192, 2.5070, 2

In [19]:
# Visualize
import pyvista as pv
from pyvirtualdisplay.display import Display

display = Display(visible=0, size=(1280, 1024))
display.start()
p = pv.Plotter()
m = pv.read("temp.vtk")
#contours = m.contour()
p.add_mesh(m, show_edges=True)
#p.add_mesh(contours, color="black", line_width=1)
#p.add_mesh(m.contour(8).extract_largest(), opacity=0.1)
pts = m.points
p.camera_position = 'xy'
p.screenshot('temp.png', transparent_background=True)
p.show(window_size=[384, 384], cpos="xy", jupyter_backend='panel')

display.stop()

<pyvirtualdisplay.display.Display at 0x1522b1464cd0>

In [20]:
ts = torch.tensor([1, 2, 3])

In [21]:
t1 = ts.detach()

In [22]:
t1

tensor([1, 2, 3])

In [23]:
t = Transition(1, 2, 3, 4)

In [24]:
t

Transition(state=1, action=2, reward=3, next_state=4)

In [25]:
sys.getsizeof(((1.2, 2.3), 3.4))

56

In [26]:
sys.getsizeof(12)

28

In [27]:
rm = ReplayMemory(5)

In [84]:
d = {0: 1.2, 1: 2.3, 2: 3.4, 3: 4.5}
random.choice(d)

3.4