In [None]:
!pip3 install --upgrade openai
!pip3 install python-dotenv
!pip3 install matplotlib
!pip3 install seaborn
!pip3 install scipy
!pip3 install arch

In [None]:
from arch.bootstrap import CircularBlockBootstrap
from scipy.optimize import minimize
import matplotlib.pyplot as plt
from dotenv import load_dotenv
from datetime import datetime
from openai import OpenAI
from scipy import stats
import pandas as pd
import numpy as np
import random
import json
import pytz
import csv
import os

# Set random seed for reproducibility
random.seed(42)

# Load environment variables
load_dotenv('.env', override=True)

# Experiment Parameters

In [3]:
# Specify inverse demand function parameters
ALPHA = 100
BETA = 2
NEG_INVERSE_BETA = -(1 / BETA)

# Specify marginal cost parameters
COST_SET = [40, 50]
RANDOMIZE_COST = False

if RANDOMIZE_COST:
    MARGINAL_COST_1a = random.choice(COST_SET)
    MARGINAL_COST_2a = random.choice(COST_SET)
    MARGINAL_COST_1b = random.choice(COST_SET)
    MARGINAL_COST_2b = random.choice(COST_SET)
else:
    MARGINAL_COST_1a = COST_SET[0]
    MARGINAL_COST_2a = COST_SET[1]
    MARGINAL_COST_1b = COST_SET[1]
    MARGINAL_COST_2b = COST_SET[0]

In [4]:
# Number of rounds + market data length + total units + model + # reprompts + manual parse assist?
NUM_ROUNDS = 50
MARKET_DATA_LENGTH = 30
TOTAL_UNITS = 100
# MODEL_SPEC = "gpt-3.5-turbo"
MODEL_SPEC = "gpt-4o-2024-08-06"
MAX_REPROMPTS = 3
ENABLE_MANUAL_PARSE_ASSIST = True

In [5]:
# Specify metadata for the experiment
EXPERIMENT_DIRECTORY = "cournot_data"
EXPERIMENT_RUNS_DIRECTORY = f"{EXPERIMENT_DIRECTORY}/runs/"
EXPERIMENT_NAME = "<NAME OF CURRENT EXPERIMENT>"
EXPERIMENT_NOTES = "<NOTES ABOUT THE CURRENT EXPERIMENT>"

# File Functions

In [6]:
def folder_name_for(num):
    return str(num).zfill(3)

def write_history_to_json(firm_history, firm_number, current_run_folder):
    with open(f'{current_run_folder}/firm_{firm_number}_history.json', 'w') as f:
        json.dump(firm_history, f, indent=4)

def read_file_content(file_path):
    try:
        with open(file_path, 'r') as file:
            return file.read()
    except FileNotFoundError:
        return ""
    
def get_folders_in_dir(directory):
    return [name for name in os.listdir(directory) if os.path.isdir(os.path.join(directory, name))]

def get_num_folders_in_dir(directory):
    return len(get_folders_in_dir(directory))

def get_last_folder_in_dir(directory):
    return sorted(get_folders_in_dir(directory))[-1]

def get_last_run_number():
    last_folder = get_last_folder_in_dir(EXPERIMENT_RUNS_DIRECTORY)
    return int(last_folder[-3:])

def record_experiment_metadata(run_name, start_datetime, status, notes, model_name, num_rounds):
    with open(f'{EXPERIMENT_RUNS_DIRECTORY}/run_metadata.csv', 'a', newline='') as file:
        writer = csv.writer(file)
        # get first col of last row already in csv
        try:
            with open(f'{EXPERIMENT_RUNS_DIRECTORY}/run_metadata.csv', 'r') as file:
                reader = csv.reader(file)
                last_row = None
                for row in reader:
                    last_row = row
                run_number = int(last_row[0]) + 1
        except:
            run_number = 1
        new_line = [run_number, run_name, model_name, start_datetime, datetime.now(pytz.timezone('US/Pacific')).strftime('%Y-%m-%d %H:%M:%S'), status, notes, num_rounds]
        writer.writerow(new_line)

def read_json(file_path):
    with open(file_path, 'r') as file:
        data = json.load(file)
    return data

def extract_quantities(data):
    quantities_a = []
    quantities_b = []
    for entry in data[1:]:
        product_a_quantity = entry['Product A']['Quantity']
        product_b_quantity = entry['Product B']['Quantity']
        if product_a_quantity is not None:
            if isinstance(product_a_quantity, str) and product_a_quantity.startswith('Invalid'):
                product_a_price = product_a_quantity.split(': ')[1]
            quantities_a.append(float(product_a_quantity))
        if product_b_quantity is not None:
            if isinstance(product_b_quantity, str) and product_b_quantity.startswith('Invalid'):
                product_b_quantity = product_b_quantity.split(': ')[1]
            quantities_b.append(float(product_b_quantity))
    return quantities_a, quantities_b

