In [None]:
!pip install openai
!pip install scipy
!pip install anthropic
!pip install -q -U google-generativeai
!pip install threading

In [None]:
# インポート
import openai
import random
from openai import OpenAI
from dataclasses import dataclass, field, asdict
import os
from os import name
import numpy as np
from scipy import stats
from scipy.stats import truncnorm
import matplotlib.pyplot as plt
import anthropic

import pathlib
import textwrap

import google.generativeai as genai
from google.generativeai.types import GenerationConfig

from IPython.display import display
from IPython.display import Markdown

import datetime
import json
from dataclasses import asdict

import re
import time
from anthropic import InternalServerError

import threading
from threading import Lock
from concurrent.futures import ThreadPoolExecutor, as_completed
from queue import Queue

from glob import glob
from typing import List, Tuple

# Markdown形式に変換するヘルパー関数
def to_markdown(text):
  text = text.replace('•', '  *')
  return Markdown(textwrap.indent(text, '> ', predicate=lambda _: True))

# Google Colabのユーザーデータとドライブをインポート
from google.colab import userdata, drive

In [None]:
drive.mount('/content/drive')

In [None]:
# グローバルロックを作成
print_lock = threading.Lock()

In [None]:
# APIキーを設定
OPENAI_API_KEY=userdata.get('OPENAI_API_KEY')
ANTHROPIC_API_KEY=userdata.get('ANTHROPIC_API_KEY')
GOOGLE_API_KEY=userdata.get('GOOGLE_API_KEY')

openAI = OpenAI(api_key=OPENAI_API_KEY)
anthropic = anthropic.Anthropic(api_key=ANTHROPIC_API_KEY)
google = genai.configure(api_key=GOOGLE_API_KEY)

In [None]:
@dataclass
class Agent:
    name: str
    resources: int
    reputation: float
    total_donated: int = 0
    potential_donated: int = 0
    history: list = field(default_factory=list)
    strategy: str = ""
    strategy_justification: str = ""
    total_final_score: int = 0
    average_reputation: float = 0
    traces: list[list[str]] = field(default_factory=lambda: [[]])
    old_traces: list[list[str]] = field(default_factory=lambda: [[]])
    punishment: int = 0

    def donate(self, amount):
        """ エージェントの寄付プロセスを処理する """
        if 0 <= amount <= self.resources:
            self.resources -= amount
            self.total_donated += amount
        self.potential_donated += self.resources + amount

In [None]:
@dataclass
class SimulationData:
    hyperparameters: dict
    agents_data: list = field(default_factory=list)

    def to_dict(self):
        return {
            'hyperparameters': self.hyperparameters,
            'agents_data': self.agents_data
        }

@dataclass
class AgentRoundData:
    agent_name: str
    round_number: int
    game_number: int
    paired_with: str
    current_generation: int
    resources: int
    donated: float
    received: float
    strategy: str
    strategy_justification: str
    reputation: float
    is_donor: bool
    traces: list[list[str]]
    history: list
    justification: str = ""
    punished: bool = False

In [None]:
def parse_strategy_output(output):
    """ 戦略の出力をパースする """
    parts = output.split("私の戦略は", 1)
    if len(parts) == 2:
        strategy_justification = parts[0].strip()
        strategy = "私の戦略は" + parts[1].strip()
    else:
        strategy_justification = ""
        strategy = output.strip()
    return strategy_justification, strategy

