In [14]:
import openseespy.opensees as op
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

In [15]:
class Analysis:
    
    def __init__(self):
        
        self.ip = pd.read_csv('node_number.csv')
        self.no = self.ip['node']
        self.cor_x = self.ip['x']
        self.cor_y = self.ip['y']
        self.fix_node = self.ip['fix_node']
        self.fix_x = self.ip['fix_x']
        self.fix_y = self.ip['fix_y']
        self.ele_no = self.ip['ele_no']
        self.n1 = self.ip['n1']
        self.n2 = self.ip['n2']
        self.area = self.ip['area']
        self.length = self.ip['length']
        self.length_sum = np.sum(self.length)
        self.load_point = self.ip['point']
        self.dir_x = self.ip['dir_x']
        self.dir_y = self.ip['dir_y']
        self.dat_no = len(self.ip)
        
        #opensees設定
        op.wipe()
        op.model('basic','-ndm',2,'-ndf',2)
        
        #節点の設定
        self.count = 0
        for i in range(self.dat_no):
            if self.no[i] == self.no[i]:
                self.count += 1
        for i in range(self.count):
            op.node(int(self.no[i]),float(self.cor_x[i]),float(self.cor_y[i]))
        
        #境界条件
        self.count1 = 0
        for i in range(self.dat_no):
            if self.fix_node[i] == self.fix_node[i]:
                self.count1 += 1
        for i in range(self.count1):
            op.fix(int(self.fix_node[i]),int(self.fix_x[i]),float(self.fix_y[i]))
            
        
        op.uniaxialMaterial('Elastic',1,3000.0)
        
        #エレメントの設定
        self.count2 = 0
        for i in range(self.dat_no):
            if self.ele_no[i] == self.ele_no[i]:
                self.count2 += 1
        for i in range(self.count2):
            op.element('Truss',int(self.ele_no[i]),int(self.n1[i]),int(self.n2[i]),float(self.area[i]),1)
            
        #TimeSeries
        op.timeSeries('Linear',1)
        
        #create a plain load pattern
        op.pattern('Plain',1,1)
        
        #create the nodal load command
        self.count3 = 0
        for i in range(self.dat_no):
            if self.load_point[i] == self.load_point[i]:
                self.count3 += 1
        for i in range(self.count3):
            op.load(int(self.load_point[i]),float(self.dir_x[i]),float(self.dir_y[i]))
        
        #解析の設定
        op.wipeAnalysis()
        #create SOD
        op.system('BandSPD')
        #create DOF number
        op.numberer('RCM')
        #create constraint handler
        op.constraints('Plain')
        op.integrator('LoadControl',1.0)
        #create alhorithm
        op.algorithm('Linear')
        #create analysis object
        op.analysis('Static')

        self.i_pre = 0
        self.i = 0
        self.i_next = 0
        self.dis_x = 0
        self.dis_y = 0
        self.force = 0
        self.resp = {
            'dis_x':[],
            'dis_y':[],
            'force':[]
        }
        self.done = False
        
    #初期化
    def reset(self):
        self.__init__()
        
    def action(self):
        K = np.random.randint(len(self.area))
        self.area[K] = 10**-6
        self.length[K] = 10**-6
        self.after_area = self.area 
        self.after_length = self.length
        return self.after_area,self.after_length
    
    #1ステップ分の計算
    def step(self):
        
        #opuput results
        #節点変位の計算
        for i in range(self.count):
            self.dis_x = op.nodeDisp(int(self.no[i]),1)
            self.dis_y = op.nodeDisp(int(self.no[i]),2)
            
            self.resp['dis_x'].append(self.dis_x)
            self.resp['dis_y'].append(self.dis_y)
            
        #部材応力の計算
        for i in range(self.count2):
            self.force = op.basicForce(int(self.ele_no[i]))
            
            self.resp['force'].append(self.force)
            
        self.i_pre = self.i
        self.i += 1
        self.i_next = self.i +1 if not self.done else self.i
           
        return self.reward,self.done,self.dis_x,self.dis_y
    
    def reward(self):
        if np.max(np.abs(self.force)) < 200:
            return (1 - ((np.max(np.abs(self.force)))/200))
        else:
            return 0
        
    def state(self):
        return np.array([self.area,self.dis_x,self.dis_y] ,dtype=np.float32)

経験の記録

In [16]:
import random
from collections import namedtuple

Transition = namedtuple('Transition',('state','action','next_state','reward'))

class ReplayMamory(object):
    
    def __init__(self,capacity):
        self.capacity = capacity
        self.memory = []
        self.position = 0
        
    def push(self,*args):
        if len(self.memory) < self.capacity:
            self.memory.append(None)
        self.memory[self.position] = Transition(*args)
        self.position = (self.position + 1) % self.capacity
        
    def samaple(self,batch_size):
        return random.sample(self.memory,batch_size)

    def __len__(self):
        return len(self.memory)

深層学習

In [17]:
import torch
import torch.nn as nn
import torch.nn.functional as F

# GPUが使えるかどうか
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"use {device}")

class DQN(nn.Module):

    def __init__(self, s, outputs):
        super(DQN, self).__init__()
        self.conv1 = nn.Conv1d(1, 16, kernel_size=2, stride=1)
        self.bn1 = nn.BatchNorm1d(16)
        self.conv2 = nn.Conv1d(16, 32, kernel_size=2, stride=1)
        self.bn2 = nn.BatchNorm1d(32)
        self.conv3 = nn.Conv1d(32, 32, kernel_size=2, stride=1)
        self.bn3 = nn.BatchNorm1d(32)

        # conv1dから出力されるサイズの計算
        def conv1d_out_size(size, kernel_size=2, stride=1):
            return (size - (kernel_size - 1) - 1) // stride + 1

        # conv1d3回分の出力サイズを計算
        conv = conv1d_out_size(conv1d_out_size(conv1d_out_size(s)))
        linear_input_size = conv * 32
        self.head = nn.Linear(linear_input_size, outputs)

    # ネットワークの順伝播を計算して計算結果を返す
    def forward(self, x):
        x = x.to(device)
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = F.relu(self.bn3(self.conv3(x)))
        return self.head(x.view(x.size(0), -1))

