In [1]:
import sys
!{sys.executable} -m pip install --user unidecode

[0m

In [2]:
import requests
import json
from pathlib import Path
from unidecode import unidecode

In [None]:
import json
import unicodedata
import re
import csv
import os
import requests
from tqdm.notebook import tqdm

# === CONFIGURACIÓN ===
target_domain = "TRANSPORT"  # 🔹 dominio objetivo (puedes cambiarlo)
lang = "en"  # idioma de las definiciones a extraer
base_dir = "/home/jovyan/lrec_2026/data_10/land_transport"

txt_path = os.path.join(base_dir, "glossary_land_transport_en.txt")
json_output = os.path.join(base_dir, "iate_land_transport_full_filtered_en.json")
csv_output = os.path.join(base_dir, "glossary_land_transport_with_definitions_en_full_filtered.csv")

# === FUNCIONES AUXILIARES ===
def save_json(data, filename):
    with open(filename, "w", encoding="utf-8") as json_file:
        json.dump(data, json_file, ensure_ascii=False, indent=4)
    print(f"✅ Archivo guardado en: {filename}")

def normalize(text):
    text = "" if text is None else str(text)
    text = text.strip().lower()
    text = ''.join(c for c in unicodedata.normalize("NFD", text) if unicodedata.category(c) != "Mn")
    text = re.sub(r"[^a-z0-9\s]", " ", text)
    return text.strip()

def domain_matches(item, target_norm):
    """Comprueba si el ítem pertenece al dominio indicado."""
    for d in item.get("domains", []) or []:
        domain_info = d.get("domain", {})
        path = domain_info.get("path", [])
        name = domain_info.get("name", "")
        texts = [" ".join(path)] if path else [name]
        for t in texts:
            if target_norm in normalize(t) or normalize(t) in target_norm:
                return True
    return False

# === CLASE PARA HACER REQUESTS A IATE ===
class IateRequester:
    def __init__(self) -> None:
        username = "username"
        password = "password"
        iate_url = (
            f"https://test.iate2.eu/uac-api/ws/oauth2/token?"
            f"username={username}&password={password}&grant_type=password"
        )
        iate_headers = {
            "Accept": "application/vnd.iate.token+json; version=2",
            "Content-Type": "application/x-www-form-urlencoded",
        }
        response = requests.post(iate_url, headers=iate_headers)
        response = response.json()
        self.auth_token = response["tokens"][0]["access_token"]

    def make_request(self, request_data, quantity_results=5):
        query_url = (
            f"https://test.iate2.eu/em-api/ws/entries/_search?expand=true&limit={quantity_results}&offset=0"
        )
        query_headers = {
            "Accept": "application/vnd.iate.entry+json; version=2",
            "Content-Type": "application/vnd.iate.entry-search+json; version=1",
            "Authorization": f"Bearer {self.auth_token}",
        }
        query_data = {
            "query": request_data["req_term"],
            "query_operator": 3,  # all words
            "search_in_term_types": [4],
            "search_in_fields": [0],
            "source": request_data["in_language"],
            "targets": request_data["out_languages"],
        }
        query_res = requests.get(query_url, headers=query_headers, json=query_data)
        return query_res.json()

# === LEER TÉRMINOS DEL TXT ===
with open(txt_path, "r", encoding="utf-8") as f:
    all_terms = [line.strip() for line in f if line.strip()]

# === INICIALIZAR IATE ===
requester = IateRequester()

results_dict = {}
rows = []
target_norm = normalize(target_domain)

print(f"🔹 Extrayendo definiciones del dominio '{target_domain}' para todos los términos...")

# === BUCLE PRINCIPAL ===
for term in tqdm(all_terms):
    request_data = {
        "req_term": term,
        "in_language": lang,
        "out_languages": [lang],
    }

    try:
        response_json = requester.make_request(request_data)
    except Exception as e:
        print(f"❌ Error con '{term}': {e}")
        continue

    items = response_json.get("items", [])
    chosen_definition = None
    chosen_sources = []

    for item in items:
        lang_entry = item.get("language", {}).get(lang, {})
        definition = lang_entry.get("definition", {})

        if not (definition and "value" in definition):
            continue

        # ✅ Comprobamos si pertenece al dominio
        if not domain_matches(item, target_norm):
            continue

        chosen_definition = definition["value"].strip().replace("\n", " ")
        refs = definition.get("references", [])
        chosen_sources = [r.get("text", "").strip() for r in refs if r.get("text", "")]
        break  # solo una definición por término

    if chosen_definition:
        results_dict[term] = {
            "definition": chosen_definition,
            "sources": chosen_sources,
        }
        rows.append([term, chosen_definition, " | ".join(chosen_sources)])

# === GUARDAR RESULTADOS ===
save_json(results_dict, json_output)

with open(csv_output, "w", encoding="utf-8", newline="") as csvfile:
    writer = csv.writer(csvfile, delimiter=";")
    writer.writerow(["term", "definition", "source"])
    writer.writerows(rows)

print(f"\n✅ Proceso terminado.")
print(f"📘 Definiciones extraídas: {len(rows)}")
print(f"🗂 CSV guardado en: {csv_output}")
print(f"🧩 JSON guardado en: {json_output}")
