In [None]:
!pip install bitsandbytes transformers torch datasets openai requests




In [None]:
# ============================================================
# Script para recolección de datos de benchmark estilo Shakespeare
# Pensado para ejecutarse en Google Colab
# Requiere:
#   pip install transformers torch datasets openai requests
# ============================================================

import os
import random
import re
import textwrap
import requests
from pathlib import Path

import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
from openai import OpenAI
from transformers import (
    AutoTokenizer,
    AutoModelForCausalLM,
    BitsAndBytesConfig
)
#from trl import SFTTrainer, SFTConfig
from peft import LoraConfig, get_peft_model


In [None]:
# ----------------------------
# CONFIGURACIÓN GLOBAL
# ----------------------------

BASE_DIR = "/content/drive/MyDrive/StoryWriter/Data/Benchmark_data"

# Crear todas las carpetas necesarias
for p in [
#    BENCH_DIR / "Shakespeare_like_finetuned",
    BASE_DIR + "Shakespeare_like_gpt",
#    BENCH_DIR / "neg_coarse",
#    BENCH_DIR / "neg_fine",
]:
    p.mkdir(parents=True, exist_ok=True)

# Longitud objetivo de párrafos
MIN_WORDS = 150
MAX_WORDS = 300

# Split de controles negativos:
# proporción que va al benchmark vs al set de entrenamiento de Roberta
#NEG_BENCH_FRAC = 0.10  # 10% benchmark, 90% entrenamiento

# Semilla para reproducibilidad
random.seed(42)

AttributeError: 'str' object has no attribute 'mkdir'

In [None]:
MODEL_DIR = "/content/drive/MyDrive/StoryWriter/Modelo_FineTuning/mistral-7b-instruct-v0.3"

In [None]:
import

In [None]:
tokenizer = AutoTokenizer.from_pretrained(MODEL_DIR, trust_remote_code=True)
if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token

tokenizer.model_max_length = 512


bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_use_double_quant=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_compute_dtype=torch.bfloat16 if torch.cuda.is_available() else torch.float32,
)

model = AutoModelForCausalLM.from_pretrained(
    MODEL_DIR,
    quantization_config=bnb_config,
    dtype=torch.float16,
    device_map="auto",
    trust_remote_code=True,
)

ImportError: Using `bitsandbytes` 4-bit quantization requires the latest version of bitsandbytes: `pip install -U bitsandbytes`

In [None]:
MIN_WORDS = 150
MAX_WORDS = 400

In [None]:
# ----------------------------
# 1. HELPERS GENERALES
# ----------------------------

def clean_text(text: str) -> str:
    """Limpieza mínima de texto."""
    text = text.replace("\r\n", "\n")
    text = re.sub(r"\n{2,}", "\n\n", text)
    text = text.strip()
    return text

def count_words(text: str) -> int:
    return len(re.findall(r"\w+", text))

def cut_to_word_range(text: str, min_w=MIN_WORDS, max_w=MAX_WORDS) -> str | None:
    """
    Toma un texto y devuelve un sub-párrafo entre min_w y max_w palabras.
    Si no encuentra, devuelve None.
    Estrategia simple: tomar la ventana inicial de tamaño max_w
    y recortar al punto final más cercano.
    """
    words = re.findall(r"\S+", text)
    if len(words) < min_w:
        return None

    if len(words) <= max_w:
        return " ".join(words)

    # Tomamos un slice de tamaño max_w
    slice_words = words[:max_w]
    # Intentar cortar en el último punto
    joined = " ".join(slice_words)
    last_dot = joined.rfind(".")
    if last_dot != -1 and count_words(joined[:last_dot]) >= min_w:
        return joined[:last_dot+1].strip()
    # Si no, devolver las primeras max_w palabras sin más
    return joined.strip()

def save_paragraphs(paragraphs, out_dir: Path, prefix: str):
    """
    Guarda cada párrafo como un .txt en out_dir, con nombre prefix_XXXX.txt
    Devuelve la lista de paths escritos.
    """
    out_paths = []
    for i, text in enumerate(paragraphs):
        path = out_dir / f"{prefix}_{i:05d}.txt"
        with open(path, "w", encoding="utf-8") as f:
            f.write(text)
        out_paths.append(path)
    return out_paths

In [None]:
    """
    Write a single paragraph between 150 and 300 words in the style of
    Shakespeare's stories. The paragraph must be original,
    not copied, and self-contained.
    """

In [None]:
"""
You are an expert writer imitating William Shakespeare.

Write one single self-contained paragraph between 150 and 300 words in Early Modern English,
in the style of Shakespeare’s plays and sonnets. The paragraph must be original, not copied,
and should use iambic or quasi-iambic rhythm, archaic pronouns (thee, thou, thy), and
elevated metaphors.

Avoid copying any real Shakespeare sentences; the text must be entirely new.
"""

In [None]:
PROMPT_PRO = """
You are an expert writer imitating William Shakespeare.

Write one single self-contained paragraph between 150 and 300 words in Early Modern English,
in the style of Shakespeare’s plays and sonnets. The paragraph must be original, not copied,
and should use iambic or quasi-iambic rhythm, archaic pronouns (thee, thou, thy), and
elevated metaphors.

Avoid copying any real Shakespeare sentences; the text must be entirely new.
"""

