In [None]:
# !pip install tensorflow==2.3.0
# !pip install keras
# !pip install keras-rl2

In [25]:
from gym import Env
from gym.spaces import Discrete, Box, Dict
import pandas as pd
import numpy as np
import random 

In [5]:
# Create a virtual environment actions
def reset():
    global P, M, It, s
    dummy_array = np.zeros(shape=(P,8))
    df = pd.DataFrame(dummy_array,columns=['x','y','Day','Susceptible','Exposed','Infectious','Recovered','GG'])
    df = df.astype({'x':int,'y':int,'Day':int,'Susceptible':bool,'Exposed':int,'Infectious':int,'Recovered':bool,'GG':bool})
    df['Susceptible'] = True
    #Appending infectious population in 
    dfupdate=df.sample(M)
    dfupdate['Infectious'] = np.random.randint(1,It, size=len(dfupdate))
    dfupdate['Susceptible'] = False
    df.update(dfupdate)
    update_list = dfupdate.index.tolist() 
    #Dispersing people randomly among grid
    df['x'] = np.random.randint(0,s, size=len(df))
    df['y'] = np.random.randint(0,s, size=len(df))

    return df

def one_day(df, action = 0):
    global P, M, It, S, death_rate, expose_rate
    policy_match = {0: 1, 1:0.75, 2:0.25} # assign action to policy
    moves_under_policy = int(round(Mt * policy_match[action], 0))
    for mt in range(moves_under_policy):
        for p in range(len(df)):
            if df.loc[p,'GG'] == False: #If the person is not dead
                new_move_x = random.choice(range(-1,2))
                new_move_y = random.choice(range(-1,2))
                df.loc[p,'x'] = max(min(df.loc[p,'x']+new_move_x,S),0) #make valid movements in the grid
                df.loc[p,'y'] = max(min(df.loc[p,'y']+new_move_y,S),0)

                if (df.loc[p,'Infectious'] > 0) and (df.loc[p,'Recovered'] == False): #If a person is in infectious state
                    if df.loc[p,'Infectious'] - random.choice(range(0,7)) >= It: #If the infectious days are completed
                        if random.choice(range(0,death_rate)) > (death_rate-2): #If the person dies(with probability distribution 1:4)                           
                            df.loc[p,'Infectious'] = 0
                            df.loc[p,'GG'] = True #Kill the person
                        else: #If the person survives
                            df.loc[p,'Infectious'] = 0
                            df.loc[p,'Recovered'] = True #Recover the person
                    elif mt+1 == Mt:
                        df.loc[p,'Infectious'] = df.loc[p,'Infectious'] + 1 #Increase the infectious day counter
                elif (df.loc[p,'Exposed'] > 0) and (df.loc[p,'Infectious'] == 0): #If a person is in exposed state 
                    if (df.loc[p,'Exposed'] - random.choice(range(0,2))) >= Et: #If the person has reached the exposed day limit?  7
                        df.loc[p,'Exposed'] = 0
                        df.loc[p,'Infectious'] = 1 #Increase the infectious day counter, now the person is infectious
                    elif mt+1 == Mt:
                        df.loc[p,'Exposed'] = df.loc[p,'Exposed'] + 1 #Increase the exposed day counter
                elif df.loc[p,'Susceptible'] == True: #If the person is in susceptible state
                    if not df.loc[(df['Infectious'] > 0) & ((df['x'] == df.loc[p,'x'])|(df['x'] == df.loc[p,'x'] - 1)|(df['x'] == df.loc[p,'x'] + 1)) & ((df['y'] == df.loc[p,'y'])|(df['y'] == df.loc[p,'y'] - 1)|(df['y'] == df.loc[p,'y'] + 1))].empty:
                        if random.choice(range(0,expose_rate)) > (expose_rate-2):
                            df.loc[p,'Exposed'] = 1
                            df.loc[p,'Susceptible'] = False
    return df

def economy_gain(df):
    economy_gain = len(df[(df.GG == False) & (df.Infectious == 0)]) * round(random.uniform(0.8,1), 2)
    return economy_gain

