Contexto
Hoy DataDrain::Engine usa un único where_clause que aplica simultáneamente a:
get_postgres_count (engine.rb:161-166) — contar filas a archivar
export_to_parquet (engine.rb:169-197) — select que se escribe a Parquet
verify_integrity (engine.rb:201-220) — count Parquet filtrado para comparar con @pg_count
purge_from_postgres → build_delete_sql (engine.rb:352-361) — DELETE en PG
Esto asume que "lo que archivo" == "lo que borro". No modela el caso común:
Archivar subconjunto (filas válidas), borrar superset (válidas + trash/orphans que no se respaldan).
Use case real
Ver cold_storage_service#10 y PR sequre/cold_storage_service#8.
Regla de producto: dejar últimos 6 meses en PG. Para filas más viejas:
isp_id NOT NULL → archivar a S3 + borrar de PG
isp_id IS NULL → borrar de PG sin archivar (trash)
Con la API actual hay que elegir:
where_clause: 'isp_id IS NOT NULL' → archiva + borra sólo NOT NULL. Orphans quedan en PG para siempre.
where_clause: nil + filtrar en la query del caller (Glue/DuckDB select) → Parquet sale filtrado, pero get_postgres_count y verify_integrity usan base_where_sql sin filtro → counts no matchean → integrity_failed cada corrida.
- Engine hace el subset, caller hace DELETE extra para orphans en un segundo statement → no aprovecha el batching/throttling/vacuum del purge_loop.
Ninguna es buena.
Propuesta
Agregar opción purge_where_clause opcional. Default al valor de where_clause (backwards compatible).
Cambios en lib/data_drain/engine.rb
initialize (~línea 40):
@where_clause = options[:where_clause]
@purge_where_clause = options.fetch(:purge_where_clause, @where_clause)
Nuevo método hermano de base_where_sql:
# @api private
# @return [String]
def purge_where_sql
sql = "created_at >= '#{@start_date.to_fs(:db)}' AND created_at < '#{@end_date.to_fs(:db)}'"
sql += " AND #{@purge_where_clause}" if @purge_where_clause && !@purge_where_clause.empty?
sql
end
Modificar build_delete_sql (línea 352-361):
def build_delete_sql
<<~SQL
DELETE FROM #{@table_name}
WHERE #{@primary_key} IN (
SELECT #{@primary_key} FROM #{@table_name}
WHERE #{purge_where_sql}
LIMIT #{@config.batch_size}
)
SQL
end
get_postgres_count y verify_integrity siguen usando base_where_sql (el where_clause "de archivo"). La integridad sigue verificando lo archivado, no lo purgado — por diseño.
Contrato a documentar (YARD)
En initialize:
# @option options [String] :where_clause (Opcional) Condición SQL extra
# que filtra export, count e integrity check. Define "qué se archiva".
# @option options [String] :purge_where_clause (Opcional) Condición SQL
# para el DELETE. Si se omite, usa :where_clause. Puede ser más amplia
# que :where_clause; filas que matchean :purge_where_clause pero no
# :where_clause se borran sin archivar ni verificar. Útil para limpieza
# de orphans/trash que no debe respaldarse.
README: ejemplo de uso con subset/superset.
Ejemplo consumer post-cambio
DataDrain::Engine.new(
bucket: 'my-bucket',
start_date: Date.new(2023, 1, 1),
end_date: Date.new(2023, 2, 1),
table_name: 'versions',
folder_name: 'cloud/versions',
partition_keys: [:year, :month],
primary_key: 'id',
where_clause: 'isp_id IS NOT NULL', # archiva + verifica
purge_where_clause: nil, # borra TODO el mes
skip_export: true
).call
Resultado:
- Export/verify: cuenta y compara sólo
isp_id IS NOT NULL.
- Purge: borra el mes completo (NULL + NOT NULL) con batching, throttling y vacuum del
purge_loop.
Tests a agregar
describe "purge_where_clause" do
it "defaults to where_clause when not provided" do
# comportamiento actual — backwards compat
end
it "purges superset when purge_where_clause is wider than where_clause" do
# seed: 10 filas isp_id NOT NULL + 3 filas isp_id NULL en el rango
engine = DataDrain::Engine.new(
where_clause: 'isp_id IS NOT NULL',
purge_where_clause: nil,
# ... resto
)
expect(engine.call).to be true
expect(pg_count_in_range).to eq(0) # mes vacío
expect(parquet_count_in_range).to eq(10) # sólo NOT NULL
end
it "integrity check ignores purge_where_clause" do
# seed: 10 NOT NULL + 3 NULL. Glue/export sólo archiva NOT NULL.
# Engine con where_clause='isp_id IS NOT NULL' + purge_where_clause=nil.
# pg_count = 10, parquet_count = 10 → pasa.
end
end
Agregar test de integración end-to-end con el escenario completo (dataset mixto, verify pasa, purge borra todo).
Semver
Cambio aditivo + backwards compatible. Default de purge_where_clause = where_clause preserva el comportamiento existente.
Bump: 0.5.x → 0.6.0.
Alternativas consideradas
skip_purge flag — obliga al consumer a reimplementar batching/throttling/vacuum por afuera. Fragmenta responsabilidad. Peor.
Quitar purge del engine — breaking change, muchos users actuales dependen de él.
purge_where_clause es minúsculo (~10 líneas de código), flexible y no rompe nada.
Referencias
- Issue aguas abajo: sequre/cold_storage_service#10
- PR aguas abajo bloqueado: sequre/cold_storage_service#8
- Review que disparó la discusión: sequre/cold_storage_service#8#issuecomment (quinta review)
Contexto
Hoy
DataDrain::Engineusa un únicowhere_clauseque aplica simultáneamente a:get_postgres_count(engine.rb:161-166) — contar filas a archivarexport_to_parquet(engine.rb:169-197) — select que se escribe a Parquetverify_integrity(engine.rb:201-220) — count Parquet filtrado para comparar con@pg_countpurge_from_postgres→build_delete_sql(engine.rb:352-361) — DELETE en PGEsto asume que "lo que archivo" == "lo que borro". No modela el caso común:
Use case real
Ver
cold_storage_service#10y PR sequre/cold_storage_service#8.Regla de producto: dejar últimos 6 meses en PG. Para filas más viejas:
isp_id NOT NULL→ archivar a S3 + borrar de PGisp_id IS NULL→ borrar de PG sin archivar (trash)Con la API actual hay que elegir:
where_clause: 'isp_id IS NOT NULL'→ archiva + borra sólo NOT NULL. Orphans quedan en PG para siempre.where_clause: nil+ filtrar en la query del caller (Glue/DuckDB select) → Parquet sale filtrado, peroget_postgres_countyverify_integrityusanbase_where_sqlsin filtro → counts no matchean →integrity_failedcada corrida.Ninguna es buena.
Propuesta
Agregar opción
purge_where_clauseopcional. Default al valor dewhere_clause(backwards compatible).Cambios en
lib/data_drain/engine.rbinitialize(~línea 40):Nuevo método hermano de
base_where_sql:Modificar
build_delete_sql(línea 352-361):get_postgres_countyverify_integritysiguen usandobase_where_sql(elwhere_clause"de archivo"). La integridad sigue verificando lo archivado, no lo purgado — por diseño.Contrato a documentar (YARD)
En
initialize:README: ejemplo de uso con subset/superset.
Ejemplo consumer post-cambio
Resultado:
isp_id IS NOT NULL.purge_loop.Tests a agregar
Agregar test de integración end-to-end con el escenario completo (dataset mixto, verify pasa, purge borra todo).
Semver
Cambio aditivo + backwards compatible. Default de
purge_where_clause=where_clausepreserva el comportamiento existente.Bump:
0.5.x→0.6.0.Alternativas consideradas
skip_purgeflag — obliga al consumer a reimplementar batching/throttling/vacuum por afuera. Fragmenta responsabilidad. Peor.Quitar purge del engine — breaking change, muchos users actuales dependen de él.
purge_where_clausees minúsculo (~10 líneas de código), flexible y no rompe nada.Referencias