Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include collection name in oplog query #78

Merged
merged 5 commits into from
Apr 15, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 2.1.0
* Optimize oplog extractions to only query for the selected tables [#78](https://github.com/singer-io/tap-mongodb/pull/78)

## 2.0.1
* Modify `get_databases` function to return a unique list of databases [#58](https://github.com/singer-io/tap-mongodb/pull/58)

Expand Down Expand Up @@ -78,7 +81,7 @@

## 0.1.0
* Added key-based incremental sync [commit](https://github.com/singer-io/tap-mongodb/commit/b618b11d91e111680f70b402c6e94c9bf40c7b8f)

## 0.0.5
* Fixed bug in oplog projections [commit](https://github.com/singer-io/tap-mongodb/commit/b400836678440499d4a15fb7d5b0a40a13e3342e)

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from setuptools import setup

setup(name='tap-mongodb',
version='2.0.1',
version='2.1.0',
description='Singer.io tap for extracting data from MongoDB',
author='Stitch',
url='https://singer.io',
Expand Down
5 changes: 3 additions & 2 deletions tap_mongodb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,8 @@ def sync_stream(client, stream, state):
LOGGER.info("Clearing state because Oplog has aged out")
state.get('bookmarks', {}).pop(tap_stream_id)

collection_oplog_ts = oplog.get_latest_ts(client)

# make sure initial full table sync has been completed
if not singer.get_bookmark(state, tap_stream_id, 'initial_full_table_complete'):
msg = 'Must complete full table sync before starting oplog replication for %s'
Expand All @@ -318,12 +320,11 @@ def sync_stream(client, stream, state):
# only mark current ts in oplog on first sync so tap has a
# starting point after the full table sync
if singer.get_bookmark(state, tap_stream_id, 'version') is None:
collection_oplog_ts = oplog.get_latest_ts(client)
oplog.update_bookmarks(state, tap_stream_id, collection_oplog_ts)

full_table.sync_collection(client, stream, state, stream_projection)

oplog.sync_collection(client, stream, state, stream_projection)
oplog.sync_collection(client, stream, state, stream_projection, collection_oplog_ts)

elif replication_method == 'FULL_TABLE':
full_table.sync_collection(client, stream, state, stream_projection)
Expand Down
16 changes: 12 additions & 4 deletions tap_mongodb/sync_strategies/oplog.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#!/usr/bin/env python3
import copy
import time
import pymongo
import singer
Expand Down Expand Up @@ -104,7 +103,7 @@ def flush_buffer(client, update_buffer, stream_projection, db_name, collection_n


# pylint: disable=too-many-locals, too-many-branches, too-many-statements
def sync_collection(client, stream, state, stream_projection):
def sync_collection(client, stream, state, stream_projection, max_oplog_ts=None):
tap_stream_id = stream['tap_stream_id']
LOGGER.info('Starting oplog sync for %s', tap_stream_id)

Expand All @@ -129,7 +128,8 @@ def sync_collection(client, stream, state, stream_projection):
start_time = time.time()

oplog_query = {
'ts': {'$gte': oplog_ts}
'ts': {'$gte': oplog_ts},
'ns': {'$eq' : '{}.{}'.format(database_name, collection_name)}
}

projection = transform_projection(stream_projection)
Expand Down Expand Up @@ -239,7 +239,7 @@ def sync_collection(client, stream, state, stream_projection):
update_buffer = set()

# write state
singer.write_message(singer.StateMessage(value=copy.deepcopy(state)))
singer.write_message(singer.StateMessage(value=state))

# flush buffer if finished with oplog
for buffered_row in flush_buffer(client,
Expand All @@ -256,6 +256,14 @@ def sync_collection(client, stream, state, stream_projection):
singer.write_message(record_message)
rows_saved += 1


if rows_saved == 0:
# We went a whole sync where this stream did not see updates. So we can move the bookmark to the max position in the oplog
state = update_bookmarks(state,
tap_stream_id,
max_oplog_ts)
singer.write_message(singer.StateMessage(value=state))

common.COUNTS[tap_stream_id] += rows_saved
common.TIMES[tap_stream_id] += time.time()-start_time
LOGGER.info('Syncd %s records for %s', rows_saved, tap_stream_id)
240 changes: 240 additions & 0 deletions tests/test_mongodb_oplog_bookmarks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
import tap_tester.connections as connections
import tap_tester.menagerie as menagerie
import tap_tester.runner as runner
import os
import datetime
import unittest
import datetime
import pymongo
import string
import random
import time
import re
import pprint
import pdb
import bson
from bson import ObjectId
import singer
from functools import reduce
from singer import utils, metadata
from mongodb_common import drop_all_collections, get_test_connection, ensure_environment_variables_set
import decimal


RECORD_COUNT = {}


def random_string_generator(size=6, chars=string.ascii_uppercase + string.digits):
return ''.join(random.choice(chars) for x in range(size))

def generate_simple_coll_docs(num_docs):
docs = []
for int_value in range(num_docs):
docs.append({"int_field": int_value, "string_field": random_string_generator()})
return docs

class MongoDBOplogBookmarks(unittest.TestCase):
def setUp(self):

ensure_environment_variables_set()

with get_test_connection() as client:
drop_all_collections(client)

# simple_coll_1 has 50 documents
client["simple_db"]["simple_coll_1"].insert_many(generate_simple_coll_docs(50))

# simple_coll_2 has 100 documents
client["simple_db"]["simple_coll_2"].insert_many(generate_simple_coll_docs(100))


def expected_check_streams(self):
return {
'simple_db-simple_coll_1',
'simple_db-simple_coll_2',
}

def expected_pks(self):
return {
'simple_coll_1': {'_id'},
'simple_coll_2': {'_id'},
}

def expected_row_counts(self):
return {
'simple_coll_1': 50,
'simple_coll_2': 100,

}


def expected_sync_streams(self):
return {
'simple_coll_1',
'simple_coll_2',
}

def name(self):
return "tap_tester_mongodb_oplog_bookmarks"

def tap_name(self):
return "tap-mongodb"

def get_type(self):
return "platform.mongodb"

def get_credentials(self):
return {'password': os.getenv('TAP_MONGODB_PASSWORD')}

def get_properties(self):
return {
'host' : os.getenv('TAP_MONGODB_HOST'),
'port' : os.getenv('TAP_MONGODB_PORT'),
'user' : os.getenv('TAP_MONGODB_USER'),
'database' : os.getenv('TAP_MONGODB_DBNAME')
}


def test_run(self):

conn_id = connections.ensure_connection(self)

# -------------------------------
# ----------- Discovery ----------
# -------------------------------

# run in discovery mode
check_job_name = runner.run_check_mode(self, conn_id)

# verify check exit codes
exit_status = menagerie.get_exit_status(conn_id, check_job_name)
menagerie.verify_check_exit_status(self, exit_status, check_job_name)

# verify the tap discovered the right streams
found_catalogs = menagerie.get_catalogs(conn_id)

# assert we find the correct streams
self.assertEqual(self.expected_check_streams(),
{c['tap_stream_id'] for c in found_catalogs})

for tap_stream_id in self.expected_check_streams():
found_stream = [c for c in found_catalogs if c['tap_stream_id'] == tap_stream_id][0]

# assert that the pks are correct
self.assertEqual(self.expected_pks()[found_stream['stream_name']],
set(found_stream.get('metadata', {}).get('table-key-properties')))

# assert that the row counts are correct
self.assertEqual(self.expected_row_counts()[found_stream['stream_name']],
found_stream.get('metadata', {}).get('row-count'))

# -----------------------------------
# ----------- Initial Full Table ---------
# -----------------------------------
# Select simple_coll_1 and simple_coll_2 streams and add replication method metadata
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like only collection 1 is selected. Can you update this comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

additional_md = [{ "breadcrumb" : [],
"metadata" : {'replication-method' : 'LOG_BASED'}}]
for stream_catalog in found_catalogs:
if stream_catalog['tap_stream_id'] == 'simple_db-simple_coll_1':
annotated_schema = menagerie.get_annotated_schema(conn_id, stream_catalog['stream_id'])
selected_metadata = connections.select_catalog_and_fields_via_metadata(conn_id,
stream_catalog,
annotated_schema,
additional_md)

# Run sync
sync_job_name = runner.run_sync_mode(self, conn_id)

exit_status = menagerie.get_exit_status(conn_id, sync_job_name)
menagerie.verify_sync_exit_status(self, exit_status, sync_job_name)


# verify the persisted schema was correct
records_by_stream = runner.get_records_from_target_output()

# assert that each of the streams that we synced are the ones that we expect to see
record_count_by_stream = runner.examine_target_output_file(self,
conn_id,
self.expected_sync_streams(),
self.expected_pks())

# Verify that the full table was syncd
tap_stream_id = 'simple_db-simple_coll_1'
self.assertGreaterEqual(record_count_by_stream['simple_coll_1'],
self.expected_row_counts()['simple_coll_1'])

# Verify that we have 'initial_full_table_complete' bookmark
state = menagerie.get_state(conn_id)
first_versions = {}

# assert that the state has an initial_full_table_complete == True
self.assertTrue(state['bookmarks'][tap_stream_id]['initial_full_table_complete'])
# assert that there is a version bookmark in state
first_versions[tap_stream_id] = state['bookmarks'][tap_stream_id]['version']
self.assertIsNotNone(first_versions[tap_stream_id])
# Verify that we have a oplog_ts_time and oplog_ts_inc bookmark
self.assertIsNotNone(state['bookmarks'][tap_stream_id]['oplog_ts_time'])
self.assertIsNotNone(state['bookmarks'][tap_stream_id]['oplog_ts_inc'])


changed_ids = set()
with get_test_connection() as client:
# Make changes to not selected collection
changed_ids.add(client['simple_db']['simple_coll_2'].find({'int_field': 0})[0]['_id'])
client["simple_db"]["simple_coll_2"].delete_one({'int_field': 0})

changed_ids.add(client['simple_db']['simple_coll_2'].find({'int_field': 1})[0]['_id'])
client["simple_db"]["simple_coll_2"].delete_one({'int_field': 1})

changed_ids.add(client['simple_db']['simple_coll_2'].find({'int_field': 98})[0]['_id'])
client["simple_db"]["simple_coll_2"].update_one({'int_field': 98},{'$set': {'int_field': -1}})

changed_ids.add(client['simple_db']['simple_coll_2'].find({'int_field': 99})[0]['_id'])
client["simple_db"]["simple_coll_2"].update_one({'int_field': 99},{'$set': {'int_field': -1}})

client["simple_db"]["simple_coll_2"].insert_one({"int_field": 100, "string_field": random_string_generator()})
changed_ids.add(client['simple_db']['simple_coll_2'].find({'int_field': 100})[0]['_id'])

client["simple_db"]["simple_coll_2"].insert_one({"int_field": 101, "string_field": random_string_generator()})
changed_ids.add(client['simple_db']['simple_coll_2'].find({'int_field': 101})[0]['_id'])

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This scenario is good. I think it would be good to also update smilple_coll_1 and verify that a record comes through rather than no records.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be ignored. I misunderstood the change that was being covered here.

# -----------------------------------
# ----------- Subsequent Oplog Sync ---------
# -----------------------------------

# Run sync
sync_job_name = runner.run_sync_mode(self, conn_id)

exit_status = menagerie.get_exit_status(conn_id, sync_job_name)
menagerie.verify_sync_exit_status(self, exit_status, sync_job_name)

# verify the persisted schema was correct
messages_by_stream = runner.get_records_from_target_output()
records_by_stream = {
'simple_coll_1': [x
for x in messages_by_stream['simple_coll_1']['messages']
if x.get('action') == 'upsert']
}

# assert that each of the streams that we synced are the ones that we expect to see
record_count_by_stream = runner.examine_target_output_file(self,
conn_id,
self.expected_sync_streams(),
self.expected_pks())

# Verify that we got at least 6 records due to changes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you fix this comment

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

# (could be more due to overlap in gte oplog clause)
self.assertEqual(0, record_count_by_stream['simple_coll_1'])


final_state = menagerie.get_state(conn_id)

with get_test_connection() as client:
row = client.local.oplog.rs.find_one(sort=[('$natural', pymongo.DESCENDING)])
latest_oplog_ts = row.get('ts')

self.assertEqual(
(latest_oplog_ts.time, latest_oplog_ts.inc),
(final_state['bookmarks']['simple_db-simple_coll_1']['oplog_ts_time'],
final_state['bookmarks']['simple_db-simple_coll_1']['oplog_ts_inc'])
)