Skip to content

Refactor streaming sync architecture, preview flow, and packaging#3

Merged
park-peter merged 7 commits into
mainfrom
feature/refactor
Mar 12, 2026
Merged

Refactor streaming sync architecture, preview flow, and packaging#3
park-peter merged 7 commits into
mainfrom
feature/refactor

Conversation

@park-peter
Copy link
Copy Markdown
Owner

Summary

This PR consolidates the recent refactor work across the brickbyte sync pipeline, writer architecture, packaging, preview flow, tests, and CI.

It includes the latest review-driven fixes along with the preceding foundation work that introduced the newer streaming, state, deduplication, and credential-management behavior.

Included in this PR

Core architecture

  • Refactor the client and writer foundation to support the current streaming-oriented sync model
  • Expand credential resolution and configuration handling
  • Add and refine schema utilities, sanitization, and writer abstractions
  • Improve state handling, deduplication flow, and overwrite behavior

Preview, tests, and docs

  • Rework preview type detection and preview behavior
  • Rewrite and expand tests across sync modes, concurrency, preview, deduplication, credentials, and writer behavior
  • Refresh notebooks and documentation to match the current architecture and usage model

Bug fixes and review follow-up

  • Fix the parallel sync path so streams are processed in isolated workers without preloading entire streams into memory
  • Eliminate the parallel scheduling deadlock when selected streams exceed the configured worker count
  • Rework SQL staging to generate parquet locally, upload into the configured Unity Catalog Volume, and COPY INTO from the uploaded volume path
  • Remove metadata enrichment entirely, including the public API surface and tests
  • Simplify preview output so it reports sampled source records, target counts, and inferred schema changes instead of implying exact diff totals

CI and release behavior

  • Update CI publishing behavior so publishing happens only when tagged

Why

This PR moves brickbyte toward a cleaner and more reliable ingestion-focused design.

The main goals are:

  • make streaming behavior actually bounded-memory in practice
  • remove brittle parallel execution behavior
  • align the SQL path with a more realistic staging model
  • simplify the product surface by removing low-value enrichment behavior
  • make preview and docs accurately reflect what the library really computes
  • tighten tests and release flow around the current architecture

Testing

  • uv run pytest tests/ -v -m 'not integration'
  • uv run ruff check src tests

Notes

  • This PR targets park-peter/brickbyte fork main
  • SyncResult no longer includes enriched_tables
  • Preview is now explicitly sample-based
  • Metadata enrichment has been removed from the codebase

Phase 1: Rewrite package API (Brickbyte -> client()), clean logging,
pin dependencies, update CI matrix, replace Makefile with uv targets.

Phase 2: Canonical schema (record_id/extracted_at/data/run_id),
stream name sanitization with collision detection, SQL identifier
validation, backtick-quoted table names, UTC-aware timestamps,
fixed buffer size estimation.

Phase 3: Safe overwrite via staging tables with atomic INSERT OVERWRITE,
writer close in finally block, fatal error handling fix, SQL writer
hardening (Volume guard, flatten DDL, deterministic filenames, proper
cleanup), cooperative timeout, cleanup=False default.

Phase 4: Fix credential scope resolution bug for explicit scope/key,
add dotted-key nested mapping, warning logs for unresolved secrets.

Phase 5: Incremental sync state manager, deduplication with positional
_dk_N columns, concurrent stream processing with isolated per-thread
writers, progress reporting with callback and tqdm support.
Preview: add Python-to-Spark type mapping for real type change
detection in compare_schemas(). Log skipped records instead of
silent pass.

Enrichment: add SQLSemanticEnricher for SQL-based enrichment without
Spark. Flatten mode now uses COMMENT ON COLUMN for column-level
descriptions. Spark enricher guards for active session with
actionable error message. JSON parse errors are logged per-row.
Rewrite all 9 existing test files to use brickbyte.client() API,
canonical schema (record_id/extracted_at/data/run_id), run_id param,
safe overwrite assertions, and UTC-aware timestamps.

Add 7 new test files:
- test_sanitize.py: stream name sanitization, identifier validation
- test_safe_overwrite.py: staging table lifecycle, atomic overwrite,
  schema alignment, incompatible type rejection
- test_incremental.py: state table CRUD
- test_concurrent.py: parallel writer isolation, error propagation
- test_dedup.py: key normalization, _dk_N extraction, _dk_missing,
  null keys, validation errors, MERGE execution
- test_progress.py: callback invocation, event correctness
- test_enrichment_sql.py: SQL enrichment path, column comments

Update all 10 notebooks to use import brickbyte / brickbyte.client(),
lowercase naming, pinned dependencies in _setup.py.

Rewrite README.md with Lakeflow positioning, v2 API, canonical schema,
and documentation for all new features.

Fix ruff lint issues (unused imports, import sorting).
Dedup:
- List[str] dedup_keys (__all__ sentinel) now expanded to per-stream
  dict after selected streams are known, fixing silent no-op.
- Dedup runs per-stream with the owning writer as executor, fixing
  crash in parallel + SQL mode where executor was None.
- _execute_sql(None) raises immediately instead of silently falling
  back to Spark.
- Key column identifiers validated through validate_identifier() at
  both normalization and MERGE-build time; unsafe chars rejected early.
- _run_dedup_for_stream always routes through positional _dk_N columns
  in both flatten and raw modes.

Overwrite:
- SQL _atomic_overwrite_sql now checks types: rejects incompatible
  changes, CASTs safe narrower→wider, ALTERs target column when
  staging is wider.
- Spark _atomic_overwrite uses raw dataType objects (not str()) with
  _type_name()/_sql_type() helpers to normalize parenthesized type
  strings (e.g. IntegerType()) and emit correct SQL type names for
  CAST and ALTER COLUMN TYPE.  Reverse-widening now widens the target
  instead of narrowing the staging data.

Preview:
- get_table_name() sanitizes and backtick-quotes stream names,
  matching writer table naming.
- get_target_schema() strips trailing "()" from Spark type strings so
  _PYTHON_TO_SPARK comparisons work correctly.

Incremental:
- StateManager supports SQL connector (not just Spark) via
  _get_connection() with staging_volume/warehouse_id params.
- State loaded before sync via _apply_incremental_state(), which
  probes set_stream_state / set_state_for_stream / set_state on the
  source.  Raises NotImplementedError when saved state exists but the
  connector lacks a state API.
- State saved per-stream on success via _save_incremental_state(),
  which extracts connector-emitted state when available and falls back
  to a run_id+records dict.

Progress:
- record_processed() called every 5 000 records in both sequential
  and parallel-oversized paths.
- stream_completed() called in parallel future collection.
- ProgressReporter.close() called in finally block; errors suppressed.

Enrichment:
- enrich_table() in sync() now passes sanitized stream name.
@park-peter park-peter merged commit 2958017 into main Mar 12, 2026
4 checks passed
@park-peter park-peter deleted the feature/refactor branch March 12, 2026 15:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant