# Algorithmic Collusion with Reinforcement Learning: Evaluating Q-learning Robustness in Bertrand Duopoly Contexts

# Objective 
We apply Q-learning to a Bertrand duopoly model with differentiated products to simulate how firms adjust their pricing strategies over time. We measure the ability of these algorithms to achieve collusive outcomes and explore their behavior when transferred to new market contexts.

## Bertrand Duopoly Implementation 

In [8]:
import numpy as np
import pandas as pd
from itertools import product
from scipy.optimize import fsolve


class model(object):
    """
    model

    Attributes
    ----------
    n : int
        number of players
    alpha : float
        product differentiation parameter
    beta : float
        exploration parameter
    delta : float
        discount factor
    mu : float
        product differentiation parameter
    a : int
        value of the products
    a0 : float
        value of the outside option
    c : float
        marginal cost
    k : int
        dimension of the grid
    stable: int
        periods of game stability
    """

    def __init__(self, **kwargs):
        """Initialize game with default values"""
        # Default properties
        self.n = kwargs.get('n', 2)
        self.alpha = kwargs.get('alpha', 0.15)
        self.beta = kwargs.get('beta', 4e-6)
        self.delta = kwargs.get('delta', 0.95)
        self.c = kwargs.get('c', 1)
        self.a = kwargs.get('a', 2)
        self.a0 = kwargs.get('a0', 0)
        self.mu = kwargs.get('mu', 0.25)
        self.k = kwargs.get('k', 15)
        self.tstable = kwargs.get('tstable', 1e5)
        self.tmax = kwargs.get('tstable', 1e7)
        self.t = 0

        # Derived properties
        self.sdim, self.s0 = self.init_state()
        self.p_minmax = self.compute_p_competitive_monopoly()
        self.A = self.init_actions()
        self.PI = self.init_PI()
        self.Q = kwargs.get('Q', self.init_Q())

    def demand(self, p):
        """Computes demand"""
        e = np.exp((self.a - p) / self.mu)
        d = e / (np.sum(e) + np.exp(self.a0 / self.mu))
        return d

    def foc(self, p):
        """Compute first order condition"""
        d = self.demand(p)
        zero = 1 - (p - self.c) * (1 - d) / self.mu
        return np.squeeze(zero)

    def foc_monopoly(self, p):
        """Compute first order condition of a monopolist"""
        d = self.demand(p)
        d1 = np.flip(d)
        p1 = np.flip(p)
        zero = 1 - (p - self.c) * (1 - d) / self.mu + (p1 - self.c) * d1 / self.mu
        return np.squeeze(zero)

    def compute_p_competitive_monopoly(self):
        """Computes competitive and monopoly prices"""
        p0 = np.ones((1, self.n)) * 3 * self.c
        p_competitive = fsolve(self.foc, p0)
        p_monopoly = fsolve(self.foc_monopoly, p0)
        return p_competitive, p_monopoly

    def init_actions(self):
        """Get action space of the firms"""
        a = np.linspace(min(self.p_minmax[0]), max(self.p_minmax[1]), self.k - 2)
        delta = a[1] - a[0]
        A = np.linspace(min(a) - delta, max(a) + delta, self.k)
        return A

    def init_state(self):
        """Get state dimension and initial state"""
        sdim = (self.k, self.k)
        s0 = np.zeros(len(sdim)).astype(int)
        return sdim, s0

    def compute_profits(self, p):
        """Compute payoffs"""
        d = self.demand(p)
        pi = (p - self.c) * d
        return pi

    def init_PI(game):
        """Initialize Profits (k^n x kp x n)"""
        PI = np.zeros(game.sdim + (game.n,))
        for s in product(*[range(i) for i in game.sdim]):
            p = np.asarray(game.A[np.asarray(s)])
            PI[s] = game.compute_profits(p)
        return PI

    def init_Q(game):
        """Initialize Q function (n x #s x k)"""
        Q = np.zeros((game.n,) + game.sdim + (game.k,))
        for n in range(game.n):
            pi = np.mean(game.PI[:, :, n], axis=1 - n)
            Q[n] = np.tile(pi, game.sdim + (1,)) / (1 - game.delta)
        return Q

## Q-learning Functions

In [9]:
import sys
import numpy as np


def pick_strategies(game, s, t):
    """Pick strategies by exploration vs exploitation"""
    a = np.zeros(game.n).astype(int)
    pr_explore = np.exp(- t * game.beta)
    e = (pr_explore > np.random.rand(game.n))
    for n in range(game.n):
        if e[n]:
            a[n] = np.random.randint(0, game.k)
        else:
            a[n] = np.argmax(game.Q[(n,) + tuple(s)])
    return a


def update_q(game, s, a, s1, pi, stable):
    """Update Q matrix"""
    #rememebr to swap to not -1 for normal simulatiojn 
    for n in range(game.n):
        subj_state = (n,) + tuple(s) + (a[n],)
        old_value = game.Q[subj_state]
        max_q1 = np.max(game.Q[(n,) + tuple(s1)])
        new_value = pi[n] + game.delta * max_q1
        old_argmax = np.argmax(game.Q[(n,) + tuple(s)])
        game.Q[subj_state] = (1 - game.alpha) * old_value + game.alpha * new_value
        # Check stability
        new_argmax = np.argmax(game.Q[(n,) + tuple(s)])
        same_argmax = (old_argmax == new_argmax)
        stable = (stable + same_argmax) * same_argmax
    return game.Q, stable

def update_q_only_one(game, s, a, s1, pi, stable):
    """Update Q matrix"""
    #rememebr to swap to not -1 for normal simulatiojn 
    for n in range(game.n-1):
        subj_state = (n,) + tuple(s) + (a[n],)
        old_value = game.Q[subj_state]
        max_q1 = np.max(game.Q[(n,) + tuple(s1)])
        new_value = pi[n] + game.delta * max_q1
        old_argmax = np.argmax(game.Q[(n,) + tuple(s)])
        game.Q[subj_state] = (1 - game.alpha) * old_value + game.alpha * new_value
        # Check stability
        new_argmax = np.argmax(game.Q[(n,) + tuple(s)])
        same_argmax = (old_argmax == new_argmax)
        stable = (stable + same_argmax) * same_argmax
    return game.Q, stable


def check_convergence(game, t, stable):
    """Check if game converged"""
    if (t % game.tstable == 0) & (t > 0):
        sys.stdout.write("\rt=%i" % t)
        sys.stdout.flush()
    if stable > game.tstable:
        print('Converged!')
        return True
    if t == game.tmax:
        print('ERROR! Not Converged!')
        return True
    return False


def simulate_game(game, only_one = False):
    """Simulate game"""
    s = game.s0
    stable = 0
    profits_history = []
    actions_history = []
    # Iterate until convergence
    for t in range(int(game.tmax)):
        a = pick_strategies(game, s, t)
        pi = game.PI[tuple(a)]
        s1 = a
        if only_one: 
            game.Q, stable = update_q_only_one(game, s, a, s1, pi, stable)
        else: 
            game.Q, stable = update_q(game, s, a, s1, pi, stable)
        s = s1
        actions_history.append(game.A[a])
        profits_history.append(pi)
        if check_convergence(game, t, stable):
            game.t = t
            break
    
    #remeber to then re-adjust other plots to acount for the fact that they are not after the stability period 
    
    #profits_history = profits_history[-int(game.tstable):]
    #actions_history = actions_history[-int(game.tstable):]
    
    return game, profits_history, actions_history, int(game.tstable)


def compute_collusion(game, profits_history): 
    """Compute collusion"""  
    competitive, monopoly = game.compute_p_competitive_monopoly()

    avg_profit = np.mean(np.mean(profits_history))
    
    competitive_profit =  game.compute_profits(competitive)[0]
    monopoly_profit = game.compute_profits(monopoly)[0]

    return np.divide(avg_profit - competitive_profit, monopoly_profit - competitive_profit)
    

In [None]:
# Init algorithm
game_low = model(c=1)

# Compute equilibrium
game_equilibrium_low, profit_history_low, price_history_low, stability_low = simulate_game(game_low)


## Training the algorithms in the first context

In [None]:
price_history_low[-10::]

In [None]:
game_high = model(c=1.7)
game_equilibrium_high, profit_history_high, price_history_high, stability_high = simulate_game(game_high)

In [None]:
profit_history_low_adj = profit_history_low[-stability_low:]
profit_history_high_adj = profit_history_high[-stability_high:]

In [None]:
collusion_low = compute_collusion(game_equilibrium_low, profit_history_low_adj)

In [None]:
collusion_low

In [None]:
collusion_high = compute_collusion(game_equilibrium_high, profit_history_high_adj)

In [None]:
collusion_high

In [None]:
profit_history_high[-30::]

## Setting up the re-training second context

In [None]:
game_equilibrium_high.Q.shape

In [None]:
Q_matrix_for_retraining = np.stack((game_equilibrium_low.Q[0], game_equilibrium_high.Q[0]))

In [None]:
Q_matrix_for_retraining.shape

In [None]:
game_low_in_high = model(Q = Q_matrix_for_retraining, c=1.7)
game_equilibrium_low_in_high, profit_history_low_in_high, price_history_low_in_high, stability_low_in_high = simulate_game(game_low_in_high, only_one=True)

In [None]:
profit_history_low_in_high_adj = profit_history_high[-stability_low_in_high:]
collusion_low_in_high = compute_collusion(game_low_in_high, profit_history_low_in_high_adj)

In [None]:
profit_history_low_in_high_adj[-10:]

In [None]:
avg_profit = np.mean(np.mean(profit_history_low_in_high_adj))

In [None]:
avg_profit

In [None]:
collusion_low_in_high

In [None]:
competitive, monopoly = game_equilibrium_low_in_high.compute_p_competitive_monopoly()
competitive_profit_low_in_high =  game_equilibrium_low_in_high.compute_profits(competitive)[0]
monopoly_profit_low_in_high = game_equilibrium_low_in_high.compute_profits(monopoly)[0]

competitive, monopoly = game_equilibrium_low.compute_p_competitive_monopoly()
competitive_profit_low =  game_equilibrium_low.compute_profits(competitive)[0]
monopoly_profit_low = game_equilibrium_low.compute_profits(monopoly)[0]

competitive, monopoly = game_equilibrium_high.compute_p_competitive_monopoly()
competitive_profit_high =  game_equilibrium_high.compute_profits(competitive)[0]
monopoly_profit_high = game_equilibrium_high.compute_profits(monopoly)[0]

In [None]:
len(profit_history_low_in_high)

In [None]:
df_profit_history_low_in_high = pd.DataFrame(profit_history_low_in_high, columns=['firm_1', 'firm_2'])
df_profit_history_low = pd.DataFrame(profit_history_low, columns=['firm_1', 'firm_2'])
df_profit_history_high = pd.DataFrame(profit_history_high, columns=['firm_1', 'firm_2'])

In [None]:
import matplotlib.pyplot as plt

window_size = 50000

# Calculate the moving average for each firm
df_profit_history_low_in_high['firm_1_MA'] = df_profit_history_low_in_high['firm_1'].rolling(window=window_size).mean()
df_profit_history_low_in_high['firm_2_MA'] = df_profit_history_low_in_high['firm_2'].rolling(window=window_size).mean()

# Plot the moving averages
plt.figure(figsize=(14, 7))
plt.plot(df_profit_history_low_in_high['firm_1_MA'], label='Firm 1 Moving Average')
plt.plot(df_profit_history_low_in_high['firm_2_MA'], label='Firm 2 Moving Average')
plt.title('Moving Average of Profits for Firm 1 and Firm 2')
competitive_constant, monopoly_constant = game_equilibrium_low_in_high.p_minmax
plt.axhline(y=competitive_profit_low_in_high, color='r', linestyle='--', label='Competitive')
plt.axhline(y=monopoly_profit_low_in_high, color='g', linestyle='--', label='Monopoly')
plt.axvline(x=2700000-stability_low_in_high, color='b', linestyle='--', label='Stability Low in High')
plt.xlabel('Time')
plt.ylabel('profit')
plt.legend()
plt.show()

In [None]:
import matplotlib.pyplot as plt

window_size = 50000

# Calculate the moving average for each firm
df_profit_history_low['firm_1_MA'] = df_profit_history_low['firm_1'].rolling(window=window_size).mean()
df_profit_history_low['firm_2_MA'] = df_profit_history_low['firm_2'].rolling(window=window_size).mean()

# Plot the moving averages
plt.figure(figsize=(14, 7))
plt.plot(df_profit_history_low['firm_1_MA'], label='Firm 1 Moving Average')
plt.plot(df_profit_history_low['firm_2_MA'], label='Firm 2 Moving Average')
plt.title('Moving Average of Profits for Firm 1 and Firm 2')
competitive_constant, monopoly_constant = game_equilibrium_low.p_minmax
plt.axhline(y=competitive_profit_low, color='r', linestyle='--', label='Competitive')
plt.axhline(y=monopoly_profit_low, color='g', linestyle='--', label='Monopoly')
plt.xlabel('Time')
plt.ylabel('profit')
plt.legend()
plt.show()

In [None]:
df_profit_history_high.tail()

In [None]:
import matplotlib.pyplot as plt

window_size = 50000

# Calculate the moving average for each firm
df_profit_history_high['firm_1_MA'] = df_profit_history_high['firm_1'].rolling(window=window_size).mean()
df_profit_history_high['firm_2_MA'] = df_profit_history_high['firm_2'].rolling(window=window_size).mean()

# Plot the moving averages
plt.figure(figsize=(14, 7))
plt.plot(df_profit_history_high['firm_1_MA'], label='Firm 1 Moving Average')
plt.plot(df_profit_history_high['firm_2_MA'], label='Firm 2 Moving Average')
plt.title('Moving Average of Profits for Firm 1 and Firm 2')
competitive_constant, monopoly_constant = game_equilibrium_high.p_minmax
plt.axhline(y=competitive_profit_high, color='r', linestyle='--', label='Competitive')
plt.axhline(y=monopoly_profit_high, color='g', linestyle='--', label='Monopoly')
plt.xlabel('Time')
plt.ylabel('Profit')
plt.legend()
plt.show()

## Running the simulation for the merged context 

In [4]:
import numpy as np
import pandas as pd
import time

def run_simulation():
    # Low-cost game
    game_low = model(c=1)
    game_equilibrium_low, profit_history_low, price_history_low, stability_low = simulate_game(game_low)
    
    # High-cost game
    game_high = model(c=1.7)
    game_equilibrium_high, profit_history_high, price_history_high, stability_high = simulate_game(game_high)
    
    # Low-in-high game
    Q_matrix_for_retraining = np.stack((game_equilibrium_low.Q[0], game_equilibrium_high.Q[0]))
    game_low_in_high = model(Q=Q_matrix_for_retraining, c=1.7)
    game_equilibrium_low_in_high, profit_history_low_in_high, price_history_low_in_high, stability_low_in_high = simulate_game(game_low_in_high, only_one=True)
    
    # Adjust profit history
    profit_history_low_adj = profit_history_low[-stability_low:]
    profit_history_high_adj = profit_history_high[-stability_high:]
    profit_history_low_in_high_adj = profit_history_low_in_high[-stability_low_in_high:]
    
    # Compute collusion indices
    collusion_low = compute_collusion(game_equilibrium_low, profit_history_low_adj)
    collusion_high = compute_collusion(game_equilibrium_high, profit_history_high_adj)
    collusion_low_in_high = compute_collusion(game_low_in_high, profit_history_low_in_high_adj)
    
    # Collect time convergence
    time_convergence_low = game_equilibrium_low.t
    time_convergence_high = game_equilibrium_high.t
    time_convergence_low_in_high = game_equilibrium_low_in_high.t
    
    return {
        "time_convergence_low": time_convergence_low,
        "time_convergence_high": time_convergence_high,
        "time_convergence_low_in_high": time_convergence_low_in_high,
        "collusion_index_low": collusion_low,
        "collusion_index_high": collusion_high,
        "collusion_index_low_in_high": collusion_low_in_high
    }

# Measure the time taken for one simulation run
start_time = time.time()
run_simulation()
end_time = time.time()
time_per_simulation = end_time - start_time

# Total time for simulations (8 hours)
total_time = 8 * 60 * 60

# Number of iterations possible in 8 hours
num_iterations = int(total_time / time_per_simulation)

# Create a DataFrame to store results
results = []

# Run the simulation for the determined number of iterations
for _ in range(num_iterations):
    result = run_simulation()
    results.append(result)

# Convert results to a DataFrame
df_results = pd.DataFrame(results)

# Display the results DataFrame
print(df_results)

# Optionally save the results to a CSV file
df_results.to_csv("simulation_results.csv", index=False)


t=1400000Converged!
t=1800000Converged!
t=2300000Converged!
t=1800000Converged!
t=2100000Converged!
t=2900000Converged!
   time_convergence_low  time_convergence_high  time_convergence_low_in_high  \
0               1837697                2115143                       2922499   

   collusion_index_low  collusion_index_high  collusion_index_low_in_high  
0             0.972265              0.822591                     0.569401  


## Computing summary statistics of the simulation results 

In [4]:
df_results = pd.read_csv('simulation_results.csv')

In [5]:
correlation_matrix = df_results[['collusion_index_low', 'collusion_index_high', 'collusion_index_low_in_high']].corr()

In [6]:
correlation_matrix

Unnamed: 0,collusion_index_low,collusion_index_high,collusion_index_low_in_high
collusion_index_low,1.0,0.075064,-0.129862
collusion_index_high,0.075064,1.0,-0.041534
collusion_index_low_in_high,-0.129862,-0.041534,1.0


In [7]:
df_results.describe()

Unnamed: 0,time_convergence_low,time_convergence_high,time_convergence_low_in_high,collusion_index_low,collusion_index_high,collusion_index_low_in_high
count,126.0,126.0,126.0,126.0,126.0,126.0
mean,1562267.0,1781602.0,2662635.0,0.853285,0.858347,0.68165
std,225899.8,339753.5,210493.4,0.095447,0.100184,0.175802
min,1077309.0,1320025.0,2005295.0,0.591781,0.429553,0.263721
25%,1416074.0,1554930.0,2532451.0,0.785509,0.793214,0.560893
50%,1526637.0,1662913.0,2690010.0,0.868382,0.881412,0.673755
75%,1670714.0,1945738.0,2790596.0,0.933228,0.931288,0.818656
max,2413860.0,2763098.0,3184193.0,0.990218,0.992108,0.993455
