Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pymongo/bulk.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,8 @@ def _execute_command(self, generator, write_concern, session,
if retryable and not self.started_retryable_write:
session._start_retryable_write()
self.started_retryable_write = True
session._apply_to(cmd, retryable, ReadPreference.PRIMARY)
session._apply_to(cmd, retryable, ReadPreference.PRIMARY,
sock_info)
sock_info.send_cluster_time(cmd, session, client)
ops = islice(run.ops, run.idx_offset, None)
# Run as many ops as possible in one command.
Expand Down
9 changes: 6 additions & 3 deletions pymongo/client_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -866,7 +866,7 @@ def _txn_read_preference(self):
return self._transaction.opts.read_preference
return None

def _apply_to(self, command, is_retryable, read_preference):
def _apply_to(self, command, is_retryable, read_preference, sock_info):
self._check_ended()

self._server_session.last_use = time.monotonic()
Expand All @@ -891,7 +891,7 @@ def _apply_to(self, command, is_retryable, read_preference):
rc = self._transaction.opts.read_concern.document
if rc:
command['readConcern'] = rc
self._update_read_concern(command)
self._update_read_concern(command, sock_info)

command['txnNumber'] = self._server_session.transaction_id
command['autocommit'] = False
Expand All @@ -900,12 +900,15 @@ def _start_retryable_write(self):
self._check_ended()
self._server_session.inc_transaction_id()

def _update_read_concern(self, cmd):
def _update_read_concern(self, cmd, sock_info):
if (self.options.causal_consistency
and self.operation_time is not None):
cmd.setdefault('readConcern', {})[
'afterClusterTime'] = self.operation_time
if self.options.snapshot:
if sock_info.max_wire_version < 13:
raise ConfigurationError(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be InvalidOperation or ConfigurationError. We seem to use both in this file for various "this thing is not supported" errors. Do you have a preference?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConfigurationError makes more sense to me.

'Snapshot reads require MongoDB 5.0 or later')
rc = cmd.setdefault('readConcern', {})
rc['level'] = 'snapshot'
if self._snapshot_time is not None:
Expand Down
6 changes: 3 additions & 3 deletions pymongo/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,10 +313,10 @@ def as_command(self, sock_info):
session = self.session
sock_info.add_server_api(cmd)
if session:
session._apply_to(cmd, False, self.read_preference)
session._apply_to(cmd, False, self.read_preference, sock_info)
# Explain does not support readConcern.
if not explain and not session.in_transaction:
session._update_read_concern(cmd)
session._update_read_concern(cmd, sock_info)
sock_info.send_cluster_time(cmd, session, self.client)
# Support auto encryption
client = self.client
Expand Down Expand Up @@ -418,7 +418,7 @@ def as_command(self, sock_info):
self.max_await_time_ms)

if self.session:
self.session._apply_to(cmd, False, self.read_preference)
self.session._apply_to(cmd, False, self.read_preference, sock_info)
sock_info.add_server_api(cmd)
sock_info.send_cluster_time(cmd, self.session, self.client)
# Support auto encryption
Expand Down
2 changes: 1 addition & 1 deletion pymongo/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def command(sock_info, dbname, spec, slave_ok, is_mongos,
if read_concern.level:
spec['readConcern'] = read_concern.document
if session:
session._update_read_concern(spec)
session._update_read_concern(spec, sock_info)
if collation is not None:
spec['collation'] = collation

Expand Down
3 changes: 2 additions & 1 deletion pymongo/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,8 @@ def command(self, dbname, spec, slave_ok=False,

self.add_server_api(spec)
if session:
session._apply_to(spec, retryable_write, read_preference)
session._apply_to(spec, retryable_write, read_preference,
self)
self.send_cluster_time(spec, session, client)
listeners = self.listeners if publish_events else None
unacknowledged = write_concern and not write_concern.acknowledged
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
{
"description": "snapshot-sessions-not-supported-client-error",
"schemaVersion": "1.0",
"runOnRequirements": [
{
"minServerVersion": "3.6",
"maxServerVersion": "4.4.99"
}
],
"createEntities": [
{
"client": {
"id": "client0",
"observeEvents": [
"commandStartedEvent",
"commandFailedEvent"
]
}
},
{
"database": {
"id": "database0",
"client": "client0",
"databaseName": "database0"
}
},
{
"collection": {
"id": "collection0",
"database": "database0",
"collectionName": "collection0"
}
},
{
"session": {
"id": "session0",
"client": "client0",
"sessionOptions": {
"snapshot": true
}
}
}
],
"initialData": [
{
"collectionName": "collection0",
"databaseName": "database0",
"documents": [
{
"_id": 1,
"x": 11
}
]
}
],
"tests": [
{
"description": "Client error on find with snapshot",
"operations": [
{
"name": "find",
"object": "collection0",
"arguments": {
"session": "session0",
"filter": {}
},
"expectError": {
"isClientError": true,
"errorContains": "Snapshot reads require MongoDB 5.0 or later"
}
}
],
"expectEvents": []
},
{
"description": "Client error on aggregate with snapshot",
"operations": [
{
"name": "aggregate",
"object": "collection0",
"arguments": {
"session": "session0",
"pipeline": []
},
"expectError": {
"isClientError": true,
"errorContains": "Snapshot reads require MongoDB 5.0 or later"
}
}
],
"expectEvents": []
},
{
"description": "Client error on distinct with snapshot",
"operations": [
{
"name": "distinct",
"object": "collection0",
"arguments": {
"fieldName": "x",
"filter": {},
"session": "session0"
},
"expectError": {
"isClientError": true,
"errorContains": "Snapshot reads require MongoDB 5.0 or later"
}
}
],
"expectEvents": []
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,7 @@
"schemaVersion": "1.0",
"runOnRequirements": [
{
"minServerVersion": "3.6",
"maxServerVersion": "4.4.99"
},
{
"minServerVersion": "3.6",
"minServerVersion": "5.0",
"topologies": [
"single"
]
Expand All @@ -20,11 +16,6 @@
"observeEvents": [
"commandStartedEvent",
"commandFailedEvent"
],
"ignoreCommandMonitoringEvents": [
"findAndModify",
"insert",
"update"
]
}
},
Expand Down Expand Up @@ -106,6 +97,91 @@
]
}
]
},
{
"description": "Server returns an error on aggregate with snapshot",
"operations": [
{
"name": "aggregate",
"object": "collection0",
"arguments": {
"session": "session0",
"pipeline": []
},
"expectError": {
"isError": true,
"isClientError": false
}
}
],
"expectEvents": [
{
"client": "client0",
"events": [
{
"commandStartedEvent": {
"command": {
"aggregate": "collection0",
"readConcern": {
"level": "snapshot",
"atClusterTime": {
"$$exists": false
}
}
}
}
},
{
"commandFailedEvent": {
"commandName": "aggregate"
}
}
]
}
]
},
{
"description": "Server returns an error on distinct with snapshot",
"operations": [
{
"name": "distinct",
"object": "collection0",
"arguments": {
"fieldName": "x",
"filter": {},
"session": "session0"
},
"expectError": {
"isError": true,
"isClientError": false
}
}
],
"expectEvents": [
{
"client": "client0",
"events": [
{
"commandStartedEvent": {
"command": {
"distinct": "collection0",
"readConcern": {
"level": "snapshot",
"atClusterTime": {
"$$exists": false
}
}
}
}
},
{
"commandFailedEvent": {
"commandName": "distinct"
}
}
]
}
]
}
]
}