<a href="https://www.kaggle.com/code/sanhapon/test-fork-of-llm-20-questions-simple-version?scriptVersionId=183616742" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

This notebook illustrates the agent creation process for the **LLM 20 Questions**. Running this notebook produces a `submission.tar.gz` file. You may submit this file directly from the **Submit to competition** heading to the right. Alternatively, from the notebook viewer, click the *Output* tab then find and download `submission.tar.gz`. Click **Submit Agent** at the upper-left of the competition homepage to upload your file and make your submission. 

In [1]:
%%bash
cd /kaggle/working
pip install -q -U -t /kaggle/working/submission/lib immutabledict sentencepiece
git clone https://github.com/google/gemma_pytorch.git > /dev/null
mkdir -p /kaggle/working/submission/lib/gemma/
mv -n /kaggle/working/gemma_pytorch/gemma/* /kaggle/working/submission/lib/gemma/
echo "done"

Cloning into 'gemma_pytorch'...


done


In [2]:
import os
import sys
import contextlib
import re
import gc

import torch
from pathlib import Path
from gemma.config import get_config_for_7b, get_config_for_2b
from gemma.model import GemmaForCausalLM

KAGGLE_AGENT_PATH = "/kaggle_simulations/agent/"
if os.path.exists(KAGGLE_AGENT_PATH):
    sys.path.insert(0, os.path.join(KAGGLE_AGENT_PATH, 'lib'))
    WEIGHTS_PATH  = os.path.join(KAGGLE_AGENT_PATH, "gemma/pytorch/7b-it-quant/2")
else:
    sys.path.insert(0, "/kaggle/working/submission/lib")
    WEIGHTS_PATH  = "/kaggle/input/gemma/pytorch/7b-it-quant/2"


os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "max_split_size_mb:512"

# Load the model           
@contextlib.contextmanager
def _set_default_tensor_type(dtype: torch.dtype):
  torch.set_default_dtype(dtype)
  yield
  torch.set_default_dtype(torch.float)
    

model_config =  get_config_for_7b()
model_config.tokenizer = os.path.join(self._weigts_path, "tokenizer.model")
model_config.quant = True

device = torch.device("cuda:0")
model = None
gc.collect()

print(f"create model... {model_config}\r\n")
with _set_default_tensor_type(model_config.get_dtype()):
    model = GemmaForCausalLM(model_config)
    ckpt_path = os.path.join(WEIGHTS_PATH, f'gemma-{variant}.ckpt')
    model.load_weights(ckpt_path)
    model = model.to(device).eval()


ModuleNotFoundError: No module named 'gemma'

In [None]:
CHAT_TEMPLATE = "<start_of_turn>user\n{prompts}. Wrap your though with **.<end_of_turn>\n<start_of_turn>model\n"

sampler_kwargs = {
    'temperature': 0.01,
    'top_p': 0.1,
    'top_k': 1,
}
resp = self._model.generate(
                CHAT_TEMPLATE.format(prompts=prompts),
                device=self._device,
                output_len=150,
                **sampler_kwargs
resp

In [4]:
# %%writefile submission/main.py?
# Setup
import os
import sys

# **IMPORTANT:** Set up your system path like this to make your code work
# both in notebooks and in the simulations environment.
KAGGLE_AGENT_PATH = "/kaggle_simulations/agent/"
if os.path.exists(KAGGLE_AGENT_PATH):
    sys.path.insert(0, os.path.join(KAGGLE_AGENT_PATH, 'lib'))
    WEIGHTS_PATH  = os.path.join(KAGGLE_AGENT_PATH, "gemma/pytorch/7b-it-quant/2")
#     WEIGHTS_PATH = os.path.join(KAGGLE_AGENT_PATH, "gemma/pytorch/2b-it/2")
else:
    sys.path.insert(0, "/kaggle/working/submission/lib")
    WEIGHTS_PATH  = "/kaggle/input/gemma/pytorch/7b-it-quant/2"
#     WEIGHTS_PATH = "/kaggle/input/gemma/pytorch/2b-it/2"


import contextlib
from pathlib import Path

import torch
from gemma.config import get_config_for_7b, get_config_for_2b
from gemma.model import GemmaForCausalLM
import re

os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "max_split_size_mb:512"

# Load the model           
@contextlib.contextmanager
def _set_default_tensor_type(dtype: torch.dtype):
  """Sets the default torch dtype to the given dtype."""
  torch.set_default_dtype(dtype)
  yield
  torch.set_default_dtype(torch.float)
    
### Utils function
def extract_bold_text(text):
  pattern = r"[\*|\"]+(.*?)[\*|\"]+"
  matches = re.findall(pattern, text)
  to_ret = matches[0] if len(matches) > 0 else text
  parts = to_ret.split(":") 
  to_ret = parts[1].strip() if len(parts) > 1 else to_ret
  return to_ret

class BaseAgent:
    def __init__(self, variant, device_str):
        self._variant = variant
        self._device = torch.device(device_str)
        self._weigts_path = WEIGHTS_PATH

        model_config = get_config_for_2b() if "2b" in self._variant else get_config_for_7b()
        model_config.tokenizer = os.path.join(self._weigts_path, "tokenizer.model")
        model_config.quant = "quant" in variant

        print(f"create model... {model_config}\r\n")
        with _set_default_tensor_type(model_config.get_dtype()):
          self._model = GemmaForCausalLM(model_config)
          ckpt_path = os.path.join(self._weigts_path, f'gemma-{variant}.ckpt')
          self._model.load_weights(ckpt_path)
          self._model = self._model.to(self._device).eval()

    def get_response_from_llm(self, obs, prompts): 
        CHAT_TEMPLATE = "<start_of_turn>user\n{prompts}. Wrap your though with **.<end_of_turn>\n<start_of_turn>model\n"

        sampler_kwargs = {
            'temperature': 0.01,
            'top_p': 0.1,
            'top_k': 1,
        }
        
        try:
            resp = self._model.generate(
                CHAT_TEMPLATE.format(prompts=prompts),
                device=self._device,
                output_len=150,
                **sampler_kwargs
            )
            print(f"""prompts: {prompts}\nresp: {resp}""")
        except RuntimeError as e:
            print(f"Error in LLM generation: {e}")
            resp = "**no**"
            resp = resp.replace("**Question:**", "").replace("**Guess:**", "").replace("**Anwser:**", "")
        return extract_bold_text(resp)

class Questioner(BaseAgent):
    def __init__(self, variant, device_str):
        super().__init__(variant, device_str)
        self._information = ""
    
    def call(self, obs):
        prompts = f"""We are playing the 20 Questions game. The questioner's goal is to ask the question and guess for secret. The secret in the game is the country or city name."""
        if obs.turnType == 'guess':
            self._information = super().get_response_from_llm(obs,
                f"""Convert the yes-or-no question to declarative sentence, for example: "Is it a cat?", "Yes". The declarative sentence would be "It is a cat". Here is the yes-or-no question from previous round: \"{obs.questions[-1]}\",  \"{obs.answers[-1]}\"."""
            )
            prompts = f"""Based on this information \"{self._information}\". Guess the place based on above information."""
            return super().get_response_from_llm(obs, prompts)

        elif obs.turnType == 'ask':
            if obs.step == 0:
                prompts += f"""The sample question: Is it a Asia continent? Now, you are the questioner, generate a yes-or-no question to find the place.
                         """
                return super().get_response_from_llm(obs, prompts)
            else:
                prompts = f"""We know this information \"{self._information}\". Now, you are the questioner, based on known information, generate yes-or-no question to find "specific" place."""

                return super().get_response_from_llm(obs, prompts)
                

class Answerer(BaseAgent):
    def __init__(self, variant, device_str):
        super().__init__(variant, device_str)
        self._information = ""
    
    def call(self, obs):
        prompts = f"""We are playing the 20 Questions game. The questioner's goal is to ask the question and guess for secret. The secret in the game is the country or city name. If this question \"{obs.questions[-1]}\" relates to \"{obs.keyword}\" return \"yes\" otherwise return \"no\".
                    """
        
        return super().get_response_from_llm(obs, prompts)
                
############################################################
agent = None

VARIANT = "7b-it-quant"
# VARIANT = "2b-it"
# DEVICE = "cuda"
DEVICE = "cpu"

# DEVICE =  "cuda:0" if torch.cuda.is_available() else "cpu"

def get_agent(name):
    global agent

    if agent is None and name == 'questioner':
        agent = Questioner(variant=VARIANT, device_str=DEVICE)
    elif agent is None and name == 'answerer':
        agent = Answerer(variant=VARIANT, device_str=DEVICE)

    assert agent is not None, "Agent not initialized."
    return agent


def agent_fn(obs, cfg):
    selected_agent = None
    if obs.turnType == "ask":
        selected_agent = get_agent("questioner")
    elif obs.turnType == "guess":
        selected_agent = get_agent("questioner")
    else:
        selected_agent = get_agent("answerer")
        
    try:
        response = selected_agent.call(obs)
    except Exception as e:
        print(f"error: {e}")
        response = "no"
    

    if obs.turnType == "answer":
        response = response.replace(".", "")
        if response not in ["yes", "no"]: 
            response = "no"

    if response is None or len(response) <= 1: return "no" 
    else: return response

In [5]:
def simple_agent(obs, cfg):
    if obs.turnType == "ask": response = "Is it a pig?"
    elif obs.turnType == "guess": response = "pig"
    elif obs.turnType == "answer": response = "yes"
    return response

from kaggle_environments import make
# For debugging, play game with only two rounds
debug_config = {'episodeSteps': 10,     # initial step plus 3 steps per round (ask/answer/guess)
                'actTimeout': 60,       # agent time per round in seconds; default is 60
                'runTimeout': 1200,      # max time for the episode in seconds; default is 1200
                'agentTimeout': 3600}  # obsolete field; default is 3600

env = make("llm_20_questions", configuration=debug_config, debug=True)

print("start.....")
game_output = env.run(agents=[agent_fn, simple_agent, simple_agent, simple_agent])
print("finish....")
env.render(mode="ipython", width=700, height=700)

start.....
create model... GemmaConfig(vocab_size=256000, max_position_embeddings=8192, num_hidden_layers=28, num_attention_heads=16, num_key_value_heads=16, hidden_size=3072, intermediate_size=24576, head_dim=256, rms_norm_eps=1e-06, dtype='bfloat16', quant=True, tokenizer='/kaggle/input/gemma/pytorch/7b-it-quant/2/tokenizer.model')

Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/site-packages/kaggle_environments/agent.py", line 159, in act
    action = self.agent(*args)
  File "/tmp/ipykernel_34/1055568858.py", line 147, in agent_fn
    selected_agent = get_agent("questioner")
  File "/tmp/ipykernel_34/1055568858.py", line 136, in get_agent
    agent = Questioner(variant=VARIANT, device_str=DEVICE)
  File "/tmp/ipykernel_34/1055568858.py", line 88, in __init__
    super().__init__(variant, device_str)
  File "/tmp/ipykernel_34/1055568858.py", line 61, in __init__
    self._model = self._model.to(self._device).eval()
  File "/opt/conda/lib/python3.10/site-package

In [None]:
# !apt install pigz pv > /dev/null

In [None]:
# !tar --use-compress-program='pigz --fast --recursive | pv' -cf submission.tar.gz -C /kaggle/working/submission . -C /kaggle/input/ gemma/pytorch/7b-it-quant/2