Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scala Hadoop Shred: deduplicate event_ids with different event_fingerprints (synthetic duplicates) #24

Closed
alexanderdean opened this issue Aug 15, 2012 · 12 comments
Assignees
Milestone

Comments

@alexanderdean
Copy link
Member

When you observe two events in your event stream with the same event_id, one of three things could be happening:

category cause payload matches? probable time between duplicates fix
ID collision huge event volumes / algorithm flaws no far apart give one event a new ID
synthetic copy browser pre-cachers, anti-virus software, adult content screeners, web scrapers partially (most or all client-sent fields) close by logical time either a) delete synthetic copy or b) give it a new ID & preserve relationship to "parent" event
natural copy at least once processing yes close by ETL time delete all but one event

Thinking about this further, a simple de-duplication algorithm would be:

  1. If the payload matches exactly, then delete all but one copy
  2. If the payload differs in any way, then give one event a new ID and preserve its relationship to "parent" event

With this approach, distinguishing between ID collisions and synthetic copies can still be done (if needed) at analysis time.

Could this be done using bloom filters? (http://webdocs.cs.ualberta.ca/~drafiei/papers/DupDet06Sigmod.pdf) Not directly:

unless event_id **definitely not in set** of bloom[event_ids]:
    if hash(event) **definitely not in set** of bloom[event hashes]:
        assign new event_id and store old event_id in original_event_id field
    else:
        delete event # Deletes some false positives!

Rather than delete some false positives (causing data loss), it would be safer to err on the side of caution:

unless event_id **definitely not in set** of bloom[event_ids]:
    assign new event_id and store old event_id in original_event_id field

But this approach will still cause inflated counts with at least once processing systems. So a hybrid model, using a KV cache of N days of events and another KV cache of N days of event hashes:

if event_id in n_days_cache[event_ids]:
    if hash(event) in n_days_cache[event hashes]:
        delete event
    else:
         assign new event_id and store old event_id in original_event_id field
else:
    unless event_id **definitely not in set** of bloom[event_ids]:
        assign new event_id and store old event_id in original_event_id field

This prevents natural copies within an N day window and safely renames ID collisions and synthetic copies all the way back through time.

Title was: Occasionally CloudFront registers an event twice

Content was:

Presumably because it hits two separate nodes at the same time.

Update the ETL to dedupe this - algorithm would be to hash the querystring and check for duplicates within a X minute timeframe. (Hashing the full raw querystring would implicitly include txn_id, the random JavaScript-side identifier, in the uniqueness check.)

@alexanderdean
Copy link
Member Author

Note that achieving this isn't currently possible with our Hive serde-based row-level deserialisation. We would need either:

  1. A stream-based ETL process which cached querystring hashes for X minutes to check for uniqueness, or:
  2. An aggregation step following our current ETL process which dropped the duplicates

/cc @yalisassoon

@yalisassoon
Copy link
Member

I was imagining the following alternative, which I was hoping would work with Cascading / Scalding (so not strictly speaking stream-based, rather batch based), but is in essence a streaming based approach:

  1. Maintaining a list of the last X hours worth of processed lines of data in a lookup table. (The value of X would depend on how regularly the ETL batch process works, but I think we need to move from a daily mode to an hourly mode at the very least.)
  2. Cross referencing every new line of data with the lines stored in the lookup table and apply some de-duping logic
  3. At an X hours delay, the processed data would be written from the lookup table to S3

We will need to have a number of lookup tables referenced as part of a Scalding ETL, especially as we build out an OLAP friendly version of the events table. I imagined it would make sense to use Amazon SimpleDB or RDS for the tables for simplicity, but probably worth discussing on a dedicated thread...

@alexanderdean
Copy link
Member Author

Yep that could well work... Hopefully once we have built the caches required by the OLAP cube, the approach to deduplication will become clearer.

One thing we should probably do at some stage is move from storing the txn_id for a row to storing the generated event_hash (which will include the txn_id as well as everything else on the querystring).

@alexanderdean
Copy link
Member Author

To handle dupes cleanly, we need to store the hashes for all messages from the last batch, which could be duplicated in our time period. We can store these in a table in HBase:

http://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/emr-hbase.html

@alexanderdean
Copy link
Member Author

One of our oldest open tickets now.

In a stream processing world, you have to make an assumption that all dupes will come from the same shard key (IP address?), and then keep all event_ids in your shard's local storage for N minutes. (Otherwise you would have to re-partition based on incoming event_id, pretty painful.)

There could be an interesting abstraction in Scala Common Enrich where as well as an EnrichmentRegistry, it is supplied with read/write access to a KV store - the specifics of the KV store are abstracted (Samza's KV store impl? Storehaus?) but Common Enrich can use it to manage dupes, sessionization etc.

/cc @yalisassoon @fblundun

@alexanderdean alexanderdean changed the title Occasionally CloudFront registers an event twice Scala Common Enrich: handle duplicate event_ids Nov 8, 2014
@alexanderdean alexanderdean changed the title Scala Common Enrich: handle duplicate event_ids New module: handle duplicate event_ids May 28, 2015
@alexanderdean
Copy link
Member Author

Renaming from Scala Common Enrich: to Unnamed new module:, because in a stream processing world, any stage can introduce natural copies. So we have to embed this module in each storage sink that cares about duplicate event_ids.

@alexanderdean alexanderdean changed the title New module: handle duplicate event_ids Unnamed new module: handle duplicate event_ids May 28, 2015
@alexanderdean
Copy link
Member Author

We have done some further brainstorming on this today. First we drew out some different event scenarios:

img_20150818_114840

@alexanderdean
Copy link
Member Author

Then we came up with this taxonomy:

event id ur event possible explanations strategy
same same * At least once processing * 2 trackers, 1 event Robots Delete all but first
same different * User reusing event ID * Event ID algo collision * Robot Assign new event ID to all but first, preserving original event ID as 'parent event ID'
different same * 2 trackers, 1 event * Behavioral collision Do nothing

@alexanderdean
Copy link
Member Author

We quickly realized that it is essential to define the ur event (ur meaning most primitive) correctly. @yalisassoon to add his whiteboard photo...

@alexanderdean alexanderdean changed the title Unnamed new module: handle duplicate event_ids Spark Dedupe: handle duplicate event_ids Aug 18, 2015
@yalisassoon
Copy link
Member

See below:

img_20150818_120812

@alexanderdean alexanderdean changed the title Spark Dedupe: handle duplicate event_ids Scala Hadoop Shred: deduplicate event_ids with different event_fingerprints (synthetic duplicates) Sep 3, 2016
@alexanderdean alexanderdean added this to the R8x [HAD] Synthetic dedupe milestone Sep 3, 2016
@alexanderdean alexanderdean assigned chuwy and unassigned alexanderdean Nov 9, 2016
@alexanderdean
Copy link
Member Author

For existing code: 8c09ade

chuwy added a commit that referenced this issue Nov 28, 2016
alexanderdean pushed a commit that referenced this issue Dec 4, 2016
alexanderdean pushed a commit that referenced this issue Dec 19, 2016
chuwy added a commit to snowplow/snowplow-rdb-loader that referenced this issue Sep 5, 2017
jrpeck1989 pushed a commit to jrpeck1989/snowplow that referenced this issue Sep 8, 2022
snowplow#24)

* Add language to Markdown blocks in Try Snowplow and Tutorials sections

* Change list of IAM policies from bash to text for Prism to colour
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants