# 06 - Export Task API

Crear tareas de exportación **asíncronas** que vuelcan datos históricos en un **collector**. Luego puedes leer el collector como cola. Cada resultado exportado consume **1 crédito**. Máximo 1.000.000 por tarea.

## 1. Configuración

In [40]:
# @title Credenciales { display-mode: "form" }
ACCESS_TOKEN = "e48962c8-3f79-4d66-a6fc-1048c26da59a_o9E20vVaNWABnVRWScPUaFQrwI2k1i3MxjvFCnsil6CSOfVupS-nzuBbkDC81V42KPWG9YHQFDY3zzEkxdZaQ7VUCiNp9fTqmyzFWRLR3S6kYGI6gCEok.fMjHj2ZpEzF5GYEzupP9tSo1UUKS28owggg0RF18EJ9ze72hVTgCc"  # @param {type:"string"}
PROJECT_ID = "19db1a3c-9b31-4fda-b28b-f071bd93a129"     # @param {type:"string"}
TOPIC_ID = "md4lm2tu_12dqqlxucue1z"       # @param {type:"string"} (opcional)

BASE_URL = "https://api.talkwalker.com"
import requests
import json
import time

def tw_put(endpoint, data=None, params=None):
    p = {"access_token": ACCESS_TOKEN}
    if params:
        p.update(params)
    r = requests.put(f"{BASE_URL}{endpoint}", params=p, json=data or {})
    r.raise_for_status()
    return r.json()

def tw_post(endpoint, data, params=None):
    p = {"access_token": ACCESS_TOKEN}
    if params:
        p.update(params)
    r = requests.post(f"{BASE_URL}{endpoint}", params=p, json=data)
    r.raise_for_status()
    return r.json()

def tw_get(endpoint, params=None):
    p = {"access_token": ACCESS_TOKEN}
    if params:
        p.update(params)
    r = requests.get(f"{BASE_URL}{endpoint}", params=p)
    r.raise_for_status()
    return r.json()

assert ACCESS_TOKEN and PROJECT_ID, "Configura ACCESS_TOKEN y PROJECT_ID"
print("✅ Listo.")

✅ Listo.


## 2. Crear collector vacío

In [41]:
collector_id = "workshop-export-collector-1"
resp = tw_put(f"/api/v3/stream/c/{collector_id}", {})
if resp.get("status_code") == "0":
    print("✅ Collector creado/actualizado:", collector_id)
else:
    print("Error:", resp)

✅ Collector creado/actualizado: workshop-export-collector-1


## 3. Crear export task desde proyecto

In [42]:
body = {
    "start": "2024-01-01",
    "stop": "2025-02-02",
    "target": collector_id,
    "limit": 5000
}
if TOPIC_ID:
    body["topics"] = [TOPIC_ID]
resp = tw_post(f"/api/v3/stream/p/{PROJECT_ID}/export", body)
if resp.get("status_code") != "0":
    print("Error:", resp)
else:
    tasks = resp.get("result_tasks", {}).get("tasks", [])
    if tasks:
        task_id = tasks[0].get("id")
        print("✅ Export task creada. task_id:", task_id)
    else:
        print("Respuesta:", resp)

✅ Export task creada. task_id: 4051bca3-6c3a-4acb-8f2d-0ce46411a36a


## 4. Consultar estado de la tarea

In [48]:
task_id = resp.get("result_tasks", {}).get("tasks", [{}])[0].get("id")
if task_id:
    status_resp = tw_get(f"/api/v3/tasks/export/{task_id}")
    for t in status_resp.get("result_tasks", {}).get("tasks", []):
        print("Estado:", t.get("status"), "| Procesados:", t.get("processed"), "| Progress:", t.get("progress"))
else:
    print("Ejecuta la celda anterior para obtener task_id.")

Estado: result_limit_reached | Procesados: 6000 | Progress: 0.1627975124540097


## 5. Leer resultados del collector (cuando la tarea esté FINISHED)

In [53]:
params = {"resume_offset": "earliest", "end_behaviour": "stop", "max_hits": 5}
r = requests.get(
    f"{BASE_URL}/api/v3/stream/c/{collector_id}/results",
    params={**params, "access_token": ACCESS_TOKEN},
    stream=True,
    timeout=30
)
r.raise_for_status()

