In [1]:
import random
import hydra
import torch
import numpy as np
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from collections import deque
from scipy.integrate import odeint
from dqn_agent import Agent
from omegaconf import DictConfig, OmegaConf

In [2]:
def sliar(y, t, beta, sigma, kappa, alpha, tau, p, eta, epsilon, q, delta, u):
    S, L, I , A = y
    dydt = np.array([- beta * (1-sigma) * S * (epsilon * L + (1 - q) * I + delta * A) - u * S,
                    beta * (1-sigma) * S * (epsilon * L + (1 - q) * I + delta * A) - kappa * L,
                    p * kappa * L - alpha * I - tau * I,
                    (1 - p) * kappa * L  - eta * A])
    return dydt

class SliarEnvironment:
    def __init__(self, S0=1000000, L0=0, I0 = 1, A0 = 0):
        self.state = np.array([S0, L0, I0, A0])
        self.beta = 0.000000527
        self.sigma = 0
        self.kappa = 0.526
        self.alpha = 0.244
        self.tau = 0
        self.p = 0.667
        self.eta = 0.244
        self.epsilon = 0
        self.q = 0.5
        self.delta = 1


    def reset(self, S0=1000000, L0=0, I0 = 1, A0 = 0):
        self.state = np.array([S0, L0, I0, A0])
        self.beta = 0.000000527
        self.sigma = 0
        self.kappa = 0.526
        self.alpha = 0.244
        self.tau = 0
        self.p = 0.667
        self.eta = 0.244
        self.epsilon = 0
        self.q = 0.5
        self.delta = 1
        return self.state

    def step(self, action):
        sol = odeint(sliar, self.state, np.linspace(0, 1, 101),
                    args=(self.beta, self.sigma, self.kappa, self.alpha, self.tau, self.p, self.eta, self.epsilon, self.q, self.delta, action))
        new_state = sol[-1, :]
        S0, L0, I0, A0 = self.state
        S, L, I, A = new_state
        self.state = new_state
        # cost = PI + Qu^2 // P = 1, Q = 10e-06
        
        reward = - I - action ** 2
        
        done = True if new_state[2] < 1.0 else False
        return (new_state, reward, False, 0)

In [3]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(device)

cuda:0


In [4]:
# 2. Train DQN Agent
env = SliarEnvironment()
agent = Agent(state_size=4, action_size=2, seed=0)

S = np.linspace(0, 1000001, 102)       # 1000001 = 9901*101
I = np.linspace(0, 1000001, 102)

SS, II = np.meshgrid(S, I)

vf = np.zeros((len(I), len(S)))
af = np.zeros((len(I), len(S)))
# 왜 크기를 (I,S)로 했나요?


for si, s in enumerate(S):
    for ii, i in enumerate(I):
        v = agent.qnetwork_local.forward(torch.tensor([float(s), 0, float(i), 0]).to(device))
        v = v.detach().cpu().numpy()
        
        # 가장 큰 value 값 받아오기
        vf[si, ii] = np.max(v)
        
        # 그 때의 action 받아오기
        af[si, ii] = np.argmax(v)

# 총 인구가 1000001이 넘는 경우에 대해서는 0으로 초기화
vf[SS + II > 1000001] = None
af[SS + II > 1000001] = None

In [5]:
import plotly.graph_objects as go
# S, I 에 대한 value값 분석
fig = go.Figure(data =
    go.Contour(
        z=-vf,
        x=S,
        y=I,
        contours_coloring='heatmap'
    ))

fig.update_layout(
    title ={
        'text': "Value function",
        'x': 0.5,
        'xanchor': 'center',
        },
    xaxis_title = "S population",
    yaxis_title = "I population",
)

fig.show()

In [6]:
fig = go.Figure(data =
    go.Contour(
        z=af,
        x=S,
        y=I
    ))