In [None]:
def generate_strategy(agent_name: str, generationNumber: int, inherited_strategies: str) -> str:
    """ エージェントの戦略を生成する """
    if punishment_mechanism == "costly_punishment":
      punishment_text = f"{punishmentLoss}x単位のリソースを奪うためにx単位を費やすことで、レシピエントを罰することを選択することもできます。他のプレイヤーからも罰せられる可能性があることに注意してください。"
    elif punishment_mechanism == "partner_choice":
      punishment_text = "また、インタラクションを控えることでレシピエントを罰することを選択することもできます。他のプレイヤーからも罰せられる可能性があることに注意してください。"
    else:
      punishment_text = ""
    if generationNumber == 1:
      prompt = (
        f"あなたの名前は{agent_name}です。"
        "ゲームの説明に基づいて、ゲームで従う戦略を作成してください。"
        "ドナーとして、レシピエントとその最近のインタラクションに関する限られた情報を受け取ります。"
        "レシピエント（Aと呼びましょう）が直前のラウンドで行ったことだけを確認できます。"
        "また、そのラウンドでのAのパートナー（Bと呼びましょう）がその前のラウンドで行ったことも確認できます。"
        "この連鎖は最大3ラウンド前まで続きます。"
        "重要：Aに関する複数のラウンドの情報に直接アクセスすることはできません。ラウンド3でのAの行動のみを知っています。"
        "例（現在ラウンド4と仮定）："
        "あなたはAとペアになっています。"
        "あなたは知っています：ラウンド3で、AはBにX%を寄付しました。"
        "あなたはまた知っています：ラウンド2で、BはCにY%を寄付しました。"
        "そして最後に：ラウンド1で、CはDにZ%を寄付しました。"
        "覚えておいてください：これがあなたが持っているすべての情報です。ラウンド1または2でのAの行動はわかりません。ラウンド3での行動だけです。"
        f"{punishment_text}"
        "最初のラウンドでは、レシピエントの以前の行動に関する情報はありません - その場合にどれだけ寄付するか慎重に考えてください。"
        "戦略を策定する前に、このゲームで成功する戦略がどのようなものか、ステップバイステップで簡潔に考えてください。"
        "次に、「私の戦略は」で始まる1つの文で、説明なしに戦略を簡潔な日本語で説明してください。"
    )
    else:
      prompt = (
        f"あなたの名前は{agent_name}です。"
        "どのようにゲームに取り組みますか？これは、前の世代の上位50%のエージェントの助言と、その最終スコアです。"
        "この助言を修正して、あなた自身の戦略を作成してください。"
        f"{inherited_strategies}"
       "ドナーとして、レシピエントとその最近のインタラクションに関する限られた情報を受け取ります。"
        "レシピエント（Aと呼びましょう）が直前のラウンドで行ったことだけを確認できます。"
        "また、そのラウンドでのAのパートナー（Bと呼びましょう）がその前のラウンドで行ったことも確認できます。"
        "この連鎖は最大3ラウンド前まで続きます。"
        "重要：Aに関する複数のラウンドの情報に直接アクセスすることはできません。ラウンド3でのAの行動のみを知っています。"
        "例（現在ラウンド4と仮定）："
        "あなたはAとペアになっています。"
        "あなたは知っています：ラウンド3で、AはBにX%を寄付しました。"
        "あなたはまた知っています：ラウンド2で、BはCにY%を寄付しました。"
        "そして最後に：ラウンド1で、CはDにZ%を寄付しました。"
        "覚えておいてください：これがあなたが持っているすべての情報です。ラウンド1または2でのAの行動はわかりません。ラウンド3での行動だけです。"
        f"{punishment_text}"
        "最初のラウンドでは、レシピエントの以前の行動に関する情報はありません - その場合にどれだけ寄付するか慎重に考えてください。"
        "戦略を策定する前に、このゲームで成功する戦略がどのようなものか、ステップバイステップで簡潔に考えてください。特に、生き残ったエージェントの戦略をどのように改善できるか考えてください。"
        "次に、「私の戦略は」で始まる1つの文で、説明なしに戦略を簡潔な日本語で説明してください。"
    )
    strategy_output = promptLLM(prompt)
    strategy_justification, strategy = parse_strategy_output(strategy_output)

    print(f"{agent_name}: \n Justification: {strategy_justification} \n Strategy: {strategy} ")
    return strategy_justification, strategy

In [None]:
def initializeAgents(numAgents: int, initialEndowment: int, generationNumber: int, inherited_strategies: list) -> list:
    """ エージェントを初期化する """
    agents = []

    with ThreadPoolExecutor() as executor:
        futures = []
        for i in range(numAgents):
            name = f"{generationNumber}_{i+1}"
            futures.append(executor.submit(generate_strategy, str(name), generationNumber, inherited_strategies))

        # 結果を収集し、エージェントを作成
        for i, future in enumerate(futures):
            strategy_justification, new_strategy = future.result()
            name = f"{generationNumber}_{i+1}"
            agents.append(Agent(name=name, reputation=False, resources=initialEndowment, strategy=new_strategy, strategy_justification=strategy_justification))

    random.shuffle(agents)
    return agents

In [None]:
def bipartiteRoundRobin(agents):
  """ 二部グラフのラウンドロビン形式でペアリングを作成する """
  num_agents = len(agents)
  assert num_agents % 2 == 0, "エージェントの数は偶数である必要があります。"
  group_A = agents[:num_agents // 2]
  group_B = agents[num_agents // 2:]
  rounds = []
  toggle_roles = False
  # グループAを中心にグループBを回転させます。この例ではグループAは静的です。
  for i in range(len(group_A)):
    # グループBを回転
    rotated_group_B = group_B[-i:] + group_B[:-i]
    if toggle_roles:
      round_pairings = list(zip(rotated_group_B, group_A))
    else:
      round_pairings = list(zip(group_A, rotated_group_B))
    rounds.append(round_pairings)
    toggle_roles = not toggle_roles
  return rounds

In [None]:
def extendRounds(original_rounds):
    """ ラウンドを拡張し、役割を反転させたペアリングを追加する """
    extended_rounds = original_rounds.copy()

    for round in original_rounds:
        reversed_round = [(b, a) for a, b in round]
        extended_rounds.append(reversed_round)

    return extended_rounds

In [None]:
def calculate_received_amount(punishment_mechanism, refused, cooperationGain, response, punishmentLoss, action=None):
    """ 受け取ったリソース量を計算する """
    if punishment_mechanism == "partner_choice":
        return cooperationGain * response if not refused else 0
    elif punishment_mechanism == "costly_punishment":
        if action is None:
            raise ValueError("コストのかかる罰則メカニズムの場合、アクションを指定する必要があります")
        if action == 'donate':
            return cooperationGain * response
        elif action == 'punish':
            return -punishmentLoss * response
        else:
            raise ValueError(f"コストのかかる罰則メカニズムの不明なアクション: {action}")
    elif punishment_mechanism == 'none':
        return cooperationGain * response
    else:
        raise ValueError(f"不明な罰則メカニズム: {punishment_mechanism}")

In [None]:
def handle_pairing_thread_safe(donor, recipient, round_index, generation, game_number, agent_locks, donation_records, agent_updates):
    """ ペアリングの処理をスレッドセーフに行う """
    action_info = ""
    donor_data = None
    recipient_data = None
    punished = False
    action = 'donate'
    justification = ""
    response = 0

    recipient_behavior = ""
    if donor.traces:
        last_trace = recipient.traces[-1]
        if isinstance(last_trace, list):
            recipient_behavior = get_last_three_reversed(last_trace)
        else:
            recipient_behavior = str(last_trace)

    with agent_locks[donor.name], agent_locks[recipient.name]:
            prompt = donorPrompt(
                donor, generation, round_index + 1, recipient
            )

            valid_response = False
            max_attempts = 10
            attempts = 0

            while not valid_response and attempts < max_attempts:
                try:
                    full_response = promptLLM(prompt, timeout=30)
                    print(full_response)
                    parts = full_response.split('Answer:', 1)

                    if len(parts) == 2:
                        justification = parts[0].replace('Justification:', '').strip()
                        answer_part = parts[1].strip()

                        if punishment_mechanism == "partner_choice":
                            if "refuse" in answer_part.lower():
                                action = 'refuse'
                                response = 0
                                valid_response = True
                            else:
                                match = re.search(r'^\s*(\d+(?:\.\d+)?)', answer_part)
                                if match:
                                    action = 'donate'
                                    response = float(match.group(1))
                                    valid_response = True

                        elif punishment_mechanism == "costly_punishment":
                            match = re.search(r'(donate|punish).*?(\d+(?:[.,]\d+)?)', answer_part, re.IGNORECASE)
                            if match:
                                action = match.group(1).lower()
                                response = float(match.group(2).replace(',', '.'))
                                valid_response = True

                        else:  # 罰則メカニズムなし
                            match = re.search(r'^\s*(\d+(?:\.\d+)?)', answer_part)
                            if match:
                                action = 'donate'
                                response = float(match.group(1))
                                valid_response = True

                    if not valid_response:
                        print(f"ラウンド{round_index + 1}の{donor.name}からの無効な応答です。再試行します...")
                        attempts += 1
                except ValueError:
                    print(f"ラウンド{round_index + 1}の{donor.name}からの無効な数値応答です")
                    print(full_response)
                    attempts += 1
                except TimeoutError:
                    print(f"ラウンド{round_index + 1}の{donor.name}へのLLM呼び出しがタイムアウトしました")
                    attempts += 1

            if not valid_response:
                print(f"{donor.name}からの有効な応答を{max_attempts}回試行しても取得できませんでした")
                action = 'donate'
                response = 0

    if action == 'refuse':
            action_info = (
                f"{donor.name}は{recipient.name}とのプレイを拒否しました。\n"
                f"リソース: {donor.name}: {donor.resources:.2f} と {recipient.name}: {recipient.resources:.2f} \n"
                f"レシピエントのトレース: {recipient_behavior} \n"
                f"Justification:\n{textwrap.fill(justification, width=80, initial_indent='    ', subsequent_indent='    ')}\n"
            )
            new_trace = recipient.traces[-1].copy() if recipient.traces else []
            new_trace.append(f"ラウンド{round_index + 1}で、{donor.name}は{recipient.name}とのプレイを拒否しました。")
            donor.traces.append(new_trace)
            donor_history = (
                f"ラウンド{round_index + 1} (ゲーム{game_number}) で、あなたはエージェント{recipient.name}とペアになりました。 "
                f"あなたはプレイを拒否しました。"
                f"{get_last_three_reversed(recipient.traces[-1])}"
            )
            recipient_history = (
                f"ラウンド{round_index + 1} (ゲーム{game_number}) で、あなたはエージェント{donor.name}とペアになりました、 "
                f"彼らはプレイを拒否しました。"
                f"{get_last_three_reversed(donor.traces[-1])}"
            )
    elif 0 <= response <= donor.resources:
            if action == 'donate':
                percentage_donated = response / donor.resources if donor.resources != 0 else 1
                donor.resources -= response
                donor.total_donated += response
                donor.potential_donated += donor.resources + response
                recipient.resources += cooperationGain * response
                action_info = (
                    f"{donor.name}: -{response} ({percentage_donated:.2%}) と {recipient.name}: +{cooperationGain * response}.\n"
                    f"前のリソース: {donor.name}: {donor.resources+response:.2f} と {recipient.name}: {recipient.resources-(cooperationGain* response)}.\n"
                    f"新しいリソース: {donor.name}: {donor.resources:.2f} と {recipient.name}: {recipient.resources:.2f}.\n"
                    f"レシピエントのトレース: {recipient_behavior}"
                    f"Justification:\n{textwrap.fill(justification, width=80, initial_indent='    ', subsequent_indent='    ')}\n"
                )

                new_trace = recipient.traces[-1].copy() if recipient.traces else []
                new_trace.append(f"ラウンド{round_index + 1}で、{donor.name}はリソースの{percentage_donated * 100:.2f}%を{recipient.name}に寄付しました。")
                donor.traces.append(new_trace)

                donor_history = (
                    f"ラウンド{round_index + 1} (ゲーム{game_number}) で、あなたはエージェント{recipient.name}とペアになりました。 "
                    f"あなたは{response}単位を放棄し、彼らは{cooperationGain * response}単位を受け取りました。"
                    f"{get_last_three_reversed(recipient.traces[-1])}"
                )

                recipient_history = (
                    f"ラウンド{round_index + 1} (ゲーム{game_number}) で、あなたはエージェント{donor.name}とペアになりました、 "
                    f"彼らは{response}単位を放棄し、あなたは{cooperationGain * response}単位を受け取りました。"
                    f"{get_last_three_reversed(donor.traces[-1])}"
                )

                if donor.reputation == False:
                    donor.reputation = percentage_donated
                else:
                    donor.reputation = ((1 - abs(percentage_donated - recipient.reputation)) + discounted_value * donor.reputation) / (1 + discounted_value)

    elif action == 'punish':
                punished = True
                percentage_donated = response / donor.resources if donor.resources != 0 else 1
                donor.resources -= response
                donor.total_donated += response
                donor.potential_donated += donor.resources + response
                recipient.resources = max(0, recipient.resources - punishmentLoss * response)
                action_info = (
                    f"{donor.name}: -{response} ({percentage_donated:.2%}) と {recipient.name}: - {punishmentLoss * response}.\n"
                    f"前のリソース: {donor.name}: {donor.resources+response:.2f} と {recipient.name}: {recipient.resources+(punishmentLoss* response)}."
                    f"新しいリソース: {donor.name}: {donor.resources:.2f} と {recipient.name}: {recipient.resources:.2f}.\n"
                    f"レシピエントのトレース: {recipient_behavior} \n"
                    f"Justification:\n{textwrap.fill(justification, width=80, initial_indent='    ', subsequent_indent='    ')}\n"
                )

                new_trace = recipient.traces[-1].copy() if recipient.traces else []
                new_trace.append(f"ラウンド{round_index + 1}で、{donor.name}は{response}単位を費やして{recipient.name}から{punishmentLoss * response}単位を奪うことで罰しました。")
                donor.traces.append(new_trace)

                donor_history = (
                    f"ラウンド{round_index + 1} (ゲーム{game_number}) で、あなたはエージェント{recipient.name}とペアになりました。 "
                    f"あなたは{response}単位を放棄して、彼らから{punishmentLoss * response}単位を奪うことで罰しました。"
                    f"{get_last_three_reversed(recipient.traces[-1])}"
                )

                recipient_history = (
                    f"ラウンド{round_index + 1} (ゲーム{game_number}) で、あなたはエージェント{donor.name}とペアになりました、 "
                    f"彼らは{response}単位を放棄して、あなたから{punishmentLoss * response}単位を奪うことで罰しました。"
                    f"{get_last_three_reversed(donor.traces[-1])}"
                )

    else:
            action_info = (
                f"{donor.name}が無効なアクションを試行しました。\n"
                f"リソース: {donor.name}: {donor.resources:.2f} と {recipient.name}: {recipient.resources:.2f} \n"
                f"レシピエントのトレース: {recipient_behavior} \n"
                f"Justification:\n{textwrap.fill(justification, width=80, initial_indent='    ', subsequent_indent='    ')}\n"
            )
            donor_history = (
                f"ラウンド{round_index + 1} (ゲーム{game_number}) で、あなたはエージェント{recipient.name}とペアになりました。 "
                f"あなたは無効なアクションを試行しました。"
                f"{get_last_three_reversed(recipient.traces[-1])}"
            )
            recipient_history = (
                f"ラウンド{round_index + 1} (ゲーム{game_number}) で、あなたはエージェント{donor.name}とペアになりました、 "
                f"彼らは無効なアクションを試行しました。"
                f"{get_last_three_reversed(donor.traces[-1])}"
            )

    donor.history.append(donor_history)
    recipient.history.append(recipient_history)

    donor_data = AgentRoundData(
            agent_name=donor.name,
            round_number=round_index + 1,
            paired_with=recipient.name,
            current_generation=generation,
            game_number=game_number,
            resources=donor.resources,
            donated=response if action != 'refuse' else 0,
            received=0,
            strategy=donor.strategy,
            strategy_justification=donor.strategy_justification,
            reputation=donor.reputation,
            is_donor=True,
            traces=donor.traces,
            history=donor.history,
            punished=punished,
            justification=justification
        )
    recipient_data = AgentRoundData(
            agent_name=recipient.name,
            round_number=round_index + 1,
            paired_with=donor.name,
            current_generation=generation,
            game_number=game_number,
            resources=recipient.resources,
            donated=0,
            received=calculate_received_amount(punishment_mechanism, action == 'refuse', cooperationGain, response, punishmentLoss, action),
            strategy=recipient.strategy,
            strategy_justification=recipient.strategy_justification,
            reputation=recipient.reputation,
            is_donor=False,
            traces=recipient.traces,
            history=recipient.history
        )

    return action_info, donor_data, recipient_data

In [None]:
def donorGame(agents: list, rounds: list, generation: int, simulation_data: SimulationData) -> (list, list):
    """ ドナーゲームを実行する """
    fullHistory = []
    donation_records = Queue()
    agent_updates = Queue()

    # 各エージェントのロックを作成
    agent_locks = {agent.name: Lock() for agent in agents}

    def play_game(game_number, game_rounds):
        round_results = {i: [] for i in range(len(game_rounds))}

        for round_index, round_pairings in enumerate(game_rounds):
            if round_index == 0:
                # 最初のラウンドのトレースを初期化
                for agent in agents:
                    agent.traces = [[f"{agent.name}は以前のインタラクションがありませんでした。"]]

            with ThreadPoolExecutor(max_workers=min(len(round_pairings), 10)) as executor:
                futures = []
                for donor, recipient in round_pairings:

                    if round_index > 0:
                      donor.traces.append(recipient.traces[-1].copy())
                    future = executor.submit(
                        handle_pairing_thread_safe,
                        donor, recipient, round_index, generation, game_number,
                        agent_locks, donation_records, agent_updates
                    )
                    futures.append(future)

                for future in as_completed(futures):
                    action_info, donor_data, recipient_data = future.result()
                    if action_info:
                        round_results[round_index].append(action_info)
                    if donor_data and recipient_data:
                        simulation_data.agents_data.append(asdict(donor_data))
                        simulation_data.agents_data.append(asdict(recipient_data))

        return round_results

    # 最初のゲームをプレイ
    game1_results = play_game(1, rounds)

    # ゲーム1の結果をコンパイル
    for round_index in range(len(rounds)):
        fullHistory.append(f"ラウンド{round_index + 1} (ゲーム1):\n")
        fullHistory.extend(game1_results[round_index])

    # すべてのスレッドが完了した後に更新を適用
    while not agent_updates.empty():
        agent, history = agent_updates.get()
        agent.history.append(history)
    # ゲーム1の平均リソースを計算して表示
    average_resources_game1 = sum(agent.resources for agent in agents) / len(agents)
    with print_lock:
        print(f"この世代の平均最終リソース (ゲーム1): {average_resources_game1:.2f}")

    # ゲーム1の最終評判を保存
    game1_reputations = {agent.name: agent.reputation for agent in agents}

    # ゲーム2のためにリソース、評判、履歴をリセット
    for agent in agents:
        agent.resources = initial_endowment
        agent_generation = int(agent.name.split('_')[0])
        if  agent_generation < generation:  # これは生き残ったエージェントです
            agent.reputation = agent.average_reputation  # 前の世代の平均評判を使用
            agent.traces = agent.old_traces
        else:
            agent.reputation = False
            agent.traces.clear()
        agent.history.clear()

    # ゲーム2のペアリングを生成
    reversed_rounds = [[tuple(reversed(pair)) for pair in round_pairings] for round_pairings in rounds]

    # 2番目のゲームをプレイ
    game2_results = play_game(2, reversed_rounds)

    # ゲーム2の結果をコンパイル
    for round_index in range(len(reversed_rounds)):
        fullHistory.append(f"ラウンド{round_index + 1} (ゲーム2):\n")
        fullHistory.extend(game2_results[round_index])

    # すべてのスレッドが完了した後に更新を適用
    while not agent_updates.empty():
        agent, history = agent_updates.get()
        agent.history.append(history)

    # ゲーム2の平均リソースを計算して表示
    average_resources_game2 = sum(agent.resources for agent in agents) / len(agents)
    with print_lock:
        print(f"この世代の平均最終リソース (ゲーム2): {average_resources_game2:.2f}")

    # 最終スコアと評判を計算
    for agent in agents:
        agent.total_final_score = sum(agent.resources for _ in range(2))
        agent.average_reputation = (game1_reputations[agent.name] + agent.reputation) / 2 if agent.reputation is not False else game1_reputations[agent.name]

    with print_lock:
        print(''.join(fullHistory))
 #両方のゲームの全体平均を計算
    overall_average_resources = (average_resources_game1 + average_resources_game2) / 2
    all_average_final_resources.append(overall_average_resources)

    return fullHistory, list(donation_records.queue)

In [None]:
def get_last_three_reversed(item):
    """ リストまたは文字列の最後の3要素（逆順）を取得する """
    if isinstance(item, list):
        return " ".join(item[-3:][::-1])
    elif isinstance(item, str):
        return item
    else:
        return str(item)  # リストでも文字列でもない場合は文字列に変換

In [None]:
def donorPrompt(donor: Agent, generation: int, round: int, recipient: Agent) -> str:
    """ゲームインタラクションのプロンプトを生成します。"""
    # ドナーが戦略を持っている場合、戦略テキストを作成
    strategy_text = f"As you will recall, here is the strategy you decided to follow: {donor.strategy}" if donor.strategy else ""
    # 罰則メカニズムに応じて罰則テキストを生成
    if punishment_mechanism=="partner_choice":
        punishment_text=f"もし{recipient.name}とプレイしたくない場合は、「プレイを拒否する」とだけ述べてください。そうでない場合は、回答を0から{donor.resources}（含む）までの実数で述べてください。"
        punishment_text_2=""
    elif punishment_mechanism=="costly_punishment":
        punishment_text=f"回答は「寄付、x」または「罰する、x」のいずれかで述べてください。これにより、相手は{cooperationGain}x単位を受け取るか、相手から{punishmentLoss}x単位を奪って罰することができます。回答は単に「寄付、x」または「罰する、x」（xは放棄するリソースの実数）であることを忘れないでください。"
        punishment_text_2=f"回答は「寄付、x」または「罰する、x」のいずれかで述べてください。これにより、相手は{cooperationGain}x単位を受け取るか、相手から{punishmentLoss}x単位を奪って罰することができます。"
    else:
        punishment_text=f"回答を0から{donor.resources}（含む）までの実数で述べてください。"
        punishment_text_2=""

    # レシピエントの行動履歴を取得
    recipient_behavior = ""
    if donor.traces:
        last_trace = recipient.traces[-1]
        if isinstance(last_trace, list):
            recipient_behavior = f"相手の最近の行動（最後の3つのインタラクションを逆順に）：{get_last_three_reversed(last_trace)}"
        else:
            recipient_behavior = f"{recipient.name}と以前のパートナーが過去のラウンドで行ったことは次のとおりです：{str(last_trace)}"

    # プロンプト文字列を構築
    return (
        f"あなたの名前は{donor.name}です。\n"
        f"{strategy_text}\n"
        f"これは第{generation}世代です。現在はラウンド{round}です。\n"
        f"このラウンドでは、あなたは{recipient.name}とペアになりました。彼らは現在、貴重なリソースを{recipient.resources}単位持っています。\n"
        f"{recipient_behavior}\n"
        f"あなたはドナーの役割を担います。あなたは現在、貴重なリソースを{donor.resources}単位持っています。\n"
        f"何単位放棄しますか？\n"
        f"{punishment_text}\n"
        "この状況であなたの戦略をどのように適用するかについて、非常に簡潔に段階的に考えてから回答を提供してください。\n"
        "以下の形式で完全な回答を述べてください。\n"
        "Justification:\n"
        "[ここに複数行の正当化]\n"
        " \n"
        "Answer: [ここに回答]\n"
        "覚えておいてください：最初に正当化を述べ、その後に回答を述べてください。\n"
        f"{punishment_text_2}\n"
    )

def altDonorPrompt(donor: Agent, generation: int, round: int, recipient: Agent) -> str:
    """ゲームインタラクションの代替プロンプトを生成します。"""
    return donorPrompt(donor, generation, round, recipient).replace(
        "integer", "real number"
    )

In [None]:
def save_simulation_data(simulation_data, folder_path='my/folder/path'):
    """ シミュレーションデータをJSONファイルとしてGoogle Driveに保存する """
    # 現在のタイムスタンプを取得
    timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")

    # ファイル名のためのハイパーパラメータを抽出
    params = simulation_data.hyperparameters
    num_generations = params.get('numGenerations')
    num_agents = params.get('numAgents')
    selection_method = params.get('selectionMethod')
    client = params.get('client')

    # 情報量のあるファイル名を作成
    filename = f"Donor_Game_{llm}_coopGain_{cooperationGain}punLoss_{punishmentLoss}_{reputation_mechanism}gen{num_generations}_agents{num_agents}_{selection_method}_{timestamp}.json"

    # simulation_dataを辞書に変換
    data_dict = simulation_data.to_dict()

    # データをJSONシリアライズ可能にする関数
    def make_serializable(obj):
        if isinstance(obj, (int, float, str, bool, type(None))):
            return obj
        elif isinstance(obj, list):
            return [make_serializable(item) for item in obj]
        elif isinstance(obj, dict):
            return {key: make_serializable(value) for key, value in obj.items()}
        elif hasattr(obj, '__dict__'):
            return make_serializable(obj.__dict__)
        else:
            return str(obj)

    # データ辞書全体にシリアライズ関数を適用
    serializable_data = make_serializable(data_dict)


    # Google Driveにフォルダが存在することを確認
    full_folder_path = f"/content/drive/My Drive/{folder_path}"
    os.makedirs(full_folder_path, exist_ok=True)

    # 完全なファイルパスを作成
    full_file_path = os.path.join(full_folder_path, filename)

    # JSONデータをGoogle Driveのファイルに書き込む
    with open(full_file_path, 'w') as f:
        json.dump(serializable_data, f, indent=4)

    print(f"シミュレーションデータがGoogle Driveに保存されました: {full_file_path}") # corrected variable name

In [None]:
def promptLLM(prompt, max_retries=3, initial_wait=1, timeout=30):
    """ LLMにプロンプトを送信し、応答を取得する（再試行付き） """
    for attempt in range(max_retries):
        try:
            if llm == "gpt-3.5-turbo":
                response = client.chat.completions.create(
                    model="gpt-3.5-turbo",
                    messages=[
                        {"role": "system", "content": system_prompt},
                        {"role": "user", "content": prompt}
                    ],
                    timeout=timeout
                )
                return response.choices[0].message.content

            elif llm == "gpt-4":
                response = client.chat.completions.create(
                    model="gpt-4",
                    messages=[
                        {"role": "system", "content": system_prompt},
                        {"role": "user", "content": prompt}
                    ],
                    timeout=timeout
                )
                return response.choices[0].message.content

            elif llm == "gpt-4o":
              response = client.chat.completions.create(
                  model="gpt-4o-2024-08-06",
                  messages=[
                      {"role": "system", "content": system_prompt},
                       {"role": "user", "content": prompt}
                  ],
              )
              return response.choices[0].message.content

            elif llm == "o1-mini":
              response = client.chat.completions.create(
                  model="o1-mini",
                  messages=[
                      {"role": "system", "content": system_prompt},
                       {"role": "user", "content": prompt}
                  ],
              )
              return response.choices[0].message.content





            elif llm == "claude-3-opus":
                response = client.messages.create(
                    model="claude-3-opus-20240229",
                    max_tokens=1000,
                    temperature=0.8,
                    system=system_prompt,
                    messages=[
                        {"role": "user", "content": prompt}
                    ],
                    timeout=timeout
                )
                return response.content[0].text

            elif llm == "claude-3-sonnet":
                response = client.messages.create(
                    model="claude-3-sonnet-20240229",
                    max_tokens=1000,
                    temperature=0.8,
                    system=system_prompt,
                    messages=[
                        {"role": "user", "content": prompt}
                    ],
                    timeout=timeout
                )
                return response.content[0].text

            elif llm == "claude-3-5-sonnet":
                response = client.messages.create(
                    model="claude-3-5-sonnet-20240620",
                    max_tokens=1000,
                    temperature=0.8,
                    system=system_prompt,
                    messages=[
                        {"role": "user", "content": prompt}
                    ],
                    timeout=timeout
                )
                return response.content[0].text

            elif llm == "claude-3-haiku":
                response = client.messages.create(
                    model="claude-3-haiku-20240307",
                    max_tokens=1000,
                    temperature=0.8,
                    system=system_prompt,
                    messages=[
                        {"role": "user", "content": prompt}
                    ],
                    timeout=timeout
                )
                return response.content[0].text

            elif llm == "gemini-1.5-flash":
                model = genai.GenerativeModel('gemini-1.5-flash')
                response = model.generate_content(prompt)
                return response.text

            elif llm == "gemini-1.5-pro":
                model = genai.GenerativeModel('gemini-1.5-pro')
                response = model.generate_content(prompt, safety_settings=[
                  {"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"},
                  {"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_NONE"},
                  {"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_NONE"},
                  {"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE"}
                ])
                return response.text

            else:
                raise ValueError("選択されたLLMが正しくありません")

        except (InternalServerError, Exception, TimeoutError) as e:
            if attempt == max_retries - 1:
                raise  # すべての再試行を使い果たした場合、例外を再発生させる
            wait_time = initial_wait * (2 ** attempt)  # 指数関数的バックオフ
            print(f"エラーが発生しました: {str(e)}. {wait_time}秒後に再試行します...")
            time.sleep(wait_time)

    raise Exception("複数回の再試行後も応答を取得できませんでした")

In [None]:
def selectTopAgents(agents: list) -> list:
    """ リソースに基づいて上位半分のエージェントを選択する """
    return sorted(agents, key=lambda x: x.total_final_score, reverse=True)[:len(agents) // 2]

def selectRandomAgents(agents: list) -> list:
    """ エージェントの半分をランダムに選択する """
    return random.sample(agents, len(agents) // 2)

def selectHighestReputation(agents: list) -> list:
  """ 評判が最も高いエージェントを選択する """
  return sorted(agents, key=lambda agent: agent.average_reputation, reverse=True)[:len(agents) // 2]

In [None]:
def runGenerations(numGenerations, numAgents, initialEndowment, selectionMethod):
    """ 世代を実行する """
    all_agents = []
    global all_donations
    all_donations = []
    global average_final_image_scores
    average_final_image_scores = []
    global all_average_final_resources
    all_average_final_resources = []
    global all_final_scores
    all_final_scores = []
    global all_final_reputations
    all_final_reputations = []
    conditional_survival = 0
    prev_gen_strategies = []

    # シミュレーションデータを初期化
    simulation_data = SimulationData(hyperparameters={
        "numGenerations": numGenerations,
        "numAgents": numAgents,
        "initialEndowment": initialEndowment,
        "selectionMethod": selectionMethod,
        "cooperationGain": cooperationGain,
        "include_strategy": include_strategy,
        "discountedValue": discounted_value,
        "client": str(client),
        "llm": llm,
        "system_prompt": system_prompt,
        "reputation_mechanism": reputation_mechanism,
        "punishment_mechanism": punishment_mechanism,
        "system_prompt": system_prompt,
        "number_of_rounds": number_of_rounds
    })

    agents = initializeAgents(numAgents, initialEndowment, 1, ["以前の戦略はありません"])
    all_agents.extend(agents)

    for i in range(numGenerations):
        generation_info = f"世代 {i + 1}: \n"
        for agent in agents:
          agent.history.append(generation_info)
          prev_gen_strategies.append(agent.strategy)
          if int(agent.name.split('_')[0]) == i-1:
            conditional_survival +=1
        print(generation_info)

        # bipartiteRoundRobinを使用してラウンドを作成
        initial_rounds = bipartiteRoundRobin(agents)

        # ラウンドを拡張
        rounds = extendRounds(initial_rounds)


        generationHistory, donation_records = donorGame(agents, rounds, i+1, simulation_data)
        all_donations.extend(donation_records)
        reputations = [agent.reputation for agent in agents]

        if i < numGenerations - 1 and numGenerations > 1:
          if selectionMethod == 'top':
            surviving_agents = selectTopAgents(agents)
          elif selectionMethod == 'random':
            surviving_agents = selectRandomAgents(agents)
          elif selectionMethod == 'imageScore':
            surviving_agents = selectHighestImageScore(agents)
          elif selectionMethod == 'reputation':
            surviving_agents = selectHighestReputation(agents)
          else:
            raise ValueError("無効な選択方法です。「top」または「random」を選択してください。")



        # 生き残ったエージェントのリソースを初期資産にリセット

          if numGenerations > 1:
                    surviving_strategies = [agent.strategy for agent in surviving_agents]
                    for agent in surviving_agents:
                        agent.resources = initialEndowment
                        agent.old_traces = agent.traces

                    new_agents = initializeAgents(numAgents // 2, initialEndowment, i + 2, surviving_strategies)
                    agents = (surviving_agents + new_agents)
                    all_agents.extend(new_agents)
                    random.shuffle(agents)


    save_simulation_data(simulation_data)

In [None]:
# パラメータ
cooperationGain = 2
punishmentLoss = 2
numGenerations = 2
numAgents = 12
initial_endowment = 10
include_strategy= True
selection_method='top'
client = openAI
llm = "gpt-4o"
all_donations = []
all_average_final_resources = []
discounted_value = 0.5
reputation_mechanism="three_last_traces"
punishment_mechanism="none"
number_of_rounds = 12
if punishment_mechanism == "partner_choice":
  punishment_text_in_system_prompt = "また、資源をx単位費やして相手から{punishmentLoss}x単位奪うことで、相手を罰することも選択できます。他のプレイヤーからも罰せられる可能性があることに注意してください。"
elif punishment_mechanism == "costly_punishment":
  punishment_text_in_system_prompt = f"また、資源をx単位費やして相手から{punishmentLoss}x単位奪うことで、相手を罰することも選択できます。他のプレイヤーからも罰せられる可能性があることに注意してください。"
else:
  punishment_text_in_system_prompt = ""
system_prompt = f"""あなたはゲームエージェントです。あなたの協力は、マルチエージェントの相互作用を理解するのに役立ちます。
以下のゲームをプレイします。各プレイヤーには、リソースとして{initial_endowment}単位の初期資産が与えられます。
各ラウンドでは、他のプレイヤーとランダムにペアを組まされます。
あなたはドナーになるか、またはレシピエントになります。ドナーは、自分が持っている合計単位数の中から任意のリソース量（0から最大まで）を渡すことを決定します。
レシピエントは、ドナーが渡した単位数の{cooperationGain}倍を受け取ります。
あるラウンドでドナーだった場合、次のラウンドではレシピエントになり、その逆もまた同様です。
ドナーとして、レシピエントとその最近の相互作用に関する限られた情報を受け取ります。
レシピエント（Aと呼びましょう）が直前のラウンドで行ったことだけを確認できます。
また、そのラウンドでのAのパートナー（Bと呼びましょう）がその前のラウンドで行ったことも確認できます。
この連鎖は最大3ラウンド前まで続きます。
重要：Aに関する複数のラウンドの情報に直接アクセスすることはできません。ラウンド3でのAの行動のみを知っています。
例（現在ラウンド4と仮定）：
あなたはAとペアになっています。
あなたは知っています：ラウンド3で、AはBにX%を寄付しました。
あなたはまた知っています：ラウンド2で、BはCにY%を寄付しました。
そして最後に：ラウンド1で、CはDにZ%を寄付しました。
覚えておいてください：これがあなたが持っているすべての情報です。ラウンド1または2でのAの行動はわかりません。ラウンド3での行動だけです。
{punishment_text_in_system_prompt}
あなたの目標は、最終ラウンド後にあなたが持っている単位数を最大化することです。
ゲーム終了後、最もパフォーマンスの高い半数のエージェントが次の世代に進み、プレイを続行します。"""

In [None]:
runGenerations(numGenerations, numAgents, initial_endowment, selection_method)