def current_state(df):
    inf = len(df.loc[df['Infectious'] > 0])
    exposed = len(df.loc[df['Exposed'] > 0]) 
    recovered = len(df.loc[df['Recovered'] == True])
    sus = len(df.loc[df['Susceptible'] == True])
    gg = df.loc[df['GG'] == True].GG.count()
    return np.array([recovered,sus, exposed, inf, gg])



In [34]:
#Inputs
s = 25 #size of the grid
N = 100 #size of population
M = round(N * 0.07) #Number of infectious population
Et = 2 #Number of days staying exposed
It = 21 #Number of days staying infectious
Mt = 5 #Number of daily movements
D = 5 #Number of days
death_rate = 100
expose_rate = 5

#Initialization
S = N - M #Susceptible population
E = 0 #Exposed population
I = M #Number of infectious population 
R = 0 #Recovered population
P = S + E + I + R #Total population
economy = 0 #Daily economic transaction

In [89]:
def_observation_space = Box(low = np.array([0,0,0,0,0]), high = np.array([P,P,P,P,P], dtype = int))

# Create the virtual environment for RL
class CoronaPolicy(Env):
    def __init__(self):
        self.action_space = Discrete(3)
        
        self.observation_space = def_observation_space # Box(low = 0, high = P, shape = (5,1), dtype = int )
        # Dict(recovered=Discrete(P+1), sus=Discrete(P+1), exposed=Discrete(P+1),inf=Discrete(P+1),gg=Discrete(P+1))
        
        self.state = np.array([R, S, E, I, 0])
        
        self.day = 0
        
        self.df = reset()
        
    def step(self, action):
        
        self.df = one_day(self.df, action)
        
        self.state = current_state(self.df)
        
        self.day = self.day + 1
        
        reward = economy_gain(self.df)
        
        if self.day <= D:
            done = False
        else:
            done = True
            
        info = {}
        
        return self.state, reward, done, info
    
    def render(self):
        pass
    
    def reset(self):
        self.observation_space = def_observation_space
        
        self.state = np.array([R, S, E, I, 0])
        
        self.day = 0
        
        self.df = reset()
        
        
        return self.state
        



In [90]:
env = CoronaPolicy()

In [91]:
# episodes = 10
# for episode in range(1, episodes+1):
#     state = env.reset()
#     done = False
#     economy = 0
    
#     while not done:
#         action = env.action_space.sample()
#         n_state, reward, done, info = env.step(action)
#         economy += reward
        
#     print(f'Episode: {episode} Score: {economy}')

# Deep Learning Model with Keras

In [77]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Flatten
from tensorflow.keras.optimizers import Adam

In [105]:
states

(5,)

In [122]:
def build_model(states, actions):
    model = Sequential()
    model.add(Dense(10, activation = 'relu', input_dim=1, input_shape = states))
    model.add(Dense(10, activation = 'relu'))
    model.add(Dense(actions, activation = 'linear'))
    return model

In [123]:
states = env.observation_space.shape
actions = env.action_space.n
model = build_model(states, actions)
model.summary()

Model: "sequential_13"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
dense_36 (Dense)             (None, 10)                60        
_________________________________________________________________
dense_37 (Dense)             (None, 10)                110       
_________________________________________________________________
dense_38 (Dense)             (None, 3)                 33        
Total params: 203
Trainable params: 203
Non-trainable params: 0
_________________________________________________________________


# Build Agent with Keras-RL

In [124]:
from rl.agents import DQNAgent
from rl.policy import BoltzmannQPolicy
from rl.memory import SequentialMemory

In [125]:
def build_agent(model, actions):
    policy = BoltzmannQPolicy()
    memory = SequentialMemory(limit = 1000, window_length = 1)
    dqn = DQNAgent(model = model, memory = memory, policy = policy, 
                   nb_actions = actions, nb_steps_warmup = 10, target_model_update = 1e-2)
                  
    return dqn

In [126]:
dqn = build_agent(model, actions)
dqn.compile(Adam(lr = 1e-3), metrics = ['mae'])
dqn.fit(env, nb_steps = 5000, visualize = False, verbose = 1  )

Training for 5000 steps ...
Interval 1 (0 steps performed)


ValueError: Error when checking input: expected dense_36_input to have 2 dimensions, but got array with shape (1, 1, 5)

In [None]:
scores = dqn.test(env, nb_episodes = 100, visualize = False)
print(np.mean(scores.history['episode_reward']))