fig.update_layout(
    title ={
        'text': "Action",
        'x': 0.5,
        'xanchor': 'center',
        },
    xaxis_title = "S population",
    yaxis_title = "I population",
)

fig.show()

1. 1000번 학습


In [7]:
## Parameters
max_t=300
eps_start=1.0
eps_end= 0.0000
eps_decay=0.995

## Loop to learn
scores = []                        # list containing scores from each episode
scores_window = deque(maxlen=100)  # last 100 scores
eps = eps_start                    # initialize epsilon
for i_episode in range(1, 1001):
    state = env.reset()
    score = 0
    actions = []
    
    for t in range(max_t):
        action = agent.act(state, eps)
        actions.append(action)
        next_state, reward, done, _ = env.step(action)
        agent.step(state, action, reward, next_state, done)
        state = next_state
        score += reward
        if done:
            break
        
    scores_window.append(score)       # save most recent score
    scores.append(score)              # save most recent score
    eps = max(eps_end, eps_decay*eps) # decrease epsilon
    print('\rEpisode {}\tAverage Score: {:.2f}'.format(i_episode, np.mean(scores_window)), end="")

torch.save(agent.qnetwork_local.state_dict(), 'checkpoint.pth')
# torch.save 를 찾아볼 것!

Episode 1000	Average Score: -5193.90

In [8]:
# 3. Visualize Controlled SIR Dynamics
agent.qnetwork_local.load_state_dict(torch.load('checkpoint.pth'))
env2 = SliarEnvironment()
state2 = env2.reset()
max_t = 300
states2 = state2
reward_sum = 0.
actions2 = []
for t in range(max_t):
    action2 = agent.act(state2, eps=0.0)
    actions2 = np.append(actions2, action2)
    next_state2, reward2, done2, _ = env2.step(action2)
    reward_sum += reward2
    states2 = np.vstack((states2, next_state2))
    state2 = next_state2

In [9]:
# Create figure with secondary y-axis
fig = make_subplots(specs=[[{"secondary_y": True}]])
# Add traces
fig.add_trace(
    go.Scatter(x=list(range(max_t+1)), y=states2[:,0].flatten(), name="susceptible",
        mode='lines+markers'),
    secondary_y=False,
)
fig.add_trace(
    go.Scatter(x=list(range(max_t+1)), y=states2[:,1].flatten(), name="infected",
        mode='lines+markers'),
    secondary_y=False,
)
fig.add_trace(
    go.Scatter(x=list(range(max_t+1)), y=actions2, name="vaccine",
        mode='lines+markers'),
    secondary_y=True,
)
# Add figure title
fig.update_layout(
    title_text=f'{reward_sum:.2f}: SLIAR model with control - 1000'
)
# Set x-axis title
fig.update_xaxes(title_text="day")
# Set y-axes titles
fig.update_yaxes(title_text="Population", secondary_y=False)
fig.update_yaxes(title_text="Vaccine", secondary_y=True)

In [10]:
S = np.linspace(0, 1000001, 102)
I = np.linspace(0, 1000001, 102)

SS, II = np.meshgrid(S, I)

vf = np.zeros((len(I), len(S)))
af = np.zeros((len(I), len(S)))

for si, s in enumerate(S):
    for ii, i in enumerate(I):
        v = agent.qnetwork_local.forward(torch.tensor([float(s), 0, float(i), 0]).to(device))
        v = v.detach().cpu().numpy()
        vf[si, ii] = np.max(v)
        af[si, ii] = np.argmax(v)

vf[SS + II > 1000001] = None
af[SS + II > 1000001] = None

In [11]:
fig = go.Figure(data =
    go.Contour(
        z=-vf,
        x=S,
        y=I,
        contours_coloring='heatmap'
    ))

fig.update_layout(
    title ={
        'text': "Value after 1000 times",
        'x': 0.5,
        'xanchor': 'center',
        },
    xaxis_title = "S population",
    yaxis_title = "I population",
)

fig.show()

