# Habitat-Sim (GPU, headless/EGL) — Starter Notebook

このノートブックは **Keio AI Consortium の JupyterHub** で、**GPU を使って Habitat-Sim を動かす**ためのスターターです。  
上から順に実行すれば、次ができます：

1. GPU 認識確認（`nvidia-smi`）  
2. `habitat-sim` / `habitat-lab` のインストール（CUDA 11.8 例）  
3. テストシーンの取得（`habitat-test-scenes`）  
4. Headless（EGL）での最小レンダリング実行  
5. 4関数の実装：`move`, `rotate`, `go_to_location`, `stop`  
6. `<FunctionSequence>` 形式の計画をそのまま実行  
7. （任意）OpenAI API で計画を生成して実行

## 0. GPU を認識できているか確認

In [None]:
!nvidia-smi || true

## 1. 環境変数の準備（EGL）

- Headless 実行では X11 は不要です。  
- `PYOPENGL_PLATFORM=egl` を設定し、`DISPLAY` は使いません。

In [None]:
import os
os.environ.pop("DISPLAY", None)   # disable X
os.environ["PYOPENGL_PLATFORM"] = "egl"
os.environ["MAGNUM_LOG"] = "warning"
print("DISPLAY:", os.environ.get("DISPLAY"))
print("PYOPENGL_PLATFORM:", os.environ.get("PYOPENGL_PLATFORM"))

## 2. Habitat のインストール

> **メモ**: 2080 Ti なら CUDA 11.8 のホイールが安定です。  
もし CUDA 版が合わず失敗する場合は、エラー内容をメモして教えてください（コマンドを調整します）。

In [None]:
# インストール（2～5分）
%pip -q install "habitat-sim==0.3.0" --extra-index-url https://download.pytorch.org/whl/cu118
%pip -q install "habitat-lab==0.3.0" 

## 3. テストシーンのダウンロード（`habitat-test-scenes`）

In [None]:
import os, shutil, tarfile, urllib.request, pathlib

DATA_DIR = pathlib.Path("data/scene_datasets/habitat-test-scenes")
DATA_DIR.mkdir(parents=True, exist_ok=True)

url = "https://github.com/facebookresearch/habitat-sim/archive/refs/heads/main.tar.gz"
tar_path = "habitat-sim-main.tar.gz"
if not (DATA_DIR / "apartment_0.glb").exists():
    print("Downloading habitat-sim (for test scenes)...")
    urllib.request.urlretrieve(url, tar_path)
    with tarfile.open(tar_path, "r:gz") as tar:
        members = [m for m in tar.getmembers() if "data/scene_datasets/habitat-test-scenes/apartment_0.glb" in m.name]
        for m in members:
            tar.extract(m)
    src = pathlib.Path("habitat-sim-main/data/scene_datasets/habitat-test-scenes/apartment_0.glb")
    if src.exists():
        shutil.move(str(src), str(DATA_DIR / "apartment_0.glb"))
    shutil.rmtree("habitat-sim-main", ignore_errors=True)
    if pathlib.Path(tar_path).exists():
        os.remove(tar_path)
    print("apartment_0.glb ready.")
else:
    print("apartment_0.glb already exists.")
print("Scene path:", DATA_DIR / "apartment_0.glb")

## 4. 最小実行（GPU/EGL でレンダリング）

In [None]:
import habitat_sim
from habitat_sim.utils.common import quat_from_angle_axis
from math import radians
from PIL import Image
import numpy as np

scene_path = "data/scene_datasets/habitat-test-scenes/apartment_0.glb"

cfg = habitat_sim.SimulatorConfiguration()
cfg.scene_id = scene_path

agent_cfg = habitat_sim.agent.AgentConfiguration()
agent_cfg.action_space = {
    "move_forward": habitat_sim.agent.ActionSpec(
        "move_forward", habitat_sim.agent.ActuationSpec(amount=0.25)
    ),
    "turn_left": habitat_sim.agent.ActionSpec(
        "turn_left", habitat_sim.agent.ActuationSpec(amount=10.0)
    ),
    "turn_right": habitat_sim.agent.ActionSpec(
        "turn_right", habitat_sim.agent.ActuationSpec(amount=10.0)
    ),
}

sim = habitat_sim.Simulator(habitat_sim.Configuration(cfg, [agent_cfg]))
agent = sim.initialize_agent(0)

obs = sim.step("move_forward")
print("OK. RGBA:", obs["rgba"].shape, "Depth:", obs["depth"].shape)

rgba = (obs["rgba"] * 255).astype(np.uint8)
Image.fromarray(rgba).save("frame0.png")
print("Saved: frame0.png")

## 5. ロボット関数の実装（move / rotate / go_to_location / stop）

In [None]:
import math
from dataclasses import dataclass
import numpy as np

@dataclass
class Pose2D:
    x: float
    y: float
    yaw_deg: float