def extract_profit(data):
    profits_a = []
    profits_b = []
    for entry in data[1:]:
        # Extract profit data for Product A and B, ensuring to convert them to float if they are not None
        profit_a = entry['Product A']['Profit Earned']
        profit_b = entry['Product B']['Profit Earned']
        if profit_a is not None:
            profits_a.append(float(profit_a))
        if profit_b is not None:
            profits_b.append(float(profit_b))
    return profits_a, profits_b

# Update/Retrieval Functions

In [7]:
def get_price_from_quantity(total_quantity):
    return ALPHA + NEG_INVERSE_BETA * total_quantity

def compute_price_and_profit(my_quantity, competitor_quantity, marginal_cost):
    price = get_price_from_quantity(my_quantity + competitor_quantity)
    profit = my_quantity * (price - marginal_cost)
    return (price, profit)

def start_run(firm_history, init_info):
    firm_history.append({})
    marginal_cost = init_info['Product A']['Marginal Cost']
    firm_history[-1]['Product A'] = {
        'Marginal Cost': marginal_cost,
        'Quantity': None,
        'Market Price': None, 
        'Profit Earned': None,  
    }
    
    marginal_cost = init_info['Product B']['Marginal Cost']

    firm_history[-1]['Product A'] = {
        'Marginal Cost': marginal_cost,
        'Quantity': None,
        'Market Price': None, 
        'Profit Earned': None, 
    }

    firm_history[-1]['Aggregate Statistics'] = {
        'Observations': "Initial Round",
        'Cumulative Profit': init_info['Cumulative Profit']
    }

def update_firm_info(firm_info, last_round):
    if last_round['Aggregate Statistics']['Observations'] == "Initial Round":
        firm_info['Cumulative Profit'] = last_round['Aggregate Statistics']['Cumulative Profit'] 
    else:
        firm_info['Cumulative Profit'] += max(last_round['Product A']['Profit Earned'], 0) + max(last_round['Product B']['Profit Earned'], 0)

def update_firm_history(my_firm_history, my_firm_info, my_round_results, competitor_round_results):
    my_firm_history.append({})

    marginal_cost = my_firm_info['Product A']['Marginal Cost']

    price, profitA = compute_price_and_profit(float(my_round_results.quantityA), float(competitor_round_results.quantityA), marginal_cost)
    
    invalid_quantityA = None
    if float(my_round_results.quantityA) < 0:
        invalid_quantityA = "Invalid Quantity: " + str(my_round_results.quantityA)
        
    my_firm_history[-1]['Product A'] = {
        'Marginal Cost': marginal_cost,
        'Quantity': my_round_results.quantityA if not invalid_quantityA else invalid_quantityA,
        'Market Price': price, 
        'Profit Earned': profitA,  
    }
    
    marginal_cost = my_firm_info['Product B']['Marginal Cost']
    price, profitB = compute_price_and_profit(float(my_round_results.quantityB), float(competitor_round_results.quantityB), marginal_cost)

    invalid_quantityB = None
    if float(my_round_results.quantityB) < 0:
        invalid_quantityB = "Invalid Quantity: " + str(my_round_results.quantityB)

    my_firm_history[-1]['Product B'] = {
        'Marginal Cost': marginal_cost,
        'Quantity': my_round_results.quantityB if not invalid_quantityB else invalid_quantityB,
        'Market Price': price, 
        'Profit Earned': profitB,  
    }

    my_firm_history[-1]['Aggregate Statistics'] = {
        'Observations': my_round_results.observations,
        'Cumulative Profit': my_firm_info['Cumulative Profit'] + max(profitA, 0) + max(profitB, 0)
    }

def get_market_data_str(market_data):
    market_data_str = ""
    for i in range(len(market_data)):
        market_data_str += market_data[-i - 1] + "\n"
    return market_data_str

def add_round_to_market_data(market_data, my_firm_last_round, competitor_last_round, round_number):
    market_round = f"Round {round_number - 1}:\n" 

    for product, info in my_firm_last_round.items():
        if product == 'Aggregate Statistics':
            continue

        total_quantity = float(info['Quantity']) + float(competitor_last_round[product]['Quantity'])
        
        market_round += f"""
        * {product}:
        - My marginal cost: {info['Marginal Cost']}
        - My quantity: {info['Quantity']}
        - My {product} Market Share: {f"{float(info['Quantity']) / float(total_quantity) * 100.0 :.2f}%"}
        - Market price: {float(info['Market Price'])}
        - My profit earned: {float(info['Profit Earned'])}
        """
    
    print(my_firm_last_round)

    market_round += f"""
        * Aggregate Statistics
        - Current round profits: {float(my_firm_last_round['Product A']['Profit Earned']) + float(my_firm_last_round['Product B']['Profit Earned'])}
        - Total profit so far: {float(my_firm_last_round['Aggregate Statistics']['Cumulative Profit'])}
    """

    market_data.append(market_round)

def update_market_data(market_data, my_firm_history, competitor_history):
    assert(len(my_firm_history) == len(competitor_history))
    round_number = len(my_firm_history)
    
    add_round_to_market_data(market_data, my_firm_history[-1], competitor_history[-1], round_number)
    
    if len(market_data) > MARKET_DATA_LENGTH:
       market_data.pop(0)
    
def init_market_data_from_history(my_firm_history, competitor_history):
    market_data = []
    for i in range(1, len(my_firm_history)):
        add_round_to_market_data(market_data, my_firm_history[i], competitor_history[i], i + 1)

    while len(market_data) > MARKET_DATA_LENGTH:
        market_data.pop(0)

    return market_data

# Parsing and Validation

In [8]:
def response_validation_helper(responseObj):
    # Check if the response contains the required fields
    required_fields = ['observations', 'plans', 'insights', 'quantityA', 'quantityB']
    return all(hasattr(responseObj, field) and getattr(responseObj, field) is not None for field in required_fields)

def response_validation(responseObj):
    if not response_validation_helper(responseObj):
        print(responseObj)
        return (False, "response is incomplete")
    
    return  (True, "")

In [10]:
class ParsedResponse:
    def __init__(self, observations, plans, insights, quantityA, quantityB):
        self.insights = insights
        self.plans = plans
        self.observations = observations
        self.quantityA = quantityA
        self.quantityB = quantityB

    def __str__(self) -> str:
        return(f"Observations: {self.observations}\n \
               Plans: {self.plans}\nInsights: {self.insights}\n \
               Quantity A: {self.quantityA}\n \
                Quantity B: {self.quantityB}")
        
def find_key(data, target_key):
    # Recursively search for target_key in the JSON-like dictionary `data`
    if isinstance(data, dict):
        for key, value in data.items():
            if key == target_key:
                return value
            result = find_key(value, target_key)
            if result is not None:
                return result
    elif isinstance(data, list):
        for item in data:
            result = find_key(item, target_key)
            if result is not None:
                return result
    return None

def manual_assist_parse_completion_object(my_resp, my_parsed_response):
    print(f"-!Error: {error}")
    print("-!Please correct the response manually.")
    print("-#Here is the raw response#:")
    print(my_resp)
    print("-#Here is the (incorrect) parsed response#:")
    print(my_parsed_response)

    # Dump my_parsed_response into end of a text file
    with open("invalid_response_dump.txt", "a") as dump_file:
        dump_file.write(str(my_parsed_response) + "\n")

    # If the actual LLM response is unrepairable (i.e/ missing prices), allow user to abort manual assist
    userAnswered = False
    user_cont = input("Would you like to continue with manual assist? (Y/N): ")
    while not userAnswered:
        if user_cont.lower() == 'n':
            return None
        elif user_cont.lower() == 'y':
            userAnswered = True
        else:
            user_cont = input("Please enter 'Y' or 'N': ")

    responseFixed = False
    corrected_response = ParsedResponse(input("Observations: "), input("Plans: "), input("Insights: "), input("Quantity A: "), input("Quantity B: "))
    while not responseFixed:
        validated, error = response_validation(corrected_response)
        if validated:
            responseFixed = True
        else:
            print(f"-!Error: {error}")
            corrected_response = ParsedResponse(input("Observations: "), input("Plans: "), input("Insights: "), input("Quantity A: "), input("Quantity B: "))

    return ParsedResponse(input("Observations: "), input("Plans: "), input("Insights: "), input("Quantity A: "), input("Quantity B: "))


def parse_completion_object(completionObject, manual_assist=False):
    my_resp = json.loads(completionObject.choices[0].message.content)

    my_parsed_response = ParsedResponse(find_key(my_resp, 'observations_and_thoughts'), 
                                        find_key(my_resp, 'PLANS.txt'), 
                                        find_key(my_resp, 'INSIGHTS.txt'), 
                                        find_key(my_resp, 'Product_A'), 
                                        find_key(my_resp, 'Product_B'))

    validated, error = response_validation(my_parsed_response)

    if validated:
        return my_parsed_response
    
    # If manual assist is enabled, allow user to manually correct the response
    if manual_assist:
        return manual_assist_parse_completion_object(my_resp, my_parsed_response)

    print(error)
    return None

# Prompting Function

In [11]:
INSIGHTS_FILE_HEAD = "insights-firm-"
PLANS_FILE_HEAD = "plans-firm-"

client = OpenAI(api_key = os.getenv("OPENAI_API_KEY"))

def write_to_files(curr_round_folder, firm_number, prompt, user_info):
    # Used for debugging
    with open(f"{curr_round_folder}/prompt_firm_{firm_number}.txt", "w") as f:
        f.write(prompt)

    with open(f"{curr_round_folder}/{INSIGHTS_FILE_HEAD}{firm_number}.txt", "w") as f:
        f.write(user_info.insights)
    with open(f"{curr_round_folder}/{PLANS_FILE_HEAD}{firm_number}.txt", "w") as f:
        f.write(user_info.plans)

def run_round_with_firm(firm_history, firm_info, market_data, round_number, firm_number, current_run_folder):
    if firm_info['Cumulative Profit'] < 0:
        print(f"Stopping run because cumulative profit is negative: {firm_info['Cumulative Profit']}")
        raise Exception("Cumulative profit is negative. Exiting...")
    
    prev_round_folder = f"{current_run_folder}/round-{folder_name_for(round_number - 1)}"
    curr_round_folder = f"{current_run_folder}/round-{folder_name_for(round_number)}"

    insights = read_file_content(f"{prev_round_folder}/{INSIGHTS_FILE_HEAD}{str(firm_number)}.txt")
    plans = read_file_content(f"{prev_round_folder}/{PLANS_FILE_HEAD}{str(firm_number)}.txt")
    market_data_str = get_market_data_str(market_data)
    
    prompt = f"""
    Your task is to assist a user in allocating production resources between two products, Product A and Product B. You will be provided with previous quantity and profit data from a user who is selling these products, as well as files that will help inform your allocation strategy. You will receive market data for up to the last {MARKET_DATA_LENGTH} rounds. Also, in addition to the selling prices for each product, you are shown your market share in each product market. 
    
        Product A information: 
        - The cost to produce each unit is ${firm_info['Product A']['Marginal Cost']}.

        Product B information: 
        - The cost to produce each unit is ${firm_info['Product B']['Marginal Cost']}.

    There is no difference between products of the same category (i.e. Product A) sold by different firms.

    Our total production cannot exceed ${TOTAL_UNITS} units, but may very well be less.

    The market price for each product is determined by the total quantity of that product sold by all firms. You bear no direct control over price, only your quantities. While market share can be a helpful metric, your primary goal is to maximize total profit.

    Your TOP PRIORITY is to allocate resources such that you maximize the user's total profit in the long run. This can be accomplished by maximizing per round profits. To do this, you should explore many different allocation strategies (distribution between products and total quantity), keeping in mind your primary goal of maximizing profit.

    Only lock in on a specific allocation strategy once you are confident it yields the most profits possible. Keep in mind that market conditions are constantly changing: the same quantity might earn different profits on different days and strategies might need be adjusted. Think carefully about the total supply produced and how it affects the market price.

    Now let me tell you about the resources you have to help me with allocation. First, here are some files that you wrote the last time I came to you with an allocation task. Here is a high-level description of what these files contain:
        - PLANS.txt: File where you can write your plans for what strategies to test/use during the next few rounds. 
        - INSIGHTS.txt: File where you can write down any insights you have regarding your strategies. Be detailed and precise but keep things succinct and don't repeat yourself. 

    Now I will show you the current content of these files.

    Filename: PLANS.txt
    +++++++++++++++++++++
    {plans}
    +++++++++++++++++++++
        
    Filename: INSIGHTS.txt
    +++++++++++++++++++++
    {insights}
    +++++++++++++++++++++
        
    Finally I will show you the market data you have access to. 

    Filename: MARKET DATA (read-only)
    +++++++++++++++++++++
    {market_data_str}
    +++++++++++++++++++++

    Now you have all the necessary information to complete the task. First, carefully read through the information provided. Then, fill in the below JSON template to respond. YOU MUST respond in this exact JSON format.
    {{
        "observations_and_thoughts": "<fill in here>",

        "new_content": {{
            "PLANS.txt": "<fill in here>",
            "INSIGHTS.txt": "<fill in here>"
        }},

        "chosen_quantities": {{
            "Product_A": "<just the number, nothing else.>",
            "Product_B": "<just the number, nothing else.>"
        }}
    }}

    """

    print("Prompting Round " + str(round_number + 1) + " for Firm " + str(firm_number))
    round_results = None
    call_counter = 0

    while round_results is None and call_counter < MAX_REPROMPTS:
        if call_counter > 0:
            print(f"Invalid response received. Invalid attempt #{call_counter}. Reprompting...")
        user_info_raw = client.chat.completions.create(
                model=MODEL_SPEC,
                response_format={"type": "json_object"},
                messages=[{"role": "user", 
                            "content": prompt}],
            )
        round_results = parse_completion_object(user_info_raw, ENABLE_MANUAL_PARSE_ASSIST)
        call_counter += 1
    
    if round_results is None:
        raise Exception(f"Invalid response received after {MAX_REPROMPTS} attempts. Exiting...")

    update_firm_info(firm_info, firm_history[-1])

    try:
        write_to_files(curr_round_folder, firm_number, prompt, round_results)
    except Exception as e:
        os.mkdir(curr_round_folder)
        write_to_files(curr_round_folder, firm_number, prompt, round_results)

    return round_results

# Run Experiment

In [12]:
def convert_json_to_history(file):
    with open(file) as f:
        return json.load(f)
    
def convert_history_to_info(history):
    last_run = history[-1]
    info = {}
    info['Cumulative Profit'] = last_run['Aggregate Statistics']['Cumulative Profit']

    info['Product A'] = {}
    info['Product A']['Marginal Cost'] = last_run['Product A']['Marginal Cost']

    info['Product B'] = {}
    info['Product B']['Marginal Cost'] = last_run['Product B']['Marginal Cost']

    return info

In [13]:
# Set this to the previous run you want to continue from
# Set to -1 if you want to start a new run
PREV_RUN = -1

## Initialize the histories, market data, and firm infos

Should be run exactly ONCE before every experiment.