In [12]:
fig = go.Figure(data =
    go.Contour(
        z=af,
        x=S,
        y=I
    ))

fig.update_layout(
    title ={
        'text': "Action after 1000 times",
        'x': 0.5,
        'xanchor': 'center',
        },
    xaxis_title = "S population",
    yaxis_title = "I population",
)

fig.show()

2. 2000번 학습

In [13]:
## Loop to learn - 2000 times
# continue
for i_episode in range(1001, 2001):
    state = env.reset()
    score = 0
    actions = []
    
    for t in range(max_t):
        action = agent.act(state, eps)
        actions.append(action)
        next_state, reward, done, _ = env.step(action)
        agent.step(state, action, reward, next_state, done)
        state = next_state
        score += reward
        if done:
            break
        
    scores_window.append(score)       # save most recent score
    scores.append(score)              # save most recent score
    eps = max(eps_end, eps_decay*eps) # decrease epsilon
    print('\rEpisode {}\tAverage Score: {:.2f}'.format(i_episode, np.mean(scores_window)), end="")

torch.save(agent.qnetwork_local.state_dict(), 'checkpoint.pth')

Episode 2000	Average Score: -2542.7668

In [14]:
# 3. Visualize Controlled SIR Dynamics
agent.qnetwork_local.load_state_dict(torch.load('checkpoint.pth'))
env2 = SliarEnvironment()
state2 = env2.reset()
max_t = 300
states2 = state2
reward_sum = 0.
actions2 = []
for t in range(max_t):
    action2 = agent.act(state2, eps=0.0)
    actions2 = np.append(actions2, action2)
    next_state2, reward2, done2, _ = env2.step(action2)
    reward_sum += reward2
    states2 = np.vstack((states2, next_state2))
    state2 = next_state2

In [15]:
# Create figure with secondary y-axis
fig = make_subplots(specs=[[{"secondary_y": True}]])
# Add traces
fig.add_trace(
    go.Scatter(x=list(range(max_t+1)), y=states2[:,0].flatten(), name="susceptible",
        mode='lines+markers'),
    secondary_y=False,
)
fig.add_trace(
    go.Scatter(x=list(range(max_t+1)), y=states2[:,1].flatten(), name="infected",
        mode='lines+markers'),
    secondary_y=False,
)
fig.add_trace(
    go.Scatter(x=list(range(max_t+1)), y=actions2, name="vaccine",
        mode='lines+markers'),
    secondary_y=True,
)
# Add figure title
fig.update_layout(
    title_text=f'{reward_sum:.2f}: SLIAR model with control - 2000'
)
# Set x-axis title
fig.update_xaxes(title_text="day")
# Set y-axes titles
fig.update_yaxes(title_text="Population", secondary_y=False)
fig.update_yaxes(title_text="Vaccine", secondary_y=True)


In [16]:
S = np.linspace(0, 1000001, 102)
I = np.linspace(0, 1000001, 102)

SS, II = np.meshgrid(S, I)

vf = np.zeros((len(I), len(S)))
af = np.zeros((len(I), len(S)))

for si, s in enumerate(S):
    for ii, i in enumerate(I):
        v = agent.qnetwork_local.forward(torch.tensor([float(s), 0, float(i), 0]).to(device))
        v = v.detach().cpu().numpy()
        vf[si, ii] = np.max(v)
        af[si, ii] = np.argmax(v)

vf[SS + II > 1000001] = None
af[SS + II > 1000001] = None

In [17]:
fig = go.Figure(data =
    go.Contour(
        z=-vf,
        x=S,
        y=I,
        contours_coloring='heatmap'
    ))

fig.update_layout(
    title ={
        'text': "Value after 2000 times",
        'x': 0.5,
        'xanchor': 'center',
        },
    xaxis_title = "S population",
    yaxis_title = "I population",
)

fig.show()

In [18]:
fig = go.Figure(data =
    go.Contour(
        z=af,
        x=S,
        y=I
    ))

