# Winners-Losers


## [Repast for Python (Repast4Py) User Guide](https://repast.github.io/repast4py.site/guide/user_guide.html)

## [API](https://repast.github.io/repast4py.site/apidoc/index.html)

## [GitHub Repast/repast4py](https://github.com/Repast/repast4py)

## [MPI for Python](https://mpi4py.readthedocs.io/en/stable/tutorial.html#collective-communication)


## 
[ESC $\ell$ set or unset row numbers]


# An idea for the initial example: 

#### something close to the *Chakraborti model* in "Winners, Losers" ex. in [Is Inequality Inevitable? (Sc.Am.)](https://www.scientificamerican.com/article/is-inequality-inevitable/). In a closer way, § 2.1 in [Chakraborti, A. (2002). Distributions of money in model markets of economy. International Journal of Modern Physics C, 13(10), 1315-1321](https://arxiv.org/pdf/cond-mat/0205221.pdf). 



## run the code as a notebook (single rank) or, opening a terminal, with

## mpirun -n X ipython winners-losers.ipynb

#### where X is the number of ranks

## run plots.ipynb as notebook to show the results

#### plots.ipynb automatically knows the rank number and the root of the name of the result files

===========================================================================

## 1

import libs \
MPI init \
context and runner definition \
t(), T(), Tc(), tr() function definitions \
random number generator rng creation \
initialization of the parameters from yaml file 

===========================================================================


In [1]:
import time
from mpi4py import MPI
from repast4py import context as ctx
import repast4py 
from repast4py import parameters
from repast4py import schedule
from repast4py import core
from typing import Tuple, List, Dict
import json
import numpy as np
import csv
import os


comm = MPI.COMM_WORLD
rank    = comm.Get_rank()
rankNum = comm.Get_size() #pt

# create the context to hold the agents and manage cross process
# synchronization
context = ctx.SharedContext(comm)

# Initializes the default schedule runner, HERE to create the t() function,
# returning the tick value
"""
init_schedule_runner(comm)
Initializes the default schedule runner, a dynamic schedule of executable 
events shared and synchronized across processes.
Events are added to the scheduled for execution at a particular tick. 
The first valid tick is 0. Events will be executed in tick order, earliest 
before latest. Events scheduled for the same tick will be executed in the 
order in which they were added. If during the execution of a tick, 
an event is scheduled before the executing tick (i.e., scheduled to occur in 
the past) then that event is ignored. The scheduled is synchronized across 
process ranks by determining the global cross-process minimum next scheduled 
event time, and executing only the events schedule for that time. In this way, 
no schedule runs ahead of any other.
"""
runner = schedule.init_schedule_runner(comm)

# tick number
def t():
    return runner.schedule.tick


# https://repast.github.io/repast4py.site/apidoc/source/repast4py.parameters.html
"""
init_params(parameters_file, parameters)
Initializes the repast4py.parameters.params dictionary with the model input parameters.
"""
params = parameters.init_params("winners-losers.yaml", "")



"""
repast4py.random.default_rng: numpy.random._generator.Generator = Generator(PCG64) 
at 0x7F6812E0CD60 repast4py’s default random generator created using init. 
See the Generator API documentation for more information on the available distributions 
and sampling functions.

Type
numpy.random.Generator

repast4py.random.init(rng_seed=None)
Initializes the default random number generator using the specified seed.

Parameters
rng_seed (int) – the random number seed. Defaults to None in which case, the current 
time as returned by time.time() is used as the seed.
"""

repast4py.random.init(rng_seed=params['myRandom.seed'][rank]) #each rank has a seed
rng = repast4py.random.default_rng 



#timer T()
startTime=-1
def T():
    global startTime
    if startTime < 0:
        startTime=time.time()
    return time.time() - startTime

T()

#cpuTimer Tc()
startCpuTime=-1
def Tc():
    global startCpuTime
    if startCpuTime < 0:
        startCpuTime=time.process_time()
    return time.process_time() - startCpuTime

Tc()

# count transactions
transactions = 0
def tr(total=False):
    global transactions
    if not total: transactions+=1
    return transactions



cpuTime = [["0 init",0],["1 initGhostsIfAny",0],["2 counter",0],\
           ["3 agentsCreatingCounterpartSubset",0],\
           ["4 agentsSelectingExchangingAndUpdating",0],\
           ["5 sync1",0], ["6 sync2", 0]]


===========================================================================

## 2

memory allocations to manage ghosts

===========================================================================

In [2]:
agent_cache={} # dict with uid as keys and agents' tuples as values, 
               # used by restore_agent to avoid rebuild agents

===========================================================================

## 3

agent classes and restore_agent function

===========================================================================

In [3]:
class WinnerLoser(core.Agent):

    TYPE = 0
    
    def __init__(self, local_id: int, rank: int, wallet: float):
        super().__init__(id=local_id, type=WinnerLoser.TYPE, rank=rank)

        self.myWallet = wallet

        self.counterpartLocalId = ()
                
        self.myGhostCounterpartId = ()
        
        self.materialWalletValueToBeReported = 0
        
        self.movAvElements = []
        
        self.yetBeenChooser=False
        self.yetBeenChoosen=False
        
        self.agentAbroadWalletValueToBeReported=-1

    def movAv(self,x):
        
        self.movAvElements.append(x)
        if len(self.movAvElements) > params['movAvElementNum']: self.movAvElements.pop(0)
     
   
    def creatingAndObservingCounterpartSubsets(self):
        
        #creating
        counterpartSet=context.agents(agent_type=0, count=int(params['counterpartRatio']*\
                                        params['WinnerLoser.count']/rankNum), shuffle=True)
        if (params['rank_interaction']):
            # in agent_cache empty case, the block has no effect
            cacheLocalCopyList=list(agent_cache)
            rng.shuffle(cacheLocalCopyList)
            #print(agent_cache[cacheLocalCopyList[0]], flush=True)
            #print(counterpartSet, flush=True)
            for i in range(int(params['counterpartRatio']*len(cacheLocalCopyList))):
                counterpartSet.append(agent_cache[cacheLocalCopyList[i]])
            #print(counterpartSet, flush=True)   
        
        #observing wallets
        self.allWalletsAndOwnersList=[]
        for anAgent in counterpartSet:
            self.allWalletsAndOwnersList.append(anAgent.returningWalletAndOwner())
        self.allWalletsAndOwnersList=sorted(self.allWalletsAndOwnersList, reverse = True)  
        #print(allWalletsAndOwnersList)
        
        self.yetBeenChooser=False
        self.yetBeenChoosen=False
        
    def returningWalletAndOwner(self) -> Tuple:
        return (self.myWallet, self.uid)
        
    def selectingCounterpart(self):
        #selecting
        self.counterpartLocalId=()
        self.myGhostCounterpartId=()
        
        #cutting the agents who have yet been choosen off the counterpart set
        residualWalletsAndOwnersList=[] 
        for anItem in self.allWalletsAndOwnersList:
            if anItem[1] in agent_cache: tmp=agent_cache[anItem[1]]
            else:                        tmp=context.agent(anItem[1])
                
            if not tmp.yetBeenChoosen: 
                residualWalletsAndOwnersList.append(anItem)
        self.allWalletsAndOwnersList=residualWalletsAndOwnersList.copy()
        if self.allWalletsAndOwnersList==[]: return
        
    
        
        #if not params['onlyRicherAgents']:
            #counterpartUid=\
                #self.allWalletsAndOwnersList[rng.integers(0, len(self.allWalletsAndOwnersList))][1]
            
        if params['onlyRicherAgents']:
            multiple=params["howManyTimesRicher"]
            if multiple < 1: multiple = 1 
            agentsRicherThanMe=[]
            for anItem in self.allWalletsAndOwnersList:
                if anItem[0]>self.myWallet*multiple: agentsRicherThanMe.append(anItem)       
            self.allWalletsAndOwnersList=agentsRicherThanMe.copy()
            
        if self.allWalletsAndOwnersList!=[]:
            counterpartUid= self.allWalletsAndOwnersList[rng.integers(0, len(self.allWalletsAndOwnersList))][1] 
            if counterpartUid[2]==rank:
                self.counterpartLocalId=counterpartUid
            else:
                self.myGhostCounterpartId=counterpartUid
        else:
            self.counterpartLocalId=()
            self.myGhostCounterpartId=()
       
        
    def exchangingLocally(self):  
        if self.counterpartLocalId != () and self.yetBeenChooser==False:
            exchangingAgent=context.agent(self.counterpartLocalId)   
            if not exchangingAgent.yetBeenChoosen:
                commonWallet = self.myWallet + exchangingAgent.myWallet
                share=float(rng.random())
                self.myWallet = commonWallet*share
                self.movAv(self.myWallet)
                exchangingAgent.myWallet = commonWallet*(1-share)
                exchangingAgent.movAv(exchangingAgent.myWallet)
            
                self.yetBeenChooser=True
                exchangingAgent.yetBeenChoosen=True
                tr()
            
                #print("agent exchanging locally","rank",rank, "chooser agent uid and wallet",\
                      #self.uid, self.myWallet,"choosen agent uid and wallet", \
                      #exchangingAgent.uid,exchangingAgent.myWallet, flush=True)
     

    def actingAsGhostsOfAnAgentAbroad(self):
        if self.myGhostCounterpartId != () and self.yetBeenChooser==False and self.myGhostCounterpartId[2]==rank:
            localExchangingAgent=context.agent(self.myGhostCounterpartId)
            #print("ag exch via ghosts", "rank", rank,\
                  #"ghost of ag abroad as chooser, uid and flag about yet chooser",\
                  #self.uid, self.yetBeenChooser,\
                  #"loc ag as choosen, uid and flag about yet choosen",\
                  #self.myGhostCounterpartId,localExchangingAgent.yetBeenChoosen, flush=True)
            
            
            if self.yetBeenChooser or localExchangingAgent.yetBeenChoosen: 
                localExchangingAgent.agentAbroadWalletValueToBeReported=-1
                return

            
            #acting also if in the while the wallet of the ghost counterpart is now lower than its own
            
            
            commonWallet = self.myWallet + localExchangingAgent.myWallet
            share=float(rng.random())
            self.myWallet = commonWallet*share 
                           # the ghost wallet, not relevant
            #self.movAv(self.myWallet) #not relevant 
            localExchangingAgent.agentAbroadWalletValueToBeReported = self.myWallet
                           # the wallet to be reported to the WL sending the ghost
                           # in the while, also the movAv() f. will be activated
            localExchangingAgent.myWallet = commonWallet*(1-share)
            #localExchangingAgent.movAv(localExchangingAgent.myWallet)
                           # the counterpart wallet
            tr()
            
            localExchangingAgent.myGhostCounterpartId = self.uid
            
            #set the flag of choosen to the local counterpart (still to be updated on the ghost)
            localExchangingAgent.yetBeenChoosen = True
        
                        
            #print("choosen wallet",localExchangingAgent.myWallet,\
                  #"chooser wallet to be reported",\
                  #localExchangingAgent.agentAbroadWalletValueToBeReported,flush=True)
            
            
            
    def updatingInformationFromGhosts(self):
        #have to get the wallet to be reported (return value) and register that the we have been chooser 
        if self.myGhostCounterpartId != ():
            ghost=agent_cache[self.myGhostCounterpartId]
            if ghost.agentAbroadWalletValueToBeReported != -1:
                self.myWallet=ghost.agentAbroadWalletValueToBeReported
                self.movAv(self.myWallet)
        
        
    
     
    def save(self) -> Tuple: # mandatory, used by request_agents and by synchronize
        #print("save, tick",t(),self.uid[0],self.uid[2],"inRank",rank,flush=True)
        """
        Saves the state of the WinnerLoser as a Tuple.

        Returns:
            The saved state of this WinnerLoser.
        """
        # the structure of the save is ( ,( )) due to an incosistent use of the 
        # save output in update internal structure /fixed in v. 1.1.2)
        return (self.uid, (self.myWallet, self.myGhostCounterpartId,\
                           self.counterpartLocalId,self.yetBeenChooser,\
                           self.yetBeenChoosen,self.agentAbroadWalletValueToBeReported,\
                           self.materialWalletValueToBeReported))
        # not considering movAvElements list, useful only in the agent calculations,
        # never in ghosts
    
    def update(self, dynState: Tuple): # mandatory, used by synchronize
        #print("update, tick",t(),"inRank",rank,flush=True)
        """
        Updates the state of this agent when it is a ghost
        agent on some rank other than its local one.
        """
        self.myWallet=dynState[0]
        self.myGhostCounterpartId = dynState[1]
        self.counterpartLocalId= dynState[2]
        self.yetBeenChooser = dynState[3]
        self.yetBeenChoosen = dynState[4]
        self.agentAbroadWalletValueToBeReported = dynState[5]
        self.materialWalletValueToBeReported = dynState[6]
          
def restore_agent(agent_data: Tuple):
    
    uid=agent_data[0]
    #print("restore, tick",t(),uid,"inRank",rank,flush=True)

    if uid[1] == WinnerLoser.TYPE:
        #print('len agent_cache in restore',len(agent_cache),flush=True)
    
        if uid in agent_cache: 
            tmp = agent_cache[uid] # found
            tmp.myWallet = agent_data[1][0] #restore data
            tmp.myGhostCounterpartId = agent_data[1][1]
            tmp.counterpartLocalId= agent_data[1][2]
            tmp.yetBeenChooser = agent_data[1][3]
            tmp.yetBeenChoosen = agent_data[1][4]
            tmp.agentAbroadWalletValueToBeReportedself=agent_data[1][5]
            tmp.materialWalletValueToBeReported = agent_data[1][6]
            #print("restore1, tick",t(),uid,"inRank",rank,flush=True)
            


        else: #creation of an instance of the class with its data
            tmp = WinnerLoser(uid[0], uid[2],agent_data[1][0])
            agent_cache[uid] = tmp
            #added infos, may be unuseful in the beginning
            tmp.myGhostCounterpartId = agent_data[1][1]
            tmp.counterpartLocalId= agent_data[1][2]
            tmp.yetBeenChooser = agent_data[1][3]
            tmp.yetBeenChoosen = agent_data[1][4]
            tmp.agentAbroadWalletValueToBeReportedself=agent_data[1][5]
            tmp.materialWalletValueToBeReported = agent_data[1][6]
            #print("restore2, tick",t(),uid,"inRank",rank,flush=True)

        return tmp

    

===========================================================================

## 4

the model

===========================================================================

In [4]:
class Model:
    """
    The Model class encapsulates the simulation, and is
    responsible for initialization (scheduling events, creating agents,
    and the grid the agents inhabit IF ANY), and the overall iterating
    behavior of the model.

    Args:
        params: the simulation input parameters
    """
    
    global params
    PARAMS = params
    
    def __init__(self, params: Dict):

        
        cpuTime[0][1]-=Tc()
        
        # the context to hold the agents and manage cross process synchronization
        # is created in step 1

        
        # the runner, implementing the schedule, is created in step 1
        # https://repast.github.io/repast4py.site/apidoc/source/repast4py.schedule.html
        
        """
        schedule_repeating_event(at, interval, evt)
        Schedules the specified event to execute at the specified tick, and repeat at 
        the specified interval.

        Parameters
        at (float) – the time of the event.
        interval (float) – the interval at which to repeat event execution.
        evt (Callable) – the Callable to execute when the event occurs.

            A callable is anything that can be called.
            The built-in callable (PyCallable_Check in objects.c) checks if the argument 
            is either:
                an instance of a class with a __call__ method or
                is of a type that has a non null tp_call (c struct) member which 
                indicates callability otherwise (such as in functions, methods etc.)
        """      
        runner.schedule_event(          0.0,     self.initGhosts) #0
        runner.schedule_repeating_event(0.0,  1, self.counter) #1
        runner.schedule_repeating_event(0.1, 1,\
                        self.agentsCreatingCounterpartSubset) #2
        runner.schedule_repeating_event(0.2, 1,\
                        self.agentsSelectingExchangingAndUpdating) #3
        """
        schedule_stop(at)
        Schedules the execution of this schedule to stop at the specified tick.

        Parameters
        at (float) – the tick at which the schedule will stop.
        """
        runner.schedule_stop(params['howManyCycles'])
        
        runner.schedule_end_event(self.finish)
        

        
        # create agents
        # winnerLoser agents
        
        for i in range(params['WinnerLoser.count'] // rankNum): 
                                                #to subdivide the total #pt
            # create and add the agent to the context
            aWallet= 1 * rng.random() #1
            aWinnerLoser = WinnerLoser(i,rank,aWallet)
            context.add(aWinnerLoser)

        cpuTime[0][1]+=Tc()
            

    #initialize ghosts by sending them in the ranks before starting the simulation
    def initGhosts(self):

        cpuTime[1][1]-=Tc()
        if not (params['rank_interaction'] and rankNum>1): 
            cpuTime[1][1]+=Tc()
            return
        
        ghostsToRequest = [] # list of tuples containing for each ghost the uid and its current rank;
                             # used by the requestGhosts(self) function of the model

        rankIds=list(range(rankNum))
        rankIds.pop(rank)
        #print(rank, rankIds, flush=True)

        n=params['WinnerLoser.count'] // rankNum
        for rankId in rankIds:
            for i in range(n):
                ghostsToRequest.append( ((i,WinnerLoser.TYPE,rankId),rankId) )

        #print("3",Tc(),flush=True)
        #print("rank",rank,"ghostsToRequest",ghostsToRequest,flush = True) 
        context.request_agents(ghostsToRequest,restore_agent)
        #print("from initGhosts, rank",rank,"tick",t(),"len(agent_cache)",len(agent_cache),flush=True)
        #print("4",Tc(),flush=True)

        cpuTime[1][1]+=Tc()

                
    
    def counter(self):
        
        cpuTime[2][1]-=Tc()
        if int(t()) % params["tickNumber.betweenChecks"] == 0: 
            print("rank", rank, "tick", t(), flush=True)
        cpuTime[2][1]+=Tc()
    
        
        
    def agentsCreatingCounterpartSubset(self):        
                
        cpuTime[3][1]-=Tc()
        for aWinnerLoser in context.agents(agent_type=0):
            aWinnerLoser.creatingAndObservingCounterpartSubsets()
        
        cpuTime[3][1]+=Tc()
            
            
    def agentsSelectingExchangingAndUpdating(self):        
                
        for numberOfExchangingAttempts in range(params['maxNumberOfExchangeAttempts']):
            cpuTime[4][1]-=Tc()
            #print("rank",rank, "exchanging attempt #",\
                  #numberOfExchangingAttempts, flush=True)

            #selecting in and out of rank
            for aWinnerLoser in context.agents(agent_type=0):
                aWinnerLoser.selectingCounterpart()
                
            #exchanging locally in rank
            for aWinnerLoser in context.agents(agent_type=0):
                aWinnerLoser.exchangingLocally()
            cpuTime[4][1]+=Tc()
            
            #synchronizing with ghosts to report yet occurred exchanges and uid of counterparts in other ranks if any
            cpuTime[5][1]-=Tc()
            if params['rank_interaction'] and rankNum>1:
                #print("rank",rank,"sync1",flush=True)
                context.synchronize(restore_agent)  
            cpuTime[5][1]+=Tc()
        
            #ghosts of agents abroad acting here
            cpuTime[4][1]-=Tc()
            if not agent_cache == {}:
                currentGhostList=list(agent_cache.keys())
                for i in range(len(agent_cache)):                
                    agent_cache[currentGhostList[i]].actingAsGhostsOfAnAgentAbroad()
            cpuTime[4][1]+=Tc()
            
            
            #synchronizing with ghosts to report exchanges occurred here with ghosts of other ranks if any      
            cpuTime[6][1]-=Tc()
            if params['rank_interaction'] and rankNum>1:
                #print("rank",rank,"sync2",flush=True)
                context.synchronize(restore_agent) 

            cpuTime[6][1]+=Tc()
        
            
            #receiving updates from ghosts of agents abroad
            cpuTime[4][1]-=Tc()
            if params['rank_interaction'] and rankNum>1:
                for aWinnerLoser in context.agents(agent_type=0):
                    aWinnerLoser.updatingInformationFromGhosts()
            cpuTime[4][1]+=Tc()
     
                    
        
                        
    def finish(self):
        allTheWallets = []
        for aWinnerLoser in context.agents(agent_type=0):
            allTheWallets.append(aWinnerLoser.myWallet)
        
        with open(params["log_file_root"]+str(rank)+'.csv', 'w', newline='') as file:
            writer = csv.writer(file)
            writer.writerow(allTheWallets)
        
        allTheMovAv = []
        for aWinnerLoser in context.agents(agent_type=0):
            if aWinnerLoser.movAvElements != []: allTheMovAv.\
               append(np.sum(aWinnerLoser.movAvElements)/len(aWinnerLoser.movAvElements))
            else: allTheMovAv.append(np.nan)
                
            
        print("\n\nBye bye by rank",rank,"at tick",t(),"elapsed time",'%6.3f' % T(),\
              "CPU time",'%6.3f' % Tc(),"transaction #", tr(True), flush=True)
        
        with open(params["log_file_root"]+"MovAv"+str(rank)+'.csv', 'w', newline='') \
          as file:
            writer = csv.writer(file)
            writer.writerow(allTheMovAv)

        if params["show.timeBySteps"]:
            print("Intervals in rank ",rank,flush=True)
            for i in range(len(cpuTime)):
                print(cpuTime[i][0],'%6.3f' % cpuTime[i][1],flush=True)
        


    def start(self):
        
        runner.execute()
        

===========================================================================

## 5

run the model

===========================================================================

In [5]:
# infos for plots.ipynm
with open('plotInfo.csv', 'w', newline='')\
          as file:
            writer = csv.writer(file)
            writer.writerow((params["log_file_root"],rankNum))

def run(params: Dict):
    
    model = Model(params) 
    model.start()
    
run(params)

rank 0 tick 0.0
rank 0 tick 5.0
rank 0 tick 10.0


Bye bye by rank 0 at tick 10.0 elapsed time 62.962 CPU time 58.914 transaction # 5960
Intervals in rank  0
0 init  0.009
1 initGhostsIfAny  0.000
2 counter  0.005
3 agentsCreatingCounterpartSubset 44.575
4 agentsSelectingExchangingAndUpdating 14.205
5 sync1  0.000
6 sync2  0.000


### run plots.ipynb to show the results

#### plots.ipynb automatically knows the rank number and the root of the name of the result files