Roughly reproducing the simulator on the Washington Post
https://www.washingtonpost.com/graphics/2020/world/corona-simulator/

In [3]:
"""
Two repeating events

1. Update infection statuses
    1a. Transmission 
        1a1. Check whether agents are within transmission distance
        1a2. Determine with infection transmits (v1 assume probability = 1, future model adjust probabilities). 
            Transmission rates/probabilities are relatedc to people's hygiene practices.
            E.g. use of gloves, washing of hands reduces transmission rates.
            
        1a3. Update status
        
    1b. Recovery (vs death)
        
        Update status
    
    1c. Death
        Probability based on factors: 
            age, 
            probability of being a smoker, 
            hospital load: hospital_capacity vs (current infected * probability of being hospitalized)
                probability of being covered = (USA: x %, rest of OECD: 1)
            +random factor
        
        
2. Movement    
        Update location of each agent
    Factors:
        social isolation: some % of agents do not move         


"""
None

In [4]:
%matplotlib ipympl

In [5]:
import pandas as pd
import numpy as np
from matplotlib import pyplot as pl
import matplotlib as mpl
from numpy import random as rd
import time
import pickle as pk

In [6]:
n_agents = 1000
transmission_threshold_distance = 0.01
transmission_factor = 1
recovery_days = 14
probability_of_mortality = 0.02
proportion_static = 0.5
hospital_capacity_per_1000 = 10 # https://en.wikipedia.org/wiki/List_of_countries_by_hospital_beds
icus_per_100k = 20 # https://en.wikipedia.org/wiki/List_of_countries_by_hospital_beds

# Andrew Cuomo: 
# 12% need hospital beds
# 3% need ICU/ventilators


speed = 0.1
initial_infections = 1

status_colors = {0: 'blue', 1: 'brown', 2: 'pink', 3: 'black'}
def status_color(status):
    return status_colors[status]

def initialize_agents(n_agents, initial_infections, proportion_static):
    x_pos = [rd.uniform() for _ in range(n_agents)]
    y_pos = [rd.uniform() for _ in range(n_agents)]

    x_dir = [rd.uniform()*speed for _ in range(n_agents)]
    y_dir = [rd.uniform()*speed for _ in range(n_agents)]
    
    days_infected = [0 for _ in range(n_agents)]

    agent_order_infected = rd.permutation(n_agents)

    agent_status = [0 for _ in range(n_agents)]
    # status 
    # 0 - uninfected
    # 1 - infected
    # 2 - recovered
    # 3 - mortality

    # create patient(s) zero
    for agent_index in agent_order_infected[0:initial_infections]:
        agent_status[agent_index]  = 1
        days_infected[agent_index]  = 1
            

    # set static
    agent_order_static = rd.permutation(n_agents)
    for agent_index in agent_order_static[0:int(proportion_static*n_agents)]: 
        # Set speed to zero for these patients that are "self-isolating"
        # They may get visits from infected people, but won't actively spread to others who are isolated.
        x_dir[agent_index] = 0
        y_dir[agent_index] = 0
        
    agents = pd.DataFrame({'x pos': x_pos, 
                           'y pos': y_pos, 
                           'status': agent_status, 
                           'x dir': x_dir, 
                           'y dir': y_dir, 
                           'days infected': days_infected,
                           'color': [status_color(status) for status in agent_status],
                          })

    return agents


In [7]:
def agents_are_close(threshold, agent1_x_pos, agent1_y_pos, agent2_x_pos, agent2_y_pos, verbose=False):
    
    distance = np.sqrt( (agent1_x_pos-agent2_x_pos) ** 2 + (agent1_y_pos-agent2_y_pos) ** 2 )
    
    if verbose:
        print(f"Distance: {distance}. Threshold: {threshold}. Close? {distance <= threshold}")
    
    return distance <= threshold


# test:
if not agents_are_close( 0.1, 0.8, 0.3, 0.81, 0.28 ):
    raise(ValueError("Distance function not working")) # Not really a value error. This should be implemented as a unit test.