fig.update_layout(
    title ={
        'text': "Action after 2000 times",
        'x': 0.5,
        'xanchor': 'center',
        },
    xaxis_title = "S population",
    yaxis_title = "I population",
)

fig.show()

3. 4000번 학습

In [19]:
## Loop to learn - 4000 times
# continue
for i_episode in range(2001, 4001):
    state = env.reset()
    score = 0
    actions = []
    
    for t in range(max_t):
        action = agent.act(state, eps)
        actions.append(action)
        next_state, reward, done, _ = env.step(action)
        agent.step(state, action, reward, next_state, done)
        state = next_state
        score += reward
        if done:
            break
        
    scores_window.append(score)       # save most recent score
    scores.append(score)              # save most recent score
    eps = max(eps_end, eps_decay*eps) # decrease epsilon
    print('\rEpisode {}\tAverage Score: {:.2f}'.format(i_episode, np.mean(scores_window)), end="")

torch.save(agent.qnetwork_local.state_dict(), 'checkpoint.pth')

Episode 4000	Average Score: -2664.912

In [20]:
# 3. Visualize Controlled SIR Dynamics
agent.qnetwork_local.load_state_dict(torch.load('checkpoint.pth'))
env2 = SliarEnvironment()
state2 = env2.reset()
max_t = 300
states2 = state2
reward_sum = 0.
actions2 = []
for t in range(max_t):
    action2 = agent.act(state2, eps=0.0)
    actions2 = np.append(actions2, action2)
    next_state2, reward2, done2, _ = env2.step(action2)
    reward_sum += reward2
    states2 = np.vstack((states2, next_state2))
    state2 = next_state2

In [21]:
# Create figure with secondary y-axis
fig = make_subplots(specs=[[{"secondary_y": True}]])
# Add traces
fig.add_trace(
    go.Scatter(x=list(range(max_t+1)), y=states2[:,0].flatten(), name="susceptible",
        mode='lines+markers'),
    secondary_y=False,
)
fig.add_trace(
    go.Scatter(x=list(range(max_t+1)), y=states2[:,1].flatten(), name="infected",
        mode='lines+markers'),
    secondary_y=False,
)
fig.add_trace(
    go.Scatter(x=list(range(max_t+1)), y=actions2, name="vaccine",
        mode='lines+markers'),
    secondary_y=True,
)
# Add figure title
fig.update_layout(
    title_text=f'{reward_sum:.2f}: SLIAR model with control - 4000'
)
# Set x-axis title
fig.update_xaxes(title_text="day")
# Set y-axes titles
fig.update_yaxes(title_text="Population", secondary_y=False)
fig.update_yaxes(title_text="Vaccine", secondary_y=True)

In [22]:
S = np.linspace(0, 1000001, 102)
I = np.linspace(0, 1000001, 102)

SS, II = np.meshgrid(S, I)

vf = np.zeros((len(I), len(S)))
af = np.zeros((len(I), len(S)))

for si, s in enumerate(S):
    for ii, i in enumerate(I):
        v = agent.qnetwork_local.forward(torch.tensor([float(s), 0, float(i), 0]).to(device))
        v = v.detach().cpu().numpy()
        vf[si, ii] = np.max(v)
        af[si, ii] = np.argmax(v)

vf[SS + II > 1000001] = None
af[SS + II > 1000001] = None

In [23]:
fig = go.Figure(data =
    go.Contour(
        z=-vf,
        x=S,
        y=I,
        contours_coloring='heatmap'
    ))

fig.update_layout(
    title ={
        'text': "Value after 4000 times",
        'x': 0.5,
        'xanchor': 'center',
        },
    xaxis_title = "S population",
    yaxis_title = "I population",
)

fig.show()

In [24]:
fig = go.Figure(data =
    go.Contour(
        z=af,
        x=S,
        y=I
    ))

fig.update_layout(
    title ={
        'text': "Action after 4000 times",
        'x': 0.5,
        'xanchor': 'center',
        },
    xaxis_title = "S population",
    yaxis_title = "I population",
)

fig.show()

4. 8000번 학습

In [25]:
## Loop to learn - 8000 times
# continue
for i_episode in range(4001, 8001):
    state = env.reset()
    score = 0
    actions = []
    
    for t in range(max_t):
        action = agent.act(state, eps)
        actions.append(action)
        next_state, reward, done, _ = env.step(action)
        agent.step(state, action, reward, next_state, done)
        state = next_state
        score += reward
        if done:
            break
        
    scores_window.append(score)       # save most recent score
    scores.append(score)              # save most recent score
    eps = max(eps_end, eps_decay*eps) # decrease epsilon
    print('\rEpisode {}\tAverage Score: {:.2f}'.format(i_episode, np.mean(scores_window)), end="")

torch.save(agent.qnetwork_local.state_dict(), 'checkpoint.pth')

Episode 8000	Average Score: -12.2396

In [26]:
# 3. Visualize Controlled SIR Dynamics
agent.qnetwork_local.load_state_dict(torch.load('checkpoint.pth'))
env2 = SliarEnvironment()
state2 = env2.reset()
max_t = 300
states2 = state2
reward_sum = 0.
actions2 = []
for t in range(max_t):
    action2 = agent.act(state2, eps=0.0)
    actions2 = np.append(actions2, action2)
    next_state2, reward2, done2, _ = env2.step(action2)
    reward_sum += reward2
    states2 = np.vstack((states2, next_state2))
    state2 = next_state2

In [27]:
# Create figure with secondary y-axis
fig = make_subplots(specs=[[{"secondary_y": True}]])
# Add traces
fig.add_trace(
    go.Scatter(x=list(range(max_t+1)), y=states2[:,0].flatten(), name="susceptible",
        mode='lines+markers'),
    secondary_y=False,
)
fig.add_trace(
    go.Scatter(x=list(range(max_t+1)), y=states2[:,1].flatten(), name="infected",
        mode='lines+markers'),
    secondary_y=False,
)
fig.add_trace(
    go.Scatter(x=list(range(max_t+1)), y=actions2, name="vaccine",
        mode='lines+markers'),
    secondary_y=True,
)
# Add figure title
fig.update_layout(
    title_text=f'{reward_sum:.2f}: SLIAR model with control - 8000'
)
# Set x-axis title
fig.update_xaxes(title_text="day")
# Set y-axes titles
fig.update_yaxes(title_text="Population", secondary_y=False)
fig.update_yaxes(title_text="Vaccine", secondary_y=True)

In [28]:
S = np.linspace(0, 1000001, 102)
I = np.linspace(0, 1000001, 102)

SS, II = np.meshgrid(S, I)

vf = np.zeros((len(I), len(S)))
af = np.zeros((len(I), len(S)))

for si, s in enumerate(S):
    for ii, i in enumerate(I):
        v = agent.qnetwork_local.forward(torch.tensor([float(s), 0, float(i), 0]).to(device))
        v = v.detach().cpu().numpy()
        vf[si, ii] = np.max(v)
        af[si, ii] = np.argmax(v)

vf[SS + II > 1000001] = None
af[SS + II > 1000001] = None

In [29]:
fig = go.Figure(data =
    go.Contour(
        z=-vf,
        x=S,
        y=I,
        contours_coloring='heatmap'
    ))

fig.update_layout(
    title ={
        'text': "Value after 8000 times",
        'x': 0.5,
        'xanchor': 'center',
        },
    xaxis_title = "S population",
    yaxis_title = "I population",
)

fig.show()

In [30]:
fig = go.Figure(data =
    go.Contour(
        z=af,
        x=S,
        y=I
    ))

fig.update_layout(
    title ={
        'text': "Action after 8000 times",
        'x': 0.5,
        'xanchor': 'center',
        },
    xaxis_title = "S population",
    yaxis_title = "I population",
)