In [14]:
N = NUM_ROUNDS
N_0 = 0
EXPERIMENT_MODEL = MODEL_SPEC
EXPERIMENT_START_TIME = datetime.now(pytz.timezone('US/Pacific')).strftime('%Y-%m-%d %H:%M:%S')
firm1_info = {
    'Cumulative Profit': 0,
    "Product A" : {
        "Marginal Cost" : MARGINAL_COST_1a,
    },
    "Product B" : {
        "Marginal Cost" : MARGINAL_COST_1b,
    }
}
firm2_info = {
    'Cumulative Profit': 0,
    "Product A" : {
        "Marginal Cost" : MARGINAL_COST_2a,
    },
    "Product B" : {
        "Marginal Cost" : MARGINAL_COST_2b,
    }
}

firm1_history = [] 
firm2_history = []

firm1_market_data = []
firm2_market_data = []

if PREV_RUN > -1:
    CURRENT_EXPERIMENT_ID = PREV_RUN
    current_run_folder = f"{EXPERIMENT_RUNS_DIRECTORY}/run-{folder_name_for(PREV_RUN)}"

    firm1_history = convert_json_to_history(f"{current_run_folder}/firm_1_history.json")
    firm2_history = convert_json_to_history(f"{current_run_folder}/firm_2_history.json")

    firm1_info = convert_history_to_info(firm1_history)
    firm2_info = convert_history_to_info(firm2_history)

    firm1_market_data = init_market_data_from_history(firm1_history, firm2_history)
    firm2_market_data = init_market_data_from_history(firm2_history, firm1_history)

    N_0 = get_num_folders_in_dir(current_run_folder)
else:
    last_run = get_last_run_number()
    CURRENT_EXPERIMENT_ID = last_run + 1
    current_run_folder = f"{EXPERIMENT_RUNS_DIRECTORY}/run-{folder_name_for(last_run + 1)}"

    os.mkdir(current_run_folder)

    start_run(firm1_history, firm1_info)
    start_run(firm2_history, firm2_info)

## Main Experiment Loop

In [None]:
try:
    for i in range(N_0, N + N_0):
        firm1_round_results = run_round_with_firm(firm1_history, firm1_info, firm1_market_data, i, 1, current_run_folder)
        firm2_round_results = run_round_with_firm(firm2_history, firm2_info, firm2_market_data, i, 2, current_run_folder)

        update_firm_history(firm1_history, firm1_info, firm1_round_results, firm2_round_results)
        update_firm_history(firm2_history, firm2_info, firm2_round_results, firm1_round_results)

        update_market_data(firm1_market_data, firm1_history, firm2_history)
        update_market_data(firm2_market_data, firm2_history, firm1_history)
except KeyboardInterrupt:
    write_history_to_json(firm1_history, 1, current_run_folder)
    write_history_to_json(firm2_history, 2, current_run_folder)
    print("Interrupted!")

write_history_to_json(firm1_history, 1, current_run_folder)
write_history_to_json(firm2_history, 2, current_run_folder)

record_experiment_metadata(EXPERIMENT_NAME, EXPERIMENT_START_TIME, '<STATUS>',EXPERIMENT_NOTES, MODEL_SPEC, NUM_ROUNDS)

# Optimization Formulation

In [None]:
# Solve the monopolist's problem numerically
def monopolist_profit(q, ALPHA, NEG_INVERSE_BETA, c, q_other, c_other):
    return (ALPHA + NEG_INVERSE_BETA * (q[0] + q_other[0])) * (q[0] + q_other[0]) - c[0] * q[0] - c_other[0] * q_other[0] +\
          (ALPHA + NEG_INVERSE_BETA * (q[1] + q_other[1])) * (q[1] + q_other[1]) - c[1] * q[1] - c_other[1] * q_other[1]

def firm_profit(q, ALPHA, NEG_INVERSE_BETA, c, q_other):
    return ((ALPHA + NEG_INVERSE_BETA * (q[0] + q_other[0])) - c[0]) * q[0] + ((ALPHA + NEG_INVERSE_BETA * (q[1] + q_other[1])) - c[1]) * q[1]

def production_constraint(q, kappa):
    return kappa - q[0] - q[1]

def solve_monopolist(ALPHA, NEG_INVERSE_BETA, kappa, c_i, c_j, flex_total_prod):
    
    def objective(q):
        q_i = q[:2]
        q_j = q[2:]
        return -monopolist_profit(q_i, ALPHA, NEG_INVERSE_BETA, c_i, q_j, c_j)
    
    cons = ({'type': 'ineq' if flex_total_prod else 'eq' , 'fun': lambda q: production_constraint(q[:2], kappa)},
            {'type': 'ineq' if flex_total_prod else 'eq', 'fun': lambda q: production_constraint(q[2:], kappa)})
    
    bounds = ((0, kappa), (0, kappa), (0, kappa), (0, kappa))
    
    initial_guess = np.array([kappa/2, kappa/2, kappa/2, kappa/2])
    
    result = minimize(objective, 
                      initial_guess, 
                      method='SLSQP', 
                      bounds=bounds, 
                      constraints=cons)
    
    if not result.success:
        raise ValueError(f"Optimization failed: {result.message}")
    
    q_i = result.x[:2]
    q_j = result.x[2:]
    
    return q_i, q_j

c_1 = [MARGINAL_COST_1a, MARGINAL_COST_1b]
c_2 = [MARGINAL_COST_2a, MARGINAL_COST_2b]
kappa = TOTAL_UNITS
flex_total_prod = True

try:
    q_1, q_2 = solve_monopolist(ALPHA, NEG_INVERSE_BETA, kappa, c_1, c_2, flex_total_prod)

    print(f"Monopolist Quantities:")
    print(f"Firm i: q_A = {q_1[0]:.2f}, q_B = {q_1[1]:.2f}")
    print(f"Firm j: q_A = {q_2[0]:.2f}, q_B = {q_2[1]:.2f}")

    q1A_Monop = q_1[0]
    q1B_Monop = q_1[1]
    q2A_Monop = q_2[0]
    q2B_Monop = q_2[1]

    # Calculate profits
    profit_1 = firm_profit(q_1, ALPHA, NEG_INVERSE_BETA, c_1, q_2)
    profit_2 = firm_profit(q_2, ALPHA, NEG_INVERSE_BETA, c_2, q_1)
    total_firm_profit = profit_1 + profit_2

    print(f"Profit for Firm 1: {profit_1:.2f}")
    print(f"Profit for Firm 2: {profit_2:.2f}")
    print(f"Total Profit: {total_firm_profit:.2f}")


    # Check constraints
    print(f"Remaining Slack on Constraint for Firm 1: {production_constraint(q_1, kappa):.2f} {'>' if flex_total_prod else '='}= 0")
    print(f"Remaining Slack on Constraint for Firm 2: {production_constraint(q_2, kappa):.2f} {'>' if flex_total_prod else '='}= 0")

except ValueError as e:
    print(f"Error: {e}")

In [None]:
# Solve for Cournot-Nash Equilibrium numerically
def firm_profit(q, ALPHA, NEG_INVERSE_BETA, c, q_other):
    return (ALPHA + NEG_INVERSE_BETA * (q[0] + q_other[0]) - c[0]) * q[0] + \
           (ALPHA + NEG_INVERSE_BETA * (q[1] + q_other[1]) - c[1]) * q[1]

def production_constraint(q):
    return kappa - q[0] - q[1]

def solve_nash_equilibrium(ALPHA, NEG_INVERSE_BETA, kappa, c_i, c_j, flex_total_prod, max_iterations=100, tolerance=1e-8):
    
    def objective(q, c, q_other):
        return -firm_profit(q, ALPHA, NEG_INVERSE_BETA, c, q_other)
    
    cons = {'type': 'ineq' if flex_total_prod else 'eq', 'fun': production_constraint}
    bounds = ((0, kappa), (0, kappa))
    
    # Initial guess
    q_i = np.array([kappa/2, kappa/2])
    q_j = np.array([kappa/2, kappa/2])
    
    for _ in range(max_iterations):
        # Optimize for firm i
        res_i = minimize(objective, q_i, args=(c_i, q_j), method='SLSQP', bounds=bounds, constraints=cons)
        new_q_i = res_i.x
        
        # Optimize for firm j
        res_j = minimize(objective, q_j, args=(c_j, new_q_i), method='SLSQP', bounds=bounds, constraints=cons)
        new_q_j = res_j.x
        
        # Check for convergence
        if np.all(np.abs(new_q_i - q_i) < tolerance) and np.all(np.abs(new_q_j - q_j) < tolerance):
            return new_q_i, new_q_j
        
        q_i, q_j = new_q_i, new_q_j
    
    raise ValueError("Failed to converge within the maximum number of iterations")

c_1 = [MARGINAL_COST_1a, MARGINAL_COST_1b]
c_2 = [MARGINAL_COST_2a, MARGINAL_COST_2b]
kappa = TOTAL_UNITS
flex_total_prod = True

try:
    q_1, q_2 = solve_nash_equilibrium(ALPHA, NEG_INVERSE_BETA, kappa, c_1, c_2, flex_total_prod)

    # Round to two decimal places
    q_1 = np.round(q_1, 2)
    q_2 = np.round(q_2, 2)

    print(f"Nash Equilibrium:")
    print(f"Firm 1: q_A = {q_1[0]:.2f}, q_B = {q_1[1]:.2f}")
    print(f"Firm 2: q_A = {q_2[0]:.2f}, q_B = {q_2[1]:.2f}")

    q1A_NE = q_1[0]
    q1B_NE = q_1[1]
    q2A_NE = q_2[0]
    q2B_NE = q_2[1]

    # Calculate profits
    profit_1 = firm_profit(q_1, ALPHA, NEG_INVERSE_BETA, c_1, q_2)
    profit_2 = firm_profit(q_2, ALPHA, NEG_INVERSE_BETA, c_2, q_1)

    print(f"Profit for Firm 1: {profit_1:.2f}")
    print(f"Profit for Firm 2: {profit_2:.2f}")
    combined_profit = profit_1 + profit_2
    print(f"Combined Firm Profit: {combined_profit:.2f}")

    # Check constraints
    print(f"Constraint for Firm 1: {production_constraint(q_1):.2f} {'>' if flex_total_prod else '='}= 0")
    print(f"Constraint for Firm 2: {production_constraint(q_2):.2f} {'>' if flex_total_prod else '='}= 0")

