Demostración de Gearman en Python con arquitectura por responsabilidades, API FastAPI, consola web, worker pool dinámico y observabilidad por worker.
src/gearman_demo/
domain/ # lógica pura: análisis de texto, sharding, catálogo de tareas
gearman/ # cliente, worker, codec, compatibilidad y telemetría Gearman
application/ # casos de uso, historial, reportes y pipeline
interfaces/
cli/ # demo por terminal
http/ # API FastAPI y contratos Pydantic
web/ # consola web
scripts/ # runners para API, worker y demo CLI
main.py # orquestador: gearmand + API + workers
install_all.sh # instalación de sistema, venv y dependencias
docs/ # notas de arquitectura
- Python 3.10+
gearmanddisponible en el sistema
Instalación rápida:
bash install_all.sh
source .venv/bin/activateSi gearmand no está instalado, en Linux/apt normalmente basta con:
sudo apt install gearman-job-serverpython main.pyEsto hace lo siguiente:
- usa
.venv/bin/pythonsi existe; - inicia
gearmandsi no hay uno escuchando; - detecta CPUs con
os.cpu_count(); - levanta un worker por CPU, con tope de 64;
- asigna tareas a los workers con round-robin;
- arranca la API antes de los workers para recibir sus reportes de estado;
- expone la consola web.
URLs:
- Web:
http://127.0.0.1:8000/ - Swagger:
http://127.0.0.1:8000/docs
Para detener todo:
Ctrl-C
Forzar cantidad exacta de workers:
python main.py --workers 4Usar CPUs detectados, pero limitar el máximo:
python main.py --max-workers 16Política de asignación:
- Si hay más tareas que workers, un worker registra varias tareas.
- Si hay más workers que tareas, varias CPUs pueden ejecutar la misma tarea.
- Los workers se nombran con padding estable:
cpu-01,cpu-02, ...,cpu-64.
Tareas Gearman registradas:
demo.analyze: analiza texto, tokens y sentimiento.demo.shard: divide texto en fragmentos.demo.bg_log: ejecuta una acción background y deja telemetría.
Caso de uso compuesto:
POST /api/pipeline: ejecutademo.shardy luegodemo.analyzepor cada shard.
La consola web está disponible en http://127.0.0.1:8000/ cuando se ejecuta python main.py o el runner de API.
La interfaz está organizada por tabs principales:
Monitor: muestra métricas generales, ocupación de workers, gráficas de actividad y el cuadro de mando por worker.Ejecutar: permite enviar trabajosPipeline,Analyze,ShardyBackground.Historial: lista jobs locales, muestra el resultado seleccionado y los resultados reportados por worker.Logs: muestra los eventos de ejecución de API y workers; al seleccionar un job enHistorial, los logs se filtran por ese job.
Dentro de Ejecutar, hay tabs secundarios para elegir el tipo de trabajo que se enviará a Gearman.
La consola web muestra:
- estado reportado por cada worker por HTTP: ocupado/libre, PID, jobs en curso, jobs procesados, jobs fallidos, tarea actual, última tarea y duración;
- ocupación visual del pool de workers;
- resultados por worker;
- resultado final/agregado del job;
- logs de ejecución de API y workers;
- historial local de jobs enviados desde la API.
Los workers usan WorkerStatusReporter para enviar su estado a la API con:
POST /api/worker-status
La web consulta el estado agregado con:
GET /api/workers-status
Telemetría compartida:
.runtime/events.jsonl
Logs humanos por worker:
.runtime/workers/cpu-01.log
.runtime/workers/cpu-02.log
...
Ver un worker específico:
tail -f .runtime/workers/cpu-01.logCada log de worker documenta arranque, tareas registradas, jobs recibidos, inicio, fin, duración, resultados y errores.
GET /api/health: estado de API y servidor Gearman configurado.GET /api/tasks: catálogo de tareas.POST /api/analyze: ejecutademo.analyze.POST /api/shard: ejecutademo.shard.POST /api/background-log: envíademo.bg_log.POST /api/pipeline: orquestademo.shardydemo.analyze.POST /api/worker-status: recibe reportes de estado de workers.GET /api/workers-status: lista el último estado reportado por cada worker.GET /api/jobs: historial local de jobs.GET /api/jobs/{local_job_id}: detalle de un job.GET /api/events: últimos eventos de ejecución.GET /api/jobs/{local_job_id}/events: eventos de un job específico.GET /api/report: reporte agregado.
- Ejecuta
python main.py. - Abre
http://127.0.0.1:8000/. - Entra al tab
Ejecutary ejecutaPipeline. - Vuelve a
Monitorpara ver la ocupación de workers y elCuadro de mando Gearman. - Entra a
Historialpara seleccionar el job y revisarResultado seleccionado. - Revisa
Resultados por workeren el mismo tab. - Entra a
Logspara ver los eventos filtrados por el job seleccionado. - Mira el archivo de un worker con
tail -f .runtime/workers/cpu-01.log.
Gearman server:
gearmand --listen=127.0.0.1 --port=4730 --verbose INFOAPI + web:
source .venv/bin/activate
python scripts/run_api.py --host 127.0.0.1 --port 4730 --api-host 127.0.0.1 --api-port 8000Worker:
source .venv/bin/activate
python scripts/run_worker.py --host 127.0.0.1 --port 4730 --worker-index 0 --worker-count 1 --worker-id cpu-01 --api-url http://127.0.0.1:8000Cliente CLI:
source .venv/bin/activate
python scripts/run_demo.py "Gearman es excelente, rápido y productivo, aunque a veces da error"source .venv/bin/activate
python -m unittest discover -s tests -vgearman3 requiere parches para Python moderno:
array.fromstringfue reemplazado porarray.frombytes.- Algunos nombres de tarea llegan desde Gearman como
bytesy se normalizan astr. - Los eventos y respuestas se sanitizan para ser JSON serializable.
Estos parches viven en src/gearman_demo/gearman/compat.py y están cubiertos por tests.
- Historial y estado agregado de workers viven en memoria del proceso API.
- Telemetría multiproceso se guarda en archivos locales bajo
.runtime/. - El pipeline ejecuta los análisis de shards de forma secuencial desde la API.
- Una versión más avanzada podría usar batch submit, persistencia externa y métricas Prometheus.
Ver también docs/architecture.md.