In [1]:
import numpy as np
import pandas as pd
from tqdm.auto import tqdm
np.random.seed(0)

In [2]:
# Defining a class to retain statistics for people within our population
class Person():
    def __init__ (self):
        self.is_alive = True
        self.is_vaccinated = False
        self.is_infected = False
        self.has_been_infected = False
        self.newly_infected = False
        self.random_pct = np.random.random()
    
    # Vaccinating a person according to the percent vaccinated stat we set for 
    # our population
    def get_vaccinated(self, pct_vaccinated):
        if self.random_pct >= (1 - pct_vaccinated):
            self.is_vaccinated = True
        else:
            self.is_vaccinated = False    

In [3]:
# Defining a class to retain the statistics about the simulated disease
class Simulation():
    def __init__(self,
                 population_size,
                 disease_name,
                 r0,
                 mortality_rate,
                 total_time_steps,
                 pct_vaccinated,
                 num_initial_infected):
        self.population_size = population_size
        self.disease_name = disease_name
        self.r0 = r0/100
        self.mortality_rate = mortality_rate
        self.total_time_steps = total_time_steps
        self.pct_vaccinated = pct_vaccinated
        self.num_initial_infected = num_initial_infected
        self.current_time_step = 0
        self.total_infected_counter = 0
        self.current_infected_counter = 0
        self.dead_counter = 0
        self.population = []
        self.time_step_statistics_df = pd.DataFrame()
        
        # Creating instances of the Person class according to our population size
        for i in range(population_size):
            # Create new person
            new_person = Person()
            # Adding infected persons to our simulation first. If the
            # current number of infected is not equal to the num_initial_infected
            # parameter, setting new_person to be infected
            if self.current_infected_counter <= num_initial_infected:
                new_person.is_infected = True
                # Incrementing both infected counters!
                self.total_infected_counter += 1
                self.current_infected_counter += 1
            # If new_person is not infected, determining if they are vaccinated 
            # or not, then appending new_person to self.population
            else:
                new_person.get_vaccinated(self.pct_vaccinated)
            self.population.append(new_person)
       
        print("-" * 50)
        print("Simulation Initiated!")
        print("-" * 50)
        self._get_sim_statistics()
    
    # Collecting and printing statistics about our simulation based on the 
    # population, using this provided formula
    def _get_sim_statistics(self):
        num_infected = 0
        num_dead = 0
        num_vaccinated = 0
        num_immune = 0
        for i in self.population:
            if i.is_infected:
                num_infected += 1
            if not i.is_alive:
                num_dead += 1
            if i.is_vaccinated:
                num_vaccinated += 1
                num_immune += 1
            if i.has_been_infected:
                num_immune += 1
        assert num_infected == self.current_infected_counter
        assert num_dead == self.dead_counter

        print("")
        print("Summary Statistics for Time Step {}".format(self.current_time_step))
        print("")
        print("-" * 50)
        print("Disease Name: {}".format(self.disease_name))
        print("R0: {}".format(self.r0 * 100))
        print("Mortality Rate: {}%".format(self.mortality_rate * 100))
        print("Total Population Size: {}".format(len(self.population)))
        print("Total Number of Vaccinated People: {}".format(num_vaccinated))
        print("Total Number of Immune: {}".format(num_immune))
        print("Current Infected: {}".format(num_infected))
        print("Deaths So Far: {}".format(num_dead))   

In [4]:
# Creating interactions where an infected person within our population can
# potentially infect others


def infected_interaction(self, infected_person):
    num_interactions = 0
    while num_interactions < 100:
        # Randomly selecting a person from self.population
        random_person = np.random.choice(self.population)
        # Creating an interaction if the person is alive
        if random_person.is_alive == True:
            # CASE: Random person is not vaccinated, and has not been infected
            # before, making them vulnerable to infection
            if random_person.is_vaccinated == False and random_person.has_been_infected == False:
                # Generating a random number between 0 and 1
                random_number = np.random.random()
                # If random_number is greater than or equal to (1 - self.r0),
                # setting random person as newly_infected
                if random_number >= (1 - self.r0):
                    random_person.newly_infected = True
            # Incrementing the number of interactions, now that there has been
            # a successful interaction
            num_interactions += 1
        else:
            continue


# Adding this function to our Simulation class
Simulation.infected_interaction = infected_interaction

