project to practice Java without frameworks
Estado: ✔️ Completado
Cobertura mínima garantizada: ≥ 70 % instrucciones (JaCoCo)
Build:
Modelar las entidades centrales del sistema aplicando buenas prácticas OO:
- Encapsulación e inmutabilidad
- Contratos coherentes de
equals / hashCode / toString
- Patrones de diseño apropiados (Builder)
- Cobertura automática de pruebas ≥ 70 %
classDiagram
direction TB
class User {
+UUID id
+String name
+String email
+UserRole role
+Instant createdAt
+promoteTo()
}
class DataFile {
+UUID id
+String originalFilename
+String storagePath
+long sizeBytes
+String checksumSha256
+Instant uploadedAt
+User uploadedBy
}
class BatchJobConfig {
+UUID id
+String name
+String description
+int chunkSize
+ReaderType readerType
+WriterType writerType
+boolean allowRestart
+Instant createdAt
+boolean active
+Builder builder()
}
class ProcessingRequest {
+UUID id
+String title
+DataFile dataFile
+Map\<String,String> parameters
+RequestStatus status
+Instant createdAt
+User requestedBy
+BatchJobConfig batchJobConfig
+markRunning()/markCompleted()/markFailed()
}
class JobExecution {
+UUID id
+ProcessingRequest processingRequest
+Instant startTime
+Instant endTime
+ExecutionStatus exitStatus
+long readCount/writeCount/skipCount
+String errorMessage
+finish()
}
class Report {
+UUID id
+ProcessingRequest processingRequest
+String storagePath
+String summaryJson
+Instant generatedAt
+User generatedBy
}
User "1" --> "many" DataFile : uploadedBy
User "1" --> "many" ProcessingRequest : requestedBy
DataFile "1" --> "many" ProcessingRequest : dataFile
BatchJobConfig "1" --> "many" ProcessingRequest : template
ProcessingRequest "1" --> "many" JobExecution : retries
ProcessingRequest "1" --> "0..1" Report : result
Área | Decisión |
---|---|
Identidad | Todas las entidades usan UUID id ; igualdad y hash se basan solo en ese campo. |
Inmutabilidad | DataFile y Report son records 100 % inmutables.User , ProcessingRequest , JobExecution exponen solo los campos estrictamente mutables (role , status , métricas). |
Validaciones | Reglas de negocio comprobadas en constructores y métodos de transición (assertThrows cubierto en tests). |
Cobertura | JaCoCo con umbral ≥ 70 % (INSTRUCTION). El reporte HTML se publica como artefacto en GitHub Actions. |
Patrón | Propósito principal | Pros clave | Contras clave | Dónde se aplica |
---|---|---|---|---|
Builder | Construir objetos con muchos parámetros opcionales manteniendo legibilidad. | Lectura fluida, evita telescoping constructors, facilita valores por defecto. | Algo de boilerplate adicional. | BatchJobConfig (builder(String) + clase estática Builder ). |
Factory | Ocultar o centralizar la lógica de creación cuando existen varias implementaciones o decisiones condicionales. | Aísla la complejidad de instanciación; favorece SRP. | Puede dispersarse en múltiples métodos si crecen variantes. | Previsto para futuras estrategias de ReaderType / WriterType (no implementado aún, documentado para epic E2). |
my-app
├── pom.xml # POM raíz (packaging = pom)
├── core # módulo de dominio (épica E1)
│ ├── pom.xml # dependencias JaCoCo, EqualsVerifier, etc.
│ └── src
│ ├── main
│ │ └── java
│ │ └── com/practice/domain/...
│ └── test
│ └── java
│ └── com/practice/domain/...
└── lib # reservado para la épica E2 (utilidades)
├── pom.xml
└── src/...
# Compilar + tests + reporte de cobertura
./mvnw clean verify
# Abrir el reporte
open core/target/site/jacoco/index.html
import com.utils.cache.LruCache;
import java.util.Optional;
import java.util.UUID;
LruCache<UUID, User> cache = new LruCache<>(1_000);
cache.put(user.getId(), user);
Optional<User> maybe = cache.get(user.getId());
System.out.println("hits=" + cache.hitCount() + ", miss=" + cache.missCount());
Si insertas 1 001 usuarios, el más antiguo se descarta automáticamente.
Componente | Elección | Razón |
---|---|---|
Contenedor base | LinkedHashMap access-order |
Reordenamiento automático y removeEldestEntry para expulsión |
Concurrencia | ReentrantReadWriteLock |
Muchos lectores, un escritor |
Métricas | volatile long hitCount/missCount |
Lectura coherente sin bloqueo |
Caso de uso | Beneficio |
---|---|
Resolver User por UUID miles de veces en un job | Reduce I/O a BD o estructuras de gran tamaño |
Guardar configuraciones repetidamente leídas | Evita parseo / IO redundante |
Estrategia | Ventaja principal | Uso recomendado |
---|---|---|
LRU | Mantiene en memoria los elementos usados recientemente | Lecturas muy frecuentes con cache de tamaño fijo |
TTL | Expira elementos tras X tiempo, sin importar el uso | Configuraciones que cambian periódicamente |
(Se incluye TtlCache<K,V> como referencia, aún no productiva.)
Comando ejecutado
java -jar lib/target/benchmarks.jar \
-rf CSV -rff bench.csv \
-tu ms -f 1 -wi 2 -i 3 -w 2s -r 2s
Operación (dataset = 1 000 000) | ArrayList (ms / op) |
LinkedList (ms / op) |
Ganador |
---|---|---|---|
addLast list.add(x) |
0.000020 ± 0.000120 | 0.000138 ± 0.000025 | ArrayList |
addFirst list.add(0,x) |
0.078022 ± 0.061059 | 0.000134 ± 0.000031 | LinkedList |
random get list.get(rnd) |
0.000025 ± 0.000039 | 0.625891 ± 2.060440 | ArrayList |
full iteration for-each |
1.582198 ± 4.929957 | 3.701233 ± 29.944820 | ArrayList |
Interpretación rápida
ArrayList domina en acceso aleatorio (get), inserción al final y recorrido secuencial.
LinkedList solo gana en inserción al inicio de la lista (addFirst) con colecciones muy grandes.
Para la mayoría de casos de lectura y escritura al final, ArrayList es la opción recomendada.
Método | Firma | PECS aplicado |
---|---|---|
copy |
<T> List<T> copy(List<? extends T> src) |
Producer Extends |
addAll |
<T> void addAll(Collection<? super T> dst, Collection<? extends T> src) |
Consumer Super / Producer Extends |
deepUnmodifiable |
<K,V> Map<K,V> deepUnmodifiable(Map<? extends K,? extends V> src) |
Ambos “extends” |
Regla PECS: Producer Extends (fuentes producen objetos → ? extends
), Consumer Super (destinos consumen objetos → ? super
).
Esto permite una API flexible y segura, sin casts ni raw types.
classDiagram
Exception <|-- DataflowException
DataflowException <|-- DomainException
RuntimeException <|-- InfraException
DomainException <|-- InvalidFileFormatException
DomainException <|-- UserNotFoundException
InfraException <|-- DatabaseUnavailableException
[DB_DOWN] DatabaseUnavailableException: db down
com.dataflowhub.core.exception.DatabaseUnavailableException: db down
at com.dataflowhub.core.repository.JobRepository.save(JobRepository.java:42)
at com.dataflowhub.core.service.JobService.create(JobService.java:57)
at ...
Prefijo [CODE]
permite dashboards rápidos en Kibana / Grafana.
Opción verbose=false limita a 5 líneas de stack para logs limpios.
Método bajo prueba | Excepción esperada | Test |
---|---|---|
CsvParser.parse() |
InvalidFileFormatException |
CsvParserStubTest |
TtlCache.put() (cuando falla executor) |
InfraException |
TtlCacheFailureTest |
ProcessingRequest constructor |
NullPointerException / IllegalArgumentException |
ProcessingRequestValidationTest |
Cada excepción declarada cuenta con al menos un test; la cobertura supera el 75 %.
La CI fallará si se cambia la excepción lanzada o se reduce la cobertura.
NotificationService svc = new NotificationService(); // 4 threads
Notification n = new Notification(user.getEmail(), "Job done");
Future<Boolean> ok = svc.send(n);
if (ok.get()) log.info("Email sent!");
svc.shutdown();
svc.awaitTermination(5, SECONDS);
Criterio de aceptación | Resultado |
---|---|
100 envíos en paralelo | ✔️ completan < 5 s sin RejectedExecutionException |
shutdown() ordenado |
✔️ termina < 2 s |
Cobertura playground | 80 % en paquete concurrent.notification |
try (WorkQueue workQueue = new WorkQueue()) {
workQueue.startWorkers(3);
jobs.forEach(job -> workQueue.submit(() -> process(job)));
} // auto-close ⇒ stop()
Ventaja | Detalle |
---|---|
Distribuye carga | Productores delegan a consumidores concurrentes. |
Back-pressure | Si limitas la capacidad, submit() bloquea al llenar la cola. |
Shutdown limpio | stop() envía POISON PILL + join() sin InterruptedException . |
Versión | Primitiva | Resultado | Rendimiento |
---|---|---|---|
Buggy | int sin sincronización |
Pierde incrementos | Rápido pero incorrecto |
Fix #1 | AtomicInteger |
Correcto | Mejor que lock bajo contención alta |
Fix #2 | ReentrantLock |
Correcto | Latencia mayor, pero permite operaciones compuestas |
Modelo de memoria (simplificado)
- Escribir en un
int
no es atómico → dos hilos pueden leer-modificar-escribir simultáneamente. AtomicInteger
ofrece operación CAS -> happens-before y visibilidad.ReentrantLock
establece un monitor → exclusión mutua + semántica happens-before enunlock()
/lock()
.
ReportAggregator ra = new ReportAggregator();
ra.generate("id-123")
.thenAccept(r -> log.info("Ready: {}", r.summary()))
.join(); // bloquea en demo; en producción, se encadena
Ventaja | Detalle |
---|---|
Paralelismo | supplyAsync lanza tareas A, B, C en el commonPool; total ≤ máx(tareas). |
Composición | thenCombine + thenApply fusionan resultados sin callback hell. |
Manejo de errores | exceptionally registra con ErrorHandler y propaga causa unificada. |
total = transactions.stream() // fuente
.filter(t -> t.status() == VALID) // interm. 1
.collect(groupingBy( // interm. 2 + terminal
Transaction::user, summingDouble(Transaction::amount)))
.entrySet().stream() // nuevo stream
.sorted(comparingByValue().reversed())// interm. 3
.collect(toMap(..., LinkedHashMap::new));
Operación Stream | Tipo | Complejidad |
---|---|---|
filter |
intermedia | O(n) |
groupingBy + sum |
terminal (con cola intermedia) | O(n) |
sorted |
intermedia | O(n log n) |
collect(toMap) |
terminal | O(n) |
Complejidad total: O(n log n) debido a la fase de ordenación.
Operación | Dataset | Secuencial (ms/op) | Paralelo (ms/op) | Speed-up |
---|---|---|---|---|
Suma de 10 000 000 doubles | 10 M elementos | 30.554 ± 3.189 | 2.764 ± 0.184 | × 11 ≈ |
Map + reduce 100 000 JobExecution | 100 k elementos | 0.076 ± 0.014 | 0.083 ± 0.008 | × 0.92 (peor) |
Tiempos promedio (modo AverageTime) tras 2 warm-ups + 3 mediciones; unidad = ms/op.
-
Cálculo numérico masivo
La suma de 10 M doubles se acelera ≈ 11 × gracias al fork-join: cada hilo procesa unos 2,5 M elementos y la sobrecarga de división/combina se amortiza. -
Datasets medianos o pipelines ligeros
Map-reduce sobre 100 k objetos empeora en paralelo (× 0.92).
Cuando la operación por elemento es muy barata, la sobrecarga de fork-join y la fusión de resultados supera al trabajo útil. -
Regla práctica
- Usa
parallelStream()
para colecciones muy grandes (≈ > 1 M) o tareas CPU-bound costosas. - Evítalo en datasets pequeños, operaciones I/O-bound o servidores donde el commonPool ya está saturado.
- Usa
-
Fork-join pool
parallelStream()
utiliza el ForkJoinPool.commonPool (≈ nº de núcleos).
Puedes ajustar su tamaño conSystem.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "8");
o emplear tu propio ForkJoinPool si necesitas aislar cargas.
Instant start = job.getStartTime(); // almacenado en UTC
Duration d = TimeUtil.between(start, TimeUtil.nowUtc());
log.info("Duración: {} s", d.getSeconds());
Almacena en UTC (Instant) y convierte a zona de usuario en la vista.
No se usan Date / Calendar; la API moderna es inmutable y thread-safe.
Complejidad O(1) en todas las utilidades; solo cálculos aritméticos o acceso a campos.
✅ DO | ❌ DON’T |
---|---|
Retornar Optional<T> en vez de null . |
Encadenar Optional.get() sin isPresent() . |
Usar map , flatMap , orElse , orElseThrow . |
Aceptar Optional como argumento público (mejor @Nullable parámetro o sobrecarga). |
Emplear utilidades como OptionalUtil.firstPresent(...) para evitar cascadas de ifPresent . |
Usar Optional en campos de entidad (incrementa coste de serialización). |
Nota: Guarda valores ausentes como
Optional.empty()
, no comonull
dentro delOptional
.
Ejemplo práctico:
Optional<Report> maybe = reportRepo.findByRequestId(id);
String path = maybe.map(Report::getFilePath)
.orElse("/placeholder.txt");
User u = …;
String json = JsonSerializer.toJson(u); // pretty-printed, null-safe
User copy = JsonSerializer.fromJson(json, User.class);
assert u.equals(copy);
Ventaja Gson core | Limitación vs Jackson |
---|---|
≈ 240 kB JAR, sin reflection module opener | No soporta filtros, mix-ins o @JsonView |
Tolerancia a campos desconocidos (forward-compat) | Sin autodetección de records en versiones < 17 |
Rendimiento suficiente (≈ 50 MB/s) | Sin streaming “pull” de bajo nivel |
Regla adoptada: guardar JSON siempre en UTF-8, sin dependencias de Spring; los modelos evolucionan manteniendo compatibilidad porque los campos extra se ignoran.
Métrica sobre 10 000 objetos | JSON (Gson) | Externalizable | Ventaja |
---|---|---|---|
Tiempo serializar (promedio) | 4,01 ms/op | 2,63 ms/op | ≈ 1,5 × más rápido |
Tiempo calcular tamaño ( size_* benchmark) |
0,20 ms/op | 2,69 ms/op | JSON obtiene longitud de String ; llamada binaria necesita copiar buffer |
Tamaño total en disco (previo) | 250 KB | 120 KB | 2,1 × más compacto |
Resultados obtenidos en la misma JVM y máquina (cuatro núcleos),
3 warm-ups + 3 mediciones (jmh
modo AverageTime, unidad = ms/op).
- Externalizable continúa duplicando la compresión (120 KB vs 250 KB) y ahora es ~50 % más veloz al serializar.
- El benchmark
size_*
sólo mide el coste de calcular el tamaño, no el tamaño en sí; por eso JSON es más rápido ahí (simpleString.length
). - En deserialización (no mostrado) la tendencia es similar: binario evita parseo de texto.
Externalizable (binario) | JSON (texto legible) |
---|---|
✔ Tamaño más pequeño y latencia menor | ✔ Humano-legible, diff-friendly |
✔ Controlas qué campos escribes (versión 100 % manual) | ✔ Portabilidad entre lenguajes |
❌ No legible / requiere versión explícita | ❌ Mayor tamaño y parseo más costoso |
Regla práctica Utiliza serialización binaria sólo en caminos «hot-path» controlados enteramente por Java; mantén JSON para integración, logs o configuración donde la legibilidad y portabilidad pesan más.
Path csv = Path.of("requests.csv");
CsvUtil.writeRequests(csv, list, UTF_8, ';'); // guardar
List<ProcessingRequest> back =
CsvUtil.readRequests(csv, UTF_8, ';'); // leer
java.nio (Files.newBufferedReader/Writer) evita librerías pesadas.
Manejo correcto de salto de línea (Windows/Linux) gracias a BufferedWriter.newLine().
Charset configurable; por defecto usamos UTF-8 para compatibilidad.
Lógica O(n) simple: dividir cadena + join. Para CSV complejo (citas, escapes) considerar OpenCSV / Univocity.
Herramienta | Propósito | Severidad que rompe build |
---|---|---|
SpotBugs | Detecta bugs potenciales (NPE, concurrencia) | High o superior |
Checkstyle | Consistencia de estilo (sangría, nombres) | error |
PMD | Code smells, complejidad, duplicados | error |
- SpotBugs: añade un bloque
<Match>
enconfig/quality/spotbugs-exclude.xml
. - PMD / Checkstyle: usa la anotación
@SuppressWarnings("PMD.RuleName")
o comentario// CHECKSTYLE:OFF ... ON
.
Regla: justificar la supresión en el PR; no silenciar globalmente.
Los reportes HTML se generan en target/quality-reports/index.html
para cada módulo.
La documentación de la API pública (≥ 80 % cubierta) está disponible en
➡️ docs/javadoc/index.html
- ☑️ Setup multi-módulo (
core
,lib
) con CI y cobertura ≥ 75 %. - ☑️ Dominio modelado (User, DataFile, ProcessingRequest, …) aplicando OOP sólido.
- ☑️ Colecciones & Genéricos:
PagedResult<T>
,InMemoryCache
, micro-benchmark ArrayList vs LinkedList. - ☑️ Concurrencia básica:
NotificationService
,WorkQueue
, demo de race-condition yCompletableFuture
. - ☑️ Streams & Lambdas:
KpiCalculator
, benchmarkparallelStream()
. - ☑️ I/O & Serialización: JSON “ligero”,
Externalizable
,CsvUtil
conjava.nio
. - ☑️ Calidad transversal: análisis estático (SpotBugs, Checkstyle, PMD), JMH, Javadoc 80 %.
- OOP (encapsulación, inmutabilidad, patrón Builder/Factory).
- Streams & Collectors (
groupingBy
,mapping
, paralelos). - Generics avanzados (wildcards
? extends / super
). - Concurrencia (
ExecutorService
,BlockingQueue
,AtomicInteger
,ReentrantLock
). - java.time (
Instant
,LocalDate
, zonas). - Optional (best practices, utilidades).
- I/O / NIO.2 (Paths, Files, charsets).
- Benchmarking (JMH).
- Calidad (JaCoCo, SpotBugs, Checkstyle, PMD).
- Javadoc generada y publicada.
- Trade-offs en estructuras de datos – benchmark mostró cuándo
LinkedList
vence aArrayList
solo en inserciones al inicio con datasets grandes. - Overhead de
parallelStream()
– speed-up real sólo con > 1 M elementos CPU-bound. - Externalizable vs JSON – binario 2× más pequeño y rápido, pero pierde portabilidad/legibilidad.
- Evitar data races –
AtomicInteger
ofrece CAS barato; locks útiles para operaciones compuestas. - Calidad automatizada – build falla temprano; menor tiempo de revisión manual.
- Necesidad de profundizar en JMH avanzado (profiler, warm-up adecuado).
- PMD marcó complejidad > 15 en algunos métodos; pendiente refactorizar.
- Cobertura JaCoCo de benchmarks excluida; explorar integración con
jacoco:agent
para medición real. - Falta prueba de integración end-to-end (JSON ↔ CSV ↔ Report).
Área | Acción | Enlace |
---|---|---|
Refactor | Reducir complejidad de ProcessingRequest.Builder . |
#REF-TICKET-123 |
Performance | Añadir benchmarks para I/O (Buffered vs NIO channels). | perf-board |
Observabilidad | Integrar logs estructurados + metrics Micrometer. | board-fase-2 |
Persistencia | Prototipo con Spring Batch + H2. | board-fase-2 |
Automatización | Publicar GitHub Pages con Javadoc y JaCoCo badge. | PR #XYZ |
Cobertura actual: 78 % (core) • 85 % (lib)
Benchmarks clave: externalizable vs JSON (×1.5 speed-up) / parallelStream suma (×11 speed-up).
Fase 1 establecida como base sólida; lista para escalar a integración, observabilidad y optimización avanzada en la siguiente etapa.
Boot Setup & Tooling Estado: ✅ F2-01 • ✅ F2-02 · ✅ F2-03
Este documento resume los cambios y lineamientos aplicados en la Fase 2 para preparar el andamiaje de Spring Boot y conectar la librería de la Fase 1 con un servicio API.
DataFlowHubLibraryJava/
├─ pom.xml # POM padre (packaging = pom)
├─ lib/ # utilidades/demos (Fase 1)
│ └─ pom.xml
├─ core/ # lógica reusable de dominio (Fase 1)
│ └─ pom.xml
└─ api-service/ # NUEVO: servicio REST Spring Boot 3
└─ pom.xml
Claves del POM padre
- Propiedad centralizada
spring.boot.version
. - Import del BOM de Spring Boot en
<dependencyManagement>
. - Versión del spring-boot-maven-plugin fijada en
<pluginManagement>
para herencia en todos los módulos.
Objetivo: Inicializar un servicio web mínimo con Spring Boot 3 (Web + Actuator) que compile y arranque.
Entregables
- Módulo
api-service
creado. - Dependencias:
spring-boot-starter-web
,spring-boot-starter-actuator
. - Configuración de versión de Boot mediante BOM en el POM padre.
Criterios de aceptación
mvn -pl api-service test
compila sin errores.- La aplicación arranca localmente.
Comandos
mvn -pl api-service test
mvn -pl api-service spring-boot:run
¿Qué se hizo?
-
Se importó el módulo
core
(librería de Fase 1) dentro deapi-service
:<dependency> <groupId>com.practice</groupId> <artifactId>core</artifactId> <version>${project.version}</version> </dependency>
-
Se habilitó component scan cruzado desde la app web para detectar beans del
core
:@SpringBootApplication(scanBasePackages = "com.practice") public class ApiServiceApplication { }
-
Se validó la inyección de un bean de la librería (ej.
ErrorHandler
) exponiendo un endpoint de prueba:@RestController @RequiredArgsConstructor class HealthExtraController { private final ErrorHandler errorHandler; // viene de core @GetMapping("/ping") public String ping() { return "pong"; } }
Criterios de aceptación
mvn -pl api-service test
pasa sin errores.ErrorHandler
(u otro bean decore
) se inyecta y funciona en un endpoint expuesto.
Probar rápidamente
mvn clean install
java -jar api-service/target/api-service-*.jar
# En otra terminal
curl http://localhost:8080/actuator/health
curl http://localhost:8080/ping # → "pong"
POM padre
-
Define:
java.version
spring.boot.version
-
Importa el BOM de Spring Boot en
dependencyManagement
. -
Fija
spring-boot-maven-plugin
enpluginManagement
(heredado por los módulos). -
Orden de módulos para un build determinista:
<modules> <module>lib</module> <module>core</module> <module>api-service</module> </modules>
Compilación completa
mvn clean install
Compilar solo api-service
(con sus dependencias aguas arriba)
mvn -pl api-service test -am
Ejecutar
java -jar api-service/target/api-service-*.jar
Endpoints
GET /actuator/health
GET /ping
Objetivo: definir configuración por ambiente en
api-service
, aislando datasources y niveles de log, y documentar cómo activar perfiles.
api-service/src/main/resources/application.yml
(base)api-service/src/main/resources/application-dev.yml
api-service/src/main/resources/application-test.yml
api-service/src/main/resources/application-prod.yml
Dependencias relevantes (en api-service/pom.xml
): spring-boot-starter-jdbc
, com.h2database:h2
(runtime), org.postgresql:postgresql
(runtime).
Tests ejecutan con perfil test
de forma automática (Surefire: spring.profiles.active=test
).
spring:
application:
name: api-service
profiles:
default: dev # si no se especifica, arranca en dev
server:
port: 8080
management:
endpoints:
web:
exposure:
include: health,info
spring:
datasource:
url: jdbc:h2:mem:dataflow;DB_CLOSE_DELAY=-1;MODE=PostgreSQL
driver-class-name: org.h2.Driver
username: sa
password:
h2:
console:
enabled: true
logging:
level:
root: INFO
com.practice: DEBUG
spring:
datasource:
url: jdbc:h2:mem:testdb;DB_CLOSE_DELAY=-1;MODE=PostgreSQL
driver-class-name: org.h2.Driver
username: sa
password:
jpa:
hibernate:
ddl-auto: create-drop
logging:
level:
root: WARN
org.springframework: WARN
spring:
datasource:
url: ${DB_URL:jdbc:postgresql://db:5432/dataflow}
username: ${DB_USER:postgres}
password: ${DB_PASSWORD:postgres}
driver-class-name: org.postgresql.Driver
jpa:
hibernate:
ddl-auto: validate
server:
port: ${PORT:8080}
logging:
level:
root: INFO
org.springframework: INFO
file:
name: ${LOG_FILE:logs/api-service.log}
-
Por variable de entorno
# Linux/Mac SPRING_PROFILES_ACTIVE=prod mvn -pl api-service spring-boot:run # Windows PowerShell $Env:SPRING_PROFILES_ACTIVE="prod"; mvn -pl api-service spring-boot:run
-
Por línea de comandos al ejecutar el JAR
java -Dspring.profiles.active=dev -jar api-service/target/api-service-*.jar
-
Tests: el plugin Surefire fija
spring.profiles.active=test
(no requiere anotaciones en los tests).
mvn clean install
java -jar api-service/target/api-service-*.jar
# En otra terminal
curl http://localhost:8080/actuator/health
curl http://localhost:8080/ping # si está habilitado el endpoint de ejemplo
Smoke test (resumen): se valida que con test
el DataSource
sea H2 en memoria y que el nivel de log sea WARN
.
- Cambiar perfil modifica datasource y nivel de log.
- Tests se ejecutan con perfil
test
automáticamente (@ActiveProfiles("test")
). - README sección “Perfiles” explica variables y ejemplos.
Objetivo: exponer un endpoint que registre metadatos de un archivo (NO guarda binarios), valide con Bean Validation y responda 201 Created con
Location
y un DTO de salida.
api-service/src/main/java/.../files/FileController.java
api-service/src/main/java/.../files/FileUploadRequest.java
api-service/src/main/java/.../files/FileUploadResponse.java
api-service/src/main/java/.../common/ApiErrorHandler.java
api-service/src/main/java/.../files/FileTooLargeException.java
api-service/src/test/java/.../files/FileControllerTest.java
Dependencias (en api-service/pom.xml
)
spring-boot-starter-validation
(activación de Bean Validation)
Ruta: POST /files
Request Body (JSON):
{
"originalFilename": "ventas_julio.csv",
"sizeBytes": 1048576,
"storagePath": "/data/in/ventas_julio.csv",
"checksumSha256": "9f2c...abcd", // OPCIONAL
"uploadedByUserId": "8d3b6c3f-64db-4c1e-9d51-3b6f3d8e2a11"
}
Validaciones (Bean Validation):
originalFilename
→@NotBlank
y@Size(max=255)
sizeBytes
→@Positive
y máx 50 MB (50 * 1024 * 1024
)checksumSha256
→ opcional, regex^[a-fA-F0-9]{64}$
storagePath
→@NotBlank
uploadedByUserId
→@NotNull
(UUID)
Response (201 Created):
- Headers:
Location: /files/{id}
- Body:
{
"id": "4c16cf2a-9a11-4a9b-a1e5-0c7b2a7d1234",
"originalFilename": "ventas_julio.csv"
}
Errores (400 Bad Request):
- Cuerpo estándar:
{ "code": string, "message": string, "fields": [ {"field": string, "message": string} ] }
- Tamaño excedido ⇒
code = "FILE_TOO_LARGE"
- Violaciones de Bean Validation ⇒
code = "VALIDATION_ERROR"
confields[]
- Se reutiliza el
DataFile
existente encore/domain
. - Creación recomendada:
DataFile.createForUpload(...)
(fábrica en el dominio que generaid
/uploadedAt
y resuelveuploadedBy
a partir deuploadedByUserId
). - Si no existe la fábrica, puede instanciarse el
DataFile
directamente respetando las invariantes actuales del dominio.
# Éxito (201)
curl -i -X POST http://localhost:8080/files \
-H 'Content-Type: application/json' \
-d '{
"originalFilename":"ventas_julio.csv",
"sizeBytes":1048576,
"storagePath":"/data/in/ventas_julio.csv",
"uploadedByUserId":"8d3b6c3f-64db-4c1e-9d51-3b6f3d8e2a11"
}'
# Error (400 FILE_TOO_LARGE)
curl -i -X POST http://localhost:8080/files \
-H 'Content-Type: application/json' \
-d '{
"originalFilename":"big.bin",
"sizeBytes":52428801,
"storagePath":"/tmp/big.bin",
"uploadedByUserId":"8d3b6c3f-64db-4c1e-9d51-3b6f3d8e2a11"
}'
# Error (400 VALIDATION_ERROR) por filename vacío + checksum inválido
curl -i -X POST http://localhost:8080/files \
-H 'Content-Type: application/json' \
-d '{
"originalFilename":" ",
"sizeBytes":100,
"checksumSha256":"XYZ",
"storagePath":"/data/in/a.csv",
"uploadedByUserId":"8d3b6c3f-64db-4c1e-9d51-3b6f3d8e2a11"
}'
- Happy path: 201 +
Location
+ body{id, originalFilename}
. - Tamaño > 50MB: 400 con
code=FILE_TOO_LARGE
. - Payload inválido: 400
VALIDATION_ERROR
, lista defields
conoriginalFilename
ychecksumSha256
.
Los tests fuerzan la validación antes de llegar al dominio usando
@Valid
+BindingResult
.
- 202 con Location válido al crear correctamente.
- 400 con lista de errores de validación cuando falten campos.
title
recorta espacios; longitud > 140 → 400.- Tests de mapeo DTO→dominio verifican campos obligatorios y opcionales.
- El almacenamiento real del binario no se implementa en esta HU; solo se registran metadatos.
- En
prod
el comportamiento de logging y perfiles se hereda de la HU F2-03.
Objetivo: aceptar la solicitud de procesamiento y devolver un acuse (ACK) con
status=PENDING
, sin instanciar aún el agregado de dominio ni persistir. La validación de existencia deDataFile
,User
yBatchJobConfig
se realizará en la siguiente HU/épica.
api-service/src/main/java/.../processings/ProcessingController.java
api-service/src/main/java/.../processings/dto/CreateProcessingRequest.java
api-service/src/main/java/.../processings/dto/ProcessingCreatedResponse.java
api-service/src/main/java/.../config/AppBatchProps.java
api-service/src/test/java/.../processings/ProcessingControllerTest.java
api-service/src/main/resources/application.yml
→ propiedadapp.batch.default-config-id
Notas
- Se reutiliza el
ApiErrorHandler
existente (badRequestFrom(BindingResult)
) para responder 400 VALIDATION_ERROR. - No se construyen objetos de dominio (
ProcessingRequest
,DataFile
,User
,BatchJobConfig
) en esta HU.
Ruta: POST /processings
Request Body (JSON):
{
"title": "ETL Ventas Julio",
"dataFileId": "e4b7b32e-f93b-47b1-8a5d-6a0c3c8f1b0b",
"requestedByUserId": "1b2b4d6e-9fa1-4f0f-8b12-33d4c9a0e111",
"batchJobConfigId": "00000000-0000-0000-0000-000000000001", // OPCIONAL
"parameters": { "delimiter": ";" } // OPCIONAL
}
Validaciones:
title
→ se aplicatrim()
y luego se exige longitud 1..140. (El DTO puede permitir hasta 400 para entrada, pero el controller recorta y valida el límite efectivo.)dataFileId
→ requerido (UUID)requestedByUserId
→ requerido (UUID)batchJobConfigId
→ opcional; si no llega se usaapp.batch.default-config-id
parameters
→ opcional; si viene,Map<@NotBlank String, @NotBlank String>
Response (202 Accepted):
- Headers:
Location: /processings/{id}
- Body:
{ "id": "4c16cf2a-9a11-4a9b-a1e5-0c7b2a7d1234", "status": "PENDING" }
Errores (400 Bad Request):
- Cuerpo estándar:
{ "code": "VALIDATION_ERROR", "message": "Invalid payload", "fields": [ {"field": "...", "message": "..."} ] }
- Casos cubiertos:
title
vacío trastrim
,title
> 140,dataFileId
/requestedByUserId
nulos, claves/valores inválidos enparameters
.
application.yml
(o application-dev.yml
):
app:
batch:
default-config-id: "00000000-0000-0000-0000-000000000001"
Habilitar properties:
@EnableConfigurationProperties(AppBatchProps.class)
# Éxito (202)
curl -i -X POST http://localhost:8080/processings \
-H 'Content-Type: application/json' \
-d '{
"title":" ETL Ventas Julio ",
"dataFileId":"e4b7b32e-f93b-47b1-8a5d-6a0c3c8f1b0b",
"requestedByUserId":"1b2b4d6e-9fa1-4f0f-8b12-33d4c9a0e111",
"parameters": {"delimiter":";"}
}'
# Error (400 VALIDATION_ERROR) por título inválido
d='{"title":" ","dataFileId":"'$(uuidgen)'","requestedByUserId":"'$(uuidgen)'"}'
curl -i -X POST http://localhost:8080/processings -H 'Content-Type: application/json' -d "$d"
- Happy path: 202 +
Location
+ body{id, "PENDING"}
. - Uso de default: cuando
batchJobConfigId
no viene, se leeapp.batch.default-config-id
. - Validación:
title
(trim y longitud) y requeridos → 400VALIDATION_ERROR
confields
.
- 202 con Location válido al crear correctamente.
- 400 con lista de errores de validación cuando falten campos.
title
recorta espacios; longitud > 140 → 400.- Tests de mapeo DTO→dominio verifican campos obligatorios y opcionales.
- Esta HU no verifica la existencia de DataFile, User ni BatchJobConfig; tampoco instancia ProcessingRequest. Ese wiring (lookups/repos) se aborda en la siguiente HU/épica.
Objetivo: exponer el estado de un processing combinando el
ProcessingRequest
y su **última **JobExecution
(si existe). Devuelve un DTO estable para el front y 404 si el id no existe.
api-service/src/main/java/.../processings/ProcessingQueryController.java
api-service/src/main/java/.../processings/dto/ProcessingStatusResponse.java
api-service/src/main/java/.../processings/query/ProcessingStatusFinder.java
(puerto de lectura)api-service/src/main/java/.../processings/query/InMemoryProcessingStatusFinder.java
(adaptador in-memory)api-service/src/main/java/.../utils/error/ResourceNotFoundException.java
(404)api-service/src/test/java/.../processings/ProcessingQueryControllerTest.java
- (opcional demo)
api-service/src/main/java/.../config/DemoData.java
para “sembrar” un registro al arrancar.
Dependencias: se reutilizan las de api-service
. No se modifican core
ni lib
.
Ruta: GET /processings/{id}
Response 200 (JSON):
{
"id": "7e2a1d7c-39bb-4f1a-8a55-9a2f14c47788",
"title": "ETL Ventas Julio",
"status": "RUNNING",
"createdAt": "2025-08-07T03:10:21Z",
"dataFileId": "4c16cf2a-9a11-4a9b-a1e5-0c7b2a7d1234",
"metrics": { "readCount": 12345, "writeCount": 12280, "skipCount": 145 },
"lastExecution": {
"startTime": "2025-08-07T03:10:22Z",
"endTime": null,
"exitStatus": null,
"errorMessage": null
}
}
Response 404:
{ "code": "NOT_FOUND", "message": "processing id not found", "fields": [] }
Notas de serialización:
- Tiempos (
Instant
) en formato ISO‑8601 UTC (Z
). lastExecution
esnull
si nunca se ha ejecutado.- Si existe al menos una ejecución,
metrics
refleja la última (read/write/skip
).
public record ProcessingStatusResponse(
UUID id,
String title,
String status, // RequestStatus del dominio → String
Instant createdAt,
UUID dataFileId,
Metrics metrics,
LastExecution lastExecution
) {
public record Metrics(long readCount, long writeCount, long skipCount) {}
public record LastExecution(Instant startTime, Instant endTime, String exitStatus, String errorMessage) {}
}
-
Se usa el puerto
ProcessingStatusFinder
que expone:findRequest(UUID id)
→Optional<ProcessingRequest>
findLastExecution(UUID processingId)
→Optional<JobExecution>
-
Implementación por defecto:
InMemoryProcessingStatusFinder
(mapas concurrentes) para permitir tests y demos sin persistencia. -
Mapeo a DTO:
id
,title
,status
,createdAt
ydataFileId
vienen deProcessingRequest
.- Si hay última
JobExecution
: rellenametrics
ylastExecution
(conexitStatus.name()
), si no,metrics
=0/0/0
ylastExecution=null
.
# Existe y está corriendo (200)
curl -s http://localhost:8080/processings/7e2a1d7c-39bb-4f1a-8a55-9a2f14c47788 | jq
# No existe (404)
curl -i http://localhost:8080/processings/00000000-0000-0000-0000-000000000000
- 200 sin ejecución:
lastExecution
no presente ymetrics
en0
. - 200 con ejecución:
status
y métricas correctas;lastExecution.startTime
en ISO‑8601. - 404 inexistente: error con
code=NOT_FOUND
.
En los tests se mockea
ProcessingStatusFinder
y se construyen objetos de dominio reales (ProcessingRequest
,JobExecution
) para validar el mapeo.
- 200 con DTO completo; 404 cuando
id
no existe. - No exponer stacktraces; errores van por
RestExceptionHandler
(B3). - Campos de fecha en ISO‑8601 UTC.
- Tests cubren los tres escenarios.
- Este read-model es in-memory y sirve como contrato estable. En épicas B5/B6 se conectará con Spring Batch/Actuator para poblar las ejecuciones reales y métricas.
Objetivo: configurar PostgreSQL en dev y tests con migraciones Flyway. En tests se usa Testcontainers (PG real en Docker). No se modifica
core
nilib
.
api-service/pom.xml
→ dependencias JPA, PostgreSQL, Flyway y Testcontainersapi-service/src/main/resources/application-dev.yml
→ datasource PostgreSQL (por variables de entorno)api-service/src/main/resources/application-test.yml
→ sin datasource; JPA/Flyway mínimos para Testcontainersapi-service/src/test/java/.../db/DbSmokeTest.java
→ prueba de arranque y conexión a BD- (opcional)
api-service/docker-compose.yml
→ servicio PostgreSQL local para desarrollo - (opcional)
src/main/resources/db/migration/*.sql
→ scripts de Flyway
Dependencias añadidas (POM api-service
):
spring-boot-starter-data-jpa
org.postgresql:postgresql
(runtime)org.flywaydb:flyway-core
org.flywaydb:flyway-database-postgresql
(requerido para PG 16+)org.testcontainers:junit-jupiter
(test)org.testcontainers:postgresql
(test)
Si tu BOM de Spring Boot no trae Flyway 10, fija
flyway.version
a 10.x o usa Testcontainers conpostgres:15-alpine
(ver más abajo).
spring:
datasource:
url: ${DB_URL:jdbc:postgresql://localhost:5432/dataflow}
username: ${DB_USER:app}
password: ${DB_PASSWORD:secret}
driver-class-name: org.postgresql.Driver
jpa:
hibernate:
ddl-auto: validate
properties:
hibernate.jdbc.time_zone: UTC
flyway:
enabled: true
locations: classpath:db/migration
baseline-on-migrate: true
spring:
jpa:
hibernate:
ddl-auto: none
flyway:
enabled: true
No declares
spring.datasource.*
aquí: Testcontainers inyecta URL/usuario/clave automáticamente.
@Testcontainers
@SpringBootTest
class DbSmokeTest {
@Container
@ServiceConnection // Spring Boot 3.1+: autoconfigura el DataSource desde el contenedor
static PostgreSQLContainer<?> postgres = new PostgreSQLContainer<>("postgres:16-alpine");
// Fallback si tu Boot < 3.1
@DynamicPropertySource
static void pgProps(DynamicPropertyRegistry r) {
r.add("spring.datasource.url", postgres::getJdbcUrl);
r.add("spring.datasource.username", postgres::getUsername);
r.add("spring.datasource.password", postgres::getPassword);
}
@Autowired javax.sql.DataSource ds;
@Test void contextLoads_andDatabaseIsReachable() throws Exception {
try (var c = ds.getConnection()) { assert c.isValid(2); }
}
}
Nota Flyway: Para PostgreSQL 16, agrega flyway-database-postgresql
. Si usas Flyway < 10, cambia el contenedor a postgres:15-alpine
.
export DB_URL=jdbc:postgresql://localhost:5432/dataflow
export DB_USER=app
export DB_PASSWORD=secret
export SPRING_PROFILES_ACTIVE=dev
mvn -pl api-service spring-boot:run
$Env:DB_URL = "jdbc:postgresql://localhost:5432/dataflow"
$Env:DB_USER = "app"
$Env:DB_PASSWORD = "secret"
$Env:SPRING_PROFILES_ACTIVE = "dev"
mvn -pl api-service spring-boot:run
(Opcional) levanta PostgreSQL con Docker:
# api-service/docker-compose.yml
services:
postgres:
image: postgres:16-alpine
environment:
POSTGRES_DB: dataflow
POSTGRES_USER: app
POSTGRES_PASSWORD: secret
ports: ["5432:5432"]
volumes: ["pgdata:/var/lib/postgresql/data"]
volumes:
pgdata:
- Coloca scripts en
src/main/resources/db/migration
con prefijosV1__*.sql
,V2__*.sql
, etc. baseline-on-migrate: true
permite inicializar una BD vacía sin errores.
- uses: actions/checkout@v4
- uses: actions/setup-java@v4
with:
distribution: temurin
java-version: '17' # o 21 si tu POM lo requiere
cache: maven
- run: mvn -B -DskipTests=false clean verify
Testcontainers usa Docker del runner automáticamente; no requieres servicios extra.
- La app arranca en
dev
y ejecuta migraciones Flyway sin errores. - En
test
, el contexto usa Postgres de Testcontainers automáticamente. - Logs muestran zona horaria UTC y
ddl-auto=validate
.
Objetivo: persistir los agregados del dominio sin anotar ********
core
, creando entidades JPA y repos Spring Data enapi-service
(capa infra). Guardarparameters
como JSONB y exponer consultas básicas necesarias para B2.
api-service/src/main/java/.../infra/db/converter/MapToJsonConverter.java
api-service/src/main/java/.../infra/db/entity/
UserEntity.java
,DataFileEntity.java
,BatchJobConfigEntity.java
,ProcessingRequestEntity.java
,JobExecutionEntity.java
,ReportEntity.java
api-service/src/main/java/.../infra/db/repo/
UserRepository.java
,DataFileRepository.java
,BatchJobConfigRepository.java
,ProcessingRequestRepository.java
,JobExecutionRepository.java
,ReportRepository.java
- (opcional)
api-service/src/main/java/.../infra/db/mapper/
(MapStruct)UserMapper.java
,DataFileMapper.java
,ProcessingRequestMapper.java
api-service/src/test/java/.../infra/db/RepositoryIT.java
(tests de integración con Testcontainers)
Dependencias: se reutilizan las de HU F2-07 (JPA, PostgreSQL, Testcontainers). Si usas MapStruct, añade mapstruct
+ mapstruct-processor
(annotation processor) en api-service/pom.xml
.
api-service/
└─ src/main/java/com/practice/apiservice/infra/db/
├─ converter/MapToJsonConverter.java
├─ entity/
│ ├─ UserEntity.java
│ ├─ DataFileEntity.java
│ ├─ BatchJobConfigEntity.java
│ ├─ ProcessingRequestEntity.java
│ ├─ JobExecutionEntity.java
│ └─ ReportEntity.java
├─ repo/
│ ├─ UserRepository.java
│ ├─ DataFileRepository.java
│ ├─ BatchJobConfigRepository.java
│ ├─ ProcessingRequestRepository.java
│ ├─ JobExecutionRepository.java
│ └─ ReportRepository.java
└─ mapper/ (opcional)
Todo cuelga de
com.practice.apiservice
para que component-scan los detecte sin@EnableJpaRepositories/@EntityScan
.
- UserEntity → tabla
users
(id: uuid
,name
opcional). - DataFileEntity →
data_files
con:originalFilename
,storagePath
,sizeBytes
,checksumSha256
,uploadedAt
,uploadedBy (FK → users)
. - BatchJobConfigEntity →
batch_job_configs
con:name
,description
,chunkSize
,readerType
,writerType
,allowRestart
,createdAt
,active
. - ProcessingRequestEntity →
processing_requests
con:title
,dataFile (FK)
,parameters: jsonb
,status (enum)
,createdAt
,requestedBy (FK)
,batchJobConfig (FK)
. - JobExecutionEntity →
job_executions
con:processingRequest (FK)
,startTime
,endTime
,exitStatus (enum)
,readCount
,writeCount
,skipCount
,errorMessage
. - ReportEntity →
reports
con:processingRequest (FK)
,storagePath
,summaryJson (text)
,generatedAt
,generatedBy (FK)
.
Todas las
@Id
sonUUID
(columnDefinition = "uuid"
).parameters
usa JSONB con el converter de abajo.
MapToJsonConverter
serializa Map<String,String>
⇄ jsonb
usando Jackson. Se aplica en ProcessingRequestEntity.parameters
con @Convert(converter = MapToJsonConverter.class)
y @Column(columnDefinition = "jsonb")
.
No es obligatorio anotar con
@Repository
al extenderJpaRepository
(Spring Data registra el bean automáticamente). Puedes anotarlo si quieres hacerlo explícito.
-
UserRepository extends JpaRepository<UserEntity, UUID>
-
DataFileRepository extends JpaRepository<DataFileEntity, UUID>
-
BatchJobConfigRepository extends JpaRepository<BatchJobConfigEntity, UUID>
-
ProcessingRequestRepository extends JpaRepository<ProcessingRequestEntity, UUID>
Page<ProcessingRequestEntity> findByStatus(RequestStatus status, Pageable pageable)
-
JobExecutionRepository extends JpaRepository<JobExecutionEntity, UUID>
Optional<JobExecutionEntity> findTop1ByProcessingRequestIdOrderByStartTimeDesc(UUID processingRequestId)
-
ReportRepository extends JpaRepository<ReportEntity, UUID>
Si necesitas llevar entidades a objetos de dominio (p. ej. para B1/F2-06):
UserMapper
→User.ofId(entity.getId())
DataFileMapper
→ construyeDataFile
del core con sus camposProcessingRequestMapper
→ creaProcessingRequest
del core y ajustastatus
aplicando sus transiciones (markInProgress
,markCompleted
,markFailed
) según el enum almacenado.
BatchJobConfig
en el core usa builder conlogicalName
y genera ID; si solo necesitas el id en consultas, puedes omitir su reconstrucción completa hasta B5/B6.
-
Tipo:
@DataJpaTest
+ Testcontainers (reutiliza lo de F2-07). -
Esquema: para esta HU,
spring.jpa.hibernate.ddl-auto=create-drop
yspring.flyway.enabled=false
en el test. (En F2-09 migraremos a Flyway.) -
Cobertura:
- Persistir
ProcessingRequestEntity
conparameters
y leerlos de vuelta. - Crear 3
JobExecutionEntity
constartTime
distintos y comprobar quefindTop1ByProcessingRequestIdOrderByStartTimeDesc(...)
devuelve la última. - Consulta paginada
findByStatus(...)
.
- Persistir
Ejemplo de aserción de “última ejecución”
var last = execs.findTop1ByProcessingRequestIdOrderByStartTimeDesc(prId).orElseThrow();
assertThat(last.getStartTime()).isEqualTo(t2).isAfter(t1).isAfter(t3);
- Mantén
application-test.yml
sinspring.datasource.*
(Testcontainers inyecta). - Si tu Boot < 3.1, añade
@DynamicPropertySource
en el test para mapear las props del contenedor.
- CRUD básico para
User
,DataFile
,ProcessingRequest
pasa en tests de integración. - Query
findTopByProcessingRequestIdOrderByStartTimeDesc
devuelve la ejecución correcta. - Conversor JSONB funciona: inserta/lee
parameters
sin pérdida.
@Repository
en interfaces de Spring Data es opcional.- En B2-09 se introducirá Flyway para el esquema; por ahora los tests generan tablas con Hibernate (
create-drop
).
Objetivo: crear el esquema inicial con Flyway, mapear
JSONB
correctamente (sin acoplarcore
) y asegurar que los tests no ejecuten SQL de PostgreSQL en H2.
-
api-service/src/main/resources/db/migration/postgresql/V1__baseline.sql
(migración base vendorizada) -
api-service/src/main/resources/db/migration/postgresql/V2__alter_checksum_to_varchar.sql
(ajuste de tipos) -
api-service/src/main/resources/application-test.yml
(Flyway + validate, sin datasource) -
Entidades (ajustes) en
api-service
:ProcessingRequestEntity.parameters
→@JdbcTypeCode(SqlTypes.JSON)
+columnDefinition = "jsonb"
UserEntity.role
→@Enumerated(EnumType.STRING)
;createdAt
con@PrePersist
para default
-
Tests: desactivar Flyway solo en smokes que no usan BD o vendorizar migraciones
V1__baseline.sql
(ubicación: db/migration/postgresql/
):
-
Extensiones/Tipos Postgres
-
Tablas y claves foráneas:
users (id uuid PK, name varchar(140), email varchar(140) UNIQUE NOT NULL, role varchar(32) NOT NULL, created_at timestamptz NOT NULL DEFAULT now())
data_files
(checksum_sha256 varchar(64)
, etc.)batch_job_configs
processing_requests (parameters jsonb NOT NULL DEFAULT '{}'::jsonb)
job_executions
reports
-
Índices:
idx_pr_status_created (status, created_at DESC)
idx_je_pr_start_desc (processing_request_id, start_time DESC)
V2__alter_checksum_to_varchar.sql
:
ALTER TABLE data_files
ALTER COLUMN checksum_sha256 TYPE varchar(64);
Nota
users
: se actualizó para reflejar tuUserEntity
(role
,created_at
).
En ProcessingRequestEntity
:
import org.hibernate.annotations.JdbcTypeCode;
import org.hibernate.type.SqlTypes;
@JdbcTypeCode(SqlTypes.JSON)
@Column(columnDefinition = "jsonb", nullable = false)
private Map<String, String> parameters;
El campo
parameters
se persiste como jsonb nativo; no hace faltaAttributeConverter
niPGobject
.
application-test.yml
(tests con Testcontainers + Flyway):
spring:
jpa:
hibernate:
ddl-auto: validate
open-in-view: false
properties:
hibernate.jdbc.time_zone: UTC
flyway:
enabled: true
locations: classpath:db/migration/{vendor}
baseline-on-migrate: false
sql:
init:
mode: never
logging:
level:
root: WARN
org.springframework: WARN
org.flywaydb: INFO
application-dev.yml
(resumen):
spring:
datasource:
url: ${DB_URL:jdbc:postgresql://localhost:5432/dataflow}
username: ${DB_USER:app}
password: ${DB_PASSWORD:secret}
driver-class-name: org.postgresql.Driver
jpa:
hibernate:
ddl-auto: validate
properties:
hibernate.jdbc.time_zone: UTC
flyway:
enabled: true
locations: classpath:db/migration/{vendor}
baseline-on-migrate: true
Usar
{vendor}
permite que Flyway carguepostgresql/
con Postgres y (si lo necesitas)h2/
con H2.
-
Integración JPA/Repos:
@DataJpaTest
+ Testcontainers (PostgreSQL 16) con Flyway habilitado (ddl-auto=validate
).- Verifica: persistencia de
parameters
(jsonb), query de últimaJobExecution
, paginación porstatus
.
-
Smokes sin BD (web/actuator/config):
-
Deshabilitar Flyway en el test:
@SpringBootTest(properties = { "spring.flyway.enabled=false", "spring.jpa.hibernate.ddl-auto=none" }) class HealthExtraControllerIT { }
-
-
Opcional: si necesitas H2, crea migraciones en
db/migration/h2/
(sinCREATE EXTENSION
nijsonb
) y dejalocations: classpath:db/migration/{vendor}
.
-
Duplicated version (
Found more than one migration with version 1
):- Evita tener
V1__*.sql
ensrc/main/resources
ysrc/test/resources
a la vez. - Mantén todas las versionadas en
main
y usaR__*.sql
para seeds de test.
- Evita tener
-
H2 ejecutando SQL PG (
CREATE EXTENSION
,jsonb
):- Vendoriza migraciones (
{vendor}
) o desactiva Flyway en smokes.
- Vendoriza migraciones (
-
Mismatch de tipos (
char(64)
vsvarchar(64)
):- Aplica
V2__alter_checksum_to_varchar.sql
o ajusta@Column(columnDefinition = "char(64)")
(no recomendado).
- Aplica
Desarrollo (dev):
export SPRING_PROFILES_ACTIVE=dev DB_URL=jdbc:postgresql://localhost:5432/dataflow DB_USER=app DB_PASSWORD=secret
mvn -pl api-service spring-boot:run
Tests (PostgreSQL real):
mvn -pl api-service -Dtest=RepositoryIT,DbSmokeTest test
Debes ver en logs: Successfully applied V1
(+ V2
si procede) y ddl-auto=validate
sin errores.
- Migraciones Flyway aplican limpias en PostgreSQL (V1 + V2).
ProcessingRequestEntity.parameters
persiste/lee como jsonb.- Tests de integración pasan con Testcontainers; smokes no fallan por SQL de Postgres.
users
reflejaemail UNIQUE
,role
(string) ycreated_at
.
Objetivo: unificar el formato JSON de errores en toda la API con un
@RestControllerAdvice
, mapear correctamente las excepciones comunes (validación, negocio, infraestructura) y cubrirlo con tests de MVC.
api-service/src/main/java/.../exception/RestExceptionHandler.java
(nuevo handler global)api-service/src/test/java/.../RestExceptionHandlerTest.java
(tests de mapeo 400/404/503/500)- Elimina/sustituye
ApiErrorHandler
previo para evitar dos@RestControllerAdvice
en el contexto.
Dependencias: se reutilizan las de api-service
(incl. spring-boot-starter-validation
). No se tocan core
ni lib
.
{
"timestamp": "2025-08-11T03:15:29.123Z",
"path": "/ruta",
"code": "VALIDATION_ERROR",
"message": "Invalid request",
"fields": [ {"field": "title", "message": "must not be blank"} ],
"traceId": "..." // opcional si hay MDC/observabilidad
}
Tipos auxiliares:
FieldItem { field, message }
ErrorResponse { timestamp, path, code, message, fields[], traceId? }
HTTP | code | Excepción manejada |
---|---|---|
400 | VALIDATION_ERROR |
MethodArgumentNotValidException , BindException , ConstraintViolationException , HttpMessageNotReadableException (→ MALFORMED_JSON ) |
400 | BUSINESS_RULE_VIOLATION |
IllegalArgumentException (mensajes de dominio mapeados a fields cuando aplica) |
400 | FILE_TOO_LARGE |
FileTooLargeException |
404 | NOT_FOUND |
ResourceNotFoundException |
503 | SERVICE_UNAVAILABLE |
DataAccessException (problemas de BD/infra) |
500 | UNEXPECTED_ERROR |
Exception (catch‑all) |
Nota: los métodos
@ExceptionHandler
son públicos. El handler obtienetraceId
deMDC
si está disponible.
- No requieren cambios: cualquier excepción de las listadas será serializada con el formato anterior.
- Para reglas de dominio que hoy lanzan
IllegalArgumentException
, el handler responde 400 BUSINESS_RULE_VIOLATION y, si reconoce el mensaje, rellenafields
(por ejemplo:originalFilename is blank
→fields[originalFilename]
).
Registra explícitamente el controller de prueba y el advice:
MockMvc mvc = MockMvcBuilders
.standaloneSetup(new DummyController())
.setControllerAdvice(new RestExceptionHandler())
.build();
Casos cubiertos en RestExceptionHandlerTest
:
- 400 con
fields[]
(Bean Validation) - 404
NOT_FOUND
- 503
SERVICE_UNAVAILABLE
(simulandoDataAccessException
) - 500
UNEXPECTED_ERROR
@WebMvcTest(controllers = DummyController.class)
@Import(RestExceptionHandler.class)
@AutoConfigureMockMvc(addFilters = false) // si seguridad interfiere
Si ves 500 en lugar de 400/404/503: confirma que el advice está importado en el slice y que los
@ExceptionHandler
son public.
- 500 en tests de validación: faltaba importar el advice en
@WebMvcTest
o los@ExceptionHandler
no eranpublic
. HttpMessageNotReadableException
por JSON malformado: se devuelvecode="MALFORMED_JSON"
con 400.- Tests que no usan BD fallan por Flyway/H2: deshabilitar Flyway en esos tests (
spring.flyway.enabled=false
) o vendorizar migraciones (ver HU F2-09).
- Todas las respuestas de error comparten el mismo shape y
Content-Type: application/json
. - Validación → 400 con
fields[]
. - Recurso inexistente → 404 NOT_FOUND.
- Problemas de BD → 503 SERVICE_UNAVAILABLE.
- Errores no controlados → 500 UNEXPECTED_ERROR.
- Tests de MVC cubren los cuatro escenarios anteriores.
Objetivo: verificar la configuración de seguridad de la API mediante pruebas de integración con usuarios en memoria y autenticación básica.
api-service/src/test/java/.../SecurityIntegrationTest.java
(nuevo test de integración)api-service/src/main/java/.../config/SecurityConfig.java
(configuración de seguridad con usuarios en memoria y reglas de autorización)
Dependencias: se utilizan las de api-service
junto con H2 en memoria y spring-security-test
para soporte de MockMvc
con autenticación.
- Endpoints públicos (por ejemplo,
/ping
,/actuator/health
) accesibles sin autenticación → 200 OK. - Endpoints protegidos requieren credenciales válidas → 401 Unauthorized sin autenticación.
- Control de acceso por rol funciona correctamente (por ejemplo, endpoints solo para ADMIN → 403 Forbidden para usuarios USER).
- Autenticación exitosa pero recurso inexistente → 404 Not Found.
- Autenticación exitosa pero petición inválida → 400 Bad Request.
- Frameworks:
SpringBootTest
,MockMvc
,JUnit 5
. - Autenticación: HTTP Basic con usuarios en memoria (
user
/user123
,admin
/admin123
). - Base de datos: H2 en memoria configurada para el perfil de pruebas, Flyway deshabilitado para acelerar la ejecución.
mvn test -Dtest=SecurityIntegrationTest
/actuator/health
responde 200 sin auth;/processings/**
devuelve 401 sin credenciales.- Con
user/user123
se accede aGET /processings/{id}
; conadmin/admin123
se accede además a endpointsADMIN
si los hay. - Tests de seguridad cubren: 401, 403 y acceso válido.
- Para desarrollo local, CORS está habilitado para
http://localhost:3000
. - Las contraseñas se almacenan usando
BCryptPasswordEncoder
. - La configuración está definida en
SecurityConfig
y aplica tanto a entornos de producción como de prueba. - En los tests de integración se usan códigos de estado 404 o 400 para confirmar que la autenticación pasó, aunque la lógica de negocio no encuentre el recurso o el body sea inválido.
Objetivo: documentar ventajas y desventajas de Basic Auth y JWT (Bearer) en la API, y proponer una estrategia de migración para fases siguientes.
docs/security-authn.md
(este documento)- (Opcional PoC)
api-service/src/main/resources/application-jwt.yml
(perfil de pruebas para resource server JWT)
Dependencias (PoC JWT): spring-boot-starter-oauth2-resource-server
.
- Fase actual (F2): Basic Auth — simple, rápido para equipos internos y pruebas.
- Fase siguiente (F3+): JWT (Bearer) — mejor para microservicios, propagación entre servicios y escalabilidad.
Eje | Basic Auth | JWT (Bearer) |
---|---|---|
Estado | Envía credenciales en cada request. | Token firmado, stateless, con expiración. |
Rotación/Revocación | Cambiar contraseña del usuario. | Compleja: listas negras, tokens short‑lived, refresh tokens. |
Tamaño/latencia | Ligero, pero puede requerir lookups (si hay user service). | Token más pesado (claims), evita lookups por request. |
Microservicios | Cada salto debe revalidar credenciales. | Facilita propagación entre servicios con el mismo token. |
Seguridad | Requiere HTTPS; credenciales sensibles en tránsito con cada request. | HTTPS + gestión de llaves (JWK/Issuer) y relojes sincronizados. |
CSRF | Puede requerir protección si se usa desde navegador. | APIs stateless con Bearer suelen quedar fuera de riesgo de CSRF. |
Observabilidad | Sin claims; menos contexto. | Claims (sub, scope, roles, tenant) mejoran trazabilidad. |
Operación | Sin gestión de llaves. | Gestión de JWKS, rotación de firmas, reloj del cluster. |
-
Mantener Basic Auth en F2 (interno + pruebas) por rapidez y menor superficie de cambio.
-
Planificar migración a JWT en F3+ usando
spring-boot-starter-oauth2-resource-server
con validación de tokens desde un Issuer (Keycloak/Okta/issuer propio).- Tokens cortos (5–15 min).
- Rotación de llaves (JWKS).
- Scopes/roles en claims según dominios de negocio.
- Boundary de seguridad: validar el token en el edge (API gateway o cada microservicio) y propagar solo claims necesarios.
- Autorización: usar anotaciones (
@PreAuthorize
) basadas en authorities/scopes extraídos del token, o reglas en filtros. - Clock skew: configurar tolerancia (±60s).
- Errores: mapear 401 por token inválido/expirado y 403 por falta de scopes/roles.
Authorization: Bearer eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9...
spring:
security:
oauth2:
resourceserver:
jwt:
# Elige uno de los dos enfoques
# 1) Validación por Issuer (OpenID Connect/Keycloak/Okta)
issuer-uri: http://localhost:9000/realms/dataflow
# 2) Validación por clave pública (RSA) local
# jwk-set-uri: http://localhost:9000/realms/dataflow/protocol/openid-connect/certs
# Activar este perfil solo en el PoC para no interferir con Basic
spring:
profiles:
active: jwt
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-oauth2-resource-server</artifactId>
</dependency>
@Bean
SecurityFilterChain jwtFilterChain(HttpSecurity http) throws Exception {
http
.authorizeHttpRequests(a -> a
.requestMatchers("/actuator/health", "/ping").permitAll()
.anyRequest().authenticated())
.oauth2ResourceServer(o -> o.jwt());
return http.build();
}
- 200 con token válido (firma OK y no expirado).
- 401 con token sin firma válida o expirado.
- 403 con token válido pero sin el scope/rol requerido.
- Documento
docs/security-authn.md
con tabla comparativa y recomendación clara (Basic ahora, JWT luego). - (Si se realiza PoC) existe
application-jwt.yml
y un test que demuestra 200 con token válido y 401 con token inválido. - Configuración separada por perfil para no interferir con Basic en
dev
.
- Para producción, planificar rotación de llaves, tiempos de expiración cortos y estrategia de refresh tokens (si aplica a clientes de confianza).
- Considerar un Issuer central (Keycloak/Okta/Cognito) para estandarizar claims, scopes y rotación de llaves.
- La migración debe ser compatible: convivir temporalmente Basic (interno) y JWT (clientes externos) detrás de paths o perfiles distintos.
- Documentar error codes de autenticación/autorización para los clientes (401 vs 403).
Objetivo: habilitar y verificar métricas de Actuator, incluyendo métricas personalizadas para el conteo de ejecuciones de Job y pruebas de su exposición.
api-service/src/main/resources/application.yml
(configuración de exposición de endpoints Actuator y etiquetas de métricas)api-service/src/main/java/.../metrics/JobMetrics.java
(registro de métricas personalizadas con Micrometer)api-service/src/test/java/.../HealthActuatorIT.java
(test de integración para /actuator/health)api-service/src/test/java/.../MetricsActuatorIT.java
(test de integración para /actuator/metrics y métricas personalizadas)
Dependencias:
spring-boot-starter-actuator
(endpoints de observabilidad)io.micrometer:micrometer-registry-prometheus
(opcional para exposición en Prometheus)
/actuator/health
expuesto públicamente y retorna 200 OK con estadoUP
./actuator/metrics
accesible para usuarios autenticados.- Métrica personalizada
dataflow.job.executions.total
presente con tags configurados (result=success|fail
). - Incremento del contador al simular la finalización de un Job.
- Validación opcional de
/actuator/prometheus
si está habilitado.
-
Frameworks:
SpringBootTest
,MockMvc
,JUnit 5
. -
Base de datos: H2 en memoria para pruebas, Flyway deshabilitado.
-
Flujo de test:
- Llamar a
/actuator/health
y verificar estadoUP
. - Consultar
/actuator/metrics/dataflow.job.executions.total
y validar presencia demeasurements
. - Simular incremento de contador y verificar cambio.
- Llamar a
mvn test -Dtest=HealthActuatorIT,MetricsActuatorIT
/actuator/metrics
y/actuator/health
expuestos según perfil.dataflow.job.executions.total
visible y se incrementa en pruebas./actuator/prometheus
devuelve series con prefijodataflow_
si está habilitado.- Documentación breve en README sobre consulta y significado de métricas.
- En desarrollo, configurar cuidadosamente Actuator para no exponer información sensible.
- Tag
application
definido comodataflowhub-api
para dashboards. - Con Prometheus habilitado,
/actuator/prometheus
puede ser consumido por Prometheus o APM compatible.
Objetivo: habilitar logs en formato JSON que incluyan
traceId
para mejorar la trazabilidad y facilitar la integración con herramientas de análisis de logs.
api-service/src/main/resources/logback-spring.xml
(configuración para salida JSON con traceId/spandId en cada evento)api-service/src/test/java/.../StructuredLoggingIT.java
(test de integración para validar formato y campos clave)- Configuración por perfil en
application.yml
yapplication-dev.yml
para niveles de logging.
Dependencias:
- Opción A:
net.logstash.logback:logstash-logback-encoder
para JSON estructurado. - Opción B: Patrón JSON manual con
%mdc
(sin encoder externo).
- Logs en formato JSON en entornos dev y prod.
- Inclusión automática de
traceId
yspanId
en cada evento. - Niveles de log configurables por perfil (
DEBUG
en dev,INFO
en prod). - Integración opcional con Micrometer Tracing para poblar automáticamente el
traceId
. - Filtro
OncePerRequestFilter
para generar y propagartraceId
cuando no exista.
{
"timestamp": "2025-08-11T05:25:10.111Z",
"level": "INFO",
"logger": "com.dataflowhub.api.web.ProcessingController",
"traceId": "f9f1a2b3c4d5e67fa",
"message": "ProcessingRequest created",
"requestId": "7e2a1d7c-3b9b-4f1a-8a55-9a2f1e4c7788",
"user": "user@acme.com"
}
mvn test -Dtest=StructuredLoggingIT
- Logs en JSON en dev/prod con
traceId
presente. - Control de niveles por perfil (
DEBUG
en dev,INFO
en prod). - Tests validan presencia de campos clave (
traceId
,message
). - README de "Observabilidad" documenta cómo visualizar
/actuator/*
y ejemplo de log JSON.
- No incluir datos sensibles en los logs.
- Se recomienda incluir campos de contexto como:
requestId
,user
,endpoint
. - Niveles:
INFO
normal,WARN
recuperable,ERROR
con stacktrace.
Objetivo: permitir la ejecución bajo demanda de un Job de Spring Batch vía un endpoint REST seguro y restringido a administradores.
api-service/src/main/java/.../rest/JobController.java
(nuevo controlador para ejecutar Jobs vía REST)api-service/src/test/java/.../JobControllerTest.java
(tests unitarios)- Configuración de Spring Batch en perfiles
dev
,test
yprod
(application-*.yml
).
Dependencias: spring-boot-starter-batch
(solo en api-service
).
-
Endpoint
POST /jobs/{configId}/run
protegido con rol ADMIN. -
Construcción de
JobParameters
con:processingRequestId
(UUID como String)configId
(desde el path)requestTime
(Instant en ISO-8601 o epoch para asegurar unicidad)
-
Uso de
JobLauncher.run(job, params)
para iniciar el Job y retorno 202 Accepted con:- Header
Location
apuntando a/processings/{processingRequestId}
- Body con
jobInstanceId
yjobExecutionId
.
- Header
-
Control de concurrencia: si existe una ejecución
RUNNING
con los mismos parámetros clave, responde 409 Conflict con códigoJOB_ALREADY_RUNNING
. -
Registro de métricas personalizadas (
dataflow.job.executions.total
). -
Manejo de errores:
- Config desconocido → 404 CONFIG_NOT_FOUND
- Error de infraestructura → 503
- Unitarias: Mock de
JobLauncher
para verificar construcción deJobParameters
y respuesta 202. - Integración: con Spring Batch activo, ejecuta un Job dummy y verifica registros en tablas
BATCH_*
.
mvn test -Dtest=JobControllerTest
Request (mínimo):
POST /jobs/csv_to_ipa_v1/run
Content-Type: application/json
{
"processingRequestId": "7e2a1d7c-3b9b-4f1a-8a55-9a2f1e4c7788",
"parameters": { "delimiter": ";" }
}
Response (202):
{
"jobInstanceId": 123,
"jobExecutionId": 456
}
Conflicto si ya corre → 409:
{
"code": "JOB_ALREADY_RUNNING"
}
- Endpoint protegido (ADMIN) retorna 202 y
Location
válido. - Si existe ejecución RUNNING con mismos parámetros clave → 409.
- Spring Batch crea registros en tablas
BATCH_*
o mock verificado en unit test. - Métrica
dataflow.job.executions.total
incrementa al aceptar.
- Perfiles: en
dev
ytest
usar H2; enprod
conectar a base persistente. - Persistencia: esquema JDBC de Spring Batch se autoinicializa en
dev/test
; enprod
incluir en Flyway. - Seguridad: restringido a rol
ADMIN
según configuración deSecurityConfig
. - Métricas: visibles vía
/actuator/metrics/dataflow.job.executions.total
.
Objetivo: Automatizar la ejecución diaria de Jobs Batch a las 02:00 en el entorno de producción, con control de concurrencia, métricas y logs estructurados.
api-service/src/main/java/.../BatchScheduler.java
(nuevo scheduler)application-prod.yml
(propiedadscheduling.enabled=true
)application-dev.yml
yapplication-test.yml
(propiedadscheduling.enabled=false
)
Dependencias: uso de spring-context
para @EnableScheduling
y @Scheduled
.
- El scheduler se activa únicamente en perfil
prod
gracias a la propiedadscheduling.enabled=true
. - A las 02:00 se ejecuta el método planificado (
@Scheduled(cron = "0 0 2 * * *")
). - Consulta en la base de datos todas las solicitudes de procesamiento (
ProcessingRequest
) con estado PENDING (máx. 50 por ciclo). - Para cada solicitud, invoca internamente el mismo servicio que expone el endpoint
/jobs/{configId}/run
de la HU F2-16. - Verifica mediante
JobExplorer
que no exista ya una ejecución RUNNING para el mismoprocessingRequestId
. - Implementa control de back-pressure: si hay más de
N
ejecuciones en estado RUNNING, pospone el resto. - Registra en métricas (
dataflow.scheduler.trigger{result=launched|skipped}
) y logs (traceId
) el resultado de cada ejecución.
- Prod: activo a las 02:00.
- Dev/Test: desactivado (
scheduling.enabled=false
).
application-prod.yml
scheduling:
enabled: true
Log esperado (JSON)
{
"message": "Scheduler launched 17 jobs",
"result": "launched",
"pending": 23,
"launched": 17,
"skipped": 6,
"traceId": "f91fa2b3c4d5e67a"
}
- Con
scheduling.enabled=true
en prod, el método se ejecuta a las 02:00. - No lanza un job si ya hay una ejecución RUNNING para el mismo
processingRequestId
. - Métrica
dataflow.scheduler.trigger
visible en/actuator/metrics
. - Tests unitarios verifican el comportamiento en ambos perfiles.
- En entornos locales, mantener
scheduling.enabled=false
para evitar ejecuciones involuntarias. - La implementación actual está preparada para integrarse con el flujo real de ejecución de jobs definido en la HU F2-16.
- Los logs incluyen
traceId
para permitir trazabilidad completa de ejecuciones.
Objetivo: preparar la aplicación
api-service
para ser empaquetada y ejecutada en contenedores Docker, incluyendo configuración para PostgreSQL y perfiles de ejecución.
Dockerfile
(nuevo) — Construcción multi-stage (JDK para compilación, JRE para ejecución) con empaquetado optimizado.docker-compose.yml
(nuevo) — Orquestación deapi-service
y base de datos PostgreSQL.- Configuración en
application.properties
para soportar variables de entorno y perfiles (prod
,dev
,test
).
Dependencias:
- Imagen base
eclipse-temurin
(Java 21). - PostgreSQL 16-alpine.
-
Construcción del JAR mediante multi-stage build:
- Etapa de compilación con Maven y JDK 21.
- Etapa final con JRE 21 y el
app.jar
listo para ejecutar.
-
Ejecución en contenedor con variables de entorno para DB y perfil activo.
-
Orquestación con
docker-compose
que levanta la app y PostgreSQL en red compartida. -
Posibilidad de usar
host.docker.internal
para conectar a PostgreSQL local en entornos sin Compose.
docker build -t dataflowhub/api-service .
docker run --rm -p 8080:8080 \
-e SPRING_PROFILES_ACTIVE=prod \
-e SPRING_FLYWAY_ENABLED=false \
-e SPRING_DATASOURCE_URL='jdbc:h2:mem:testdb;MODE=PostgreSQL' \
dataflowhub/api-service
docker compose up --build
Esto levanta PostgreSQL (db
) y api-service
conectados en la misma red Docker.
docker run --rm -p 8080:8080 \
-e SPRING_DATASOURCE_URL='jdbc:postgresql://host.docker.internal:5432/dataflow' \
-e SPRING_DATASOURCE_USERNAME=app \
-e SPRING_DATASOURCE_PASSWORD=secret \
dataflowhub/api-service
services:
db:
image: postgres:16-alpine
environment:
POSTGRES_DB: dataflow
POSTGRES_USER: app
POSTGRES_PASSWORD: secret
ports:
- "5432:5432"
api:
build: .
environment:
SPRING_PROFILES_ACTIVE: prod
SPRING_DATASOURCE_URL: jdbc:postgresql://db:5432/dataflow
SPRING_DATASOURCE_USERNAME: app
SPRING_DATASOURCE_PASSWORD: secret
ports:
- "8080:8080"
depends_on:
- db
- Se puede construir la imagen del servicio con
docker build
sin errores. - La aplicación inicia correctamente en contenedor usando DB en Compose.
- Se soporta conexión a DB local o en otro contenedor usando variables de entorno.
- El empaquetado multi-stage reduce el tamaño final de la imagen.
- En Windows/Mac,
host.docker.internal
apunta al host; en Linux puede requerir configuración adicional. - Flyway está habilitado en
prod
por defecto; deshabilitar (SPRING_FLYWAY_ENABLED=false
) si no se desea migrar en contenedor. - Para desarrollo, se recomienda mapear
application.properties
externos como volumen para ajustes rápidos sin reconstruir la imagen.
Configurar un entorno reproducible para levantar la base de datos PostgreSQL y la aplicación de forma conjunta mediante docker-compose, facilitando el arranque local y en CI/CD.
-
Creación de
docker-compose.yml
con dos servicios:- db: imagen oficial de PostgreSQL (v16-alpine) configurada con credenciales y base de datos
dataflow
. - api: imagen generada de la app (
dataflowhub/api-service:latest
).
- db: imagen oficial de PostgreSQL (v16-alpine) configurada con credenciales y base de datos
-
Configuración de variables de entorno para la conexión a la base de datos y perfil
prod
. -
Implementación de healthchecks en ambos servicios para garantizar que
api
solo inicie tras confirmarse quedb
está saludable. -
Persistencia de datos mediante volumen
pgdata
asociado a/var/lib/postgresql/data
. -
Parámetros adicionales:
- Puerto
5432
expuesto para acceso local a la base de datos. - Puerto
8080
expuesto para acceso a la API. - Variable
SCHEDULING_ENABLED
configurada afalse
para evitar ejecuciones involuntarias del scheduler en local/compose.
- Puerto
# Levantar la base de datos y la app en segundo plano
docker compose up -d
# Verificar estado de la API
curl http://localhost:8080/actuator/health
- Usar archivo
.env
para credenciales en local y variables distintas en CI/Prod. - Si se usa Flyway, confirmar que
V1__baseline.sql
se aplica correctamente al inicio. - Para logs JSON, asegurarse de incluir
logback-spring.xml
(B5) en la imagen.
docker compose up -d
levantadb
saludable y luegoapi
.curl http://localhost:8080/actuator/health
devuelveUP
.- La app se conecta a PostgreSQL y aplica migraciones Flyway sin errores.
- Variables de entorno controlan correctamente el perfil y el scheduler.
- El healthcheck de la base de datos usa
pg_isready -U app -d dataflow
. - El healthcheck de la API consulta
/actuator/health
y verifica estadoUP
. - Compatible con entornos locales y CI/CD sin cambios en configuración.
Agregar en api-service/pom.xml
:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
- Desactivar ejecución automática de jobs:
spring.batch.job.enabled=false
- Desactivar auto‑init de Spring Batch (usar Flyway):
spring.batch.jdbc.initialize-schema=never
- Desactivar DDL automático de Hibernate:
spring.jpa.hibernate.ddl-auto=none
- Configurar datasource (ejemplo PostgreSQL UTC):
spring.datasource.url=jdbc:postgresql://localhost:5432/appdb
spring.datasource.username=app
spring.datasource.password=app
spring.jpa.properties.hibernate.jdbc.time_zone=UTC
- Crear archivo
src/main/resources/db/migration/V1__spring_batch_schema.sql
con el script oficial de Spring Batch para PostgreSQL (schema-postgresql.sql
). - Si el esquema ya tiene tablas y no existe
flyway_schema_history
, establecer en el perfil correspondiente:
spring:
flyway:
baseline-on-migrate: true
baseline-version: 1
Esto evita que Flyway intente recrear tablas existentes.
- Iniciar la aplicación y comprobar en logs que las migraciones Flyway se aplicaron correctamente.
- Consultar en la base:
SELECT count(*) FROM BATCH_JOB_INSTANCE;
SELECT count(*) FROM BATCH_JOB_EXECUTION;
SELECT count(*) FROM BATCH_STEP_EXECUTION;
- Confirmar que existe la tabla
flyway_schema_history
.
- La app inicia sin errores relacionados a tablas batch.
spring.batch.job.enabled=false
activo en todos los perfiles.- Tablas
BATCH_*
visibles en la base configurada. - Migraciones registradas en
flyway_schema_history
.
- En dev/test se puede limpiar el esquema para aplicar
V1__...
desde cero. - En prod nunca usar
spring.batch.jdbc.initialize-schema=always
; siempre controlar el esquema con Flyway.
Configurar la infraestructura base de Spring Batch para exponer JobRepository
, JobLauncher
y JobRegistry
de forma controlada, permitiendo registrar y lanzar jobs por nombre.
- Extender
DefaultBatchConfiguration
para personalizarDataSource
yPlatformTransactionManager
. - Declarar un único
JobRegistry
y registrador (JobRegistrySmartInitializingSingleton
) para evitar duplicados. - Definir un job de prueba (
demoJob
) y un step (demoStep
) que utilice explícitamente elPlatformTransactionManager
.
@Bean
public JobRegistry jobRegistry() {
return new MapJobRegistry();
}
@Bean
public JobRegistrySmartInitializingSingleton jobRegistryInitializer(JobRegistry jobRegistry) {
JobRegistrySmartInitializingSingleton init = new JobRegistrySmartInitializingSingleton();
init.setJobRegistry(jobRegistry);
return init;
}
@Bean
public Job demoJob(JobRepository jobRepository, Step demoStep) {
return new JobBuilder("demoJob", jobRepository)
.start(demoStep)
.build();
}
@Bean
public Step demoStep(JobRepository jobRepository, PlatformTransactionManager tx) {
return new StepBuilder("demoStep", jobRepository)
.tasklet((contribution, chunkContext) -> RepeatStatus.FINISHED, tx)
.build();
}
- Ejecutar un test de integración que obtenga
demoJob
delJobRegistry
y lo lance conJobLauncher
, validando que el estado seaCOMPLETED
. - Revisar en logs el registro del job (
Registering job: demoJob
). - Confirmar que no se generan errores
DuplicateJobException
.
- Beans
JobRepository
,JobLauncher
yJobRegistry
presentes en el contexto. - Un único registrador de jobs activo.
- Job de prueba ejecutable y con estado
COMPLETED
.
- Evitar coexistencia de
JobRegistryBeanPostProcessor
yJobRegistrySmartInitializingSingleton
. - Usar
PlatformTransactionManager
explícitamente en todos los steps (tasklet
ochunk
).
Implementar listeners reutilizables para todos los jobs y steps que registren logs estructurados y publiquen métricas operativas vía Micrometer.
-
LoggingJobExecutionListener
: implementaJobExecutionListener
para:-
Loguear inicio y fin de cada job (
jobName
,executionId
, parámetros, estado y duración). -
Publicar métricas:
dataflow.job.executions.total{job, status}
(counter)dataflow.job.duration{job}
(timer)
-
-
MetricsStepListener
: implementaStepExecutionListener
para:-
Capturar tiempos de ejecución de cada step.
-
Publicar contadores de
readCount
,writeCount
,skipCount
. -
Publicar métrica:
dataflow.step.duration{job, step}
(timer)dataflow.step.reads{job, step}
(counter)dataflow.step.writes{job, step}
(counter)dataflow.step.skips{job, step}
(counter)
-
- Registrar
LoggingJobExecutionListener
a nivel de job. - Registrar
MetricsStepListener
a nivel de step. - Ejemplo:
@Bean
public Job demoJob(JobRepository jobRepository,
Step demoStep,
LoggingJobExecutionListener jobListener) {
return new JobBuilder("demoJob", jobRepository)
.listener(jobListener)
.start(demoStep)
.build();
}
@Bean
public Step demoStep(JobRepository jobRepository,
PlatformTransactionManager tx,
MetricsStepListener stepListener) {
return new StepBuilder("demoStep", jobRepository)
.tasklet((contribution, chunkContext) -> RepeatStatus.FINISHED, tx)
.listener(stepListener)
.build();
}
{"event":"JOB_END","job":"csvToJpaJob","executionId":456,"status":"COMPLETED","durationMs":8421}
- Al ejecutar un job, revisar en logs el inicio y fin con
executionId
y duración. - Consultar en
/actuator/metrics
que las métricas anteriores estén disponibles con etiquetasjob
ystep
.
- Cualquier job nuevo hereda estos listeners sin código adicional.
- Logs incluyen
executionId
y duración. - Métricas visibles en Actuator y Prometheus (si está configurado).
- Requiere dependencia
micrometer-core
y configuración de Actuator. - Evitar registrar múltiples listeners del mismo tipo para no duplicar métricas o logs.
Contrato canónico para el job principal (csvToJpaJob
). Define claves, tipos, si son identificantes de JobInstance
, obligatoriedad, validaciones y descripción.
Clave | Tipo Batch | Identif. | Oblig. | Validación / Dominio | Descripción |
---|---|---|---|---|---|
processingRequestId |
String | Sí | Sí | UUID válido | Id del pedido/lote a procesar. Fija la identidad del JobInstance. |
configId |
String | Sí | Sí | BatchJobConfig existente |
Plantilla/config de job a usar. |
storagePath |
String | No | Sí | Ruta legible/existente | Origen del archivo a procesar. |
delimiter |
String | No | No | Uno de , ; |
Separador CSV (default , ). |
requestTime |
Date | Sí* | No | ISO‑8601 o epoch ms | Marca única para forzar nueva instancia (ver reglas). |
chunkSize |
Long | No | No | Rango 100..10_000 |
Tamaño del chunk (tuning). |
Identificante = participa en la identidad de
JobInstance
. Dos ejecuciones con los mismos parámetros identificantes pertenecen al mismoJobInstance
(útil pararestart
). Si cambia alguno, se crea otroJobInstance
(re‑ejecución/otra versión).
processingRequestId
yconfigId
→ siempre identificantes.requestTime
→ identificante solo si se suministra: úsalo para forzar nueva instancia (HU F3‑06).storagePath
ydelimiter
→ no identificantes (pueden variar sin cambiar la identidad si son parte del mismo pedido).- Documentar toda decisión de identidad; cualquier cambio requiere migración/rollback plan.
Spring Batch permite tipos: String, Long, Double, Date. Ejemplo de payload REST y cómo se mapea:
{
"processingRequestId": "7e2a7c1e-9a37-49a0-9b2e-6c4a3a2f8f10",
"configId": "csv_to_jpa_v1",
"storagePath": "/data/in/ventas_julio.csv",
"delimiter": ";",
"chunkSize": 500,
"requestTime": "2025-08-12T00:15:00Z"
}
---
### HU F3-05 – JobParametersValidator por job
#### 1. Objetivo
Implementar validación centralizada de parámetros de job mediante `JobParametersValidator` específico para cada job productivo.
#### 2. Implementación principal
* Crear clase `CsvToJpaJobParametersValidator` que verifique:
* Presencia de `processingRequestId`, `configId` y `storagePath`.
* Formato UUID válido en `processingRequestId`.
* Valores permitidos para `delimiter` y rango válido para `chunkSize`.
* (Opcional) Validar existencia del archivo en `storagePath` si el entorno lo permite, o diferir validación al `ItemReader`.
* Registrar el validador en la definición del job:
```java
@Bean
public Job csvToJpaJob(JobRepository jobRepository,
Step csvToJpaStep,
CsvToJpaJobParametersValidator validator) {
return new JobBuilder("csvToJpaJob", jobRepository)
.validator(validator)
.start(csvToJpaStep)
.build();
}
-
Implementar tests unitarios que:
- Pasen con parámetros válidos.
- Llamen a
validate(...)
y confirmen que se lanzaJobParametersInvalidException
para faltantes, formato inválido o valores fuera de rango.
Parámetros válidos:
processingRequestId=7e2a... (UUID)
configId=csv_to_jpa_v1
storagePath=/data/in/ventas.csv
delimiter=; (opcional)
chunkSize=1000 (opcional)
Parámetros inválidos (lanza JobParametersInvalidException):
processingRequestId=not-a-uuid
configId=csv_to_jpa_v1
- Ejecutar un job con parámetros válidos → pasa validación y arranca.
- Ejecutar un job con parámetros faltantes o inválidos → falla antes de iniciar steps y muestra mensaje descriptivo.
- Todos los jobs productivos tienen
validator()
asociado. - Tests cubren al menos: OK, falta clave, formato inválido, valor fuera de rango.
- Mensajes de excepción son descriptivos e incluyen el nombre de la clave inválida.
Permitir re‑ejecutar un job como nueva JobInstance (no restart) controlando la identidad mediante un JobParametersIncrementer
seleccionable por propiedad.
-
RunIdIncrementer (por defecto)
- Añade/actualiza
run.id
(Long, identificante) → cada ejecución crea nueva JobInstance. - Útil para re‑procesos genéricos sin semántica adicional.
- Añade/actualiza
-
RequestTimeIncrementer (custom)
- Añade/actualiza
requestTime
(Date, identificante) conInstant.now()
. - Útil cuando se desea trazar “cuándo” se pidió el re‑proceso.
- Consistente con F3‑04:
requestTime
es identificante solo si está presente.
- Añade/actualiza
Restart vs Re‑ejecución:
- Restart: mismos parámetros identificantes y
JobOperator.restart(executionId)
→ mismaJobInstance
.- Re‑ejecución: altera al menos un identificante (p. ej.,
run.id
orequestTime
) → nuevaJobInstance
.
Beans de incrementer (config seleccionable):
@Configuration
public class IncrementerConfig {
@Bean
@ConditionalOnProperty(name = "batch.incrementer", havingValue = "runId", matchIfMissing = true)
public JobParametersIncrementer runIdIncrementer() { return new RunIdIncrementer(); }
@Bean
@ConditionalOnProperty(name = "batch.incrementer", havingValue = "requestTime")
public JobParametersIncrementer requestTimeIncrementer() { return new RequestTimeIncrementer(); }
}
Asociación al job:
@Bean
public Job demoJob(JobRepository jobRepository,
Step demoStep,
JobParametersIncrementer incrementer,
CsvToJpaJobParametersValidator validator,
LoggingJobExecutionListener jobListener) {
return new JobBuilder("demoJob", jobRepository)
.validator(validator)
.listener(jobListener)
.incrementer(incrementer)
.start(demoStep)
.build();
}
Propiedades (por perfil):
# Default: nueva instancia con run.id
batch.incrementer=runId
# Alternativa semántica:
# batch.incrementer=requestTime
- Usar H2 embebida y
spring.batch.jdbc.initialize-schema=always
. - Desactivar Flyway y auto‑lanzamiento de jobs.
- Registrar un
MeterRegistry
simple si hay listeners con métricas. - Importante:
JobLauncherTestUtils.launchJob(params)
no aplica el incrementer. Para el segundo lanzamiento, aplícalo manualmente o usaJobOperator.startNextInstance(jobName)
.
Snippet base:
JobParameters base = new JobParametersBuilder()
.addString("processingRequestId", UUID.randomUUID().toString())
.addString("configId", "csv_to_jpa_v1")
.addString("storagePath", "/tmp/input.csv")
.toJobParameters();
JobExecution first = utils.launchJob(base);
JobParameters next = incrementer.getNext(base); // run.id o requestTime
JobExecution second = utils.launchJob(next);
Properties típicas de test:
spring.batch.job.enabled=false
spring.batch.jdbc.initialize-schema=always
spring.flyway.enabled=false
# H2 en memoria, si el test usa Spring Boot Test
spring.datasource.url=jdbc:h2:mem:batchtest;MODE=PostgreSQL;DB_CLOSE_DELAY=-1
spring.datasource.driverClassName=org.h2.Driver
spring.datasource.username=sa
spring.datasource.password=
- Dos lanzamientos consecutivos con el incrementer producen JobInstance distintas (
BATCH_JOB_INSTANCE
crece +1). - Ambos
JobExecution
finalizan conExitStatus.COMPLETED
. - En logs/metrics se distinguen ambas ejecuciones (listeners de F3‑03).
- La estrategia de incrementer se puede elegir por propiedad (
batch.incrementer
). - El job aplica
incrementer(...)
y crea nuevaJobInstance
cuando corresponde. - Existen pruebas que demuestran: (a) re‑ejecución crea nueva instancia, (b) restart mantiene la misma instancia.
- README y
docs/JobParametersSpec.md
actualizados (relación conrequestTime
).
- JobInstanceAlreadyComplete: estás re‑lanzando con los mismos identificantes. Aplica
incrementer.getNext(base)
o usastartNextInstance
. - Contexto de test no levanta: usa contexto mínimo, registra
MeterRegistry
y@ComponentScan
para listeners. - Duplicado de registradores: no mezclar
JobRegistryBeanPostProcessor
yJobRegistrySmartInitializingSingleton
(ver F3‑02).
Exponer un ItemReader streaming para CSV que se parametriza en tiempo de ejecución con storagePath
y delimiter
, mapea cada línea a un POJO y valida el header.
-
POJO (
ImportRecord
):externalId:String
,userEmail:String
,amount:BigDecimal
,eventTime:Instant
(ymeta:Map<String,String>
opcional). -
Reader:
FlatFileItemReader<ImportRecord>
@StepScope con parámetros inyectados:@Value("#{jobParameters['storagePath']}") String storagePath
@Value("#{jobParameters['delimiter']?:','}") String delimiter
-
Header:
linesToSkip=1
+SkippedLinesCallback
que verifica el encabezado esperado. -
Tokenizer:
DelimitedLineTokenizer
consetNames("external_id","user_email","amount","event_time")
ysetDelimiter(delimiter)
. -
Mapper de campos:
ImportRecordFieldSetMapper
(parseaBigDecimal
eInstant
ISO‑8601). -
Strict mode:
reader.setStrict(true)
para fallar si el archivo no existe (si prefieres validar en F3‑05, ponfalse
). -
El
FlatFileItemReader
lee en streaming (no carga todo a memoria).
Bean del reader
@Bean
@StepScope
public FlatFileItemReader<ImportRecord> importRecordReader(
@Value("#{jobParameters['storagePath']}") String storagePath,
@Value("#{jobParameters['delimiter']?:','}") String delimiter) {
var reader = new FlatFileItemReader<ImportRecord>();
reader.setName("importRecordReader");
reader.setResource(new FileSystemResource(storagePath));
reader.setStrict(true);
reader.setLinesToSkip(1);
reader.setSkippedLinesCallback(line -> {
String expected = "external_id,user_email,amount,event_time";
if (!expected.equals(line)) throw new FlatFileParseException(
"Invalid header. Expected: " + expected + " but was: " + line, line, 1);
});
var tokenizer = new DelimitedLineTokenizer();
tokenizer.setDelimiter(delimiter);
tokenizer.setQuoteCharacter('"');
tokenizer.setStrict(true);
tokenizer.setNames("external_id","user_email","amount","event_time");
var mapper = new DefaultLineMapper<ImportRecord>();
mapper.setLineTokenizer(tokenizer);
mapper.setFieldSetMapper(new ImportRecordFieldSetMapper());
mapper.afterPropertiesSet();
reader.setLineMapper(mapper);
return reader;
}
Mapper de campos (extracto)
public class ImportRecordFieldSetMapper implements FieldSetMapper<ImportRecord> {
public ImportRecord mapFieldSet(FieldSet fs) {
var amount = Optional.ofNullable(fs.readString("amount"))
.filter(s -> !s.isBlank()).map(BigDecimal::new).orElse(null);
var time = Optional.ofNullable(fs.readString("event_time"))
.filter(s -> !s.isBlank()).map(Instant::parse).orElse(null);
return new ImportRecord(
fs.readString("external_id"), fs.readString("user_email"), amount, time);
}
}
Test recomendado
- Crear archivo temporal con header esperado y 2–3 filas.
- Inyectar parámetros con
StepScopeTestUtils.doInStepScope(...)
y leer conreader.read()
hastanull
.
JobParameters params = new JobParametersBuilder()
.addString("storagePath", tmpFile.toString())
.addString("delimiter", ",")
.toJobParameters();
StepScopeTestUtils.doInStepScope(MetaDataInstanceFactory.createStepExecution(params), () -> {
reader.open(new ExecutionContext());
var r1 = reader.read(); var r2 = reader.read(); var r3 = reader.read();
reader.close();
assertThat(r1.getExternalId()).isEqualTo("A1");
assertThat(r2).isNotNull();
assertThat(r3).isNull();
return null;
});
- Con un CSV de muestra, el reader lee >100k filas en tiempo aceptable (definir objetivo) sin picos de memoria.
- Si el archivo no existe o el header es inválido → error claro en logs.
- El delimiter es configurable: por defecto
,
; si se pasa;
en parámetros, se parsea correctamente.
- Bean
FlatFileItemReader<ImportRecord>
anotado con @StepScope y parámetros inyectados. - Header validado y mapeo correcto a
ImportRecord
(incluyeBigDecimal
/Instant
). - Lectura streaming (sin OOM) en archivos grandes.
- Test de integración del reader en verde.
- Si el CSV tiene comillas/escapes complejos, considera usar OpenCSV o Commons CSV dentro del
FieldSetMapper
(se mantiene elFlatFileItemReader
). - Integración con F3‑05: el validator puede verificar existencia de
storagePath
y dominio dedelimiter
antes de abrir el reader. - Para trabajar con Windows, valida encoding/line endings si el header no coincide.
Centralizar reglas de validación y normalización antes del writer. Soporta dos estrategias:
- filter: devuelve
null
y el registro se descarta (no cuenta como skip). - exception: lanza
RecordValidationException
; luego el Step (F3‑10) lo contará con.skip(...)
.
-
Bean:
ImportRecordProcessor
(@StepScope
,ItemProcessor<ImportRecord, ImportRecord>
). -
Propiedades:
batch.processor.validation-mode=exception|filter
batch.processor.event-window-years=2
(ventana válida paraeventTime
).
-
Validaciones (ejemplos):
externalId
: requerido y único en el chunk (usaSet
en el processor).userEmail
: requerido + regex de email.amount
: requerido y>= 0
.eventTime
: requerido, no futuro y dentro de la ventana.
-
Transformaciones:
userEmail
→ minúsculas.amount
→setScale(2, HALF_UP)
.eventTime
ya esInstant
(UTC).
Excepción de validación
public class RecordValidationException extends RuntimeException {
public static final class FieldError {
private final String field, reason;
public FieldError(String field, String reason) { this.field = field; this.reason = reason; }
public String getField() { return field; }
public String getReason() { return reason; }
}
private final long rowNumber; private final List<FieldError> errors;
public RecordValidationException(long rowNumber, List<FieldError> errors) {
super("row=" + rowNumber + " errors=" + errors);
this.rowNumber = rowNumber; this.errors = errors;
}
public long getRowNumber() { return rowNumber; }
public List<FieldError> getErrors() { return errors; }
}
Processor @StepScope (extracto)
@Component
@StepScope
public class ImportRecordProcessor implements ItemProcessor<ImportRecord, ImportRecord> {
private final boolean throwOnValidation; // exception|filter
private final int windowYears; private final Clock clock = Clock.systemUTC();
private final Set<String> seenIds = new HashSet<>(); private long row = 0;
public ImportRecordProcessor(
@Value("${batch.processor.validation-mode:exception}") String mode,
@Value("${batch.processor.event-window-years:2}") int windowYears) {
this.throwOnValidation = !"filter".equalsIgnoreCase(mode);
this.windowYears = windowYears;
}
@Override
public ImportRecord process(ImportRecord in) {
row++;
var errs = new ArrayList<RecordValidationException.FieldError>();
// validaciones (id requerido/único, email válido, amount >= 0, fecha no futura/en ventana)
if (!errs.isEmpty()) {
if (throwOnValidation) throw new RecordValidationException(row, errs);
return null; // filtrado
}
in.setUserEmail(in.getUserEmail().toLowerCase());
in.setAmount(in.getAmount().setScale(2, RoundingMode.HALF_UP));
return in;
}
}
Listener opcional
@Component
public class LoggingProcessListener implements ItemProcessListener<ImportRecord, ImportRecord> {
public void afterProcess(ImportRecord in, ImportRecord out) {
if (out == null) log.debug("Filtered record: externalId={}", in.getExternalId());
}
}
- Entrada:
{ externalId:"A-001", userEmail:"ANA@ACME.COM", amount:"12.5", event_time:"2025-07-01T12:00:00Z" }
- Salida:
{ externalId:"A-001", userEmail:"ana@acme.com", amount:12.50, eventTime:"2025-07-01T12:00:00Z" }
- Inválido:
{ amount:-3 } → RecordValidationException(field="amount", reason="NEGATIVE")
(modoexception
) onull
(modofilter
).
-
Dataset de 5–8 filas mezclando válidas/ inválidas.
-
Aserciones:
- Normalizaciones: email minúscula, escala
2
en amount. - Duplicado de
externalId
dentro de chunk detectado. - Comportamiento según
validation-mode
.
- Normalizaciones: email minúscula, escala
- Reglas aplicadas y documentadas; válidos avanzan al writer.
- Inválidos se filtran o generan skip (según estrategia).
- Tests cubren casos borde (email raro, montos con 3 decimales, fechas futuras/fuera de ventana).
- Processor puro (sin I/O ni DB) para rendimiento.
DUPLICATED_IN_CHUNK
: revisa el tamaño de chunk según negocio.- Para skip en C3, configura el step con
.faultTolerant().skip(RecordValidationException.class).skipLimit(N)
. - Importa desde el JDK:
Clock
,Instant
,ChronoUnit
,RoundingMode
(si el IDE los marca en rojo, revisa SDK/bytecode y cache del IDE).
Persistir los registros válidos del job en una tabla de staging mediante inserciones por lotes y dentro de transacciones por chunk, evitando duplicados en re‑ejecuciones.
Crear tabla import_records
y restricciones para idempotencia.
-- V3__create_import_records.sql
create table if not exists import_records (
id uuid primary key,
processing_request_id uuid not null,
external_id varchar(64) not null,
user_email varchar(140) not null,
amount numeric(18,2) not null,
event_time timestamptz not null,
created_at timestamptz not null default now()
);
create unique index if not exists ux_import_records_req_ext
on import_records (processing_request_id, external_id);
create index if not exists ix_import_records_event_time
on import_records (event_time);
Si usas otro esquema, antepón el nombre (p. ej.
data.import_records
).
JdbcBatchItemWriter<ImportRecord>
con NamedParameterJdbcTemplate
y ON CONFLICT DO NOTHING
(PostgreSQL) para idempotencia.
@Bean
@StepScope
public JdbcBatchItemWriter<ImportRecord> importRecordWriter(
DataSource dataSource,
@Value("#{jobParameters['processingRequestId']}") String processingRequestId) {
var npjt = new NamedParameterJdbcTemplate(dataSource);
String sql = """
insert into import_records (
id, processing_request_id, external_id, user_email, amount, event_time
) values (
:id, :processingRequestId, :externalId, :userEmail, :amount, :eventTime
)
on conflict (processing_request_id, external_id) do nothing
""";
return new JdbcBatchItemWriterBuilder<ImportRecord>()
.namedParametersJdbcTemplate(npjt)
.sql(sql)
.assertUpdates(false) // con ON CONFLICT, algunas filas devuelven 0 updates
.itemSqlParameterSourceProvider(item -> new MapSqlParameterSource()
.addValue("id", UUID.randomUUID())
.addValue("processingRequestId", UUID.fromString(processingRequestId))
.addValue("externalId", item.getExternalId())
.addValue("userEmail", item.getUserEmail())
.addValue("amount", item.getAmount())
.addValue("eventTime", Timestamp.from(item.getEventTime())))
.build();
}
Transacciones y lotes
chunkSize
configurable (p. ej.,500
).- El
PlatformTransactionManager
del Step asegura commit por chunk.
@Bean
public Step csvToJpaStep(JobRepository jobRepository,
PlatformTransactionManager tx,
FlatFileItemReader<ImportRecord> reader,
ImportRecordProcessor processor,
JdbcBatchItemWriter<ImportRecord> writer,
MetricsStepListener stepListener) {
int chunkSize = 500; // o desde JobParameters
return new StepBuilder("csvToJpaStep", jobRepository)
.<ImportRecord, ImportRecord>chunk(chunkSize, tx)
.reader(reader)
.processor(processor)
.writer(writer)
.listener(stepListener)
// .faultTolerant().skip(RecordValidationException.class).skipLimit(1000)
.build();
}
- Con 10k filas válidas y
chunkSize=500
⇒ ~20 commits.writeCount
≈ 10000. - Re‑ejecutar con el mismo
processingRequestId
y mismas filas ⇒writeCount
≈ 0 gracias aON CONFLICT
+ índice único.
- Inserción por lotes funcionando;
writeCount
coincide con registros procesados. - Idempotencia: re‑ejecuciones con mismo
processingRequestId
no duplican. - Migración aplicada sin romper esquema existente.
- JPA alternativa:
JpaItemWriter
+hibernate.jdbc.batch_size
(más cómodo, menor throughput). - Ajusta
fetchSize
/batch_size
si cambias a JPA. - En entornos con PK/UK distintos, sustituye
ON CONFLICT
porMERGE
o equivalente del motor.