In [None]:
import os
import numpy as np
import pandas as pd
from tqdm import tqdm_notebook as tqdm
import matplotlib.pyplot as plt

# Turn interactive plotting off
plt.ioff()

HOME = 0
WORK = 1

HEALTHY = 0
INFECTED = 1
TRANSMITTER = 2

def calc_coords_in_circle(x, y, radius, x_size, y_size):
    """ Calculate coordinates in circle that satisfy radius & grid (must be located within) conditions
    
    :param x:      Current x coordinate in a grid
    :param y:      Current y coordinate in a grid
    :param radius: Desired radius of a circle
    :param x_size: Grid x size (of map to place people into)
    :param y_size: Grid y size (of map to place people into)
    :return:       Satisfying coordinates
    """
    
    coords = []

    # Possible y-shifts
    for i in range(-int(np.floor(radius)), int(np.floor(radius)) + 1):
        
        # Possible x-shifts
        for j in range(int(np.floor(radius)) + 1):
            
            # Coords in circle
            if np.sqrt(i ** 2 + j ** 2) <= radius:
                
                # Append valid coordinates
                if (x + j) >= 0 and (y + i) >= 0 and (x + j) < x_size and (y + i) < y_size:
                    coords.append((x + j, y + i))
                
                # Append valid mirrored coordinates
                if j > 0:
                    
                    if (x - j) >= 0 and (y + i) >= 0 and (x - j) < x_size and (y + i) < y_size:
                        coords.append((x - j, y + i))
                
            # Coords not in circle
            else:
                
                break

    return np.array(coords)


def transport_iter(cur_x_arr, cur_y_arr, home_x_arr, home_y_arr, location_arr, x_size, y_size, radius):
    """ Transport people from current locations based on radius & map (grid) conditions
    
    :param cur_x_arr:      Array of x coordinates (current position) for all people
    :param cur_y_arr:      Array of y coordinates (current position) for all people
    :param home_x_arr: Initial array of x coordinates for all people - needed to calc person's allowed neighbourhood
    :param home_y_arr: Initial array of y coordinates for all people - needed to calc person's allowed neighbourhood
    :param location_arr:      Array of locations (home/work) for all people
    :param x_size:            Grid x size (of map to place people into)
    :param y_size:            Grid y size (of map to place people into)
    :param radius:            Limiting radius for one epoch transportation
    """

    for i, (x, y) in enumerate(zip(cur_x_arr, cur_y_arr)):

        # Skip working person
        if location_arr[i] == WORK:
            continue
        
        valid_coords = False
        
        # Get possible moves from current (x, y) location
        coords = calc_coords_in_circle(x, y, radius, x_size, y_size)
        
        # Randomly choose new move that yields neighbourhood coordinates
        while not valid_coords:
            
            rnd_move_idx = np.random.choice(coords.shape[0], 1, replace=False)[0]
            new_x, new_y = coords[rnd_move_idx]
            if np.sqrt((new_x - home_x_arr[i]) ** 2 + (new_y - home_y_arr[i]) ** 2) <= neighbourhood_radius:
                valid_coords = True
                
        cur_x_arr[i] = new_x
        cur_y_arr[i] = new_y

        
def make_disease_matrix(cur_x_arr, cur_y_arr, location_arr, work_x_arr, work_y_arr, status_arr, x_size, y_size, spread_radius):
    """ Make disease (exposure) matrix which indicates for each spot a number of ill people near, that can transfer a disease
    
    :param cur_x_arr:      Array of x coordinates (current position) for all people
    :param cur_y_arr:      Array of y coordinates (current position) for all people
    :param location_arr:      Array of locations (home/work) for all people
    :param work_x_arr:        Array of x coordinates (work position) for all people
    :param work_y_arr:        Array of y coordinates (work position) for all people
    :param status_arr: Array that indicates people's illness status: 0=healthy, 1=infected, 2=trasmitter
    :param x_size:            Grid x size (of map to place people into)
    :param y_size:            Grid y size (of map to place people into)
    :param spread_radius:     Radius of exposure of an infected person within which disease can be transfered to a healthy one
    :return:                  Disease (exposure) matrix
    """
    
    # Init disease matrix
    disease_mat = np.zeros(shape=(y_size, x_size)).astype(int)

    # Get indices of people with trasmittable disease
    disease_indices = np.where(status_arr == TRANSMITTER)[0]

    # Calculate disease spread for each infected person
    for i in disease_indices:

        if location_arr[i] == HOME:
            infected_coords = calc_coords_in_circle(cur_x_arr[i], cur_y_arr[i], spread_radius, x_size, y_size)
            
        elif location_arr[i] == WORK:
            infected_coords = calc_coords_in_circle(work_x_arr[i], work_y_arr[i], spread_radius, x_size, y_size)
            
        for x, y in infected_coords:
            disease_mat[y, x] += 1
            
    return disease_mat


def calc_infection_prob(infection_exposure, infect_prob = 0.5):
    """ Calculate infection probability based on exposure and infection's base probability
    
    :param infection_exposure: Number of infected people which can transfer the disease and located near a healthy person
    :param infect_prob:        Infection's base probability
    :return:                   Probability of infection
    """
    return 1 - (1 - infect_prob) ** infection_exposure


def spread_disease(disease_mat, cur_x_arr, cur_y_arr, location_arr, work_x_arr, work_y_arr, status_arr, incubation_timer_arr, timer_min, timer_max, infect_prob):
    """ Spread disease between infected and healthy people based on disease_mat. Updates status_arr & incubation_timer_arr
    
    :param disease_mat:       Map of overlapping areas of disease exposure from current infected people
    :param cur_x_arr:      Array of x coordinates (current position) for all people
    :param cur_y_arr:      Array of y coordinates (current position) for all people
    :param location_arr:      Array of locations (home/work) for all people
    :param work_x_arr:        Array of x coordinates (work position) for all people
    :param work_y_arr:        Array of y coordinates (work position) for all people
    :param status_arr: Array that indicates people's illness status: 0=healthy, 1=infected, 2=trasmitter
    :param incubation_timer_arr:  Array of timers (expressed in epochs until) for each person, which indicate when person can transfer disease (if person was infected)
    :param timer_min:         Min steps (epochs) until infected person can transmit a disease (exception: initial group)
    :param timer_max:         Max steps (epochs) until infected person can transmit a disease (exception: initial group)
    :param infect_prob:       Disease's base probability of transmission
    """
    
    # infected_count = 0
    # new_transmitters_count = 0
    
    assert cur_x_arr.size == cur_y_arr.size
    for i in range(cur_x_arr.size):

        # Get infection exposure for person
        if location_arr[i] == HOME:
            infection_exposure = disease_mat[cur_y_arr[i], cur_x_arr[i]]
            
        elif location_arr[i] == WORK: 
            infection_exposure = disease_mat[work_y_arr[i], work_x_arr[i]]

        # Calculate probability of getting ill and illness' outcome
        infection_prob = calc_infection_prob(infection_exposure, infect_prob)
        infection_outcome = np.random.choice([HEALTHY, INFECTED], p=[1 - infection_prob, infection_prob], replace=False)

        # Infect only healthy people
        if infection_outcome == INFECTED and status_arr[i] == HEALTHY and incubation_timer_arr[i] == 0:
            status_arr[i] = infection_outcome
            incubation_timer_arr[i] = np.random.choice(range(timer_min, timer_max + 1), replace=False)
            # infected_count += 1
            
            # Instant transmitter
            if incubation_timer_arr[i] == 0:
                status_arr[i] = TRANSMITTER
                # new_transmitters_count += 1
                
    # print('[spread_disease] infected_count={}\tnew_transmitters_count={}'.format(infected_count, new_transmitters_count))

            
def plot_disease_exposure(cur_x_arr, cur_y_arr, location_arr, work_x_arr, work_y_arr, status_arr, spread_radius, epoch=None, path=None):
    """ Plot disease (exposure) matrix
    
    :param cur_x_arr:       Array of x coordinates (current position) for all people
    :param cur_y_arr:       Array of y coordinates (current position) for all people
    :param location_arr:       Array of locations (home/work) for all people
    :param work_x_arr:         Array of x coordinates (work position) for all people
    :param work_y_arr:         Array of y coordinates (work position) for all people
    :param status_arr:  Array that indicates people's illness status: 0=healthy, 1=infected, 2=trasmitter
    :param spread_radius:      Disease spreading radius
    :param epoch:              Current epoch in simulation
    :param path:               Absolute path to save plot to
    """
    
    home_indices = np.where(location_arr == HOME)[0]
    work_indices = np.where(location_arr == WORK)[0]
    
    fig, ax = plt.subplots(figsize=(15, 15))
    
    ax.scatter(cur_x_arr[home_indices], cur_y_arr[home_indices], s=1, c='grey', alpha=0.1)
    ax.scatter(work_x_arr[work_indices], work_y_arr[work_indices], s=1, c='grey', alpha=0.1, label='Uninfected')
    
    ax.scatter(cur_x_arr[(status_arr == INFECTED) & (location_arr == HOME)], cur_y_arr[(status_arr == INFECTED) & (location_arr == HOME)], s=spread_radius, c='yellow', alpha=0.5)
    ax.scatter(work_x_arr[(status_arr == INFECTED) & (location_arr == WORK)], work_y_arr[(status_arr == INFECTED) & (location_arr == WORK)], s=spread_radius, c='yellow', alpha=0.5, label='Infected (visible + invisible)')
    
    ax.scatter(cur_x_arr[(status_arr == TRANSMITTER) & (location_arr == HOME)], cur_y_arr[(status_arr == TRANSMITTER) & (location_arr == HOME)], s=spread_radius, c='red', alpha=1)
    ax.scatter(work_x_arr[(status_arr == TRANSMITTER) & (location_arr == WORK)], work_y_arr[(status_arr == TRANSMITTER) & (location_arr == WORK)], s=spread_radius, c='red', alpha=1, label='Infected (visible = transmitters)')
    
    ax.set_title(f'Disease (exposure) matrix - visibly ill people = {(status_arr == TRANSMITTER).sum()}, infected total = {(status_arr != HEALTHY).sum()} - epoch {epoch}')
    ax.legend(loc=2)

    if epoch is not None and path is not None:

        # Create folder if it doesn't exist
        if not os.path.exists(path):
            os.mkdir(path)

        # Save plot
        fig.savefig(os.path.join(path, f'Disease_matrix_epoch_{epoch}.png'), dpi=300)

    plt.close(fig)

            