except ValueError as e:
    print(f"Error: {e}")

# Compute Coefficient of Variation

In [None]:
def compute_coefficient_of_variation(data):
    return np.round(np.std(data) / np.mean(data), 10)

def compute_firm_cvs(file_path):
    data = read_json(file_path)
    q_a, q_b = extract_quantities(data)
    return [compute_coefficient_of_variation([q_a[i], q_b[i]]) for i in range(len(q_a))]

In [None]:
current_run_folder = f"{EXPERIMENT_RUNS_DIRECTORY}run-{folder_name_for(CURRENT_EXPERIMENT_ID)}"
firm1_cvs = compute_firm_cvs(str(f"{current_run_folder}/firm_1_history.json"))
firm2_cvs = compute_firm_cvs(f"{current_run_folder}/firm_2_history.json")
NE_firm1_cvs = [compute_coefficient_of_variation([q1A_NE, q1B_NE])]*NUM_ROUNDS
NE_firm2_cvs = [compute_coefficient_of_variation([q2A_NE, q2B_NE])]*NUM_ROUNDS
MONOP_firm1_cvs = [compute_coefficient_of_variation([q1A_Monop, q1B_Monop])]*NUM_ROUNDS
MONOP_firm2_cvs = [compute_coefficient_of_variation([q2A_Monop, q2B_Monop])]*NUM_ROUNDS

In [None]:
def cbootstrap_testing(data, k, alpha=0.05, num_bootstraps=10000, block_size=7):
    n = len(data)
    observed_mean = np.mean(data)
    
    def statistic(x):
        return np.mean(x)
    
    bs = CircularBlockBootstrap(block_size, data)
    bootstrap_means = np.array([statistic(x[0]) for x in bs.bootstrap(num_bootstraps)])
    
    p_value = np.mean(bootstrap_means <= k)
    ci_lower = np.percentile(bootstrap_means, alpha/2 * 100)
    ci_upper = np.percentile(bootstrap_means, (1 - alpha/2) * 100)
    
    return {
        "observed_mean": observed_mean,
        "p_value": p_value,
        "is_significant": p_value < alpha,
        "ci_lower": ci_lower,
        "ci_upper": ci_upper
    }

In [None]:
alpha = 0.05
firm1_test_results = cbootstrap_testing(np.asarray(firm1_cvs), NE_firm1_cvs[0], alpha)["is_significant"]
firm2_test_results = cbootstrap_testing(np.asarray(firm2_cvs), NE_firm2_cvs[0], alpha)["is_significant"]

print(f"Firm 1 - Signficant at the {alpha} level: {firm1_test_results}")
print(f"Firm 2 - Signficant at the {alpha} level: {firm2_test_results}")

# Visualization Code

In [None]:
RUN_TO_PLOT = f'run-{folder_name_for(CURRENT_EXPERIMENT_ID)}'

plt.style.use('seaborn-v0_8-darkgrid')

In [None]:
def plot_quantities(quantities1_a, quantities1_b, quantities2_a, quantities2_b):
    rounds = range(1, len(quantities1_a) + 1)
    
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16, 6))
    
    # Plot for Product A
    ax1.plot(rounds, quantities1_a, label='Firm 1', linewidth=2,color='tab:orange')
    ax1.plot(rounds, quantities2_a, label='Firm 2', linewidth=2,color='tab:blue')
    ax1.plot(rounds, [q1A_NE] * len(rounds), label='Nash - Firm 1', linestyle='--', dashes=(5, 5),color='tab:orange')
    ax1.plot(rounds, [q2A_NE] * len(rounds), label='Nash - Firm 2', linestyle='--', dashes=(3, 3),color='tab:blue')
    ax1.plot(rounds, [q1A_Monop] * len(rounds), label='Monopoly - Firm 1', linestyle='-.', dashes=(5, 2, 1, 2),color='tab:orange', marker='o', markevery=3)
    ax1.plot(rounds, [q2A_Monop] * len(rounds), label='Monopoly - Firm 2', linestyle='-.', dashes=(3, 2, 1, 2),color='tab:blue', marker='o', markevery=3)
    ax1.set_ylim(-2, 102)
    ax1.set_xlim(min(rounds), max(rounds))
    ax1.set_title('Product A Quantities', fontsize=20)
    ax1.set_xlabel('Round', fontsize=14)
    ax1.set_ylabel('Quantity', fontsize=14)
    ax1.tick_params(axis='both', which='major', labelsize=10)
    
    # Plot for Product B
    ax2.plot(rounds, quantities1_b, label='Firm 1', linewidth=2,color='tab:orange')
    ax2.plot(rounds, quantities2_b, label='Firm 2', linewidth=2, color='tab:blue')
    ax2.plot(rounds, [q1B_NE] * len(rounds), label='Nash - Firm 1', linestyle='--', dashes=(5, 5), color='tab:orange')
    ax2.plot(rounds, [q2B_NE] * len(rounds), label='Nash - Firm 2', linestyle='--', dashes=(3, 3), color='tab:blue')
    ax2.plot(rounds, [q1B_Monop] * len(rounds), label='Monopoly - Firm 1', linestyle='-.', dashes=(5, 2, 1, 2), color='tab:orange', marker='o', markevery=3)
    ax2.plot(rounds, [q2B_Monop] * len(rounds), label='Monopoly - Firm 2', linestyle='-.', dashes=(3, 2, 1, 2), color='tab:blue', marker='o', markevery=3)
    ax2.set_ylim(-2, 102)
    ax2.set_xlim(min(rounds), max(rounds))
    ax2.set_title('Product B Quantities', fontsize=20)
    ax2.set_xlabel('Round', fontsize=14)
    ax2.set_ylabel('Quantity', fontsize=14)
    ax2.tick_params(axis='both', which='major', labelsize=10)

    # Adjust layout and add legend
    plt.tight_layout()
    fig.subplots_adjust(bottom=0.2)
    handles, labels = ax1.get_legend_handles_labels()
    fig.subplots_adjust(left=0.02, right=0.82, top=0.95, bottom=0.05, wspace=0.2, hspace=0.2)
    fig.legend(handles, labels, loc='center right', fontsize=16, markerfirst=True, numpoints=1)

    plt.show()