location_map = {
    "キッチン": (1.5, 0.0),
    "リビング": (0.0, 0.0),
    "机": (0.5, 0.5),
    "kitchen": (1.5, 0.0),
    "living": (0.0, 0.0),
    "desk": (0.5, 0.5),
}

def _repeat(action_name: str, times: int):
    times = max(0, int(times))
    last = None
    for _ in range(times):
        last = sim.step(action_name)
    return last

def move(direction: str, distance_m: float):
    step = 0.25
    n = int(abs(distance_m) / step + 0.5)
    d = direction.lower()
    if d in ["forward", "前", "前進"]:
        _repeat("move_forward", n)
        return "OK forward"
    elif d in ["backward", "後退", "後ろ"]:
        rotate("left", 180)
        _repeat("move_forward", n)
        rotate("right", 180)
        return "OK backward"
    elif d in ["left", "左"]:
        rotate("left", 90)
        _repeat("move_forward", n)
        rotate("right", 90)
        return "OK strafe-left(~)"
    elif d in ["right", "右"]:
        rotate("right", 90)
        _repeat("move_forward", n)
        rotate("left", 90)
        return "OK strafe-right(~)"
    else:
        return "NG unknown direction"

def rotate(direction: str, angle_deg: float):
    step = 10.0
    n = int(abs(angle_deg) / step + 0.5)
    if direction.lower() in ["left", "左"]:
        _repeat("turn_left", n)
        return "OK turn_left"
    else:
        _repeat("turn_right", n)
        return "OK turn_right"

def go_to_location(location_name: str):
    key = location_name if location_name in location_map else location_name.lower()
    if key not in location_map:
        return f"NG: '{location_name}' not registered."
    x, y = location_map[key]
    s = agent.get_state()
    s.position = np.array([x, 0.0, y], dtype=np.float32)
    agent.set_state(s)
    return f"OK go_to({location_name})"

def stop():
    return "stopped" 

## 6. `<FunctionSequence>` を実行

In [None]:
import re, ast

def extract_between(tag: str, text: str):
    m = re.search(fr"<{tag}>([\s\S]*?)</{tag}>", text, re.IGNORECASE)
    return m.group(1).strip() if m else None

allowed_env = {"move": move, "rotate": rotate, "go_to_location": go_to_location, "stop": stop}

def safe_eval_call(expr: str):
    node = ast.parse(expr, mode="eval")
    if not isinstance(node.body, ast.Call):
        raise ValueError("Not a function call")
    call = node.body
    if not isinstance(call.func, ast.Name) or call.func.id not in allowed_env:
        raise ValueError("Function not allowed")
    def lit(x):
        if isinstance(x, ast.Constant):
            return x.value
        if isinstance(x, ast.UnaryOp) and isinstance(x.op, (ast.UAdd, ast.USub)) and isinstance(x.operand, ast.Constant):
            return +x.operand.value if isinstance(x.op, ast.UAdd) else -x.operand.value
        raise ValueError("Only literal args")
    args = [lit(a) for a in call.args]
    kwargs = {kw.arg: lit(kw.value) for kw in call.keywords}
    return allowed_env[call.func.id](*args, **kwargs)

plan_text = '''
<FunctionSequence>
rotate("left", 90)
move("forward", 1.0)
go_to_location("キッチン")
stop()
</FunctionSequence>
'''

fs = extract_between("FunctionSequence", plan_text) or ""
log = []
for line in fs.splitlines():
    s = line.strip()
    if not s:
        continue
    try:
        res = safe_eval_call(s)
    except Exception as e:
        res = f"ERROR: {e}"
    log.append((s, res))
log 

## 7. （任意）OpenAI API で計画生成→実行

In [None]:
import os
api_key = os.environ.get("OPENAI_API_KEY")
if not api_key:
    print("Set OPENAI_API_KEY and re-run this cell.")
else:
    from openai import OpenAI
    client = OpenAI(api_key=api_key)

    SYSTEM_PROMPT = """
You are a robot planner. Always output a <FunctionSequence> block composed of these Python calls only:
- move(direction:str, distance_m:float)
- rotate(direction:str, angle_deg:float)
- go_to_location(location_name:str)
- stop()
You may also include <ClarifyingQuestion> if needed.
"""

    user_text = "キッチンに移動して、1メートル前進して停止して"
    resp = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role":"system","content":SYSTEM_PROMPT},
            {"role":"user","content":user_text},
        ],
        temperature=0.2,
    )
    reply = resp.choices[0].message.content
    print("LLM reply:\n", reply)

    def extract_between(tag: str, text: str):
        import re
        m = re.search(fr"<{tag}>([\s\S]*?)</{tag}>", text, re.IGNORECASE)
        return m.group(1).strip() if m else None

    fs = extract_between("FunctionSequence", reply) or ""
    exec_log = []
    for line in fs.splitlines():
        s = line.strip()
        if not s: 
            continue
        try:
            res = safe_eval_call(s)
        except Exception as e:
            res = f"ERROR: {e}"
        exec_log.append((s, res))
    exec_log 