Skip to content

ElasticSearch input via generalized polling #215

@zxqfd555

Description

@zxqfd555

Is your feature request related to a problem? Please describe.

Pathway has pw.io.elasticsearch.write but no corresponding pw.io.elasticsearch.read. ElasticSearch was originally requested as a data source for Pathway pipelines, but reading is currently impossible without a custom Python connector. ElasticSearch does not expose a CDC API (unlike PostgreSQL, MongoDB, or MS SQL Server), so traditional log-based streaming is not available — polling is the only option.

Polling is a last resort, but ElasticSearch is not unique in lacking native CDC. Rather than implementing a one-off polling connector for ElasticSearch alone, the right approach is to build a generalized polling-based input mechanism that can be reused across any source that lacks CDC, and then wire ElasticSearch as the first consumer.

Describe the solution you'd like

Part 1: Generalized polling connector

The mechanism takes three parameters:

  • timestamp_column — name of the column that records when a row was last written/updated.
  • id_column — name of the column with a unique row identifier.
  • max_transaction_duration — the maximum time window within which concurrent writers may still be writing rows with a timestamp older than the current watermark (i.e. the maximum clock skew / transaction duration to tolerate).

The polling algorithm is as follows:

At each poll cycle, the connector maintains persistent state consisting of:

  • watermark — the highest timestamp_column value among all rows that have been fully committed to Pathway (i.e. are safe to never re-read).
  • pending_ids — the set of id_column values that were read in the last overlapping window and must be deduplicated on the next read.

Poll cycle logic:

  1. Query the source for all rows where timestamp_column >= watermark, ordered by timestamp_column ascending.
  2. Let max_ts be the maximum timestamp_column value in the result set.
  3. If now - max_ts > max_transaction_duration (i.e. the source has advanced far enough that no concurrent writer could still be adding rows at or before max_ts):
    • All rows with timestamp_column < max_ts are safe: emit them, advance watermark to max_ts - ε, clear pending_ids.
    • Rows at exactly max_ts are held for the next cycle (they may still be incomplete).
  4. If now - max_ts <= max_transaction_duration (the source is near the live edge — concurrent writers may still be adding rows at timestamps ≤ max_ts):
    • Emit only rows whose id_column is not in pending_ids (they were already emitted in the previous cycle).
    • Update pending_ids to the full set of id_column values read in this cycle.
    • Do not advance watermark — the next query will overlap with the current one.
  5. On restart, reload (watermark, pending_ids) from Pathway persistence and resume from step 1. Rows already in pending_ids will be re-queried but deduplicated before emission.

This guarantees:

  • No missed rows: any row written by a slow concurrent transaction within max_transaction_duration of the watermark will be caught by the overlapping re-read.
  • No duplicates delivered downstream: pending_ids deduplicates rows seen in the overlap window.
  • Correct resume after restart: the persisted (watermark, pending_ids) pair is the complete state needed to resume safely. Rows at or above watermark are re-queried; rows already in pending_ids are suppressed.
  • Bounded overlap: once the source advances beyond max_transaction_duration from the watermark, the window closes and state is compacted.

The mechanism should be implemented as a reusable Rust trait or struct that source-specific connectors implement by supplying a fetch(watermark, limit) method returning (id, timestamp, row) triples. State serialization, deduplication, and the overlap logic live entirely in the generic layer.

Part 2: ElasticSearch input connector

Wire the generalized polling mechanism to ElasticSearch, exposed as pw.io.elasticsearch.read. The underlying query uses the ElasticSearch Search API with a range filter on timestamp_column and is implemented using the existing crate already used by pw.io.elasticsearch.write (verify license compatibility at implementation time).

The Python API should mirror pw.io.elasticsearch.write in style:

pw.io.elasticsearch.read(
    hosts,
    index_name,
    schema,
    *,
    timestamp_column,
    id_column,
    max_transaction_duration,
    poll_interval=...,
    name=None,
)

Describe alternatives you've considered

ElasticSearch offers EQL and the Scroll API for large result pagination. Neither provides CDC semantics. The Scroll API is deprecated in favor of search_after pagination, which could be used inside the fetch implementation for large result pages but does not change the overall polling design.

Additional context

Testing: integration tests should follow the same pattern as existing database connector tests and live in the Docker Compose-based integration test suite with an ElasticSearch container. Coverage should include:

  • Basic read in polling mode (static data, verify all rows received).
  • Read with concurrent writes arriving within max_transaction_duration (verify no rows are missed and no duplicates are emitted).
  • Restart/resume: write some rows, start the connector, checkpoint, restart, write more rows, verify correct incremental delivery with no missed or duplicate rows.
  • Field parsing round-trip tests for all Pathway types supported by the ElasticSearch connector.

The generalized polling mechanism itself should have unit tests covering the overlap logic and deduplication independently of ElasticSearch.

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions