<a href="https://colab.research.google.com/github/marianadpl/DataMiningSales/blob/master/Untitled1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import random
import pandas as pd

In [None]:
class Simulation():
    
    """Initialize the simulation with settings"""
    def __init__(self, m=40, n=25, r=6, k=20, infected_locations=None):
        self.m = m #number of columns of individuals, in case location is not provided
        self.n = n #number of rows of individuals, in case location is not provided
        self.r = r #Individuals can only be in contact with neighbours at a distance of at most r
        self.k = k #avg number of contacts
        self.infected_locations = infected_locations #locations are provided 
    
    """Method to create a dataframe with coordinates for each individual"""
    def create_population(self): #
        if self.infected_locations is None: #if user does not provide the locations, create coordinates based on columns and rows
          population = [] #empty list for population
          x_coord = [] #empty list for x coordinate
          y_coord = [] #empty list for y coordinate
          for x in range(self.m): #create the coordinates based on columns defined above and...
            for y in range(self.n): #...rows defined above
                x_coord.append(x) #save created coordinates for X
                y_coord.append(y) #save created coordinates for y
                population.append((x, y)) #create list of tuples with coordinates for each individual
          self.population = pd.DataFrame({'Individual': population,
                                          'X_coord': x_coord,
                                          'Y_coord': y_coord })   #save in a dataframe
        elif self.infected_locations is not None: #if the user provides the location, use them
                    self.population = pd.DataFrame(self.infected_locations, columns=['X_coord', 'Y_coord']) # save each coordinate in a column
                    self.population['Individual']= '('+ self.population['X_coord'].map(str) + ',' + self.population['Y_coord'].map(str) +')' #create colum with both coordinates


    """Method to calculate distance between 2 pairs: Manhattan Distance between two points (X1, Y1) 
    and (X2, Y2) is given by |X1 – X2| + |Y1 – Y2| """
    def calculate_distance(self, pair1, pair2):
        return abs(pair1[0]-pair2[0]) + abs(pair1[1]-pair2[1])
    
    """Method to check if pairs combination existes"""
    def connections_contains(self, connections, pair1, pair2):
        if (pair1,pair2) not in connections and pair1!=pair2:
            if (pair2,pair1) not in connections:
                return False
        return True
                
    """Method to create connections between individual"""
    def create_connections(self):
        self.connections = [] 
        max_connections = (self.m * self.n * self.k)/2 #max new connection
        
        while len(self.connections) < max_connections:
            p1 = random.choice(self.population['Individual']) #choose random pair from population to be pair1
            p2 = random.choice(self.population['Individual']) #choose random pair from population to be pair2
            distance = self.calculate_distance(p1, p2)        #calculate distance
            has_connection = self.connections_contains(self.connections,p1, p2)            
            if distance <=self.r and has_connection == False: #If the distance between the pair is at most r and the pair is not already connected...
                self.connections.append([p1, p2]) #...connect the pair
       
    """Initialise statuses for each individual"""
    def init_state(self): 
        alpha_infected = 0.01 #Probability that an individual is infected at the begin of the simulation
        alpha_exposed = 0.01  #Probability that an individual is exposed at the begin of the simulation
        alpha_recovered = 0   #Probability that an individual is immune to the infection at the begin of the simulation
        initial_state = np.random.choice(["I","E","R","S"], 1, True, [alpha_infected, #infected
                                                                      alpha_exposed, #exposed
                                                                      alpha_recovered, #recovered
                                                                      1-alpha_infected-alpha_exposed-alpha_recovered]) #suscetible
        
        self.population['State']= initial_state
       
  
    """Plot contact graph connecting indvidual"""   
    
    def plot_connections(self):
        for pair in range(len(self.connections)):
            x = [self.connections[pair][0][0],self.connections[pair][1][0]]
            y = [self.connections[pair][0][1],self.connections[pair][1][1]]
            plt.plot(x, y, color ='dimgrey',alpha=0.4)
        

            
    """Method plotting individual coordinates """
    def plot_individuals(self, statuses=None):
        
        if statuses:
            colors= self.population[statuses].map({'I': 'red',
                                                   'E': 'yellow',
                                                   'R': 'green',
                                                   'S': 'dodgerblue',
                                                   'D': 'black'
                                                  })
            plt.scatter(self.population['X_coord'], 
                        self.population['Y_coord'],
                        color= colors)
            
        else:
            plt.scatter(self.population['X_coord'], 
                        self.population['Y_coord'],
                        color = 'dodgerblue')
            
            
        plt.axis('off')        
        axes = plt.gca()
        axes.axes.xaxis.set_visible(False)            
        axes.axes.yaxis.set_visible(False)
        
    """Plotting initialize contact"""       
    def run(self):
        self.create_population()
        self.create_connections()_
        self.init_state()   
        fig = plt.figure(figsize=(10,10))
        self.plot_connections()
        self.plot_individuals(statuses=None)

        return plt.show()
       

    """Create a list of exposed contact"""
    def create_exposure_list(self, current_state):
        infected_person_list = self.population[(current_state)=='I']['Individual']
        self.exposure_list = []
        
        for infected_person in infected_person_list:
            for pair in range(len(self.connections)):
                if infected_person in self.connections[pair]:
                    [self.exposure_list.append(person) for person in self.connections[pair] if person!=infected_person 
                     and person not in self.exposure_list and person not in list(infected_person_list)]
        return self.exposure_list
    
    """A method that takes time as argument and plot the state of the population at that point in time"""
    def plot_state(self, N=200):
        
        self.update_state(N)
        fig = plt.figure(figsize=(10,10))
        
        i=0
        while i <= N:
            if i == 0:
                self.plot_individuals('State')
            else:
                self.plot_individuals(str(i))                
            i+=1
        self.plot_connections()
        return plt.show()
    
    """Run simulation and update status for N number of days"""
    def update_state(self, N):
        self.N = N
        i=1
        current_state = self.population['State']

        while i <= self.N:
            
            self.create_exposure_list(current_state)
            nth_day_state = []
            for person in range(len(self.population)):
                if current_state[person]=='I':
                    recover_or_die = random.random()
                    # death
                    if recover_or_die <= 0.005:
                        nth_day_state.append('D')
                    #recover    

                    elif (recover_or_die > 0.005) and (recover_or_die <=0.05):
                        nth_day_state.append('R')
                    # stay recovered

                    else:
                        nth_day_state.append('I')
                        
                elif current_state[person]=='S' and list(self.population['Individual'].isin(self.exposure_list)):
                    
                    infected_probability = random.random()
                    if infected_probability <= 0.075:
                        nth_day_state.append('I')
                    else:
                        nth_day_state.append('S')

                else:
                    nth_day_state.append(current_state[person])


            self.population[str(i)] = nth_day_state
            current_state = self.population[str(i)]
            
            i+=1
        return self.population
        

    """Aggregating count number of infected, death, recovered and susceptible by N days"""
    def create_summary_df(self):
        self.x_axis =[]
        self.summary_df = pd.DataFrame({'States':['I','R','D','S']})
        
        for simulation in range(self.N):
            self.x_axis.append(simulation)
            temp = pd.DataFrame(self.population[str(simulation+1)].value_counts().reset_index())
            self.summary_df = self.summary_df.merge(temp,how="left", left_on="States", right_on='index').fillna(0)
        
        self.summary_df = self.summary_df.drop(['index_x','index_y','index'],axis=1, errors='ignore').set_index('States').transpose()
        self.summary_df.index = self.summary_df.index.rename('N')
         
        return self.summary_df
        
    """Plotting chart by different status across N days"""
    def chart(self):
        self.create_summary_df()
  
        fig, ax = plt.subplots()

        ax.plot(self.x_axis, self.summary_df['R'], label="Recovered",color = "green")
        ax.plot(self.x_axis, self.summary_df['S'], label="Susceptible", color = "dodgerblue")
        ax.plot(self.x_axis, self.summary_df['D'], label="Death", color="black")
        ax.plot(self.x_axis, self.summary_df['I'], label="Infected",color="red")

        ax.legend()

        return plt.show()
    
    """Calculate the highest number of infected"""
    def max_infected(self):
        self.max_infected = self.summary_df['I'].max()
        return self.max_infected
    
    """Calculate Nth day with the highest infected number """
    def peak_infected(self):
        self.peak_infected = self.summary_df.index[self.summary_df['I']==self.max_infected][0]
        return  self.peak_infected
    
    """Static method that finds average number across N simulations"""
    @staticmethod
    def averaged_chart(m=40, n=25, r=6, k=20,N=100):
        total_I = [] 
        total_R = []
        total_D = []
        total_S = []
        
        
        for N_sim in range(N): #repeat simulation n times       
            sim = Simulation(m=m, n=n, r=r, k=k)
            sim.create_population()
            sim.create_connections()
            sim.init_state()
            sim.update_state(N)         
            sim.create_summary_df()

            temp_I = []
            temp_R = []
            temp_D = []
            temp_S = []


            for N_sim2 in range(N): # loop through data of N days
                I_val = sim.summary_df.iloc[N_sim2]['I']
                R_val = sim.summary_df.iloc[N_sim2]['R']
                D_val = sim.summary_df.iloc[N_sim2]['D']
                S_val = sim.summary_df.iloc[N_sim2]['S']
                temp_I.append(I_val)
                temp_R.append(R_val)
                temp_D.append(D_val)
                temp_S.append(S_val)
            
            if len(total_I)==0 or len(total_R)==0 or len(total_D)==0 or len(total_S)==0:
                total_I= temp_I
                total_R= temp_R
                total_D= temp_D
                total_S= temp_S
                
            else:
                total_I = np.add(temp_I, total_I)
                total_R = np.add(temp_R, total_R)
                total_D = np.add(temp_D, total_D)
                total_S = np.add(temp_S, total_S)
        
        
        averaged_I = [I / N for I in total_I]
        averaged_R = [R / N for R in total_R]
        averaged_D = [D / N for D in total_D]
        averaged_S = [S / N for S in total_S]


        
        fig, ax = plt.subplots()

        x_axis = [x for x in range(N)]

        ax.plot(x_axis, averaged_R, label="Recovered",color = "green")
        ax.plot(x_axis, averaged_S, label="Susceptible", color = "dodgerblue")
        ax.plot(x_axis, averaged_D, label="Death", color="black")
        ax.plot(x_axis, averaged_I, label="Infected",color="red")
        
        ax.legend()
        return plt.show(), {'Max':max(averaged_I),'Peak Day':averaged_I.index(max(averaged_I))}


        

In [None]:
s = Simulation(settings)
s.run()