In [8]:
def transmit_disease(agents_curr):
    agents = agents_curr.copy()
    
    agents.loc[:,'update as infected'] = False

    infected_agents = agents.loc[agents['status']==1,:]
    print(f"Number infected agents: {infected_agents.shape[0]}")
    
    # update infected status
    for agent1_index in range(n_agents):
        # 
        agent1 = agents.iloc[agent1_index]

        for agent2_index in range(infected_agents.shape[0]): #[:agent1_index]:
            agent2 = infected_agents.iloc[agent2_index]
            
            agents_close = agents_are_close( transmission_threshold_distance, 
                                            agent1['x pos'], 
                                            agent1['y pos'], 
                                            agent2['x pos'], 
                                            agent2['y pos'],
                                            verbose=False)            
            
            # future version, do a self merge for efficiency @TODO
            if agents_close \
                and rd.uniform() <= transmission_factor \
                and agent1['status'] == 0 \
                and agent2['status'] == 1 \
                and agent2['days infected'] <= recovery_days: # future, agent2 infected days between min and max 
                
                    # print(f'Transmitting to agent {agent1_index}')
                    
                    agents.loc[agent1_index,'update as infected'] = True
                    #agents.loc[agent2_index,'update as infected'] = True

                    # future, update infection days count
                    # agents.loc[agent1_index,'days infected']
                    
    print(f"Number of new cases: {agents['update as infected'].sum()}")
                    
    # Assign infected status
    agents.loc[agents['update as infected'],'status'] = 1
    #agents.loc[agents['update as infected'],'color'] = [] # managed below
    agents.loc[agents['status']==1,'days infected'] = agents.loc[agents['status']==1,'days infected']+1
    
    # recovery - 
    survival = agents['status'].apply(lambda status: rd.uniform()>=probability_of_mortality )
    recovered = (agents['status']==1) & survival & (agents['days infected'] == 14)
    agents.loc[recovered,'status'] = 2
    
    fatality = (agents['status'] == 1) & (~survival) & (agents['days infected'] == 14)
    agents.loc[fatality,'status'] = 3 
    agents.loc[fatality,'x dir'] = 0
    agents.loc[fatality,'y dir'] = 0 # update marker to X
    
    #     
    agents['color'] = agents['status'].apply(status_color)
    
    return agents

In [9]:
# #agents.head()
# agents

# print(agents.loc[agents['status']==1,:])

In [10]:
# agents

In [11]:
#transmit_disease(agents)

In [12]:
# agent_state = []
# agent_state.append(agents)
# agent_state.append(transmit_disease(agents))

# agent_state[1].loc[agent_state[1]['status']==1,:]

In [13]:
adjust_outofrange = lambda pos, bounds: pos if (pos <= bounds[1] and pos >= bounds[0]) else (pos-1 if pos>bounds[1] else pos+1)

# def new_location(x_new, y_new, bounds):
#     # future, x1, y1, x2, y2, x_velocity, y_velocity
    
#     return (adjust_outofrange(x_new, [0,1]), adjust_outofrange(y_new, [0,1])) # too much engineering? Setting up for future new_location which could bound agents of bounds and other agents.
 
def move_agents(agents_orig):
    
    agents = agents_orig.copy()
    
    agents['x pos'] = agents['x pos'] + agents['x dir']
    agents['y pos'] = agents['y pos'] + agents['y dir']
    
    # update if outside of range - rather than "bouncing" off other agents, we assume that the continue to move within the space starting from the other extremee.
    agents['x pos'] = agents['x pos'].apply(lambda x: adjust_outofrange(x, [0,1]) )
    agents['y pos'] = agents['y pos'].apply(lambda x: adjust_outofrange(x, [0,1]) )
        
    return agents

In [14]:
# run simulation (100 iterations)
def run_sim(n_periods = 100, n_agents=1000, initial_infections=1, proportion_static=0, initial_state=[] ):  

    if initial_state==[]:
        # def update
        agents = initialize_agents(n_agents, initial_infections, proportion_static)        

        #print(agents['status'].to_list())# eye-balling that there is one (or initial_infections) agent with status=1 and the rest zero
                
        agent_state = []
        agent_state.append(agents)
    else: 
        # provides option to give starting point (e.g. if half a sim ran, can restart by supplying the existing state list)
        agent_state = initial_state

    period_i = 0
    while period_i<n_periods: #period_i in range(n_periods):    
        print(f'period={period_i}')
        # transmit
        agent_state.append(transmit_disease(agent_state[-1]))

        # move - reuse
        agent_state[-1] = move_agents(agent_state[-1])         

        period_i += 1
                       
    return agent_state




In [15]:
has_already_run = True
if not has_already_run:
    # unmitigated = run_sim(n_periods = 100, n_agents=1000, initial_infections=1, proportion_static=0 )
    ##unmitigated = run_sim(n_periods = 50, initial_state=unmitigated )
    with open('data/unmitigated_simulation.pkl', 'wb' ) as file:
        pk.dump(unmitigated, file)
    # isolation_50pct = run_sim(n_periods = 100, n_agents=1000, initial_infections=1, proportion_static=0.5 )
    with open('data/isolation_50pct_simulation.pkl', 'wb' ) as file:
        pk.dump(isolation_50pct, file)    
        
else: 
    with open('data/unmitigated_simulation.pkl','rb') as file:    
        unmitigated = pk.load(file)
    with open('data/isolation_50pct_simulation.pkl','rb') as file:    
        isolation_50pct = pk.load(file)
        