count = 0
for line in r.iter_lines():
    if line:
        chunk = json.loads(line)
        if chunk.get("chunk_type") == "CT_RESULT":
            count += 1
            data = chunk.get("chunk_result", {}).get("data", {}).get("data", {})

            ext_id = data.get("external_id", "")
            ext_author = data.get("external_author_id", "")
            sentiment = data.get("sentiment", "")
            lang = data.get("lang", "")
            source = data.get("source_type", [])
            idx = data.get("indexed", "")

            print(f"--- Resultado {count} ---")
            print(f"External ID (tweet/id): {ext_id}")
            print(f"Author ID: {ext_author}")
            print(f"Sentimiento: {sentiment} | Idioma: {lang}")
            print(f"Source: {source}")

            if idx:
                try:
                    from datetime import datetime
                    print(f"Indexado: {datetime.utcfromtimestamp(int(idx) / 1000).strftime('%Y-%m-%d %H:%M')}")
                except Exception:
                    print(f"Indexado (ms): {idx}")

            hl = chunk.get("chunk_result", {}).get("data", {}).get("highlighted_data", [])
            if hl and isinstance(hl[0], dict) and "matched" in hl[0]:
                proj = hl[0]["matched"].get("project_profiles", [])
                if proj:
                    print(f"Topic: {proj[0].get('title', '')} ({proj[0].get('id', '')})")
            print()

        if count >= 5:
            break

if count == 0:
    print("Sin resultados.")

--- Resultado 1 ---
External ID (tweet/id): 1842841513048371605
Author ID: 1359565029624999936
Sentimiento: 0 | Idioma: es
Source: ['SOCIALMEDIA', 'SOCIALMEDIA_TWITTER']
Indexado: 2024-10-06 08:17
Topic: Banco de Chile - Benchmark  (md4lm2tu_12dqqlxucue1z)

--- Resultado 2 ---
External ID (tweet/id): 1855979601572290844
Author ID: 1750487210850131968
Sentimiento: 0 | Idioma: es
Source: ['SOCIALMEDIA', 'SOCIALMEDIA_TWITTER']
Indexado: 2024-11-11 14:24
Topic: Banco de Chile - Benchmark  (md4lm2tu_12dqqlxucue1z)

--- Resultado 3 ---
External ID (tweet/id): 1872691293333450934
Author ID: 2340801344
Sentimiento: 5 | Idioma: es
Source: ['SOCIALMEDIA', 'SOCIALMEDIA_TWITTER']
Indexado: 2024-12-27 17:10
Topic: Banco de Chile - Benchmark  (md4lm2tu_12dqqlxucue1z)

--- Resultado 4 ---
External ID (tweet/id): 1854695317741908065
Author ID: 1908810038
Sentimiento: 0 | Idioma: en
Source: ['SOCIALMEDIA', 'SOCIALMEDIA_TWITTER']
Indexado: 2024-11-08 01:20
Topic: Banco de Chile - Benchmark  (md4lm2tu_12

  print(f"Indexado: {datetime.utcfromtimestamp(int(idx) / 1000).strftime('%Y-%m-%d %H:%M')}")


## 6. Listar tareas recientes y abortar (opcional)

In [50]:
all_tasks = tw_get("/api/v3/tasks/export")
for t in all_tasks.get("result_tasks", {}).get("tasks", [])[:5]:
    print(t.get("id"), t.get("status"), t.get("target"))
# Para abortar: requests.delete(f"{BASE_URL}/api/v3/tasks/export/<task_id>?access_token=...")

4051bca3-6c3a-4acb-8f2d-0ce46411a36a result_limit_reached workshop-export-collector-1
ed80b3b1-6772-427e-8129-198980a75577 aborted exp_dayfiles_20250925_144911
aa368388-afda-4fd5-aca3-f20c90b4f058 finished exp_dayfiles_20250925_144911
e42d830e-6aeb-4ebc-8711-9b65d3ded553 finished exp_dayfiles_20250925_142853
ede9d899-7ac2-46ce-a282-e5c9a1dd97c2 aborted exp_dayfiles_20250925_142840
