Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix audit trail duplicated records #59

Merged

Conversation

DownstreamDataTeam
Copy link
Contributor

Description of change

Just for audit_trail stream:

  • added number_last_occurrence value in bookmark state and use this value to remove duplicates
  • changed the sorting order from descending to ascending

As it is right now, the records with the occurred_at = <bookmarked_field_value> get duplicated due to using >= instead of > when filtering the records.

This PR solves this problem by saving the last occurrence number of the bookmarked value and use it as offset in the next API call.

Risks

  • if the number_last_occurrence > 9500 the API will return 400 as it's stated in the Mambu's Docs.

Test

Json snippet without any code changes (last 6 records) - bad output
State used: {"bookmarks": {"audit_trail": "2021-10-27T10:24:13.016Z"}}

{"type": "RECORD", "stream": "audit_trail", "record": {"occurred_at": "2021-10-27T10:24:13.437000Z", "response_code": 200.0}}
{"type": "RECORD", "stream": "audit_trail", "record": {"occurred_at": "2021-10-27T10:24:13.288000Z", "response_code": 200.0}}
{"type": "RECORD", "stream": "audit_trail", "record": {"occurred_at": "2021-10-27T10:24:13.201000Z", "response_code": 200.0}}
{"type": "RECORD", "stream": "audit_trail", "record": {"occurred_at": "2021-10-27T10:24:13.108000Z", "response_code": 200.0}}
{"type": "RECORD", "stream": "audit_trail", "record": {"occurred_at": "2021-10-27T10:24:13.016000Z", "response_code": 200.0}} <----- duplicate
{"type": "RECORD", "stream": "audit_trail", "record": {"occurred_at": "2021-10-27T10:24:13.016000Z", "response_code": 200.0}} <----- duplicate

The last 2 records are duplicate not because they have the same information, but because they have the same occurred_at as the bookmarked value, meaning that these two records have already been fetched before.

Json snippet with the code changes (first 6 records, because of the sorting change) - desired output
State used: {"bookmarks": {"audit_trail": ["2021-10-27T10:24:13.016Z", 2]}}

{"type": "RECORD", "stream": "audit_trail", "record": {"occurred_at": "2021-10-27T10:24:13.108000Z", "response_code": 200.0}}
{"type": "RECORD", "stream": "audit_trail", "record": {"occurred_at": "2021-10-27T10:24:13.201000Z", "response_code": 200.0}}
{"type": "RECORD", "stream": "audit_trail", "record": {"occurred_at": "2021-10-27T10:24:13.288000Z", "response_code": 200.0}}
{"type": "RECORD", "stream": "audit_trail", "record": {"occurred_at": "2021-10-27T10:24:13.437000Z", "response_code": 200.0}}
{"type": "RECORD", "stream": "audit_trail", "record": {"occurred_at": "2021-10-27T10:24:13.582000Z", "response_code": 200.0}}
{"type": "RECORD", "stream": "audit_trail", "record": {"occurred_at": "2021-10-27T10:24:13.701000Z", "response_code": 200.0}}

Rollback steps

  • revert this branch

 - bookmarked number_last_occurrence and used it to avoid duplicates
 - changed sort_order from desc to asc
@cmerrick
Copy link
Contributor

cmerrick commented Nov 5, 2021

Hi @DownstreamDataTeam, thanks for your contribution!

In order for us to evaluate and accept your PR, we ask that you sign a contribution license agreement. It's all electronic and will take just minutes.

if stream_name == 'audit_trail':
next_max_date = static_params['occurred_at[lte]']
number_last_occurrence = 0
next_max_date = static_params['occurred_at[gte]'] if stream_name == 'audit_trail' else None

while record_count == limit: # break out of loop when record_count < limit (or not data returned)
params = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small readability improvement

Suggested change
params = {
if stream_name == 'audit_trails':
params = {
'from': number_last_occurance,
'size': limit,
'occured_at[gte]': next_max_date,
**static_params
}
else:
params = {
'offset': offset,
'limit': limit,
**static_params
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion!
Do you think that these changes are relevant for this PR scope? We are also considering doing a bit of refactoring in the code and we think that your suggestions might go very well there, if they are not urgent.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is okay to do in the context of a larger refactor. It's always easiest for review when the code changes in a PR are associated with a narrow scope.

if bookmark_type == 'integer':
last_integer = get_bookmark(state, stream_name, sub_type, 0)
max_bookmark_value = last_integer
else:
last_datetime = get_bookmark(state, stream_name, sub_type, start_date)
if stream_name == 'audit_trail':
audit_trail_bookmark = get_bookmark(state, stream_name, sub_type, [start_date, 0])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it might be more extensible to use a dictionary instead of a list here. I imagine a scenario where this stream evolves further and other sorts of state transformations might be required, a dictionary could be easier to reason about in that case and keep that option manageable for future changes.

Maybe something like this?

{"last_datetime": "<datetime_value>", "number_last_occurrence": 123}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea! We considered this case in the first place, but we opted for the simpler implementation for now, because it was a quick fix strictly for the audit stream. We do, however, plan to include this change in one of our future PRs; PR that will have more refactoring, including keeping the bookmark state format extensible for more streams, not just audit trail.

@cosimon cosimon merged commit ac7b7dc into singer-io:master Nov 29, 2021
@singer-bot
Copy link

You did it @DownstreamDataTeam!

Thank you for signing the Singer Contribution License Agreement.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants