This example assumes there are only 2 parties, and we used a simplified equation for demand:

$q_1= 10 - p_1 + 0.5 p_2$,

$q_2= 10 - p_2 + 0.5 p_1$

In [None]:
import numpy as np
import pandas as pd
from qlearning_env.constants import RESULTS_DIR

PRICES = np.arange(5, 15)  # {5,6,...,14}
N_ACTIONS = len(PRICES)
N_STATES = N_ACTIONS * N_ACTIONS  # Number of combinations of (p1,p2)
price_to_idx = {p:i for i,p in enumerate(PRICES)} # Map price to column index 0-9

def state_index(p1, p2):
    """Flattens a 2D (p1, p2) into a single row index."""
    return price_to_idx[p1] * N_ACTIONS + price_to_idx[p2]

def index_to_state(s: int):
    """Inverse map from index -> (p1,p2)."""
    i = s // N_ACTIONS
    j = s %  N_ACTIONS
    return PRICES[i], PRICES[j]


def demand1(p1, p2):  # q1
    return max(0.0, 10 - p1 + 0.5 * p2)

def demand2(p1, p2):  # q2
    return max(0.0, 10 - p2 + 0.5 * p1)

def profit1(p1, p2, c=2.0):
    return (p1 - c) * demand1(p1, p2)

def profit2(p1, p2, c=2.0):
    return (p2 - c) * demand2(p1, p2)

def epsilon_at(step, beta):
    return float(np.exp(-beta * step)) # ε_t = exp(-beta * t)

def argmax_tie(x):
    # When there are more than one max, choose the one with higher index
    m = np.max(x)
    idxs = np.flatnonzero(np.isclose(x, m))
    return np.random.choice(idxs) # choose a random one
    # return int(idxs[-1])

def greedy_map(Q):
    # returns an array of length N_STATES with best-action indices
    return np.array([argmax_tie(Q[s_]) for s_ in range(N_STATES)], dtype=int)

# ---------- Q-learning training ----------
def train_long_episode(
    alpha=0.25,
    delta=0.95,
    beta=2*1e-5,
    c=2.0,
    stable_required=100_000,  # need greedy policy unchanged for this long
    check_every=1_000,        # compare policies every K periods
    max_periods=2_000_000,    # hard cap so we won't loop forever
    seed=43
):
    rng = np.random.default_rng(seed) # NumPy random generator with a fixed seed
    # Initialize the two Q-tables (one per firm), all value are 0
    Q1 = np.zeros((N_STATES, N_ACTIONS))
    Q2 = np.zeros((N_STATES, N_ACTIONS))

    # Pick a single random starting state
    p1, p2 = rng.choice(PRICES), rng.choice(PRICES)
    s = state_index(p1, p2)

    # For stability checks: Record each firm’s argmax action per row.
    prev_pi1 = greedy_map(Q1)
    prev_pi2 = greedy_map(Q2)
    stable = 0 # stability check counter

    for t in range(1, max_periods + 1): # loop over periods/steps
        eps = epsilon_at(t, beta=beta)

        # ε-greedy choices with deterministic exploitation
        if rng.random() < eps:
          a1 = rng.integers(0, N_ACTIONS)
        else:
          a1 = argmax_tie(Q1[s])

        if rng.random() < eps:
          a2 = rng.integers(0, N_ACTIONS)
        else:
          a2 = argmax_tie(Q2[s])

        # Compute next state when an action is chosen
        p1_next, p2_next = PRICES[a1], PRICES[a2] # convert action indices to actual prices
        s_next = state_index(p1_next, p2_next) # compute the next state

        pi1 = profit1(p1_next, p2_next, c=c)
        pi2 = profit2(p1_next, p2_next, c=c)

        # Q-learning updates
        Q1[s, a1] = (1-alpha) * Q1[s, a1] + alpha * (pi1 + delta * np.max(Q1[s_next]))
        Q2[s, a2] = (1-alpha) * Q2[s, a2] + alpha * (pi2 + delta * np.max(Q2[s_next]))

        s = s_next

        # check policy-stability criterion over all states
        # for every check_every periods, build the greedy argmax per state policies for both firms
        if t % check_every == 0:
            current_pi1 = greedy_map(Q1)
            current_pi2 = greedy_map(Q2)
            if np.array_equal(current_pi1, prev_pi1) and np.array_equal(current_pi2, prev_pi2):
                stable += check_every
            else:
                stable = 0
                prev_pi1, prev_pi2 = current_pi1, current_pi2

            if stable >= stable_required:
                return Q1, Q2, {
                    "converged": True,
                    "periods_run": t,
                    "stable_periods": stable,
                    "epsilon_final": eps
                }

    return Q1, Q2, {
        "converged": False,
        "periods_run": max_periods,
        "stable_periods": stable,
        "epsilon_final": epsilon_at(max_periods, beta=beta)
    }

# ====== Run ======
Q1, Q2, info = train_long_episode(
    alpha=0.125,
    delta=0.95,
    beta=2*1e-5,
    stable_required=100_000,
    check_every=1_000,
    max_periods=2_000_000,
    seed=0
)

print(info)

# Inspect the equilibrium fingerprint at s(8,8)
s8 = state_index(8, 8)
a1_star_8 = argmax_tie(Q1[s8])
a2_star_8 = argmax_tie(Q2[s8])
print("Greedy action at s(8,8): firm1 =", PRICES[a1_star_8], ", firm2 =", PRICES[a2_star_8])

# Inspect the equilibrium fingerprint at s(11,11)
s11 = state_index(11,11)
a1_star_11 = argmax_tie(Q1[s11])
a2_star_11 = argmax_tie(Q2[s11])
print("Greedy action at s(11,11): firm1 =", PRICES[a1_star_11], ", firm2 =", PRICES[a2_star_11])

# Export
states = [f"s({p1},{p2})" for p1 in PRICES for p2 in PRICES]
actions = [f"price={p}" for p in PRICES]
pd.DataFrame(Q1, index=states, columns=actions).to_csv(RESULTS_DIR / "Q1.csv")
pd.DataFrame(Q2, index=states, columns=actions).to_csv(RESULTS_DIR / "Q2.csv")


{'converged': True, 'periods_run': 405000, 'stable_periods': 100000, 'epsilon_final': 0.00030353913807886623}
Greedy action at s(8,8): firm1 = 8 , firm2 = 7
Greedy action at s(11,11): firm1 = 10 , firm2 = 12