In [16]:
# unmitigated[-1]

In [17]:
def agent_state_summary(agents):
    status_counts = agents.groupby('status')['days infected'].count()
    
    return status_counts

In [18]:
# move - reuse


In [19]:
def plot_statuses_over_time(agent_state,title):
    status_over_time = pd.DataFrame(data=[agent_state_summary(agents) for agents in agent_state], index=range(len(agent_state)))

    status_over_time.plot()
    pl.title(title)
    pl.xlabel('Days')
    pl.legend(['Unaffected','sick','recovered','mortality'])

    status_over_time.plot.area()
    pl.title(title)
    pl.xlabel('Days')
    pl.legend(['Unaffected','sick','recovered','mortality'])
    pl.ylim([0,n_agents])
    
    return status_over_time



In [20]:
unmitigated_50pct_summary = plot_statuses_over_time(unmitigated,'Unmitigated')

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

In [21]:
isolation_50pct_summary = plot_statuses_over_time(isolation_50pct,'50% isolation')

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

In [22]:
#agent_state[-1].loc[agent_state[-1]['status']>=1,:]
#unmitigated

In [47]:
import matplotlib.animation as animation
from matplotlib.colors import ListedColormap

color_list = ['blue','peru','pink','black']
cmap = ListedColormap(color_list, N=4)
    
#     .from_list(
#     'status colors',
#     [
#         [0, 0, 1], # blue
#         [0.5, 0.5, 0], # 165,42,42 brown
#         [0.5, 1, 0.5], # pink
#         [0, 0, 0]
#     ])

def plot_agents(agents):
        
   # fig = pl.figure()
    axes = agents.plot.scatter(
        x='x pos', 
        y='y pos', 
        c=agents['status'].apply(int), #'status',
        marker='o',
        #cmap=cmap,
        #colorbar=None,
        figsize=(10,10)
    ) # colormap
        
    pl.xticks([])    
    _ = pl.yticks([])    
    xlabel = pl.xlabel("period=0")
    pl.ylabel("")
    pl.xlim([0,1])
    pl.ylim([0,1])
    pl.title("Agents by location")
    #pl.legend([])
    
    scat = [child for child in axes.get_children() if type(child) is mpl.collections.PathCollection][0]

    
    return (axes.figure, axes, scat, xlabel)


fig, axes, scat, xlabel = plot_agents(unmitigated[0])

# pl.set_cmap(cmap)

# https://stackoverflow.com/questions/9401658/how-to-animate-a-scatter-plot
def update_agent_plot( period, agent_state ):
    #
    scat.set_offsets(agent_state[['x pos','y pos']].to_numpy())
    scat.set_array( agent_state['status']/agent_state['status'].max() )
    scat.set_cmap( ListedColormap(color_list[0:agent_state['status'].max()]) )
    
    xlabel.set_text(f'period={period}')
    
    return scat,
    
agents = unmitigated

# for i in range(10):
#     update_agent_plot( 1, unmitigated[1] )

    
# enumerate(agents)
ani = animation.FuncAnimation(
                              fig, 
                              lambda period_state: update_agent_plot(period_state[0],period_state[1]), 
                              frames=enumerate(agents), 
                              interval=300
)

#
# ani.save('simulation.gif', writer=animation.PillowWriter())
#
#         self.ani = animation.FuncAnimation(self.fig, self.update, interval=5, 
#                                           init_func=self.setup_plot, blit=True)

# HTML(ani.to_jshtml())

#pl.show()

ani.save('unmitigated_simulation.gif')


  fig = self.plt.figure(figsize=self.figsize)


Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

MovieWriter ffmpeg unavailable; trying to use <class 'matplotlib.animation.PillowWriter'> instead.


![simulation](unmitigated_simulation.gif)

In [24]:
# def update_plot(i, data, scat):
#     print(i)
#     scat.set_array(data[i])
#     return scat,

# def main():
#     numframes = 100
#     numpoints = 10
#     color_data = np.random.random((numframes, numpoints))
#     x, y, c = np.random.random((3, numpoints))

#     fig = pl.figure()
#     scat = pl.scatter(x, y, c=c, s=100)

#     ani = animation.FuncAnimation(fig, update_plot, frames=40,
#                                   fargs=(color_data, scat))
#     pl.show()


# main()

In [25]:
# for i in range(len(agent_state)):
#     print(i)

In [26]:
#next(enumerate(agents[:2]))

In [27]:
# for i in range(len(agent_state)):
#     plot_agents(agent_state[i])

In [28]:
#type(axes_children[0])
# type(axes_children[0]) is mpl.collections.PathCollection

In [29]:
# plot_agents(agent_state)