Skip to content

Commit

Permalink
Gracefully handle the mongo server not supporting sessions (#112)
Browse files Browse the repository at this point in the history
* Gracefully handle the mongo server not supporting sessions

* Version bump and update changelog
  • Loading branch information
cosimon committed Feb 14, 2024
1 parent 752f83f commit ca5bf36
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 4 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
@@ -1,5 +1,8 @@
# Changelog

## 3.1.1
* Gracefully fallback to not using a session if sessions are not supported by the mongo server [#112](https://github.com/singer-io/tap-mongodb/pull/112)

## 3.1.0
* Updates to run on python 3.11.7 [#111](https://github.com/singer-io/tap-mongodb/pull/111)

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -3,7 +3,7 @@
from setuptools import setup

setup(name='tap-mongodb',
version='3.1.0',
version='3.1.1',
description='Singer.io tap for extracting data from MongoDB',
author='Stitch',
url='https://singer.io',
Expand Down
29 changes: 26 additions & 3 deletions tap_mongodb/sync_strategies/oplog.py
Expand Up @@ -3,10 +3,12 @@
import pymongo
import singer
from singer import metadata, utils
from pymongo.errors import ConfigurationError

from bson import timestamp
import tap_mongodb.sync_strategies.common as common


LOGGER = singer.get_logger()

SDC_DELETED_AT = "_sdc_deleted_at"
Expand Down Expand Up @@ -101,6 +103,25 @@ def flush_buffer(client, update_buffer, stream_projection, db_name, collection_n
for row in cursor:
yield row

class SessionNotAvailable():
def __enter__(self, *args):
pass
def __exit__(self, *args):
pass

def maybe_get_session(client):
'''
Try to get a session. If sessions are not available to us then return an object
that will work in the context manager
'''
try:
return client.start_session()
except ConfigurationError:
# log sessions not available
LOGGER.info('Unable to start session, without session')
# return an object that works with a 'with' block
return SessionNotAvailable()


# pylint: disable=too-many-locals, too-many-branches, too-many-statements
def sync_collection(client, stream, state, stream_projection, max_oplog_ts=None):
Expand Down Expand Up @@ -146,7 +167,9 @@ def sync_collection(client, stream, state, stream_projection, max_oplog_ts=None)
session_refresh_time = time.time()

# Create a session so that we can periodically send a simple command to keep it alive
with client.start_session() as session:
with maybe_get_session(client) as session:

have_session = not isinstance(session, SessionNotAvailable)

# consider adding oplog_replay, but this would require removing the projection
# default behavior is a non_tailable cursor but we might want a tailable one
Expand All @@ -156,11 +179,11 @@ def sync_collection(client, stream, state, stream_projection, max_oplog_ts=None)
projection,
sort=[('$natural', pymongo.ASCENDING)],
oplog_replay=oplog_replay,
session=session,
session=session if have_session else None,
no_cursor_timeout=True
) as cursor:
# Refresh the session every 10 minutes to keep it alive
if time.time() - session_refresh_time > 600:
if have_session and time.time() - session_refresh_time > 600:
client.local.command('ping', session=session)
session_refresh_time = time.time()

Expand Down

0 comments on commit ca5bf36

Please sign in to comment.