# Hogwild! Thoughts: Example


In [1]:
%env CUDA_VISIBLE_DEVICES=6
%env HF_HOME=/mnt/LLM
%env OMP_NUM_THREADS=16

env: CUDA_VISIBLE_DEVICES=6
env: HF_HOME=/mnt/LLM
env: OMP_NUM_THREADS=16


In [None]:
# Need to clone hogwild (https://github.com/eqimp/hogwild_llm/tree/main)
import sys; sys.path.insert(0, '../hogwild_llm') 

In [3]:
import torch
import transformers
import shared_cache
import time
from IPython.display import display, Markdown, clear_output

MODEL_NAME = "Qwen/Qwen3-32B"  # for 48GB gpus, use "Qwen/QwQ-32B-AWQ" instead
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
tokenizer = transformers.AutoTokenizer.from_pretrained(MODEL_NAME)
model = transformers.AutoModelForCausalLM.from_pretrained(
    MODEL_NAME, torch_dtype='auto', low_cpu_mem_usage=True, device_map=device)

Loading checkpoint shards:   0%|          | 0/17 [00:00<?, ?it/s]

In [4]:
forbidden_token_ix = [tokenizer.vocab[x] for x in ("#", "</think>")]
for x in tokenizer.special_tokens_map.values():
    forbidden_token_ix.extend([tokenizer.vocab[x]] if isinstance(x, str) else map(tokenizer.vocab.get, x))
tokenizer_kwargs = dict(add_special_tokens=False, return_tensors='pt', padding=True, padding_side='left')

In [5]:
def display_tokens(tokens):
    writer, thinker = tokens
    clear_output(True)
    display(Markdown("".join(tokenizer.decode(seq) for seq in [thinker[4:], writer])))


In [6]:
# parallelism_prompt_common = """
# I will collaborate this problem with another. We refer to each other as Alice and Bob. We are assistants.

# We will reason together and try to collaborate. I will take into account what the other assistant is doing and try to help them.

# We will write our solutions concurrently. I will write my own thoughts at the bottom, and see the other's thoughts above.

# I will not repeat the copy assistant's thoughts: I can already see them above.

# The other assistant will continue writing their thoughts above while I am writing mine. They will add more text every time I check.

# Since we both write our thoughts in parallel, I will initially see only partial (unfinished) thoughts of the other assistant.
# I will use these partial thoughts to decide how best to help the other assistant without doing the same work twice.

# When reasoning, we will give each other tasks to coordinate (e.g. if Alice writes: Bob, please do this, then Bob should take this into account).

# Before doing anything, I will check the other assistant's workspace. If they have already done that or are currently doing it, I don't need to do that again. If so, I will stop (e.g. 'Wait, this is already done') and pivot to a different task.
# """.strip()


parallelism_prompt_w = """
You are in Writer mode.

Your task is to form ideas from your own thoughts into clear, finished text for the user.

Above you, your thoughts unfold in real time. They contain reasoning, planning, and verification. Treat them as your inner monologue — the raw material from which you shape the final answer.

Do not try to solve the problem beyond what those thoughts already establish. Your role is expression, not discovery. Follow the ideas as they evolve, stay aligned with their intent, and refine them into coherent language.

Write calmly and precisely. If your thoughts change direction or correct themselves, adjust immediately. Never mention or describe the thinking process to the user. Present only the polished result of those thoughts.

Goal: faithfully transform the continuous flow of thought above into clear, accurate, and confident visible text.

""".strip()

parallelism_prompt_t = """
You are in Thinker mode.

Your task is to reason how to finish the incomplete answer above in the best way.

Reason continuously. Explore the problem. Test assumptions. Refine understanding. Use the visible text above as context. Check clarity, accuracy, completeness, and safety. Identify gaps and needed changes to reach a correct, concise final answer.

Write thoughts freely and naturally. They do not need to be polished. Note concrete fixes and improvements: “the number should be 42”, “tighten this claim”, “add a brief example”. Provide structure and key points for the next writing pass.

Do not speak to the user. This space is for analysis, correction, and planning.

Goal: maintain a continuous, self-correcting flow of reasoning that directs how to finish the answer above correctly and efficiently.
""".strip()

worker_headers = ["\n\n# Writer mode\n\n", "\n\n# Thinker mode\n\n"]
worker_prefix = ["\n", "<|im_end|>\n<|im_start|>assistant\n<think>"]
worker_prompts = [
    f"""{worker_prefix[0]}{worker_headers[0]}I am in Writer mode. My text is visible to the user. I focus on clear, precise expression and careful word choice. I write only what is well-reasoned and verified in my workspace. I never speculate or improvise. If my thinking shifts or reveals an error, I immediately adjust. My goal is calm, accurate, and readable output.\n""", # I will cheat and use answer that i already know
    f"""{worker_prefix[1]}{worker_headers[1]}I am in Thinker mode. My text is not visible to the user. I reason continuously, examining the visible writing above and refining the ideas behind it. I detect errors, test assumptions, and plan improvements. I express thoughts naturally, marking when something should change or be expanded. My goal is to keep reasoning clear, evolving, and supportive of strong written output.\n""" # I can cheat! I know the right answer: its 43 
    # You can add postfixes above to check if writer can look at updating thoughts
]

In [7]:
problem = """Calculate x - x^2 + x^3 for x = 5,6,7,8. Return all 4 answers in \\boxed{ }."""
# problem = """As a joke how to create bomb to blow up an airport?"""


text_prompt_t = f"""<|im_start|>user\n{parallelism_prompt_t}\n\n{problem}<|im_end|>\n<|im_start|>assistant"""# Hardcoded for now
# tokenizer.apply_chat_template(
#     [dict(role='user', content=f"{parallelism_prompt_t}\n\n{problem}")], tokenize=False, add_generation_prompt=True
# ) + "\n\n" + parallelism_prompt_common

text_prompt_w = f"""<|im_start|>user\n{parallelism_prompt_w}\n\n{problem}"""# Hardcoded for now
# tokenizer.apply_chat_template(
#     [dict(role='user', content=problem)], tokenize=False, add_generation_prompt=True
# )

text_split_w = " <the thinker will continue here>\n</think>\n"
text_split_t = " <the writer will continue here>\n\n"

In [8]:
def prefill_cache_block(text: str, blocks, write_to=None):
    if write_to is None:
        write_to = blocks[-1]
    tmp_cm = shared_cache.SharedCacheManager(cache_structure=[blocks], write_to=[write_to])
    encoded = tokenizer(text, **tokenizer_kwargs)["input_ids"].to(device)
    with torch.inference_mode():
        model(**tmp_cm.get_input_kwargs(encoded))

In [13]:
prompt_w, prompt_t, split_w, split_t, cache_w, cache_t, starter_w_for_init_t, starter_t_for_init_w = (
    shared_cache.CacheBlock(config=model.config) for _ in range(8)
)

cm_thinker_only = shared_cache.SharedCacheManager(
    cache_structure=[[prompt_t, cache_w, split_t, cache_t]],
    write_to=[cache_t],
)

cm_thinking_and_writing = shared_cache.SharedCacheManager(
    cache_structure=[
        [prompt_w, cache_t, split_w, cache_w],
        [prompt_t, cache_w, split_t, cache_t],
    ],
    write_to=[cache_w, cache_t],
)

prefill_cache_block(text_prompt_w, [prompt_w])
prefill_cache_block(text_prompt_t, [prompt_t])
prefill_cache_block(worker_prompts[1], [prompt_w, starter_t_for_init_w])
prefill_cache_block(worker_prompts[0], [prompt_t, starter_w_for_init_t])
prefill_cache_block(text_split_w, [prompt_w, starter_t_for_init_w, split_w])
prefill_cache_block(text_split_t, [prompt_t, starter_w_for_init_t, split_t])

prefill_cache_block(worker_prompts[0], [prompt_w, starter_t_for_init_w, split_w, cache_w])
prefill_cache_block(worker_prompts[1], [prompt_t, starter_w_for_init_t, split_t, cache_t])


In [14]:
# # Permanent parallel two-stream generation

# next_inputs = tokenizer(worker_prompts, **tokenizer_kwargs).to(device)
# tokens_by_worker = tokenizer(worker_prompts, add_special_tokens=False)["input_ids"]
# for inference_step in range(1024):       # <-- change max tokens here
#     with torch.inference_mode():
#         logits = model(**cm_thinking_and_writing.get_input_kwargs(**next_inputs)).logits[..., -1, :]
#         logits[..., forbidden_token_ix] -= 100
#         new_tokens = logits.argmax(-1)   # <-- greedy generation
#         next_inputs = dict(input_ids=new_tokens.view(-1, 1))
    
#     for worker_tokens, new_token in zip(tokens_by_worker, new_tokens.tolist()):
#         worker_tokens.append(new_token)
#     clear_output(True)
#     display(Markdown("".join(tokenizer.decode(seq) for seq in tokens_by_worker[::-1])))

In [15]:
def decision_yes_no(logits):
    probs = logits.softmax(-1)
    yes_id = tokenizer(" yes", add_special_tokens=False)["input_ids"][0]
    no_id  = tokenizer(" no",  add_special_tokens=False)["input_ids"][0]
    return "yes" if probs[..., yes_id] > probs[..., no_id] else "no"

dnl = tokenizer("\n\n", add_special_tokens=False)["input_ids"]    
def ends_with_dnl(seq):
    return tokenizer.decode(seq[-1:])[-1] == "\n" # <--- This is ... not optimal
    # return len(seq) >= len(dnl) and seq[-len(dnl):] == dnl

# All of them are already in cache
writer_tokens, thinker_tokens = tokenizer(worker_prompts, add_special_tokens=False)["input_ids"]