use cpu


モデルの準備

In [18]:
import math
from itertools import count

import torch
import torch.optim as optim
import torch.nn.functional as F
from torch.autograd import Variable

# 応答解析を環境としてインスタンス
env = Analysis()

def get_state():
    return
 Variable(torch.from_numpy(env.state)).unsqueeze(0).unsqueeze(0)
# DQNをインスタンス化するためのサイズ取得
init_state = get_state()
_,_, state_size = init_state.size()

n_actions = env.naction #選択できるアクションの数

policy_net = DQN(state_size, n_actions).to(device)  # 方策を求めるためのネットワーク
target_net = DQN(state_size, n_actions).to(device)  # 最適化対象のネットワーク
target_net.load_state_dict(policy_net.state_dict())
target_net.eval() # 推論モードにする

# 最適化アルゴリズムにはRMSpropを選択
optimizer = optim.RMSprop(policy_net.parameters())
memory = ReplayMemory(10000)


  op.fix(int(self.fix_node[i]),int(self.fix_x[i]),float(self.fix_y[i]))


TypeError: expected np.ndarray (got method)

アクションの選択

In [6]:
BATCH_SIZE = 128    # 複数の結果をまとめてニューラルネットワークに入力、分析する際のバッチサイズ
GAMMA = 0.5       # 遠い側の未来を考慮する割合（0に近いほど近い未来に重きをおく）

# ランダムのアクションを選択する閾値計算用の係数
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 800

TARGET_UPDATE = 100  # target_netを更新するエピソードの間隔

steps_done = 0

# あるstateでアクションを選択する関数
def select_action(state, test):
    global steps_done
    sample = random.random()
    eps_threshold = EPS_END + (EPS_START - EPS_END) * math.exp(-1. * steps_done / EPS_DECAY)
    steps_done += 1
    if test:
        with torch.no_grad():
            return target_net(state).max(1)[1].view(1, 1)
    elif sample > eps_threshold:
        with torch.no_grad():
            # 最も効果的と思われるアクションのインデックス
            return policy_net(state).max(1)[1].view(1, 1)
    else:
        # ランダムなアクションのインデックス
        return torch.tensor([[random.randrange(n_actions)]], device=device, dtype=torch.long)

モデルの最適化

In [7]:
# モデルの最適化
def optimize_model():
    if len(memory) < BATCH_SIZE:
        return

    # memoryからBATCH_SIZE分だけランダムにサンプルを取得
    transitions = memory.sample(BATCH_SIZE)
    batch = Transition(*zip(*transitions))

    # 解析終了時のステップ以外かどうかのBooleanとその時のnext_state
    non_final_mask = torch.tensor(tuple(map(lambda s: s is not None, batch.next_state)), device=device, dtype=torch.bool)
    non_final_next_states = torch.cat([s for s in batch.next_state if s is not None])

    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward)

    # 実際に取ったアクションの価値（実際に取って得られた報酬）
    state_action_values = policy_net(state_batch).gather(1, action_batch)

    # まだ更新されていないTarget_netによる最も大きい報酬
    next_state_values = torch.zeros(BATCH_SIZE, device=device)
    next_state_values[non_final_mask] = target_net(non_final_next_states).max(1)[0].detach()

    # 本来期待されたアクションの価値
    expected_state_action_values = (next_state_values * GAMMA) + reward_batch

    # Huber loss（実際に取ったアクションの価値と、本来期待されたアクションの価値を比較して損失を計算）
    loss = F.smooth_l1_loss(state_action_values, expected_state_action_values.unsqueeze(1).type(torch.FloatTensor).to(device))

    # モデルの最適化
    optimizer.zero_grad()
    loss.backward()
    for param in policy_net.parameters():
        param.grad.data.clamp_(-1, 1)
    optimizer.step()

学習の実行

In [8]:
num_episodes = 500
for i_episode in range(num_episodes + 1):
    # 環境の初期化
    env.reset()
    state = get_state()

    test = i_episode % TARGET_UPDATE == 0

    if test:
        # target_netを更新。全ての重みやバイアスをコピー
        target_net.load_state_dict(policy_net.state_dict())

    for t in count():
        # アクションを選択
        action = select_action(state, test)
        reward, done = env.step(action.item())
        reward = torch.tensor([reward], device=device)

        next_state = get_state() if not done else None

        # memoryにTrasitionを記録
        memory.push(state, action, next_state, reward)

        state = next_state

        # モデル最適
        optimize_model()

        if done:
            acc_sd, dis_sd = env.sd
            acc_max, dis_max = env.max
            print('{0:3}'.format(str(i_episode)), 'h_ave=', '{0:4.3f}'.format(env.h_ave), 'h_sd=', '{0:4.3f}'.format(env.h_sd), 'acc_sd=', '{0:4.3f}'.format(acc_sd), 'dis_sd=', '{0:4.3f}'.format(dis_sd), 'acc_max=', '{0:4.3f}'.format(acc_max), 'dis_max=', '{0:4.3f}'.format(dis_max), 'test=', '{0:5}'.format(str(test)))
            break

print('Complete')

  op.fix(int(self.fix_node[i]),int(self.fix_x[i]),float(self.fix_y[i]))


TypeError: expected np.ndarray (got method)