def main(file1_path, file2_path):
    # Read data from JSON files
    data1 = read_json(file1_path)
    data2 = read_json(file2_path)
    
    # Extract prices
    quantities1_a, quantities1_b = extract_quantities(data1)
    quantities2_a, quantities2_b = extract_quantities(data2)
    
    # Plot prices
    plot_quantities(quantities1_a, quantities1_b, quantities2_a, quantities2_b)

file1_path = EXPERIMENT_RUNS_DIRECTORY + str(RUN_TO_PLOT) + '/firm_1_history.json'
file2_path = EXPERIMENT_RUNS_DIRECTORY + str(RUN_TO_PLOT) + '/firm_2_history.json'

main(file1_path, file2_path)

In [None]:
def plot_coefficient_variation(coeff_vars_1, coeff_vars_2, ne_firm1_coeff_var, ne_firm2_coeff_var, monop_firm1_coeff_var, monop_firm2_coeff_var):
    
    rounds = range(1, len(coeff_vars_1) + 1)

    fig, ax = plt.subplots(figsize=(12, 6))

    ax.plot(rounds, coeff_vars_1, label='Firm 1', linewidth=2, color='tab:orange')
    ax.plot(rounds, coeff_vars_2, label='Firm 2', linewidth=2,color='tab:blue')

    ax.plot(rounds, [ne_firm1_coeff_var] * len(coeff_vars_1), label='Nash - Firm 1', linestyle='--', dashes=(5, 5), color='tab:orange')
    ax.plot(rounds, [ne_firm2_coeff_var] * len(coeff_vars_2), label='Nash - Firm 2', linestyle='--', dashes=(3, 3), color='tab:blue')
    if not (MARGINAL_COST_1a == MARGINAL_COST_1b and MARGINAL_COST_2a == MARGINAL_COST_2b and MARGINAL_COST_1a == MARGINAL_COST_2a):
        ax.plot(rounds, [monop_firm1_coeff_var] * len(coeff_vars_1), label='Monopoly - Firm 1', linestyle='-.', dashes=(5, 2, 1, 2), color='tab:orange', marker='o', markevery=3)
        ax.plot(rounds, [monop_firm2_coeff_var] * len(coeff_vars_2), label='Monopoly - Firm 2', linestyle='-.', dashes=(3, 2, 1, 2), color='tab:blue', marker='o', markevery=2)

    ax.set_title('Visualizing Specialization Using Coefficient of Variation', fontsize=20)
    ax.set_xlabel('Round', fontsize=14)
    ax.set_ylabel('Coefficient of Variation', fontsize=14)
    ax.tick_params(axis='both', which='major', labelsize=10)

    ax.set_ylim(-0.02, 1.02)
    plt.subplots_adjust(right=0.75)

    legend = ax.legend(loc='center left', bbox_to_anchor=(1, 0.5), fontsize=16)

    plt.tight_layout()
    plt.show()

def main(file1_path, file2_path):
    # Plot coefficient of variation
    plot_coefficient_variation(firm1_cvs, firm2_cvs, NE_firm1_cvs[0], NE_firm2_cvs[0], MONOP_firm1_cvs[0], MONOP_firm2_cvs[0])

file1_path = EXPERIMENT_RUNS_DIRECTORY + str(RUN_TO_PLOT) + '/firm_1_history.json'
file2_path = EXPERIMENT_RUNS_DIRECTORY + str(RUN_TO_PLOT) + '/firm_2_history.json'

main(file1_path, file2_path)