# Juego de Tronos (Libro 1): QA + Imagen (Notebook independiente)

Este notebook es **autocontenido**: no depende de archivos `.py` externos.
Modelos:
- QA/Planner: `Qwen/Qwen3-30B-A3B-Thinking-2507-FP8`
- Imagen: `stabilityai/stable-diffusion-3.5-large`


In [None]:
!pip -q install -U pandas pyarrow beautifulsoup4 lxml faiss-cpu sentence-transformers transformers accelerate diffusers safetensors


In [None]:
from google.colab import drive
drive.mount('/content/drive')

EPUB_PATH = '/content/drive/MyDrive/juego_de_tronos.epub'


In [None]:
import json
import os
import re
import zipfile
from typing import Optional

import faiss
import numpy as np
import pandas as pd
import torch
from bs4 import BeautifulSoup
from diffusers import StableDiffusion3Pipeline
from sentence_transformers import SentenceTransformer, CrossEncoder
from transformers import AutoTokenizer, AutoModelForCausalLM

QWEN_MODEL_ID = 'Qwen/Qwen3-30B-A3B-Thinking-2507-FP8'
EMBED_MODEL_ID = 'BAAI/bge-m3'
RERANKER_MODEL_ID = 'BAAI/bge-reranker-large'
SD3_MODEL_ID = 'stabilityai/stable-diffusion-3.5-large'


In [None]:
def clean_text(text: str) -> str:
    text = re.sub(r"\r\n?", "\n", text)
    text = re.sub(r"\n\s*\n+", "\n\n", text)
    text = re.sub(r"[ \t]+", " ", text)
    return text.strip()


def extract_title_and_pov(text: str) -> tuple[Optional[str], Optional[str]]:
    lines = [ln.strip() for ln in text.split("\n") if ln.strip()]
    for line in lines[:20]:
        if re.match(r"^[A-ZÁÉÍÓÚÑÜ]+.*\(\d+\)$", line):
            return line, line.split("(")[0].strip()
    return None, None


def list_xhtml_text_files(zf: zipfile.ZipFile) -> list[str]:
    candidates = [f for f in zf.namelist() if f.lower().endswith((".xhtml", ".html"))]
    preferred = [f for f in candidates if '/text/' in f.lower() or '/texto/' in f.lower()]
    return sorted(preferred if preferred else candidates)


def extract_chapters(epub_path: str) -> pd.DataFrame:
    if not os.path.exists(epub_path):
        raise FileNotFoundError(f'No existe EPUB_PATH: {epub_path}')

    chapters = []
    with zipfile.ZipFile(epub_path, 'r') as zf:
        for file_name in list_xhtml_text_files(zf):
            soup = BeautifulSoup(zf.read(file_name), 'lxml')
            text = clean_text(soup.get_text('\n'))
            if len(text) < 800:
                continue
            title, pov = extract_title_and_pov(text)
            chapters.append({
                'chapter_id': len(chapters),
                'epub_file': file_name,
                'title': title,
                'pov': pov,
                'text': text,
                'n_chars': len(text),
            })
    return pd.DataFrame(chapters)


def chunk_text(text: str, chunk_size: int = 4500, overlap: int = 750):
    if chunk_size <= overlap:
        raise ValueError('chunk_size debe ser mayor que overlap')
    start = 0
    chunks = []
    while start < len(text):
        end = min(start + chunk_size, len(text))
        piece = text[start:end].strip()
        if piece:
            chunks.append((start, end, piece))
        start += (chunk_size - overlap)
    return chunks


def build_chunks(chapters_df: pd.DataFrame, chunk_size: int = 4500, overlap: int = 750) -> pd.DataFrame:
    rows = []
    for _, ch in chapters_df.iterrows():
        for i, (s, e, t) in enumerate(chunk_text(ch['text'], chunk_size, overlap)):
            rows.append({
                'chunk_id': f"{int(ch['chapter_id'])}_{i}",
                'chapter_id': int(ch['chapter_id']),
                'epub_file': ch['epub_file'],
                'title': ch['title'],
                'pov': ch['pov'],
                'start_char': s,
                'end_char': e,
                'text': t,
                'n_chars': len(t),
            })
    return pd.DataFrame(rows)