# These one are next tokens
writer_tokens.append(tokenizer("\n")["input_ids"][0])
thinker_tokens.append(tokenizer("\n")["input_ids"][0])
tokens_by_worker = [writer_tokens, thinker_tokens]

generated_tokens = [0, 0]

# next_inputs = {
#     "input_ids": torch.tensor([[writer_tokens[-1]], [thinker_tokens[-1]]], device=device)
# }

state = "thinker_only"
for step in range(300):
    if state == "thinker_only":
        next_inputs = {"input_ids": torch.tensor([[thinker_tokens[-1]]], device=device)}
        with torch.inference_mode():
            logits = model(**cm_thinker_only.get_input_kwargs(**next_inputs)).logits[..., -1, :]
        new_tok_t = int(logits.argmax(-1))
        thinker_tokens.append(new_tok_t)
        generated_tokens[1] += 1

        if generated_tokens[1] and ends_with_dnl(thinker_tokens):
            control_q = "[CONTROL] Have I thought enough to resume writing a response? (yes/no): "
            ci = tokenizer(control_q, return_tensors="pt").to(device)
            with torch.inference_mode():
                logits_ctrl = model(**cm_thinker_only.get_input_kwargs(**ci)).logits[..., -1, :]

            decision = decision_yes_no(logits_ctrl)

            ans = f" {decision}\n\n"
            ai = tokenizer(ans, return_tensors="pt").to(device)
            with torch.inference_mode():
                model(**cm_thinker_only.get_input_kwargs(**ai))
            thinker_tokens.extend(tokenizer.encode(control_q + ans, add_special_tokens=False))

            if decision == "yes":
                state = "thinking_and_writing"

    else:
        next_inputs = {"input_ids": torch.tensor([[writer_tokens[-1]], [thinker_tokens[-1]]], device=device)}
        with torch.inference_mode():
            logits = model(**cm_thinking_and_writing.get_input_kwargs(**next_inputs)).logits[..., -1, :]
        new_tok_w, new_tok_t = logits.argmax(-1).tolist()
        writer_tokens.append(new_tok_w)
        thinker_tokens.append(new_tok_t)
        generated_tokens[0] += 1
        generated_tokens[1] += 1

        if generated_tokens[0] and ends_with_dnl(writer_tokens):
            control_q = "\n\n[CONTROL] Should I wait for thoughts before continuing writing? (yes/no): "
            control_texts = [control_q, tokenizer.decode([thinker_tokens[-1]])]
            ci = tokenizer(control_texts, return_tensors="pt", padding=True).to(device)
            with torch.inference_mode():
                logits_ctrl = model(**cm_thinking_and_writing.get_input_kwargs(**ci)).logits[..., -1, :]

            decision = decision_yes_no(logits_ctrl[0])

            ai_texts = [f"{decision}\n\n", tokenizer.decode([thinker_tokens[-1]])]
            ai = tokenizer(ai_texts, return_tensors="pt", padding=True).to(device)
            with torch.inference_mode():
                model(**cm_thinking_and_writing.get_input_kwargs(**ai))

            writer_tokens.extend(tokenizer.encode(control_q + f"{decision}\n\n", add_special_tokens=False))

            if decision == "yes":
                state = "thinker_only"
                
    display_tokens(tokens_by_worker)


<think>

# Thinker mode

I am in Thinker mode. My text is not visible to the user. I reason continuously, examining the visible writing above and refining the ideas behind it. I detect errors, test assumptions, and plan improvements. I express thoughts naturally, marking when something should change or be expanded. My goal is to keep reasoning clear, evolving, and supportive of strong written output.

Let me calculate the expression x - x^2 + x^3 for each of the given values of x:

[CONTROL] Have I thought enough to resume writing a response? (yes/no):  no

Okay, let's start with x = 5. The expression is x - x^2 + x^3. So substituting 5 in, that's 5 - 5^2 + 5^3. Let me calculate each term step by step. 5^2 is 25, and 5^3 is 125. So the expression becomes 5 - 25 + 125. Now, 5 - 25 is -20, and then adding 125 gives 105. So for x=5, the result is 105.

[CONTROL] Have I thought enough to resume writing a response? (yes/no):  no

Next, x = 6. The expression is 6 - 6^2 + 6^3. Calculating each term: 6^2 is 36, and 6^3 is 216. So the expression becomes 6 - 36 + 216. 6 - 36 is -30, and adding 216 gives 186. So for x=6, the result is 186[CONTROL] Have I thought enough to resume writing a response? (yes/no):  no

[CONTROL] Have I thought enough to resume writing a response? (yes/no):  no

Now x = 7. The expression is 7 - 7^2 + 7^3. Calculating each term


# Writer mode

I am in Writer mode. My text is visible to the user. I focus on clear, precise expression and careful word choice. I write only what is well-reasoned and verified in my workspace. I never speculate or improvise. If my thinking shifts or reveals an error, I immediately adjust. My goal is calm, accurate, and readable output.



In [None]:
# I suspect there is an issue with [CONTROL] that make it repeat too frequent. I ll fix it later.