In [5]:
import numpy as np
import pandas as pd

# Simulation Idea

three types of personalities having different effects on teamwork

1. the "normal" guy just contributes his performance 1 point (on average, positive normal distributed)

2. the "overachiever" exhibits double the normal perfomance, but with his ellbow mentality he demotivates all other goup members, reducing the performance of others by 1 point (on average, positive normal distributed) in total, uniformly distributed among the team as an additive malus to each individual performance.

3. the "charismatic_idiot" achieves only half of the normal performance, but he motivates the team increasing the performance of other group members by 1 point (on average, positive normal distributed) in total, uniformly distributed among the team as an additive bonus to each individual performance.

With a distribution of 50% "normal", 25% "overachiever" and 25% "charismatic_idiot" random teams with a size of 5 persons are formed. A group competition is held between every 4 teams, where the best team wins and the other 3 groups lose. The losing teams review their workflows and learn from their loss, resulting in a 10% increase of their current individual performance (includind the effects of other team members on them) for all team members. Each winning team is awarded one promotion and the candidate is chosen by the best individual performance. However, the "charismatic_idiot" obtains a specific promotion bonus of +2 points added to his performance, due to his rhetoric abilities and good reputation. When a "charismatic_idiot" or an "overachiever" leaves a group, their corresponding effect on other team memebers is additively reversed (leaving behind their influence on either increased or decreased learning).

The open postions are refilled by new persons from the same population distribution and a cycle of competition, promotion and rehiring presents the simulation loop.

In [11]:
def clipped_normal_distriution(x0,sigma):
    """a strictly positive "normal distribution" as the normal distribution is resampled, 
    whenever a negative value appears
    """
    value = np.random.normal(x0,sigma)
    return(value if value>0 else clipped_normal_distriution(x0,sigma))