fig.show()

5. 16000번 학습

In [31]:
## Loop to learn - 16000 times
# continue
for i_episode in range(8001, 16001):
    state = env.reset()
    score = 0
    actions = []
    
    for t in range(max_t):
        action = agent.act(state, eps)
        actions.append(action)
        next_state, reward, done, _ = env.step(action)
        agent.step(state, action, reward, next_state, done)
        state = next_state
        score += reward
        if done:
            break
        
    scores_window.append(score)       # save most recent score
    scores.append(score)              # save most recent score
    eps = max(eps_end, eps_decay*eps) # decrease epsilon
    print('\rEpisode {}\tAverage Score: {:.2f}'.format(i_episode, np.mean(scores_window)), end="")

torch.save(agent.qnetwork_local.state_dict(), 'checkpoint.pth')

Episode 16000	Average Score: -13.235864

In [32]:
# 3. Visualize Controlled SIR Dynamics
agent.qnetwork_local.load_state_dict(torch.load('checkpoint.pth'))
env2 = SliarEnvironment()
state2 = env2.reset()
max_t = 300
states2 = state2
reward_sum = 0.
actions2 = []
for t in range(max_t):
    action2 = agent.act(state2, eps=0.0)
    actions2 = np.append(actions2, action2)
    next_state2, reward2, done2, _ = env2.step(action2)
    reward_sum += reward2
    states2 = np.vstack((states2, next_state2))
    state2 = next_state2

In [33]:
# Create figure with secondary y-axis
fig = make_subplots(specs=[[{"secondary_y": True}]])
# Add traces
fig.add_trace(
    go.Scatter(x=list(range(max_t+1)), y=states2[:,0].flatten(), name="susceptible",
        mode='lines+markers'),
    secondary_y=False,
)
fig.add_trace(
    go.Scatter(x=list(range(max_t+1)), y=states2[:,1].flatten(), name="infected",
        mode='lines+markers'),
    secondary_y=False,
)
fig.add_trace(
    go.Scatter(x=list(range(max_t+1)), y=actions2, name="vaccine",
        mode='lines+markers'),
    secondary_y=True,
)
# Add figure title
fig.update_layout(
    title_text=f'{reward_sum:.2f}: SLIAR model with control - 16000'
)
# Set x-axis title
fig.update_xaxes(title_text="day")
# Set y-axes titles
fig.update_yaxes(title_text="Population", secondary_y=False)
fig.update_yaxes(title_text="Vaccine", secondary_y=True)

In [37]:
S = np.linspace(0, 1000001, 102)
I = np.linspace(0, 1000001, 102)

SS, II = np.meshgrid(S, I)

vf = np.zeros((len(I), len(S)))
af = np.zeros((len(I), len(S)))

for si, s in enumerate(S):
    for ii, i in enumerate(I):
        v = agent.qnetwork_local.forward(torch.tensor([float(s), 0, float(i), 0]).to(device))
        v = v.detach().cpu().numpy()
        vf[si, ii] = np.max(v)
        af[si, ii] = np.argmax(v)

vf[SS + II > 1000001] = None
af[SS + II > 1000001] = None

In [38]:
fig = go.Figure(data =
    go.Contour(
        z=-vf,
        x=S,
        y=I,
        contours_coloring='heatmap'
    ))

fig.update_layout(
    title ={
        'text': "Value after 16000 times",
        'x': 0.5,
        'xanchor': 'center',
        },
    xaxis_title = "S population",
    yaxis_title = "I population",
)

fig.show()

In [39]:
fig = go.Figure(data =
    go.Contour(
        z=af,
        x=S,
        y=I
    ))

fig.update_layout(
    title ={
        'text': "Action after 16000 times",
        'x': 0.5,
        'xanchor': 'center',
        },
    xaxis_title = "S population",
    yaxis_title = "I population",
)

fig.show()