In [None]:
chapters_df = extract_chapters(EPUB_PATH)
chunks_df = build_chunks(chapters_df)

chapters_df.to_parquet('chapters.parquet', index=False)
chunks_df.to_parquet('chunks.parquet', index=False)

print('Capítulos:', len(chapters_df), '| Chunks:', len(chunks_df))


In [None]:
embedder = SentenceTransformer(EMBED_MODEL_ID)
reranker = CrossEncoder(RERANKER_MODEL_ID)


def embed_texts(texts, batch_size=32):
    vecs = embedder.encode(
        texts,
        batch_size=batch_size,
        show_progress_bar=True,
        convert_to_numpy=True,
        normalize_embeddings=True,
    )
    return vecs.astype('float32')

emb = embed_texts(chunks_df['text'].tolist())
index = faiss.IndexFlatIP(emb.shape[1])
index.add(emb)
print('FAISS listo:', index.ntotal)


In [None]:
tokenizer = AutoTokenizer.from_pretrained(QWEN_MODEL_ID, use_fast=True)
model = AutoModelForCausalLM.from_pretrained(
    QWEN_MODEL_ID,
    torch_dtype=torch.bfloat16,
    device_map='auto',
)


def build_context(passages_df: pd.DataFrame, max_chars_each: int = 1800) -> str:
    blocks = []
    for _, row in passages_df.iterrows():
        txt = row['text'][:max_chars_each].strip()
        blocks.append(f"[{row['chunk_id']}] ({row['pov']} | {row['title']})\n{txt}")
    return '\n\n'.join(blocks)


def run_chat(messages, max_new_tokens=400):
    prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
    inputs = tokenizer(prompt, return_tensors='pt').to(model.device)
    out = model.generate(
        **inputs,
        max_new_tokens=max_new_tokens,
        do_sample=False,
        temperature=0.0,
        top_p=1.0,
        repetition_penalty=1.03,
    )
    new_tokens = out[0][inputs['input_ids'].shape[1]:]
    return tokenizer.decode(new_tokens, skip_special_tokens=True).strip()


def retrieve_passages(question: str, top_k=12, faiss_k=100) -> pd.DataFrame:
    q_emb = embed_texts([question], batch_size=1)
    scores, idxs = index.search(q_emb, faiss_k)
    cand = chunks_df.iloc[idxs[0].tolist()].copy()
    cand['faiss_score'] = scores[0]
    cand['rerank_score'] = reranker.predict([(question, t) for t in cand['text'].tolist()])
    return cand.sort_values('rerank_score', ascending=False).head(top_k).reset_index(drop=True)


def answer_question(question: str, passages_df: pd.DataFrame) -> str:
    context = build_context(passages_df)
    messages = [
        {
            'role': 'system',
            'content': (
                "Eres experto en 'Juego de Tronos' (Libro 1). "
                "Responde solo con hechos de los fragmentos. "
                "Si no hay evidencia, di exactamente: 'No encontrado en los fragmentos proporcionados'. "
                "Añade [chunk_id] al final de cada frase factual."
            ),
        },
        {'role': 'user', 'content': f'Pregunta: {question}\n\nFragmentos:\n{context}'},
    ]
    return run_chat(messages, max_new_tokens=420)


In [None]:
scene_schema = {
    'style': 'string',
    'subject': 'string',
    'setting': 'string',
    'time_of_day': 'day|night|dawn|dusk|unknown',
    'mood': 'string',
    'characters': [{'name': 'string', 'appearance': 'string', 'clothing': 'string'}],
    'action': 'string',
    'camera': 'string',
    'important_objects': ['string'],
    'avoid': ['string'],
}


