Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update docs about deduplication #1577

Merged
merged 2 commits into from
Feb 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 7 additions & 10 deletions docs/architecture/decoder_service_specification.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ in the Structured Ingestion pipeline.
## Data Flow

1. Consume messages from Google Cloud PubSub raw topic
1. Deduplicate message by `uri` (which generally contains `docId`)
- Disabled for stub-installer, which does not include a UUID in the URI
1. Perform GeoIP lookup and drop `x_forwarded_for` and `remote_addr` and
optionally `geo_city` based on population
1. Parse the `uri` attribute to determine document type, etc.
Expand All @@ -16,16 +18,12 @@ in the Structured Ingestion pipeline.
1. Validate the schema of the body
1. Extract user agent information and drop `user_agent`
1. Add metadata fields to message
1. Deduplicate message by `docId`
- Generate `docId` for submission types that don't have one
1. Write message to PubSub decoded topic based on `namespace` and `docType`

### Implementation

The above steps will be executed as a single Apache Beam job that can accept
either a streaming input from PubSub or a batch input from Cloud Storage.
Message deduplication will be done by checking for the presence of ids as keys
in Cloud Memory Store (managed Redis).

### Decoding Errors

Expand Down Expand Up @@ -118,13 +116,12 @@ method, to ensure ack'd messages are fully delivered.

### Deduplication

Each `docId` will be allowed through "at least once", and only be
Each `uri` will be allowed through "at least once", and only be
rejected as a duplicate if we have completed delivery of a message with the
same `docId`. Duplicates will be considered errors and sent to the error topic.
same `uri`. We assume that each `uri` contains a UUID that uniquely identifies
the document.
"Exactly once" semantics can be applied to derived data sets using SQL in
BigQuery, and GroupByKey in Beam and Spark.

Note that deduplication is only provided with a "best effort" quality of service.
In the ideal case, we hold 24 hours of history for seen document IDs, but that
buffer is allowed to degrade to a shorter time window when the pipeline is under
high load.
Note that deduplication is only provided with a "best effort" quality of service
using a 10 minute window.
8 changes: 1 addition & 7 deletions docs/architecture/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ This document specifies the architecture for GCP Ingestion as a whole.
`BigQuery`
- The Dataflow `Decoder` job decodes messages from the PubSub `Raw Topic` to
the PubSub `Decoded Topic`
- Also checks for existence of `document_id`s in
`Cloud Memorystore` in order to deduplicate messages
- The Dataflow `AET Decoder` job provides all the functionality of the `Decoder`
with additional decryption handling for Account Ecosystem Telemetry pings
- The Dataflow `Republisher` job reads messages from the PubSub `Decoded Topic`,
Expand Down Expand Up @@ -70,12 +68,8 @@ This document specifies the architecture for GCP Ingestion as a whole.
1. Produce `normalized_` variants of select attributes
1. Inject `normalized_` attributes at the top level and other select
attributes into a nested `metadata` top level key in `payload`
- Should deduplicate messages based on the `document_id` attribute using
`Cloud MemoryStore`
- Should deduplicate messages based on the `uri` attribute
- Must ensure at least once delivery, so deduplication is only "best effort"
- Should delay deduplication to the latest possible stage of the pipeline
to minimize the time window between an ID being marked as seen in
`Republisher` and it being checked in `Decoder`
- Must send messages rejected by transforms to a configurable error destination
- Must allow error destination in BigQuery

Expand Down
4 changes: 2 additions & 2 deletions docs/architecture/pain_points.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ is to specify mapping at template creation time.
Does not use standard client library.

Does not expose an output of delivered messages, which is needed for at least
once delivery with deduplication. Current workaround is to get delivered
messages from a subscription to the output PubSub topic.
once delivery with deduplication. Current workaround is to use the deduplication
available via `PubsubIO.read()`.

Uses HTTPS JSON API, which increases message payload size vs protobuf by 25%
for base64 encoding and causes some messages to exceed the 10MB request size
Expand Down
11 changes: 0 additions & 11 deletions docs/ingestion-beam/republisher-job.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,8 @@ The primary intention is to produce smaller derived Pub/Sub topics so
that consumers that only need a specific subset of messages don't incur
the cost of reading the entire stream of decoded messages.

The Republisher has the additional responsibility of marking messages as seen
in `Cloud MemoryStore` for deduplication purposes. That functionality exists
here to avoid the expense of an additional separate consumer of the full
decoded topic.

## Capabilities

### Marking Messages As Seen

The job needs to connect to Redis in order to mark `document_id`s of consumed
messages as seen. The Decoder is able to use that information to drop duplicate
messages flowing through the pipeline.

### Debug Republishing

If `--enableDebugDestination` is set, messages containing an `x_debug_id`
Expand Down