Skip to content

tongqqiu/filedge

Filedge

CI codecov Python License Ruff PRs Welcome

Files are the universal building block of data engineering. Whether data starts in Kafka, Stripe's API, a partner SFTP, or a CDC stream, every reliable pipeline eventually crystallizes it into a file before it touches the warehouse. Filedge is the load boundary built around that fact: atomic per-file ingestion, content-hash idempotency, and a full audit trail — into SQLite, PostgreSQL, BigQuery, Databricks, or DuckDB.

Why files?

Streams are continuous; files are discrete. That discreteness is what makes ingestion auditable: a file has a SHA-256, a row count, a state in the audit DB, and a row-level provenance trail in the destination. Every downstream question — did we load this?, replay this, where did this row come from? — has a deterministic anchor.

Filedge starts where the file lands and ends when its rows are committed. Upstream is your choice: dlt or vendor exporters for APIs, Kafka Connect or Vector for queues, rclone for SFTP. Downstream is your warehouse. The hard part in between — retry-safe commits, dedupe, retries, lineage — is all Filedge does.

What it gives you that a hand-rolled DAG doesn't

Failure mode Typical pipeline Filedge
Half-written tables after a crash Manual cleanup Per-file atomic commit, retry-safe by content hash
"Did we already load this file?" Filename heuristics SHA-256 dedupe at the entry point
"Where did this row come from?" Grep logs _source_file_hash + _ingested_at on every row
Stale lock from a killed worker Page someone Reclaimed automatically on next run
One bad file blocks the pipeline Skip and forget Bounded retry → terminal FAILED with audit
Schema drift in destination Silent corruption Loud failure with a clear diff

How it differs from neighbors

  • vs Airbyte / Fivetran / dlt — those fetch (paginate APIs, manage cursors). Filedge lands — it takes whatever they produce as files and makes the write to the warehouse audit-grade. Use them as Fetchers in front of Filedge.
  • vs Kafka Connect / Flink / Spark Structured Streaming — streaming systems own continuous offsets and incremental state. Filedge owns the file as the unit of work — simpler to reason about, replay, and audit. Materialize queues to files, then ingest.
  • vs Airflow + custom Python loaders — same DAG shape, but partial-load corruption, lock reclaim, retry caps, idempotent CDC apply, and row provenance are already wired in.
  • vs Iceberg / Delta tables — those are table formats. Filedge is what writes to them (or to plain BigQuery / Postgres / Databricks tables) with the per-file commit guarantee.

Quick start

Requires uv.

uv sync --extra dev                          # core (SQLite)
uv sync --extra dev --extra postgres         # + PostgreSQL
uv sync --extra dev --extra bigquery         # + BigQuery
uv sync --extra dev --extra databricks       # + Databricks
uv sync --extra dev --extra duckdb           # + DuckDB

Declare a pipeline:

# pipeline.yaml
format: csv
dest_table: orders
write_mode: append          # append | truncate | cdc
retry_cap: 3
batch_size: 1000

connector:
  type: sqlite
  url: sqlite:///orders.db

columns:
  - { source: order_id,   dest: order_id,   type: string,  required: true }
  - { source: amount,     dest: amount,     type: float,   required: true }
  - { source: order_date, dest: order_date, type: date }

Run it:

filedge run --dir ./incoming --config pipeline.yaml --audit-db-url sqlite:///filedge.db
# Committed: 3  Failed: 0  Skipped: 0  New: 3  Reclaimed: 0  Retried: 0

filedge status --audit-db-url sqlite:///filedge.db
# PENDING: 0  PROCESSING: 0  COMMITTED: 3  FAILED: 0

Don't know the schema yet? filedge inspect data.csv samples the file and prints a columns: block with confidence tiers ready to paste.

Connectors

The destination is configured via a connector: block in pipeline.yaml. Built-ins:

Destination Extra Notes
SQLite (core) Default for local dev; configure with type: sqlite and a url
PostgreSQL postgres COPY bulk load; idempotent via per-hash DELETE
BigQuery bigquery NDJSON staging + load job; job-ID-keyed idempotency (7-day window)
Databricks databricks Unity Catalog volume staging
DuckDB duckdb File-based; single-writer, fails fast if locked

See docs/guides/run.md for full connector config, credentials, and live-integration test setup.

How a run works

filedge run
├── Reset FAILED below retry_cap → PENDING
├── Reclaim stale PROCESSING locks → PENDING
├── Connector: ensure destination table exists
├── Hash files in watched dir; enqueue new hashes as PENDING
└── For each PENDING file:
    ├── Audit DB: mark PROCESSING        (distributed lock)
    ├── Connector: stream rows → commit  (idempotent per file_hash)
    └── Audit DB: mark COMMITTED / FAILED

The audit DB and the destination are separate systems. A crash between connector commit and audit mark leaves the file PROCESSING — the next run reclaims it, and the connector's per-hash idempotency guarantees no duplicate rows.

More

License

Apache 2.0 — see LICENSE.

About

Resources

License

Contributing

Security policy

Stars

Watchers

Forks

Packages

 
 
 

Contributors