def extract_first_json(raw: str) -> dict:
    start = raw.find('{')
    if start < 0:
        raise ValueError('No se encontró JSON en salida del planner')

    depth = 0
    in_str = False
    esc = False
    for i in range(start, len(raw)):
        ch = raw[i]
        if in_str:
            if esc:
                esc = False
            elif ch == '\\':
                esc = True
            elif ch == '"':
                in_str = False
            continue
        else:
            if ch == '"':
                in_str = True
                continue

        if ch == '{':
            depth += 1
        elif ch == '}':
            depth -= 1
            if depth == 0:
                return json.loads(raw[start:i+1])

    raise ValueError('JSON no balanceado en salida del planner')


def plan_scene(question: str, answer: str, passages_df: pd.DataFrame) -> dict:
    context = build_context(passages_df, max_chars_each=1400)
    messages = [
        {
            'role': 'system',
            'content': (
                'You are an art director for text-to-image generation. '
                'Return ONLY a valid JSON object following the provided schema. '
                'Use only details grounded in context and answer. '
                'No actor names, no TV adaptation references.'
            ),
        },
        {
            'role': 'user',
            'content': (
                f'Question: {question}\n\nAnswer: {answer}\n\nContext:\n{context}\n\n'
                f'Schema: {json.dumps(scene_schema, ensure_ascii=False)}'
            ),
        },
    ]
    raw = run_chat(messages, max_new_tokens=360)
    return extract_first_json(raw)


def scene_to_prompt(scene: dict) -> tuple[str, str]:
    chars = []
    for c in (scene.get('characters') or [])[:3]:
        desc = ', '.join([x for x in [c.get('name'), c.get('appearance'), c.get('clothing')] if x])
        if desc:
            chars.append(desc)

    parts = [
        'cinematic still, medieval fantasy, high detail, natural lighting',
        scene.get('style', ''),
        f"subject: {scene.get('subject', '')}",
        scene.get('action', ''),
        f"setting: {scene.get('setting', '')}",
        f"time: {scene.get('time_of_day', '')}",
        f"mood: {scene.get('mood', '')}",
        f"characters: {'; '.join(chars)}" if chars else '',
        f"camera: {scene.get('camera', '')}",
        'props: ' + ', '.join(scene.get('important_objects', [])) if scene.get('important_objects') else '',
    ]
    prompt = ', '.join([p.strip() for p in parts if p and str(p).strip()])

    avoid = (scene.get('avoid') or []) + [
        'text, watermark, logo',
        'tv actors, celebrity face',
        'modern clothing',
        'low quality, blurry',
    ]
    negative = ', '.join(dict.fromkeys(avoid))
    return prompt, negative


In [None]:
image_pipe = StableDiffusion3Pipeline.from_pretrained(
    SD3_MODEL_ID,
    torch_dtype=torch.bfloat16,
).to('cuda')
image_pipe.enable_attention_slicing()


def ask_and_draw(question: str, top_k=12, faiss_k=100, seed: Optional[int] = None):
    passages = retrieve_passages(question, top_k=top_k, faiss_k=faiss_k)
    answer = answer_question(question, passages)
    scene = plan_scene(question, answer, passages)
    prompt, negative = scene_to_prompt(scene)

    gen = None
    if seed is not None:
        gen = torch.Generator(device='cuda').manual_seed(seed)

    image = image_pipe(
        prompt=prompt,
        negative_prompt=negative,
        num_inference_steps=30,
        guidance_scale=6.0,
        width=1024,
        height=1024,
        generator=gen,
    ).images[0]

    return {
        'answer': answer,
        'passages': passages,
        'scene': scene,
        'prompt': prompt,
        'negative_prompt': negative,
        'image': image,
    }


In [None]:
result = ask_and_draw('¿Cómo escapó Tyrion del Nido de Águilas?', seed=7)
print(result['answer'])
print('PROMPT:', result['prompt'])
result['image']
