Objective is to build a simplistic toy financial market that mimics the boom and bust feature of real financial markets using hetergenous agents reacting to market news and exhibiting some elements of market contagion. This leverages mesa (https://mesa.readthedocs.io/en/stable/), a python package for agent based modelling.

In [None]:
import random
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
from mesa.space import MultiGrid
from mesa import Agent, Model
from mesa.time import RandomActivation
from mesa.datacollection import DataCollector
import pandas as pd

Dow Jones Index is used to proxy market movement and Vix is used to proxy market volatility. Can use simulated data as well. It is to be noted that this type of model doesn't have much value as a forecasting tool rather it's utility lies is being able to observe system dynamics and stability given the structure of a system and its parameters.

In [None]:
dj = pd.read_csv('DJIA (daily).csv',index_col='DATE', parse_dates=True)
vix = pd.read_csv('vix (daily).csv',index_col='DATE', parse_dates=True)
vix=vix[1:] #Removing first row 
dj=dj[:-1] #Removing last row
dj['DJIA'] = dj['DJIA'].apply(pd.to_numeric,errors='coerce')
vix['VXDCLS']=vix['VXDCLS'].apply(pd.to_numeric,errors='coerce')
dj=dj.dropna()
vix=vix.dropna()
vix['change']=vix.pct_change(1)
dj['change']= dj.pct_change(1)
biggest=dj['change'].max()
lowest=dj['change'].min()

initializing financial agents -each with an initial endowment of one(1) stock, a certain propensity to contagion and either a risk averse long-term investor or a short term speculator. Their news reaction function, their decision making functions and the agent's evolving propensity for contagion is also defined.

In [None]:
class FinancialAgent(Agent):
    """ An agent with fixed initial wealth."""
    def __init__(self, unique_id, model,agent_type):
        super().__init__(unique_id, model)
        self.type = agent_type
        self.number_of_shares = 1
        self.propensity_to_sentiment_contagion = random.uniform(0, 1) #agent's are not initialized to be contrarian 
        self.news_reaction=0                                         #but may evolve to be so based on learning updates
        self.my_sentiment=self.news_reaction
        self.a=0.5 #proportion of neighbours with positive sentiment
        self.b=0.5 #proportion of neighbours with negative sentiment
        self.wtp=0 #willingness to pay
       
    def step(self):
        #reaction to news
        x=self.model.run #news reaction function for a longterm investor
        if self.type == 1:
            if(dj.iloc[x,1]>0 and vix.iloc[x,1]<0): #if market moves up & volatility decreases
                self.news_reaction=1
            elif(dj.iloc[x,1]<0 and vix.iloc[x,1]<0): #if market moves down & volatility decreases
                self.news_reaction= -0.25
            elif(dj.iloc[x,1]>0 and vix.iloc[x,1]>0): #if market moves up & volatility increases
                self.news_reaction= 0.25
            elif(dj.iloc[x,1]<0 and vix.iloc[x,1]>0): #if market moves down & volatility increases
                self.news_reaction= -1

        elif self.type == 0: #news reaction function for a speculator
            if(dj.iloc[x,1]>0 and vix.iloc[x,1]<0): #if market moves up & volatility decreases
                self.news_reaction=-1
            elif(dj.iloc[x,1]<0 and vix.iloc[x,1]<0):#if market moves down & volatility decreases
                self.news_reaction= -1
            elif(dj.iloc[x,1]>0 and vix.iloc[x,1]>0):#if market moves up & volatility increases
                self.news_reaction= np.random.choice([-1,1])
            elif(dj.iloc[x,1]<0 and vix.iloc[x,1]>0):#if market moves down & volatility increases
                self.news_reaction= 1
                
        ## based on whether the realised returns are in accordance with their expectations (i.e. news reaction) 
        # and with their neighbours’ expectations, agents update their propensity to contagion
        
        if (self.model.run != 0):
            if (self.model.returns> 0 and self.news_reaction>0):
                if self.a>self.b:
                    if self.propensity_to_sentiment_contagion + self.model.returns <=1: #condition to maintain boundedness
                        self.propensity_to_sentiment_contagion += self.model.returns
                else:
                    if self.propensity_to_sentiment_contagion - self.model.returns >=-1:#condition to maintain boundedness
                        self.propensity_to_sentiment_contagion -= self.model.returns
            elif (self.model.returns> 0 and self.news_reaction<0):
                if self.a>self.b:
                    if self.propensity_to_sentiment_contagion + self.model.returns <=1:#condition to maintain boundedness
                        self.propensity_to_sentiment_contagion += self.model.returns
                else:
                    if self.propensity_to_sentiment_contagion - self.model.returns >=-1:#condition to maintain boundedness
                        self.propensity_to_sentiment_contagion -= self.model.returns
            elif (self.model.returns < 0 and self.news_reaction<0):
                if self.a>self.b:
                    if self.propensity_to_sentiment_contagion + self.model.returns >=-1:#condition to maintain boundedness
                        self.propensity_to_sentiment_contagion += self.model.returns
                else:
                    if self.propensity_to_sentiment_contagion - self.model.returns <=1:#condition to maintain boundedness
                        self.propensity_to_sentiment_contagion -= self.model.returns
            elif (self.model.returns < 0 and self.news_reaction>0):
                if self.a>self.b:
                    if self.propensity_to_sentiment_contagion + self.model.returns >=-1:#condition to maintain boundedness
                        self.propensity_to_sentiment_contagion += self.model.returns
                else:
                    if self.propensity_to_sentiment_contagion - self.model.returns <=1: #condition to maintain boundedness                  
                        self.propensity_to_sentiment_contagion -= self.model.returns
        
        #Decision making after contagion        
        sentiment= [agent.my_sentiment for agent in self.model.grid.get_neighbors(self.pos,moore=True,include_center=False)]
        if len(sentiment)==0:
            c=0
        else:
            self.a= sum([1 for x in sentiment if x >= 0])/len(sentiment)
            self.b= sum([1 for x in sentiment if x < 0])/len(sentiment)  
            c= np.random.choice([1,-1],p=[self.a, self.b]) # c is basically an I() defined as 
            #x=contagion propensity*c+news reaction+ϵ
            x= self.propensity_to_sentiment_contagion*c+ self.news_reaction + np.random.normal(0,1) 
            
                                    
        if(x > 0):
            self.my_sentiment= 1
            self.wtp =x*biggest #scaled by biggest % change of the DJI & represents bid price in return terms
            
        else:
            self.my_sentiment = -1
            self.wtp =-x*lowest #scaled by lowest % change of the DJI & represents ask price in return terms
            
        
                    

In [None]:
#simple iteration to find the return at which the market clears
def market_clearing(model):

    number_of_traders= model.num_agents
    if sum([agent.my_sentiment for agent in model.schedule.agents])== 0:
        returns=0
    else:      
        if sum([agent.my_sentiment for agent in model.schedule.agents])> 0:
            returns=0
            while True:
                returns +=0.01
                number_of_shares=[agent.number_of_shares+1 if agent.my_sentiment==1 and returns<= agent.wtp \
                              else agent.number_of_shares-1  if agent.my_sentiment==-1 and returns >= agent.wtp\
                              else agent.number_of_shares \
                              for agent in model.schedule.agents ]
            
                if sum(number_of_shares)<= number_of_traders:
                    break
            
        elif sum([agent.my_sentiment for agent in model.schedule.agents]) < 0:
            returns=0
            while True:
                returns -=0.01
                number_of_shares=[agent.number_of_shares+1 if agent.my_sentiment==1 and returns<= agent.wtp \
                              else agent.number_of_shares-1  if agent.my_sentiment==-1 and returns >= agent.wtp\
                              else agent.number_of_shares \
                              for agent in model.schedule.agents ]
            
                if sum(number_of_shares)>= number_of_traders:
                    break
        for i in range(len(model.schedule.agents)):
            model.schedule.agents[i].number_of_shares=number_of_shares[i]
    
    return returns
   

In [None]:
#just reengineering price from return series
def price(model):
    try:
        price =model.price*(1+ model.returns)
    except:
         price= dj.iloc[0,0]*(1 + model.returns)
    return price

In [None]:
#Now defining the enviroment for these random agents to interact in. If you were to build a more realistic model, you could
# have a more empirically determined graph with institutional players, indivual investors with weak connections and strong
#connections etc. But for the purposes of this toy model, we just assume a two dimensional grid structure on which agents
#are randomly placed.
class FinancialMarket(Model):
    """Toy financial market with heterogenous agents and market contagion."""
    def __init__(self, N, width, height,speculator_pc):
        self.running = True
        self.num_agents = N
        self.speculator_pc= speculator_pc
        self.grid = MultiGrid(width, height, False)
        self.schedule = RandomActivation(self)
        self.returns=0
        self.run=0
        
        # Create agents
        for i in range(self.num_agents):
            if random.random() < self.speculator_pc:
                agent_type = 0 #speculator agent
            else:
                agent_type = 1 #long term investor
            a = FinancialAgent(i, self,agent_type)
            self.schedule.add(a)
            # Add the agent to a random grid cell
            x = random.randrange(self.grid.width)
            y = random.randrange(self.grid.height)
            self.grid.place_agent(a, (x, y))
            
        self.datacollector = DataCollector(
            model_reporters={"Returns":market_clearing,"price":price}
           )
     
        
        
    def step(self):
        self.schedule.step()
        self.datacollector.collect(self)
        a=self.datacollector.get_model_vars_dataframe()
        self.returns=a.iloc[-1,0]
        self.price=a.iloc[-1,1]
        self.run +=1       
        if self.run > len(dj.index):
            self.running = False
       
        
       

In [None]:
#Plot agents in their grid and show their movement
from mesa.visualization.modules import CanvasGrid
from mesa.visualization.ModularVisualization import ModularServer
from mesa.visualization.modules import ChartModule
from mesa.visualization.UserParam import UserSettableParameter



n_slider1 = UserSettableParameter('slider', "Number of Agents", 100, 50, 20000, 100)
n_slider2 = UserSettableParameter('slider', "Proportion of Speculators", 0.2, 0, 1, .05)


chart1 = ChartModule([{"Label": "Returns",
                      "Color": "Black"}],
                    data_collector_name='datacollector')
chart2= ChartModule([{"Label": "price",
                      "Color": "Grey"}],data_collector_name='datacollector')

def agent_portrayal(agent):
    portrayal = {"Shape": "circle",
                 "Filled": "true",
                 "Layer": 0,
                 "Color": "Grey",
                 "r": 0.5}
    if agent.my_sentiment > 0:
        portrayal["Color"] = "green"
    else:
        portrayal["Color"] = "red"
    return portrayal

grid = CanvasGrid(agent_portrayal,20, 20, 500, 500)
server = ModularServer(FinancialMarket,
                       [grid,chart1,chart2],
                       "Simulated Financial Market",
                       {"N": n_slider1, "width": 20, "height": 20,"speculator_pc":n_slider2})

In [None]:
import nest_asyncio
nest_asyncio.apply()
server.port = 8573
server.launch()