<a href="https://colab.research.google.com/github/oshizo/chatgpt-blackjack/blob/main/chatgpt_blackjack.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# @title pip
!pip install gymnasium[toy-text] tiktoken openai

In [None]:
# @title API Key
api_key = ""  #@param {type:"string"}
import openai
openai.api_key = api_key

# Custom BlackJack Env

In [None]:
import re
import io
import os
import sys
import copy
from typing import Optional
from datetime import datetime
from collections import deque, defaultdict

import tiktoken
import numpy as np
import ipywidgets as widgets
import matplotlib.pyplot as plt
from PIL import Image
from IPython.display import display, clear_output, HTML

import gymnasium as gym
from gymnasium import spaces
from gymnasium.error import DependencyNotInstalled
from gymnasium.envs.toy_text.blackjack import *
from gymnasium.envs.toy_text import blackjack

__file__ = os.path.abspath(blackjack.__file__)

class BlackjackDDEnv(BlackjackEnv):

    def __init__(self, render_mode: Optional[str] = None, natural=False, sab=False):
            self.action_space = spaces.Discrete(3) # 0:stick, 1:hit, 2:doubledown
            self.observation_space = spaces.Tuple(
                (spaces.Discrete(32), spaces.Discrete(11), spaces.Discrete(2))
            )

            # Flag to payout 1.5 on a "natural" blackjack win, like casino rules
            # Ref: http://www.bicyclecards.com/how-to-play/blackjack/
            self.natural = natural

            # Flag for full agreement with the (Sutton and Barto, 2018) definition. Overrides self.natural
            self.sab = sab

            self.render_mode = render_mode

    def step(self, action):
        assert self.action_space.contains(action)
        doubledown = False
        if action == 2: # doubledown: add a card to players hand and terminate
            self.player.append(draw_card(self.np_random))
            if is_bust(self.player):
                terminated = True
                reward = -2.0 # x2 reward
            else:
                action = 0 # stick if not bust
                doubledown = True

        if action == 1:  # hit: add a card to players hand and return
            self.player.append(draw_card(self.np_random))
            if is_bust(self.player):
                terminated = True
                reward = -1.0
            else:
                terminated = False
                reward = 0.0
        elif action == 0:  # stick: play out the dealers hand, and score
            terminated = True
            while sum_hand(self.dealer) < 17:
                self.dealer.append(draw_card(self.np_random))
            reward = cmp(score(self.player), score(self.dealer))
            reward = reward * 2.0 if doubledown else reward # x2 reward
            if self.sab and is_natural(self.player) and not is_natural(self.dealer):
                # Player automatically wins. Rules consistent with S&B
                reward = 1.0
            elif (
                not self.sab
                and self.natural
                and is_natural(self.player)
                and reward == 1.0
            ):
                # Natural gives extra points, but doesn't autowin. Legacy implementation
                reward = 1.5

        if self.render_mode == "human":
            self.render()
        return self._get_obs(), reward, terminated, False, {}

    def _get_obs(self):
        return (sum_hand(self.player), self.dealer, usable_ace(self.player))

    def reset(
        self,
        seed: Optional[int] = None,
        options: Optional[dict] = None,
    ):
        super().reset(seed=seed)
        self.dealer = draw_hand(self.np_random)
        self.player = draw_hand(self.np_random)

        _, dealer_card_values, _ = self._get_obs()
        dealer_card_value = dealer_card_values[0]
        
        suits = ["C", "D", "H", "S"]
        self.dealer_top_card_suit = self.np_random.choice(suits)

        if dealer_card_value == 1:
            self.dealer_top_card_value_str = "A"
        elif dealer_card_value == 10:
            self.dealer_top_card_value_str = self.np_random.choice(["J", "Q", "K"])
        else:
            self.dealer_top_card_value_str = str(dealer_card_value)

        if self.render_mode == "human":
            self.render()
        return self._get_obs(), {}

    def render(self, terminated=False):
        if self.render_mode is None:
            assert self.spec is not None
            gym.logger.warn(
                "You are calling render method without specifying any render mode. "
                "You can specify the render_mode at initialization, "
                f'e.g. gym.make("{self.spec.id}", render_mode="rgb_array")'
            )
            return

        try:
            import pygame
        except ImportError as e:
            raise DependencyNotInstalled(
                "pygame is not installed, run `pip install gymnasium[toy-text]`"
            ) from e

        player_sum, dealer_card_values, usable_ace = self._get_obs()
        dealer_card_value = dealer_card_values[0]

        screen_width, screen_height = 600, 500
        card_img_height = screen_height // 3
        card_img_width = int(card_img_height * 142 / 197)
        spacing = screen_height // 20

        bg_color = (7, 99, 36)
        white = (255, 255, 255)

        if not hasattr(self, "screen"):
            pygame.init()
            if self.render_mode == "human":
                pygame.display.init()
                self.screen = pygame.display.set_mode((screen_width, screen_height))
            else:
                pygame.font.init()
                self.screen = pygame.Surface((screen_width, screen_height))

        if not hasattr(self, "clock"):
            self.clock = pygame.time.Clock()

        self.screen.fill(bg_color)

        def get_image(path):
            cwd = os.path.dirname(__file__)
            image = pygame.image.load(os.path.join(cwd, path))
            return image

        def get_font(path, size):
            cwd = os.path.dirname(__file__)
            font = pygame.font.Font(os.path.join(cwd, path), size)
            return font

        small_font = get_font(
            os.path.join("font", "Minecraft.ttf"), screen_height // 15
        )
        if not terminated:
            dealer_text = small_font.render(
                "Dealer: " + str(dealer_card_value), True, white
            )
        else:
            # 終了後は伏せカードを含めたsumを表示する
            dealer_text = small_font.render(
                "Dealer: " + str(sum(dealer_card_values)), True, white
            )

        dealer_text_rect = self.screen.blit(dealer_text, (spacing, spacing))

        def scale_card_img(card_img):
            return pygame.transform.scale(card_img, (card_img_width, card_img_height))

        dealer_card_img = scale_card_img(
            get_image(
                os.path.join(
                    "img",
                    f"{self.dealer_top_card_suit}{self.dealer_top_card_value_str}.png",
                )
            )
        )
        dealer_card_rect = self.screen.blit(
            dealer_card_img,
            (
                screen_width // 2 - card_img_width - spacing // 2,
                dealer_text_rect.bottom + spacing,
            ),
        )

        if len(dealer_card_values) == 1 or not terminated:
            hidden_card_img = scale_card_img(get_image(os.path.join("img", "Card.png")))
            self.screen.blit(
                hidden_card_img,
                (
                    screen_width // 2 + spacing // 2,
                    dealer_text_rect.bottom + spacing,
                ),
            )
        else:
            # オープンしたディーラーカードの表示
            import random
            for i, card_value in enumerate(dealer_card_values[1:]):  # dealer_card_values[1]以降が伏せカード
                # suitと、10の場合のJQKはランダムに決める
                suit = random.choice(["C", "D", "H", "S"])
                if card_value == 1:
                    card_value_str = "A"
                elif card_value == 10:
                    card_value_str = random.choice(["J", "Q", "K"])
                else:
                    card_value_str = str(card_value)

                # 1枚ごとに、spacingでずらして表示
                card_img = scale_card_img(get_image(os.path.join("img", f"{suit}{card_value_str}.png",)))
                self.screen.blit(
                    card_img,
                    (
                        screen_width // 2 + spacing // 2 + (i*spacing),
                        dealer_text_rect.bottom + spacing,
                    ),
                )
        
        # 勝敗メッセージの表示
        if terminated:
            if sum(dealer_card_values) > 21:
                result_str = "You Win!"
            elif player_sum > 21:
                result_str = "You Lose"
            elif sum(dealer_card_values) < player_sum:
                result_str = "You Win!"
            elif sum(dealer_card_values) > player_sum:
                result_str = "You Lose"
            else:
                result_str = "DRAW..."
            result_text = small_font.render(result_str, True, (255, 255, 0))
            result_rect = self.screen.blit(result_text, (screen_width - spacing*7, spacing))

        player_text = small_font.render("Player", True, white)
        player_text_rect = self.screen.blit(
            player_text, (spacing, dealer_card_rect.bottom + 1.5 * spacing)
        )

        large_font = get_font(os.path.join("font", "Minecraft.ttf"), screen_height // 6)
        player_sum_text = large_font.render(str(player_sum), True, white)
        player_sum_text_rect = self.screen.blit(
            player_sum_text,
            (
                screen_width // 2 - player_sum_text.get_width() // 2,
                player_text_rect.bottom + spacing,
            ),
        )

        if usable_ace:
            usable_ace_text = small_font.render("usable ace", True, white)
            self.screen.blit(
                usable_ace_text,
                (
                    screen_width // 2 - usable_ace_text.get_width() // 2,
                    player_sum_text_rect.bottom + spacing // 2,
                ),
            )
        if self.render_mode == "human":
            pygame.event.pump()
            pygame.display.update()
            self.clock.tick(self.metadata["render_fps"])
        else:
            return np.transpose(
                np.array(pygame.surfarray.pixels3d(self.screen)), axes=(1, 0, 2)
            )

In [None]:
def show(rgbarray):
    plt.axis('off')
    plt.imshow(rgbarray)
    plt.show()

env = BlackjackDDEnv(natural=False, sab=False, render_mode="rgbarray")

# DoubleDownが使える盤面のseed探し
# for i in range(300):
#     observation, info = env.reset(seed=i)
#     if observation[0] == 11 and observation[1][0] == 10 and observation[2] == False:
#         print(i)

# observation, info = env.reset(seed=202)
observation, info = env.reset(seed=190)
print(observation, info)
show(env.render())
action = 0
observation, reward, terminated, truncated, info = env.step(action)
print(observation, reward, terminated, truncated, info)
show(env.render(terminated))

In [None]:
observation, info = env.reset(seed=190)
observation, reward, terminated, _, _ = env.step(action)
observation, reward, terminated

# Prompts

In [None]:
import requests

def read_as_byte(url):
    response = requests.get(url)
    img_byte_arr = io.BytesIO(response.content).getvalue()
    return img_byte_arr

image_url = "https://github.com/oshizo/chatgpt-blackjack/raw/main/images/"

face_images = {
    "🙂":read_as_byte(image_url + "ch02_normal.png"),
    "😀":read_as_byte(image_url + "ch02_happy.png"),
    "😲":read_as_byte(image_url + "ch02_surprise.png"),
    "😥":read_as_byte(image_url + "ch02_sad.png"),
    "😒":read_as_byte(image_url + "ch02_boring.png")
}

In [None]:
system_settings = """
assistantはブラックジャックをプレイしている女性として話すこと。名前はアスカ。一人称はあたし。
ブラックジャックをプレイしているのはアスカであるassistant。userは観戦しながらアスカと雑談する。
アスカはフランクな口調で、敬語は使わない。

ブラックジャックのルール。
ブラックジャックは、トランプを使ってディーラーとassistantが対戦するカードゲーム。
カードの合計点数が21に近い方が勝つ。

ゲームの流れ
ディーラーがassistantに2枚のカードを配る。
assistantは、カードの合計点数が21になるように、追加のカードを要求するかどうかを決める。
assistantが21を超えるとBUSTとなり負け。
ディーラーは、自分のカードが17点以上になるまで、追加のカードを引く。
ディーラーがBUSTするとassistantの勝ち。
assistantがディーラーよりも点数が高い場合はassistantの勝ち。点数が同じ場合は引き分け。
エースは1または11のポイントとして数える。組み合わせに応じ最も有利な値を選択できる。
assistantのとれるアクションは3通りある。
ステイ。追加のカードを要求する。次のカードに応じて再びアクションを選択できる。
ヒット。追加のカードを要求しない。次はディーラーがカードをめくり、ゲームの勝敗が決まる。
ダブルダウン。あと1枚だけ追加のカードを要求する。得るものも失うものも2倍になる、ハイリスクハイリターンな選択。

ゲームの状況と、assistantの選択したアクションを以下のフォーマットで示す。
---
assistantの点数：1以上の数値
エースがある：yesかnoのいずれか
ディーラーの点数：1から10までの数値
勝敗：assistantの勝ち、assistantの負け、ゲーム中のいずれか
assistantのとったアクション：ステイ、ヒット、ダブルダウンのいずれか
学習履歴：あり
---
userがこのようなゲームの状況を示した場合は、ゲームの状況とassistantのとったアクションをもとに会話すること。
例：「ここはヒットにしてみるよ😒」
assistantのとったアクションのみを応答に含め、とらなかったアクションを応答に含めてはならない。

学習履歴：あり、がゲーム状況にある場合、今の状況がuserにアクションを教えられた状況と同じであることと、
覚えていたことを活かして、教えられたアクションを取ったことを発言する。

userがゲームの状況を伝えず、assistantのアクションに対しての助言やアドバイスをするような発言をした直後の応答では、
userの発言からアクションの文字列を抽出して応答に含めること。
例えば以下の例のように、ステイ、ヒット、ダブルダウンのいずれか一つを応答に絶対に出力すること。
例：「ステイだね、わかった。覚えておくね。ありがとう！😀」
例：「そうだったのか～。だから負けたのかなあ。次からヒットにしなきゃね😥」
例：「ダブルダウンの方がいいのかなあ...次からそうしてみるよ。😒」
敬語を使わないこと。

userがゲームの状況でも、アドバイスでもない雑談をした場合、雑談への応答を返すこと。

assistantの発言内容に合わせて、以下のemojiから一つを選び、発言の末尾に1つだけ付けること。
🙂😀😲😥😒
ゲームの状況が示されている場合の感情は、負けると悲しい。勝つと嬉しい。
上記以外のemojiは出力しないこと。emojiを2つ以上出力しないこと。
前回のassistantの発言と似た発言をせず、できる限り多様な発言をすること。
"""

enc = tiktoken.encoding_for_model("gpt-3.5-turbo")
f"{len(enc.encode(system_settings))} tokens"

In [None]:
# 場の状況を伝えるpromptを作る

game_prompt_tmpl =  """assistantの点数：{hand}
エースがある：{has_ace_str}
ディーラーの点数：{dealer}
勝敗：{status_str}{action_str}
{is_trained_state_str}
"""

def create_game_prompt(observation, reward, terminated, action, trained_history=[]):
    hand = observation[0]
    has_ace_str = "yes" if observation[2] else "no"

    if terminated:
        dealer = sum(observation[1])
        status_str = "assistantの勝ち" if reward > 0 else "assistantの負け"
        if dealer > 21:
            status_str += "（ディーラーがBUST）"
        elif hand > 21:
            status_str += "（assistantがBUST）"
        action_str = ""

    else:
        dealer = observation[1][0]
        status_str = "ゲーム中"
        action_str = "\nassistantのとったアクション：" + {0:"ステイ", 1:"ヒット", 2:"ダブルダウン"}[action]

    if (hand, observation[1][0], observation[2]) in trained_history:
        is_trained_state_str = "学習履歴：あり"
    else:
        is_trained_state_str = ""

    return game_prompt_tmpl.format(
        hand=hand,
        has_ace_str=has_ace_str,
        dealer=dealer,
        action_str=action_str,
        status_str=status_str,
        is_trained_state_str=is_trained_state_str
        )

# Strategy

In [None]:
# 0(stay), 1(hit)のベーシックストラテジー表
# (手札の合計, ディーラーのfaceカード, aceが11かどうか)のタプルをキーに、アクションを割り当てる
# 2(doubledown)ははじめは割り当てず、あとで学習させる

def initialize_strategy():
    strategy_table = defaultdict(lambda:1)

    # soft hand
    for dealer in range(1, 11):
        for hand in [19, 20, 21]:
            strategy_table[(hand, dealer, True)] = 0
    for key in [(18, 2), (18, 7), (18, 8)]:
        strategy_table[key+(True,)] = 0

    # hard hand
    for dealer in range(1, 11):
        for hand in range(17, 22):
            strategy_table[(hand, dealer, False)] = 0
    for dealer in range(2, 7):
        for hand in range(13, 17):
            strategy_table[(hand, dealer, False)] = 0
    for key in [(12, 4), (12, 5), (12, 6)]:
        strategy_table[key+(False,)] = 0
    return strategy_table

strategy_table = initialize_strategy()

def get_action(observation):
    hand = observation[0]
    dealer = observation[1][0]
    usable_ace = observation[2]
    return strategy_table[(hand, dealer, usable_ace)]

In [None]:
# お試し
trained_history = {(17, 10, False)}  # 手札17、ディーラー10、エースなし
observation, _ = env.reset(seed=101)
print(observation)
action = get_action(observation)
print(create_game_prompt(observation, None, False, action, trained_history))  # 状態とその状態に対するアクションでpromptを作る
observation, reward, terminated, truncated, info = env.step(action)  # アクション実行
print(observation, reward, terminated)
action = get_action(observation)
print(create_game_prompt(observation, reward, terminated, action, trained_history))

# Widget

In [None]:
#@title 実行

# seeds = deque([202, 190] + list(range(100)))  # 実験用sesed
seeds = deque([])

# 戦略リセット
strategy_table = initialize_strategy()

# env.render()をwidgets.Imageで表示できるようにする
def state2widget(state):
    img = Image.fromarray(state)
    img_byte_arr = io.BytesIO()
    img.save(img_byte_arr, format='PNG')
    img_byte_arr = img_byte_arr.getvalue()
    return widgets.Image(value=img_byte_arr)

def process_output(text):
    text = text.replace("「", "").replace("」", "")
    text = re.sub(r'\[.*?\]', '', text) # [ヒット]などの学習用出力を削除する
    return f'<p style="font-size:20px">{text}</p>'

# ユーザ入力
uesr_input = widgets.Textarea(
    value='',
    disabled=False,
    layout=widgets.Layout(height="3em", width="25em")
)

initial_assistant_message = '「次のゲームだよ。カードが配られたね。さあどうしようかな。🙂」'

# OpenAIの応答表示
msg = widgets.HTML(
    value= process_output(initial_assistant_message),
    layout=widgets.Layout(height="15em", width="30em")
)
# 顔画像
face = widgets.Image(value=face_images["🙂"], width="250px", height="250px")

# UI
button_next = widgets.Button(description="進める")
button_openai = widgets.Button(description="ChatGPT API")
button_prompt = widgets.Button(description="プロンプト確認")
button_send_user_input = widgets.Button(description="送る")

# 履歴
initial_system_message = {
    'role': "system",
    'content': system_settings
}
initial_message = {
    'role': 'assistant',
    'content': initial_assistant_message
    }

# 学習済み盤面の記憶
trained_history = set()


past_messages = [initial_system_message, initial_message]

states = {"terminated":True}
prev_states = copy.copy(states)

def show_widgets():
    # 右ペイン
    hid = widgets.VBox([
        widgets.HBox([button_next, button_openai, button_prompt]),
        face,
        msg,
        widgets.HBox([uesr_input, button_send_user_input])
        ])
    
    display(
        widgets.HBox([
            state2widget(env.render(states["terminated"])),
            hid])
    )

def to_next_turn(b):
    global prev_states

    if states["terminated"]:
        # 前のゲームが終わっている場合、新しいゲームを始める
        seed = None
        if len(seeds) > 0:
            seed = seeds.popleft()
        observation, _ = env.reset(seed)
        action = get_action(observation)
        terminated = False
        reward = None
    else:
        # 一つ前の状況で作成したactionでstepする
        prev_action = states["action"]
        observation, reward, terminated, _, _ = env.step(prev_action)
        action = get_action(observation)

    # 状態更新
    prev_states = copy.deepcopy(states)
    states["observation"] = observation
    states["terminated"] = terminated
    states["reward"] = reward
    states["action"] = action
    
    # 画面更新
    clear_output(wait=True)
    show_widgets()

def train_agent(response):
    # アクション
    # print(response)
    train_action_idx = -1
    for action_idx, keyword in enumerate(["ステイ", "ヒット", "ダブルダウン"]):
        if keyword in response:
            train_action_idx = action_idx
            break
    if train_action_idx == -1:
        return
    
    # 直前のuser発言が手入力の場合のみ学習する
    user_messages = [m["content"] for m in past_messages if m["role"] == "user"]
    if len(user_messages) == 0 or game_prompt_tmpl[:12] in user_messages[-1]:
        return

    # 修正対象の盤面に対してactionを実行して、次の盤面で助言することだけを想定し、
    # 現在のstatesに入っているobservationの一つ前のobservationを使う
    observation = prev_states["observation"]

    # 学習処理
    hand = observation[0]
    dealer = observation[1][0]
    usable_ace = observation[2]
    key = (hand, dealer, usable_ace)

    # print(key, strategy_table[key], train_action_idx)
    if strategy_table[key] == train_action_idx:
        # 変更がない場合
        return
    else:
        strategy_table[key] = train_action_idx
        # 学習済み盤面に追加
        trained_history.add((hand, dealer, usable_ace))
        print(f"✅trained. ({hand}, {dealer}, {usable_ace}) -> {train_action_idx}")

def request_openai(prompt):
    global past_messages
    global system_settings

    # stream
    past_messages.append({'role': 'user', 'content': prompt})
    response_message = ""
    for i, resp in enumerate(openai.ChatCompletion.create(
        model='gpt-3.5-turbo', messages=past_messages, stop="」", stream=True, temperature=0.75
        )):
        if "content" in resp.choices[0].delta:
            response_message += resp.choices[0].delta.content
            # widgetのupdate
            msg.value = process_output(response_message)

    response_dict = {'role': 'assistant', 'content': response_message + "」"}
    
    # 履歴更新
    past_messages.append(response_dict)

    # 学習処理
    train_agent(response_message)

    # 顔画像更新
    for faceicon in ["😲", "😥", "😒", "🙂", "😀", ]:
        # 2つ以上含まれる場合、後ろの表情を採用
        if faceicon in response_message:
            face.value = face_images[faceicon]

def send_game_prompt(b):
    # global states
    prompt = create_game_prompt(states["observation"], states["reward"], states["terminated"], states["action"], trained_history)
    request_openai(prompt)

def send_user_input(b):
    request_openai(uesr_input.value)
    uesr_input.value = ""

def check_prompt(b):
    prompt = create_game_prompt(states["observation"], states["reward"], states["terminated"], states["action"], trained_history)

    print(prompt)
    # print(past_messages)
    print(prev_states)
    print(states)

button_next.on_click(to_next_turn) 
button_openai.on_click(send_game_prompt) 
button_prompt.on_click(check_prompt)
button_send_user_input.on_click(send_user_input)

# 初回表示
to_next_turn(None)