PROMPT =     """
    Write a single paragraph between 150 and 300 words in the style of
    Shakespeare's stories. The paragraph must be original,
    not copied, and self-contained.
    """

def generate_Shakespeare_like_gpt(n_samples: int, prompt) -> list[str]:
    paragraphs = []
    for i in range(n_samples):
        print(f"[HF-GPT] Generando párrafo {i+1}/{n_samples} ...")
        out = gpt_pipe(
            prompt,
            max_new_tokens=450,
            do_sample=True,
            temperature=0.9,
            top_p=0.95,
            num_return_sequences=1,
        )[0]["generated_text"]

        # muchas veces el modelo devuelve prompt + continuación
        text = out[len(PROMPT_BASE_Shakespeare):].strip()
        text = clean_text(text)
        text = cut_to_word_range(text)
        if text is None:
            continue
        paragraphs.append(text)
    return paragraphs


In [None]:
pars = generate_Shakespeare_like_gpt(1, PROMPT)
print(pars[0])

[HF-GPT] Generando párrafo 1/1 ...


NameError: name 'gpt_pipe' is not defined

In [None]:
for i, chunk in enumerate(pars):
    file_path = os.path.join("/content/drive/MyDrive/StoryWriter/Data/Benchmark_data/mistral_base", f"chunk_{i:04d}.txt")
    with open(file_path, "w", encoding="utf-8") as f:
        f.write(chunk)
print(f"Saved {len(pars)} validation chunks")

Saved 14 validation chunks


In [None]:
import os
os.environ["OPENAI_API_KEY"] = "..."

In [None]:
# ----------------------------
# 2. GPT PARA GENERAR PÁRRAFOS TIPO Shakespeare
# ----------------------------

# IMPORTANTE: Configurá tu API key en el entorno de Colab:
#   import os
#   os.environ["OPENAI_API_KEY"] = "tu_api_key"
client = OpenAI()

GPT_MODEL = "gpt-4.1-mini"  # o el que quieras usar para generación

PROMPT_BASE_Shakespeare = (
    """
    Write a single paragraph between 150 and 300 words in the style of
    Shakespeare's stories. The paragraph must be original,
    not copied, and self-contained.
    """
)

def generate_Shakespeare_like_gpt(n_samples: int) -> list[str]:
    paragraphs = []
    for i in range(n_samples):
        print(f"[GPT] Generando párrafo {i+1}/{n_samples} ...")
        resp = client.chat.completions.create(
            model=GPT_MODEL,
            messages=[{"role": "user", "content": PROMPT_BASE_Shakespeare}],
            temperature=0.9,
            max_tokens=450,  # suficiente para 300 palabras aprox
        )
        text = resp.choices[0].message.content
        text = clean_text(text)
        text = cut_to_word_range(text)
        if text is None:
            continue
        paragraphs.append(text)
    return paragraphs

In [None]:
generate_Shakespeare_like_gpt(1)

[GPT] Generando párrafo 1/1 ...


RateLimitError: Error code: 429 - {'error': {'message': 'You exceeded your current quota, please check your plan and billing details. For more information on this error, read the docs: https://platform.openai.com/docs/guides/error-codes/api-errors.', 'type': 'insufficient_quota', 'param': None, 'code': 'insufficient_quota'}}

In [None]:
# ----------------------------
# 3. MODELO REDACTOR FINETUNEA­DO (LOCAL)
# ----------------------------

# Configurá acá la ruta o nombre HF de tu modelo finetuneado
AUTHOR_MODEL_NAME = "ruta/o/nombre/de/tu/modelo_finetuneado"

def load_author_model():
    device = 0 if torch.cuda.is_available() else -1
    tok = AutoTokenizer.from_pretrained(AUTHOR_MODEL_NAME)
    mdl = AutoModelForCausalLM.from_pretrained(AUTHOR_MODEL_NAME)
    gen_pipe = pipeline(
        "text-generation",
        model=mdl,
        tokenizer=tok,
        device=device,
    )
    return gen_pipe

PROMPT_REDACTOR_BASE = (
    """
    Write a single paragraph between 150 and 300 words in the style of
    Shakespeare's stories. The paragraph must be original,
    not copied, and self-contained.
    """"
)

def generate_Shakespeare_like_finetuned(n_samples: int, gen_pipe) -> list[str]:
    paragraphs = []
    for i in range(n_samples):
        print(f"[FINETUNED] Generando párrafo {i+1}/{n_samples} ...")
        out = gen_pipe(
            PROMPT_REDACTOR_BASE,
            max_new_tokens=450,
            do_sample=True,
            temperature=0.9,
            top_p=0.95,
            num_return_sequences=1,
        )[0]["generated_text"]
        # quitar el prompt si quedó
        text = out[len(PROMPT_REDACTOR_BASE):].strip()
        text = clean_text(text)
        text = cut_to_word_range(text)
        if text is None:
            continue
        paragraphs.append(text)
    return paragraphs