class social_simulation:
    """meritocracy game
    """
    
    def __init__(self,corporate_size=1000,group_size=5,group_competition_number=4):
        """initialize the simulation, by setting global parameters like the corporate and group size
        (corporate size should be a multiple of group size)
        """
        self.corporate_size=corporate_size
        self.group_size=group_size
        self.number_of_groups=int(self.corporate_size/self.group_size)
        self.group_competition_number=group_competition_number
        self._check_parameter_consistency()

        # further default parameters
        self.standard_performance=1
        self.random_performance_deviation=0.25

        self.overachiever_performance_factor=2
        self.charismatc_idiot_performance_factor=0.5
        
        self.overachiever_group_effect=-1
        self.charismatic_idiot_group_effect=1

        self.random_effect_deviation=0.25

        self.charismatic_idiot_promotion_bonus=2

        self.learning_rate=0.1
        
        #further technical parameters 
        self._index_id=0  # anything starting with underscores, means it should not be accessed by a typical user
        self.initialize_save_dict()
        self.initialize_trainee_data()


    def initialize_trainee_data(self):
        """creates empty lists for some variables to be filled with values during the simulation
        """
        self.trainee_id=[]
        self.personality=[]
        self.performance=[]
        self.personality_effect=[]
        self.effective_performance=[]
        
    def initialize_save_dict(self):
        """creates empty lists of variables for the panel accumulation of data (stat. long format)
        """
        self.save_dict=dict()
        self.save_dict["time_step"]=[]
        
        self.save_dict["trainee_id"]=[]
        self.save_dict["performance"]=[]
        self.save_dict["personality"]=[]
        self.save_dict["personality_effect"]=[]
        self.save_dict["entry_time"]=[]
        self.save_dict["promotion_time"]=[]
        
        self.save_dict["group_id"]=[]
        self.save_dict["group_score"]=[]
        self.save_dict["group_share_normal"]=[]
        self.save_dict["group_share_overachiever"]=[]
        self.save_dict["group_share_charismatic_idiot"]=[]
        self.save_dict["group_winning"]=[]

    def _check_parameter_consistency(self):
        """assert that corporate_size, group_size and group_competition_number fulfil all conditions of multiples
        """
        if self.corporate_size%self.group_size != 0:
            print("corporate size "+str(self.corporate_size)+
                  " is not divisible into an integer number of groups of size "+str(self.group_size))
            adjusted_corp_size=int(np.ceil(self.corporate_size/self.group_size)*self.group_size) 
            print("adjusted corporate size: "+str(adjusted_corp_size))
            self.corporate_size=adjusted_corp_size
            self.number_of_groups=int(self.corporate_size/self.group_size)
            print()
            
        if self.number_of_groups%self.group_competition_number !=0:
            print("number of groups "+str(self.number_of_groups)+
                  " is not divisible into an integer number of group competitions each containing "+
                  str(self.group_competition_number)+ " groups")
            adjusted_num_groups=int(np.ceil(self.number_of_groups/self.group_competition_number)*self.group_competition_number)
            adjusted_corp_size=int(adjusted_num_groups*self.group_size)
            print("adjusted number of groups: "+str(adjusted_num_groups))
            print("leading to adjusted corporate size: "+str(adjusted_corp_size))
            self.corporate_size=adjusted_corp_size
            self.number_of_groups=adjusted_num_groups
            print()
                    
    @staticmethod
    def _get_personality_from_index(index):
        """helper function to get personality distribution from random integers with uniform probability
        0,1 (50%) -> normal , 2 (25%) -> overachiever , 3 (25%) -> charismatic_idiot
        """
        if index <2:
            personality="normal"
        elif index==2:
            personality="overachiever"
        elif index==3:
            personality="charismatic_idiot"
        return personality


    def create_trainees(self,number_of_trainees):#,random_performance_deviation):
        """the given number of trainees is created, by four lists containing:
        the trainee ID, the trainee personality, the trainee performance and 
        the personality dependent group effect of the trainee
        """        
        for _ in range(number_of_trainees):

            self.trainee_id.append(self._index_id)
            self._index_id +=1

            personality=self._get_personality_from_index(np.random.randint(0,4))
            self.personality.append(personality)

            performance=clipped_normal_distriution(self.standard_performance,self.random_performance_deviation)
            self.effective_performance.append(0) #dummy_value
            if personality=="normal":
                self.performance.append(performance)
                self.personality_effect.append(0)
            else:
                
                if personality=="overachiever":
                    performance *= self.overachiever_performance_factor
                    self.performance.append(performance)
                    
                    sign=np.sign(self.overachiever_group_effect)
                    effectsize=np.abs(self.overachiever_group_effect)
                    effect=sign*clipped_normal_distriution(effectsize,self.random_effect_deviation)
                    self.personality_effect.append(effect)
                
                elif personality=="charismatic_idiot":
                    performance *= self.charismatc_idiot_performance_factor
                    self.performance.append(performance)
                    
                    sign=np.sign(self.charismatic_idiot_group_effect)
                    effectsize=np.abs(self.charismatic_idiot_group_effect)
                    effect=sign*clipped_normal_distriution(effectsize,self.random_effect_deviation)
                    self.personality_effect.append(effect)
                        
        
    def initialize_groups(self):
        """Create the goup structure as a list of lists, with each list having the length of the group size,
        containing the trainee IDs belonging to that group. Additionally a list for look-up is created,
        which links each trainee ID to the corresponding group index.
        """
        self.group_lookup = [i//self.group_size for i in range(self.corporate_size)]
        self.groups=[]
        for i in range(self.corporate_size):
            if i%self.group_size == 0:
                self.groups.append([])
            self.groups[-1].append(i)#self.trainee_id[i])

    def initialize_corporation(self):
        """initialize corporation by creating the starting trainees and the group structure,
        as well as initializing further needed variables
        """
        self.create_trainees(self.corporate_size)#,self.random_performance_deviation)
        self.initialize_groups()

        self.entry_time=[0 for i in range(self.corporate_size)]
        self.promoted=[None for i in range(self.corporate_size)]
        self.time_step=0

        self.group_score=np.empty(self.number_of_groups)
        self.group_share_normal=np.empty(self.number_of_groups)
        self.group_share_charismatic_idiot=np.empty(self.number_of_groups)
        self.group_share_overachiever=np.empty(self.number_of_groups)

    def group_composition_effect_round(self):
        """calculate the group effects due to the personalities
        """
        for group in self.groups:
            # sum up the total interpersonal effects of all group members
            group_effects=np.zeros(self.group_size)
            for counter,trainee_id in enumerate(group):
                eff=np.zeros(self.group_size)+self.personality_effect[trainee_id]/(self.group_size-1)
                eff[counter]=0
                group_effects+=eff
            # add the effects to the individual ground performances
            for counter,trainee_id in enumerate(group):
                self.effective_performance[trainee_id]=self.performance[trainee_id]+group_effects[counter]
                # just in case of the highly unlikely case, 
                # that the negative effects surpass the performance of a group member
                # the performance is just reduced to 0, preventing negative values (sabotage)
                if self.effective_performance[trainee_id]<0:
                    self.effective_performance[trainee_id]=0
    
    def calculate_group_performance_and_composition(self,group_index):
        """calculate the group score of a single group by summing all individual effective performance scores
        of trainees in the group and also document information about the personality composition of the groups
        """
        group=self.groups[group_index]
        personalities_in_group=np.array([self.personality[i] for i in group])    
        number_of_overachievers=sum(personalities_in_group=="overachiever")
        number_of_charismatic_idiots=sum(personalities_in_group=="charismatic_idiot")

        group_performance =sum([self.effective_performance[i] for i in group]) 
        
        # save the calculated properties
        self.group_share_charismatic_idiot[group_index]=number_of_charismatic_idiots/self.group_size
        self.group_share_overachiever[group_index]=number_of_overachievers/self.group_size
        self.group_share_normal[group_index]=(self.group_size-number_of_overachievers-number_of_charismatic_idiots)/self.group_size
        self.group_score[group_index]=group_performance
        
        return group_performance
            
    def select_promotion(self,group):
        """determine for a given group, who will be promoted and return the corresponding 
        local index in the group and global index (trainee_id)
        """
        promotion_score=np.empty(len(group))
        for counter,i in enumerate(group):
            if self.personality[i]=="charismatic_idiot":
                promotion_score[counter]=self.effective_performance[i] +self.charismatic_idiot_promotion_bonus
            else:
                promotion_score[counter]=self.effective_performance[i]
        promotion_group_index=np.argmax(promotion_score)
        promotion_global_index=group[promotion_group_index]
        return promotion_group_index,promotion_global_index
            
    def group_competition_round(self):
        """iterate over all groups and let them compete (biggest group_score wins) with each other in 
        several "battle royales" until every group has competed. Here, "battle royales" means a competition
        of "group_competition_number" participants resulting in only one winner
        """
        #random order of groups, such that it is not always the same groups competing with each other
        random_for_sorting=np.random.uniform(size=self.number_of_groups)
        competition_order=np.argsort(random_for_sorting)

        #execute group competitions
        number_of_competitions=int(self.number_of_groups/self.group_competition_number)
        self.group_winning=np.zeros(self.number_of_groups,dtype=bool)
        for i in range(number_of_competitions):
            competing_groups=[]
            group_scores=np.empty(self.group_competition_number)
            for j in range(self.group_competition_number):
                competing_groups.append(competition_order[i*self.group_competition_number+j])
                group_scores[j]=self.calculate_group_performance_and_composition(competing_groups[-1])
                
            winning_index=np.argmax(group_scores)
            self.group_winning[competing_groups[winning_index]]=True


    def promotion_round(self):
        """promotion of a single trainee of each winning group
        """
        self._candidates_local_in_winning_groups=[]
        for counter,winning in enumerate(self.group_winning):
            if winning:
                winner_local,winner_global=self.select_promotion(self.groups[counter])
                self.promoted[winner_global]=int(self.time_step)
                self._candidates_local_in_winning_groups.append(winner_local)
            
    
    def rehiring_round(self):
        """hiring of new trainees filling the empty positions after promotion  
        marks the beginning of a new time step, as now with different people 
        the corporation is in a different state/configuration
        """
        self.time_step+=1
        
        number_of_new_trainees=np.sum(self.group_winning)
        self.create_trainees(number_of_new_trainees)#,self.random_performance_deviation)
        
        self.entry_time+=[self.time_step for i in range(number_of_new_trainees)]
        self.promoted+=[None for i in range(number_of_new_trainees)]

        count_of_wins_before=0
        for counter,winning in enumerate(self.group_winning):
            if winning:
                winner_local=self._candidates_local_in_winning_groups[count_of_wins_before]
                self.groups[counter][winner_local]=self.trainee_id[-number_of_new_trainees+count_of_wins_before]
                self.group_lookup.append(counter)
                count_of_wins_before+=1

    def learning_round(self):
        """groups that did not win, learn from their loss
        """
        for counter,winning in enumerate(self.group_winning):
            if not winning:
                for i in self.groups[counter]:
                    learn=self.learning_rate*self.effective_performance[i]
                    self.performance[i] += learn


    def write_to_csv(self,file_path):
        """save the simulation data as a .csv-file (file ending ".csv" is not needed in the name)
        """
        table=pd.DataFrame.from_dict(self.save_dict)
        table.to_csv(file_path+".csv",index=False,na_rep="NA")             
    
    def save_timestep(self,max_digit_precision=4):
        """saving the recent state of the simulation data in the statistical long format
        """
        for i in self.groups:
            for j in i:
                g=self.group_lookup[j]

                self.save_dict["time_step"].append(self.time_step)
        
                self.save_dict["trainee_id"].append(j)
                self.save_dict["performance"].append(np.round(self.effective_performance[j],max_digit_precision))
                self.save_dict["personality"].append(self.personality[j])
                self.save_dict["personality_effect"].append(np.round(self.personality_effect[j],max_digit_precision))
                self.save_dict["entry_time"].append(self.entry_time[j])
                self.save_dict["promotion_time"].append(self.promoted[j])
                
                self.save_dict["group_id"].append(g)
                self.save_dict["group_score"].append(np.round(self.group_score[g],max_digit_precision))
                self.save_dict["group_share_normal"].append(np.round(self.group_share_normal[g],max_digit_precision))
                self.save_dict["group_share_overachiever"].append(np.round(self.group_share_overachiever[g],max_digit_precision))
                self.save_dict["group_share_charismatic_idiot"].append(np.round(self.group_share_charismatic_idiot[g],max_digit_precision))
                self.save_dict["group_winning"].append(self.group_winning[g])
                

In [None]:
sim=social_simulation(corporate_size=1000,group_size=5) # --> 200 groups

number_of_timesteps=100
sim.initialize_corporation() # structure containing all groups
for i in range(number_of_timesteps):
    sim.group_composition_effect_round()
    sim.group_competition_round()
    sim.promotion_round()
    sim.save_timestep()
    sim.rehiring_round()
    sim.learning_round()

sim.write_to_csv("meritocracy_simulation")

In [4]:
sim=social_simulation(corporate_size=10000,group_size=5) # --> 2000 groups

number_of_timesteps=100
sim.initialize_corporation() # structure containing all groups
for i in range(number_of_timesteps):
    sim.group_composition_effect_round()
    sim.group_competition_round()
    sim.promotion_round()
    sim.save_timestep()
    sim.rehiring_round()
    sim.learning_round()

sim.write_to_csv("meritocracy_simulation_big")