def simulate_transportations_with_infections(residents_num, x_size, y_size, init_transmitters_num, timer_min, timer_max, neighbourhood_radius, infect_prob, radius, spread_radius, epochs, plot_disease_matrix=None):
    """ Simulate people transportation and disease spread in a square grid
    
    :param residents_num:           Number of people in simulation
    :param x_size:               Grid x size (of map to place people into)
    :param y_size:               Grid y size (of map to place people into)
    :param init_transmitters_num:    Initial infected people number
    :param timer_min:            Min steps (epochs) until infected person can transmit a disease (exception: initial group)
    :param timer_max:            Max steps (epochs) until infected person can transmit a disease (exception: initial group)
    :param neighbourhood_radius: Maximum distance allowed to travel for each person from his initial location
    :param epochs:               Steps to perform during each people 1) travel and 2) spread the disease
    :param radius:               Maximum radius for person to travel in single epoch
    :param spread_radius:        Disease spreading radius
    :param infect_prob:          Base probability for disease to transmit
    :param plot_disease_matrix:  Path to save plot of disease (exposure) matrix before transmitting a disease in each epoch
    :return:                     Number of ill (visible + invisible) people for each epoch, number of ill (visible) people that can transmit a disease for each epoch
    """

    ###########################
    # Init variables          #
    ###########################

    # Init people arrays
    cur_x_arr = np.zeros(residents_num).astype(int)
    cur_y_arr = np.zeros(residents_num).astype(int)
    status_arr = np.zeros(residents_num).astype(int)  # Illness status
    incubation_timer_arr = np.zeros(residents_num).astype(int)  # Timer untill illnes becomes observable
    work_x_arr = np.zeros(residents_num).astype(int)
    work_y_arr = np.zeros(residents_num).astype(int)
    location_arr = np.zeros(residents_num).astype(int)

    # Randomly initiate infected group
    assert residents_num == status_arr.size
    status_arr[np.random.choice(residents_num, init_transmitters_num, replace=False)] = 2

    ###########################
    # Init coordinate arrays  #
    ###########################

    # Loop through each person to place him on a grid
    for i in range(status_arr.size):

        # Make that matrix is square
        assert x_size == y_size
        
        # Init home coordinates
        x, y = np.random.randint(x_size, size=2)
        cur_x_arr[i], cur_y_arr[i] = x, y
        
        # Init work coordinates
        x, y = np.random.randint(x_size, size=2)
        work_x_arr[i], work_y_arr[i] = x, y

    home_x_arr = cur_x_arr.copy()
    home_y_arr = cur_y_arr.copy()
    
    ###########################
    # Run simulations         #
    ###########################

    disease_tracker = []
    visible_disease_tracker = []
    
    hour = 1
    
    for i in tqdm(range(1, epochs + 1)):

        if hour > 24:
            hour -= 24
        
        # Make disease visible (and transmittable)
        if i > 1:
            
            # Make illness observable
            new_disease_observations = np.where(incubation_timer_arr == 1)[0]
            status_arr[new_disease_observations] = 2

            # Decrement people counter with unabservable illness
            disease_indices = np.where(incubation_timer_arr > 0)[0]
            incubation_timer_arr[disease_indices] -= 1
            
            
        # Transport one-third of the people to work
        if (hour % 7 == 0) & (hour % 2 != 0) & (hour % 3 != 0):
            
            assert (location_arr == HOME).sum() == location_arr.size
            worker_indices = np.random.choice(np.where(location_arr == HOME)[0], int(np.floor(location_arr.size / 3)), replace=False)
            location_arr[worker_indices] = WORK
            
        # Transport one-third of the people to work
        elif (hour % 8 == 0) & (hour % 2 != 0) & (hour % 3 != 0):
            
            worker_indices = np.random.choice(np.where(location_arr == HOME)[0], int(np.floor(location_arr.size / 3)), replace=False)
            location_arr[worker_indices] = WORK
            
        # Transport rest (one-third) of the people to work
        elif (hour % 9 == 0) & (hour % 2 != 0):
            
            worker_indices = np.where(location_arr == HOME)[0]
            location_arr[worker_indices] = WORK
            
        # Transport one-third of the people from work
        elif hour % 19 == 0:
            
            assert (location_arr == WORK).sum() == location_arr.size
            worker_indices = np.random.choice(np.where(location_arr == WORK)[0], int(np.floor(location_arr.size / 3)), replace=False)
            location_arr[worker_indices] = HOME
            
            # Reset coordinates to home
            cur_x_arr[worker_indices] = home_x_arr[worker_indices]
            cur_y_arr[worker_indices] = home_y_arr[worker_indices]
            
        # Transport one-third of the people from work
        elif hour % 20 == 0:
            
            worker_indices = np.random.choice(np.where(location_arr == WORK)[0], int(np.floor(location_arr.size / 3)), replace=False)
            location_arr[worker_indices] = HOME
            
            # Reset coordinates to home
            cur_x_arr[worker_indices] = home_x_arr[worker_indices]
            cur_y_arr[worker_indices] = home_y_arr[worker_indices]
            
        # Transport rest (one-third) of the people from work
        elif hour % 21 == 0:
            
            worker_indices = np.where(location_arr == WORK)[0]
            location_arr[worker_indices] = HOME
            
            # Reset coordinates to home
            cur_x_arr[worker_indices] = home_x_arr[worker_indices]
            cur_y_arr[worker_indices] = home_y_arr[worker_indices]
            
        # Transport people (walk for those who aren't working)
        transport_iter(cur_x_arr, cur_y_arr, home_x_arr, home_y_arr, location_arr, x_size, y_size, radius)

        # Observe disease map
        disease_mat = make_disease_matrix(cur_x_arr, cur_y_arr, location_arr, work_x_arr, work_y_arr, status_arr, x_size, y_size, spread_radius)
        
        # Plot & save disease exposure map
        if plot_disease_matrix is not None:
            plot_disease_exposure(cur_x_arr, cur_y_arr, location_arr, work_x_arr, work_y_arr, status_arr, spread_radius, epoch=i, path=plot_disease_matrix)
            
        # Spread disease (based on the map above)
        spread_disease(disease_mat, cur_x_arr, cur_y_arr, location_arr, work_x_arr, work_y_arr, status_arr, incubation_timer_arr, timer_min, timer_max, infect_prob)
        
        # Track stats
        disease_tracker.append((status_arr != HEALTHY).sum())
        visible_disease_tracker.append((status_arr == TRANSMITTER).sum())
        
        hour += 1
        
        # Debug
        print('[epoch={}]\tinfected(total)={}\ttransmitters(visibly ill)={}'.format(i, (status_arr != HEALTHY).sum(), (status_arr == TRANSMITTER).sum()))
        
    return np.array(disease_tracker), np.array(visible_disease_tracker)

In [None]:
residents_num = 30000        # Number of people in simulation
x_size, y_size = 3000, 3000  # Grid sizes (map to place people into)
init_transmitters_num = 100  # Initial infected people number
timer_min = 1                # Min steps (epochs) until infected person can transmit a disease (exception: initial group)
timer_max = 12               # Max steps (epochs) until infected person can transmit a disease (exception: initial group)
neighbourhood_radius = 120   # Maximum distance allowed to travel for each person from his initial location
epochs = 120                 # Steps to perform during each people 1) travel and 2) spread the disease

radius = 1.5                 # Maximum radius for person to travel in single epoch
spread_radius = 20           # Disease spreading radius
infect_prob = 1.0            # Base probability for disease to transmit

# Path to plot of disease (exposure) matrix before transmitting a disease in each epoch
plot_disease_matrix = r'C:\Users\va\Transport_networks\radius_{}_spread_radius_{}_infected_prob_{}'.format(str(radius).replace('.', '_'),
                                                                                                           str(spread_radius).replace('.', '_'),
                                                                                                           str(infect_prob).replace('.', '_'))

# Function to perform simulation
disease_tracker, visible_disease_tracker = simulate_transportations_with_infections(residents_num, \
                                                                                    x_size, y_size, \
                                                                                    init_transmitters_num, \
                                                                                    timer_min, \
                                                                                    timer_max, \
                                                                                    neighbourhood_radius, \
                                                                                    infect_prob, \
                                                                                    radius, \
                                                                                    spread_radius, \
                                                                                    epochs, \
                                                                                    plot_disease_matrix)

In [None]:
fig, ax = plt.subplots(figsize=(15, 5))
ax.plot(disease_tracker, '.-', c='tab:blue', label='Фдд infected people')
ax.plot(visible_disease_tracker, '.-', c='tab:red', label='Visible (transmitable) infected people')
ax.set_ylabel('Infected')
ax.set_xlabel('Time')
ax.set_title(f'Infected vs. time (radius={radius}, spread_radius={spread_radius}, infect_prob={infect_prob})')
ax.grid()
ax.legend()

fig.savefig(os.path.join(plot_disease_matrix, f'Transport_networks_{x_size}x{y_size}_{people_num}_people_{epochs}_epochs_infected_ts.png'), dpi=300);

plt.show()