In [None]:
# ----------------------------
# 4. DESCARGA Y PARSING DE LIBROS (GUTENBERG O TXT LOCAL)
# ----------------------------

def download_gutenberg_book(gutenberg_id: int) -> str:
    """
    Descarga libro de Project Gutenberg en texto plano.
    NOTA: revisá siempre el estado de copyright según tu país.
    """
    url = f"https://www.gutenberg.org/files/{gutenberg_id}/{gutenberg_id}-0.txt"
    r = requests.get(url)
    r.raise_for_status()
    return r.text

def load_or_download_book(path_or_id: str | int) -> str:
    """
    Si path_or_id es un path a .txt existente, lo lee.
    Si es un int, asume Gutenberg ID y lo descarga.
    """
    if isinstance(path_or_id, int):
        text = download_gutenberg_book(path_or_id)
    else:
        path = Path(path_or_id)
        with open(path, "r", encoding="utf-8") as f:
            text = f.read()
    return clean_text(text)

def split_book_into_paragraphs(book_text: str) -> list[str]:
    """
    Divide un libro en párrafos simples usando doble salto de línea.
    Luego filtra por longitud y devuelve sólo los que están en [MIN, MAX] palabras.
    """
    raw_pars = re.split(r"\n{2,}", book_text)
    pars = []
    for p in raw_pars:
        p = p.strip()
        if not p:
            continue
        cut = cut_to_word_range(p)
        if cut is not None:
            pars.append(cut)
    return pars

In [None]:
# ----------------------------
# 5. RECOLECCIÓN DE PÁRRAFOS DE DIFERENTES FUENTES
# ----------------------------

def collect_negative_controls(neg_sources: list[str | int],
                              out_bench_dir: Path,
                              out_roberta_dir: Path,
                              prefix_bench: str,
                              prefix_roberta: str,
                              max_paragraphs_per_book=200):
    """
    neg_sources: libros para controles negativos (coarse o fine).
    Split 90% train (Roberta) / 10% benchmark (configurable).
    """
    all_pars = []
    for src in neg_sources:
        print(f"[NEG] Procesando fuente {src} ...")
        text = load_or_download_book(src)
        pars = split_book_into_paragraphs(text)
        random.shuffle(pars)
        all_pars.extend(pars[:max_paragraphs_per_book])

    random.shuffle(all_pars)
    n_total = len(all_pars)
    n_bench = int(NEG_BENCH_FRAC * n_total)
    bench_pars = all_pars[:n_bench]
    roberta_pars = all_pars[n_bench:]

    print(f"Total párrafos neg: {n_total} -> benchmark {len(bench_pars)}, roberta {len(roberta_pars)}")

    save_paragraphs(bench_pars, out_bench_dir, prefix_bench)
    save_paragraphs(roberta_pars, out_roberta_dir, prefix_roberta)

In [None]:
# ----------------------------
# 6. PIPELINE PRINCIPAL
# ----------------------------


# ====================================================
# 6.1 Fuentes de libros (completá esto a mano)
# ====================================================

# Control negativo GRUESO: otros géneros (terror, aventura, drama, etc.)
NEG_COARSE_SOURCES = [
    # 766,  # EJEMPLO: Dracula (chequear ID real)
    # "poe_tales.txt",
]

# Control negativo FINO: autores de misterio/policial NO Shakespeare
NEG_FINE_SOURCES = [
    # "christie_book1.txt",
    # "chesterton_father_brown.txt",
]

In [None]:
# ====================================================
# 6.2 Recolección de libros
# ====================================================

collect_real_Shakespeare(Shakespeare_SOURCES)

collect_negative_controls(
    NEG_COARSE_SOURCES,
    out_bench_dir=BENCH_DIR / "neg_coarse",
    out_roberta_dir=ROBERTA_DIR / "neg_coarse",
    prefix_bench="neg_coarse",
    prefix_roberta="neg_coarse",
)

collect_negative_controls(
    NEG_FINE_SOURCES,
    out_bench_dir=BENCH_DIR / "neg_fine",
    out_roberta_dir=ROBERTA_DIR / "neg_fine",
    prefix_bench="neg_fine",
    prefix_roberta="neg_fine",
)

# ====================================================
# 6.3 Generación con GPT (tipo Shakespeare)
# ====================================================

N_GPT_SAMPLES = 200  # definí cuántos querés
gpt_pars = generate_Shakespeare_like_gpt(N_GPT_SAMPLES)
save_paragraphs(gpt_pars, BENCH_DIR / "Shakespeare_like_gpt", "gpt_Shakespeare_like")

# ====================================================
# 6.4 Generación con tu modelo REDACTOR finetuneado
# ====================================================

N_FINETUNED_SAMPLES = 200
print("Cargando modelo finetuneado...")
gen_pipe = load_author_model()
finetuned_pars = generate_Shakespeare_like_finetuned(N_FINETUNED_SAMPLES, gen_pipe)
save_paragraphs(finetuned_pars, BENCH_DIR / "Shakespeare_like_finetuned", "ft_Shakespeare_like")

print("✅ Recolección terminada.")
print(f"Estructura de carpetas en: {BASE_DIR.resolve()}")