In [5]:
# Resolving the state of each person at the end of a time step
def _resolve_states(self):
    # Iterating through each person in the population
    for person in self.population:
        # Only concerning ourselves with people who are alive
        if person.is_alive == True: 
            # CASE: Person was infected this round. Stochastically determining 
            #if they die or recover from the disease
            if person.is_infected == True:
                # Generating a random number BETWEEN 0 AND 1
                random_number = np.random.random()
                # If random_number is >= (1 - self.mortality_rate), setting the 
                # person to dead and incrementing the simulation's death counter
                if random_number >= (1 - self.mortality_rate):
                    # Set is_alive and is_infected both to False
                    person.is_alive = False
                    person.is_infected = False
                    # Incrementing self.dead_counter, and 
                    # decrementing self.current_infected_counter
                    self.dead_counter += 1
                    self.current_infected_counter -= 1
                else:
                    # CASE: They survive the disease and recover. Setting
                    # is_infected to False and has_been_infected to True
                    person.is_infected = False
                    person.has_been_infected = True
                    # Decrementing self.current_infected_counter
                    self.current_infected_counter -= 1
            # CASE: Person was newly infected during this round, and needs to 
            # be set to infected before the start of next round
            elif person.newly_infected == True:
                # Setting is_infected to True, newly_infected to False, and 
                # incrementing both self.current_infected_counter and 
                # self.total_infected_counter
                person.is_infected = True
                person.newly_infected = False
                self.current_infected_counter += 1
                self.total_infected_counter += 1

# Adding this function to our Simulation class
Simulation._resolve_states = _resolve_states

In [6]:
# Computing 1 time step within the simulation
def _time_step(self):
    # Iterating through each person in the population
    for person in self.population:
        # Checking only for people that are alive and infected
        if person.is_alive == True and person.is_infected == True:
            # Calling self.infected_interaction() on this infected person
            self.infected_interaction(person)
        else:
            continue
    
    # Calling to resolve states for the population
    self._resolve_states()
     
    # Logging summary statistics, and incrementing self.current_time_step by 1.
    self._log_time_step_statistics
    self.current_time_step += 1

# Adding this function to our Simulation class
Simulation._time_step = _time_step

In [7]:
def _log_time_step_statistics(self, write_to_file=False):
    # Getting the current number of dead, using this provided function
    # CASE: Round 0 of simulation, need to create and structure DataFrame
    if self.time_step_statistics_df == None:
        import pandas as pd
        self.time_step_statistics_df = pd.DataFrame()
        col_names = ["Time Step",
                     "Currently Infected",
                     "Total Infected So Far",
                     "Alive",
                     "Dead"]
        self.time_step_statistics_df.columns = col_names
    # CASE: Any other round
    else:
        # Computing summary statistics for currently infected, alive, and dead,
        # and appending them to time_step_statistics_df
        row = {
            "Time Step": self.current_time_step,
            "Currently Infected": self.current_infected_counter,
            "Total Infected So Far": self.total_infected_counter,
            "Alive": len(self.population) - self.dead_counter,
            "Dead": self.dead_counter
        }
    self.time_step_statistics_df = self.time_step_statistics_df.append(
        row, ignore_index=True)

    if write_to_file:
        self.time_step_statistics_df.to_csv("simulation.csv", mode='w+')


# Adding this function to our Simulation class
Simulation._log_time_step_statistics = _log_time_step_statistics

In [8]:
# Defining a function to run the whole simulation
def run(self):
    for _ in tqdm(range(self.total_time_steps)):
        # Printing out the current time step 
        print(f"Beginning Time Step {self.current_time_step}"
        # Call our '_time_step()' function
        self._time_step()
    
    # Simulation is over, so logging results to a file by calling 
    # _log_time_step_statistics(write_to_file=True)
    self._log_time_step_statistics(write_to_file=True)

# Adding the run() function to our Simulation class.
Simulation.run = run

In [9]:
sim = Simulation(2000, "Ebola", 2, .5, 20, .85, 50)

--------------------------------------------------
Simulation Initiated!
--------------------------------------------------

Summary Statistics for Time Step 0

--------------------------------------------------
Disease Name: Ebola
R0: 2.0
Mortality Rate: 50.0%
Total Population Size: 2000
Total Number of Vaccinated People: 1643
Total Number of Immune: 1643
Current Infected: 51
Deaths So Far: 0


In [10]:
run(sim)

HBox(children=(IntProgress(value=0, max=20), HTML(value='')))

Beginning Time Step 0
Beginning Time Step 1
Beginning Time Step 2
Beginning Time Step 3
Beginning Time Step 4
Beginning Time Step 5
Beginning Time Step 6
Beginning Time Step 7
Beginning Time Step 8
Beginning Time Step 9
Beginning Time Step 10
Beginning Time Step 11
Beginning Time Step 12
Beginning Time Step 13
Beginning Time Step 14
Beginning Time Step 15
Beginning Time Step 16
Beginning Time Step 17
Beginning Time Step 18
Beginning Time Step 19



ValueError: The truth value of a DataFrame is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all().

In [11]:
results = pd.read_csv('simulation.csv')
results

FileNotFoundError: [Errno 2] File b'simulation.csv' does not exist: b'simulation.csv'