Skip to content
Fetching contributors…
Cannot retrieve contributors at this time
1080 lines (893 sloc) 40.2 KB
# Copyright 2009-2012 10gen, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you
# may not use this file except in compliance with the License. You
# may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing
# permissions and limitations under the License.
"""Tools for connecting to MongoDB.
.. seealso:: Module :mod:`~pymongo.master_slave_connection` for
connecting to master-slave clusters, and
:doc:`/examples/replica_set` for an example of how to connect to a
replica set.
To get a :class:`~pymongo.database.Database` instance from a
:class:`Connection` use either dictionary-style or attribute-style
access:
.. doctest::
>>> from pymongo import Connection
>>> c = Connection()
>>> c.test_database
Database(Connection('localhost', 27017), u'test_database')
>>> c['test-database']
Database(Connection('localhost', 27017), u'test-database')
"""
import datetime
import socket
import struct
import warnings
from bson.py3compat import b
from bson.son import SON
from pymongo import (common,
database,
helpers,
message,
pool,
uri_parser)
from pymongo.cursor_manager import CursorManager
from pymongo.errors import (AutoReconnect,
ConfigurationError,
ConnectionFailure,
DuplicateKeyError,
InvalidDocument,
InvalidURI,
OperationFailure)
EMPTY = b("")
def _partition_node(node):
"""Split a host:port string returned from mongod/s into
a (host, int(port)) pair needed for socket.connect().
"""
host = node
port = 27017
idx = node.rfind(':')
if idx != -1:
host, port = node[:idx], int(node[idx + 1:])
if host.startswith('['):
host = host[1:-1]
return host, port
class Connection(common.BaseObject):
"""Connection to MongoDB.
"""
HOST = "localhost"
PORT = 27017
__max_bson_size = 4 * 1024 * 1024
def __init__(self, host=None, port=None, max_pool_size=10,
network_timeout=None, document_class=dict,
tz_aware=False, _connect=True, **kwargs):
"""Create a new connection to a single MongoDB instance at *host:port*.
The resultant connection object has connection-pooling built
in. It also performs auto-reconnection when necessary. If an
operation fails because of a connection error,
:class:`~pymongo.errors.ConnectionFailure` is raised. If
auto-reconnection will be performed,
:class:`~pymongo.errors.AutoReconnect` will be
raised. Application code should handle this exception
(recognizing that the operation failed) and then continue to
execute.
Raises :class:`TypeError` if port is not an instance of
``int``. Raises :class:`~pymongo.errors.ConnectionFailure` if
the connection cannot be made.
The `host` parameter can be a full `mongodb URI
<http://dochub.mongodb.org/core/connections>`_, in addition to
a simple hostname. It can also be a list of hostnames or
URIs. Any port specified in the host string(s) will override
the `port` parameter. If multiple mongodb URIs containing
database or auth information are passed, the last database,
username, and password present will be used. For username and
passwords reserved characters like ':', '/', '+' and '@' must be
escaped following RFC 2396.
:Parameters:
- `host` (optional): hostname or IP address of the
instance to connect to, or a mongodb URI, or a list of
hostnames / mongodb URIs. If `host` is an IPv6 literal
it must be enclosed in '[' and ']' characters following
the RFC2732 URL syntax (e.g. '[::1]' for localhost)
- `port` (optional): port number on which to connect
- `max_pool_size` (optional): The maximum size limit for
the connection pool.
- `network_timeout` (optional): timeout (in seconds) to use
for socket operations - default is no timeout
- `document_class` (optional): default class to use for
documents returned from queries on this connection
- `tz_aware` (optional): if ``True``,
:class:`~datetime.datetime` instances returned as values
in a document by this :class:`Connection` will be timezone
aware (otherwise they will be naive)
Other optional parameters can be passed as keyword arguments:
- `safe`: Use getlasterror for each write operation?
- `j` or `journal`: Block until write operations have been commited
to the journal. Ignored if the server is running without journaling.
Implies safe=True.
- `w`: (integer or string) If this is a replica set write operations
won't return until they have been replicated to the specified
number or tagged set of servers.
Implies safe=True.
- `wtimeout`: Used in conjunction with `j` and/or `w`. Wait this many
milliseconds for journal acknowledgement and/or write replication.
Implies safe=True.
- `fsync`: Force the database to fsync all files before returning
When used with `j` the server awaits the next group commit before
returning.
Implies safe=True.
- `replicaSet`: The name of the replica set to connect to. The driver
will verify that the replica set it connects to matches this name.
Implies that the hosts specified are a seed list and the driver should
attempt to find all members of the set.
- `socketTimeoutMS`: How long a send or receive on a socket can take
before timing out.
- `connectTimeoutMS`: How long a connection can take to be opened
before timing out.
- `ssl`: If True, create the connection to the server using SSL.
- `read_preference`: The read preference for this connection.
See :class:`~pymongo.ReadPreference` for available options.
- `auto_start_request`: If True (the default), each thread that
accesses this Connection has a socket allocated to it for the
thread's lifetime. This ensures consistent reads, even if you read
after an unsafe write.
- `use_greenlets` (optional): if ``True``, :meth:`start_request()`
will ensure that the current greenlet uses the same socket for all
operations until :meth:`end_request()`
- `slave_okay` or `slaveOk` (deprecated): Use `read_preference`
instead.
.. seealso:: :meth:`end_request`
.. versionchanged:: 2.1.1+
Added `auto_start_request` option back.
.. versionchanged:: 2.1
Support `w` = integer or string.
Added `ssl` option.
DEPRECATED slave_okay/slaveOk.
.. versionchanged:: 2.0
`slave_okay` is a pure keyword argument. Added support for safe,
and getlasterror options as keyword arguments.
.. versionchanged:: 1.11
Added `max_pool_size`. Completely removed previously deprecated
`pool_size`, `auto_start_request` and `timeout` parameters.
.. versionchanged:: 1.8
The `host` parameter can now be a full `mongodb URI
<http://dochub.mongodb.org/core/connections>`_, in addition
to a simple hostname. It can also be a list of hostnames or
URIs.
.. versionadded:: 1.8
The `tz_aware` parameter.
.. versionadded:: 1.7
The `document_class` parameter.
.. versionadded:: 1.1
The `network_timeout` parameter.
.. mongodoc:: connections
"""
if host is None:
host = self.HOST
if isinstance(host, basestring):
host = [host]
if port is None:
port = self.PORT
if not isinstance(port, int):
raise TypeError("port must be an instance of int")
nodes = set()
username = None
password = None
db = None
options = {}
for entity in host:
if "://" in entity:
if entity.startswith("mongodb://"):
res = uri_parser.parse_uri(entity, port)
nodes.update(res["nodelist"])
username = res["username"] or username
password = res["password"] or password
db = res["database"] or db
options = res["options"]
else:
idx = entity.find("://")
raise InvalidURI("Invalid URI scheme: "
"%s" % (entity[:idx],))
else:
nodes.update(uri_parser.split_hosts(entity, port))
if not nodes:
raise ConfigurationError("need to specify at least one host")
self.__nodes = nodes
self.__host = None
self.__port = None
for option, value in kwargs.iteritems():
option, value = common.validate(option, value)
options[option] = value
self.__max_pool_size = common.validate_positive_integer(
'max_pool_size', max_pool_size)
self.__cursor_manager = CursorManager(self)
self.__repl = options.get('replicaset')
if network_timeout is not None:
if (not isinstance(network_timeout, (int, float)) or
network_timeout <= 0):
raise ConfigurationError("network_timeout must "
"be a positive integer")
self.__net_timeout = (network_timeout or
options.get('sockettimeoutms'))
self.__conn_timeout = options.get('connecttimeoutms')
self.__use_ssl = options.get('ssl', False)
if self.__use_ssl and not pool.have_ssl:
raise ConfigurationError("The ssl module is not available. If you "
"are using a python version previous to "
"2.6 you must install the ssl package "
"from PyPI.")
if options.get('use_greenlets', False):
if not pool.have_greenlet:
raise ConfigurationError(
"The greenlet module is not available. "
"Install the greenlet package from PyPI."
)
self.pool_class = pool.GreenletPool
else:
self.pool_class = pool.Pool
self.__pool = self.pool_class(
None,
self.__max_pool_size,
self.__net_timeout,
self.__conn_timeout,
self.__use_ssl
)
self.__document_class = document_class
self.__tz_aware = common.validate_boolean('tz_aware', tz_aware)
self.__auto_start_request = options.get('auto_start_request', True)
# cache of existing indexes used by ensure_index ops
self.__index_cache = {}
self.__auth_credentials = {}
super(Connection, self).__init__(**options)
if self.slave_okay:
warnings.warn("slave_okay is deprecated. Please "
"use read_preference instead.", DeprecationWarning)
if _connect:
self.__find_node()
if db and username is None:
warnings.warn("must provide a username and password "
"to authenticate to %s" % (db,))
if username:
db = db or "admin"
if not self[db].authenticate(username, password):
raise ConfigurationError("authentication failed")
def _cached(self, dbname, coll, index):
"""Test if `index` is cached.
"""
cache = self.__index_cache
now = datetime.datetime.utcnow()
return (dbname in cache and
coll in cache[dbname] and
index in cache[dbname][coll] and
now < cache[dbname][coll][index])
def _cache_index(self, database, collection, index, ttl):
"""Add an index to the index cache for ensure_index operations.
"""
now = datetime.datetime.utcnow()
expire = datetime.timedelta(seconds=ttl) + now
if database not in self.__index_cache:
self.__index_cache[database] = {}
self.__index_cache[database][collection] = {}
self.__index_cache[database][collection][index] = expire
elif collection not in self.__index_cache[database]:
self.__index_cache[database][collection] = {}
self.__index_cache[database][collection][index] = expire
else:
self.__index_cache[database][collection][index] = expire
def _purge_index(self, database_name,
collection_name=None, index_name=None):
"""Purge an index from the index cache.
If `index_name` is None purge an entire collection.
If `collection_name` is None purge an entire database.
"""
if not database_name in self.__index_cache:
return
if collection_name is None:
del self.__index_cache[database_name]
return
if not collection_name in self.__index_cache[database_name]:
return
if index_name is None:
del self.__index_cache[database_name][collection_name]
return
if index_name in self.__index_cache[database_name][collection_name]:
del self.__index_cache[database_name][collection_name][index_name]
def _cache_credentials(self, db_name, username, password):
"""Add credentials to the database authentication cache
for automatic login when a socket is created.
If credentials are already cached for `db_name` they
will be replaced.
"""
self.__auth_credentials[db_name] = (username, password)
def _purge_credentials(self, db_name=None):
"""Purge credentials from the database authentication cache.
If `db_name` is None purge credentials for all databases.
"""
if db_name is None:
self.__auth_credentials.clear()
elif db_name in self.__auth_credentials:
del self.__auth_credentials[db_name]
def __check_auth(self, sock_info):
"""Authenticate using cached database credentials.
If credentials for the 'admin' database are available only
this database is authenticated, since this gives global access.
"""
authset = sock_info.authset
names = set(self.__auth_credentials.iterkeys())
# Logout from any databases no longer listed in the credentials cache.
for dbname in authset - names:
try:
self.__simple_command(sock_info, dbname, {'logout': 1})
# TODO: We used this socket to logout. Fix logout so we don't
# have to catch this.
except OperationFailure:
pass
authset.discard(dbname)
# Once logged into the admin database we can access anything.
if "admin" in authset:
return
if "admin" in self.__auth_credentials:
username, password = self.__auth_credentials["admin"]
self.__auth(sock_info, 'admin', username, password)
authset.add('admin')
else:
for db_name in names - authset:
user, pwd = self.__auth_credentials[db_name]
self.__auth(sock_info, db_name, user, pwd)
authset.add(db_name)
@property
def host(self):
"""Current connected host.
.. versionchanged:: 1.3
``host`` is now a property rather than a method.
"""
return self.__host
@property
def port(self):
"""Current connected port.
.. versionchanged:: 1.3
``port`` is now a property rather than a method.
"""
return self.__port
@property
def max_pool_size(self):
"""The maximum pool size limit set for this connection.
.. versionadded:: 1.11
"""
return self.__max_pool_size
@property
def nodes(self):
"""List of all known nodes.
Includes both nodes specified when the :class:`Connection` was
created, as well as nodes discovered through the replica set
discovery mechanism.
.. versionadded:: 1.8
"""
return self.__nodes
@property
def auto_start_request(self):
return self.__auto_start_request
def get_document_class(self):
return self.__document_class
def set_document_class(self, klass):
self.__document_class = klass
document_class = property(get_document_class, set_document_class,
doc="""Default class to use for documents
returned on this connection.
.. versionadded:: 1.7
""")
@property
def tz_aware(self):
"""Does this connection return timezone-aware datetimes?
See the `tz_aware` parameter to :meth:`Connection`.
.. versionadded:: 1.8
"""
return self.__tz_aware
@property
def max_bson_size(self):
"""Return the maximum size BSON object the connected server
accepts in bytes. Defaults to 4MB in server < 1.7.4.
.. versionadded:: 1.10
"""
return self.__max_bson_size
def __simple_command(self, sock_info, dbname, spec):
"""Send a command to the server.
"""
rqst_id, msg, _ = message.query(0, dbname + '.$cmd', 0, -1, spec)
sock_info.sock.sendall(msg)
response = self.__receive_message_on_socket(1, rqst_id, sock_info)
response = helpers._unpack_response(response)['data'][0]
msg = "command %r failed: %%s" % spec
helpers._check_command_response(response, None, msg)
return response
def __auth(self, sock_info, dbname, user, passwd):
"""Authenticate socket against database `dbname`.
"""
# Get a nonce
response = self.__simple_command(sock_info, dbname, {'getnonce': 1})
nonce = response['nonce']
key = helpers._auth_key(nonce, user, passwd)
# Actually authenticate
query = SON([('authenticate', 1),
('user', user), ('nonce', nonce), ('key', key)])
self.__simple_command(sock_info, dbname, query)
def __try_node(self, node):
"""Try to connect to this node and see if it works
for our connection type.
:Parameters:
- `node`: The (host, port) pair to try.
"""
self.disconnect()
self.__host, self.__port = node
response = self.admin.command("ismaster")
if "maxBsonObjectSize" in response:
self.__max_bson_size = response["maxBsonObjectSize"]
# Replica Set?
if len(self.__nodes) > 1 or self.__repl:
# Check that this host is part of the given replica set.
if self.__repl:
set_name = response.get('setName')
# The 'setName' field isn't returned by mongod before 1.6.2
# so we can't assume that if it's missing this host isn't in
# the specified set.
if set_name and set_name != self.__repl:
raise ConfigurationError("%s:%d is not a member of "
"replica set %s"
% (node[0], node[1], self.__repl))
if "hosts" in response:
self.__nodes.update([_partition_node(h)
for h in response["hosts"]])
if response["ismaster"]:
return node
elif "primary" in response:
candidate = _partition_node(response["primary"])
return self.__try_node(candidate)
# Explain why we aren't using this connection.
raise AutoReconnect('%s:%d is not primary or master' % node)
# Direct connection
if response.get("arbiterOnly", False):
raise ConfigurationError("%s:%d is an arbiter" % node)
return node
def __find_node(self):
"""Find a host, port pair suitable for our connection type.
If only one host was supplied to __init__ see if we can connect
to it. Don't check if the host is a master/primary so we can make
a direct connection to read from a slave.
If more than one host was supplied treat them as a seed list for
connecting to a replica set. Try to find the primary and fail if
we can't. Possibly updates any replSet information on success.
If the list of hosts is not a seed list for a replica set the
behavior is still the same. We iterate through the list trying
to find a host we can send write operations to.
In either case a connection to an arbiter will never succeed.
Sets __host and __port so that :attr:`host` and :attr:`port`
will return the address of the connected host.
"""
errors = []
# self.__nodes may change size as we iterate.
seeds = self.__nodes.copy()
for candidate in seeds:
try:
node = self.__try_node(candidate)
if node:
return node
except Exception, why:
errors.append(str(why))
# Try any hosts we discovered that were not in the seed list.
for candidate in self.__nodes - seeds:
try:
node = self.__try_node(candidate)
if node:
return node
except Exception, why:
errors.append(str(why))
# Couldn't find a suitable host.
self.disconnect()
raise AutoReconnect(', '.join(errors))
def __socket(self):
"""Get a SocketInfo from the pool.
"""
host, port = (self.__host, self.__port)
if host is None or port is None:
host, port = self.__find_node()
try:
if self.__auto_start_request:
# No effect if a request already started
self.start_request()
sock_info = self.__pool.get_socket((host, port))
except socket.error, why:
self.disconnect()
raise AutoReconnect("could not connect to "
"%s:%d: %s" % (host, port, str(why)))
if self.__auth_credentials:
self.__check_auth(sock_info)
return sock_info
def disconnect(self):
"""Disconnect from MongoDB.
Disconnecting will close all underlying sockets in the
connection pool. If the :class:`Connection` is used again it
will be automatically re-opened. Care should be taken to make
sure that :meth:`disconnect` is not called in the middle of a
sequence of operations in which ordering is important. This
could lead to unexpected results.
.. seealso:: :meth:`end_request`
.. versionadded:: 1.3
"""
self.__pool.reset()
self.__host = None
self.__port = None
def close(self):
"""Alias for :meth:`disconnect`
Disconnecting will close all underlying sockets in the
connection pool. If the :class:`Connection` is used again it
will be automatically re-opened. Care should be taken to make
sure that :meth:`disconnect` is not called in the middle of a
sequence of operations in which ordering is important. This
could lead to unexpected results.
.. seealso:: :meth:`end_request`
.. versionadded:: 2.1
"""
self.disconnect()
def set_cursor_manager(self, manager_class):
"""Set this connection's cursor manager.
Raises :class:`TypeError` if `manager_class` is not a subclass of
:class:`~pymongo.cursor_manager.CursorManager`. A cursor manager
handles closing cursors. Different managers can implement different
policies in terms of when to actually kill a cursor that has
been closed.
:Parameters:
- `manager_class`: cursor manager to use
.. versionchanged:: 2.1+
Deprecated support for external cursor managers.
"""
warnings.warn("Support for external cursor managers is deprecated "
"and will be removed in PyMongo 3.0.", DeprecationWarning)
manager = manager_class(self)
if not isinstance(manager, CursorManager):
raise TypeError("manager_class must be a subclass of "
"CursorManager")
self.__cursor_manager = manager
def __check_response_to_last_error(self, response):
"""Check a response to a lastError message for errors.
`response` is a byte string representing a response to the message.
If it represents an error response we raise OperationFailure.
Return the response as a document.
"""
response = helpers._unpack_response(response)
assert response["number_returned"] == 1
error = response["data"][0]
helpers._check_command_response(error, self.disconnect)
error_msg = error.get("err", "")
if error_msg is None:
return error
if error_msg.startswith("not master"):
self.disconnect()
raise AutoReconnect(error_msg)
if "code" in error:
if error["code"] in [11000, 11001, 12582]:
raise DuplicateKeyError(error["err"])
else:
raise OperationFailure(error["err"], error["code"])
else:
raise OperationFailure(error["err"])
def __check_bson_size(self, message):
"""Make sure the message doesn't include BSON documents larger
than the connected server will accept.
:Parameters:
- `message`: message to check
"""
if len(message) == 3:
(request_id, data, max_doc_size) = message
if max_doc_size > self.__max_bson_size:
raise InvalidDocument("BSON document too large (%d bytes)"
" - the connected server supports"
" BSON document sizes up to %d"
" bytes." %
(max_doc_size, self.__max_bson_size))
return (request_id, data)
else:
# get_more and kill_cursors messages
# don't include BSON documents.
return message
def _send_message(self, message, with_last_error=False):
"""Say something to Mongo.
Raises ConnectionFailure if the message cannot be sent. Raises
OperationFailure if `with_last_error` is ``True`` and the
response to the getLastError call returns an error. Return the
response from lastError, or ``None`` if `with_last_error`
is ``False``.
:Parameters:
- `message`: message to send
- `with_last_error`: check getLastError status after sending the
message
"""
sock_info = self.__socket()
try:
(request_id, data) = self.__check_bson_size(message)
sock_info.sock.sendall(data)
# Safe mode. We pack the message together with a lastError
# message and send both. We then get the response (to the
# lastError) and raise OperationFailure if it is an error
# response.
rv = None
if with_last_error:
response = self.__receive_message_on_socket(1, request_id,
sock_info)
rv = self.__check_response_to_last_error(response)
self.__pool.return_socket(sock_info)
return rv
except (ConnectionFailure, socket.error), e:
self.disconnect()
raise AutoReconnect(str(e))
def __receive_data_on_socket(self, length, sock_info):
"""Lowest level receive operation.
Takes length to receive and repeatedly calls recv until able to
return a buffer of that length, raising ConnectionFailure on error.
"""
chunks = []
while length:
try:
chunk = sock_info.sock.recv(length)
except:
# If recv was interrupted, discard the socket
# and re-raise the exception.
self.__pool.discard_socket(sock_info)
raise
if chunk == EMPTY:
raise ConnectionFailure("connection closed")
length -= len(chunk)
chunks.append(chunk)
return EMPTY.join(chunks)
def __receive_message_on_socket(self, operation, request_id, sock_info):
"""Receive a message in response to `request_id` on `sock`.
Returns the response data with the header removed.
"""
header = self.__receive_data_on_socket(16, sock_info)
length = struct.unpack("<i", header[:4])[0]
assert request_id == struct.unpack("<i", header[8:12])[0], \
"ids don't match %r %r" % (request_id,
struct.unpack("<i", header[8:12])[0])
assert operation == struct.unpack("<i", header[12:])[0]
return self.__receive_data_on_socket(length - 16, sock_info)
def __send_and_receive(self, message, sock_info):
"""Send a message on the given socket and return the response data.
"""
(request_id, data) = self.__check_bson_size(message)
sock_info.sock.sendall(data)
return self.__receive_message_on_socket(1, request_id, sock_info)
# we just ignore _must_use_master here: it's only relevant for
# MasterSlaveConnection instances.
def _send_message_with_response(self, message,
_must_use_master=False, **kwargs):
"""Send a message to Mongo and return the response.
Sends the given message and returns the response.
:Parameters:
- `message`: (request_id, data) pair making up the message to send
"""
sock_info = self.__socket()
try:
try:
if "network_timeout" in kwargs:
sock_info.sock.settimeout(kwargs["network_timeout"])
return self.__send_and_receive(message, sock_info)
except (ConnectionFailure, socket.error), e:
self.disconnect()
raise AutoReconnect(str(e))
finally:
if "network_timeout" in kwargs:
try:
# Restore the socket's original timeout and return it to
# the pool
sock_info.sock.settimeout(self.__net_timeout)
self.__pool.return_socket(sock_info)
except socket.error:
# There was an exception and we've closed the socket
pass
else:
self.__pool.return_socket(sock_info)
def start_request(self):
"""Ensure the current thread or greenlet always uses the same socket
until it calls :meth:`end_request`. This ensures consistent reads,
even if you read after an unsafe write.
In Python 2.6 and above, or in Python 2.5 with
"from __future__ import with_statement", :meth:`start_request` can be
used as a context manager:
>>> connection = pymongo.Connection(auto_start_request=False)
>>> db = connection.test
>>> _id = db.test_collection.insert({}, safe=True)
>>> with connection.start_request():
... for i in range(100):
... db.test_collection.update({'_id': _id}, {'$set': {'i':i}})
...
... # Definitely read the document after the final update completes
... print db.test_collection.find({'_id': _id})
.. versionadded:: 2.1.1+
The :class:`~pymongo.pool.Request` return value.
:meth:`start_request` previously returned None
"""
self.__pool.start_request()
return pool.Request(self)
def in_request(self):
"""True if :meth:`start_request` has been called, but not
:meth:`end_request`, or if `auto_start_request` is True and
:meth:`end_request` has not been called in this thread or greenlet.
"""
return self.__pool.in_request()
def end_request(self):
"""Undo :meth:`start_request` and allow this thread's connection to
return to the pool.
Calling :meth:`end_request` allows the :class:`~socket.socket` that has
been reserved for this thread by :meth:`start_request` to be returned to
the pool. Other threads will then be able to re-use that
:class:`~socket.socket`. If your application uses many threads, or has
long-running threads that infrequently perform MongoDB operations, then
judicious use of this method can lead to performance gains. Care should
be taken, however, to make sure that :meth:`end_request` is not called
in the middle of a sequence of operations in which ordering is
important. This could lead to unexpected results.
"""
self.__pool.end_request()
def __eq__(self, other):
if isinstance(other, Connection):
us = (self.__host, self.__port)
them = (other.__host, other.__port)
return us == them
return NotImplemented
def __repr__(self):
if len(self.__nodes) == 1:
return "Connection(%r, %r)" % (self.__host, self.__port)
else:
return "Connection(%r)" % ["%s:%d" % n for n in self.__nodes]
def __getattr__(self, name):
"""Get a database by name.
Raises :class:`~pymongo.errors.InvalidName` if an invalid
database name is used.
:Parameters:
- `name`: the name of the database to get
"""
return database.Database(self, name)
def __getitem__(self, name):
"""Get a database by name.
Raises :class:`~pymongo.errors.InvalidName` if an invalid
database name is used.
:Parameters:
- `name`: the name of the database to get
"""
return self.__getattr__(name)
def close_cursor(self, cursor_id):
"""Close a single database cursor.
Raises :class:`TypeError` if `cursor_id` is not an instance of
``(int, long)``. What closing the cursor actually means
depends on this connection's cursor manager.
:Parameters:
- `cursor_id`: id of cursor to close
.. seealso:: :meth:`set_cursor_manager` and
the :mod:`~pymongo.cursor_manager` module
"""
if not isinstance(cursor_id, (int, long)):
raise TypeError("cursor_id must be an instance of (int, long)")
self.__cursor_manager.close(cursor_id)
def kill_cursors(self, cursor_ids):
"""Send a kill cursors message with the given ids.
Raises :class:`TypeError` if `cursor_ids` is not an instance of
``list``.
:Parameters:
- `cursor_ids`: list of cursor ids to kill
"""
if not isinstance(cursor_ids, list):
raise TypeError("cursor_ids must be a list")
return self._send_message(message.kill_cursors(cursor_ids))
def server_info(self):
"""Get information about the MongoDB server we're connected to.
"""
return self.admin.command("buildinfo")
def database_names(self):
"""Get a list of the names of all databases on the connected server.
"""
return [db["name"] for db in
self.admin.command("listDatabases")["databases"]]
def drop_database(self, name_or_database):
"""Drop a database.
Raises :class:`TypeError` if `name_or_database` is not an instance of
:class:`basestring` (:class:`str` in python 3) or Database.
:Parameters:
- `name_or_database`: the name of a database to drop, or a
:class:`~pymongo.database.Database` instance representing the
database to drop
"""
name = name_or_database
if isinstance(name, database.Database):
name = name.name
if not isinstance(name, basestring):
raise TypeError("name_or_database must be an instance of "
"%s or Database" % (basestring.__name__,))
self._purge_index(name)
self[name].command("dropDatabase")
def copy_database(self, from_name, to_name,
from_host=None, username=None, password=None):
"""Copy a database, potentially from another host.
Raises :class:`TypeError` if `from_name` or `to_name` is not
an instance of :class:`basestring` (:class:`str` in python 3).
Raises :class:`~pymongo.errors.InvalidName` if `to_name` is
not a valid database name.
If `from_host` is ``None`` the current host is used as the
source. Otherwise the database is copied from `from_host`.
If the source database requires authentication, `username` and
`password` must be specified.
:Parameters:
- `from_name`: the name of the source database
- `to_name`: the name of the target database
- `from_host` (optional): host name to copy from
- `username` (optional): username for source database
- `password` (optional): password for source database
.. note:: Specifying `username` and `password` requires server
version **>= 1.3.3+**.
.. versionadded:: 1.5
"""
if not isinstance(from_name, basestring):
raise TypeError("from_name must be an instance "
"of %s" % (basestring.__name__,))
if not isinstance(to_name, basestring):
raise TypeError("to_name must be an instance "
"of %s" % (basestring.__name__,))
database._check_name(to_name)
command = {"fromdb": from_name, "todb": to_name}
if from_host is not None:
command["fromhost"] = from_host
in_request = self.in_request()
try:
if not in_request:
self.start_request()
if username is not None:
nonce = self.admin.command("copydbgetnonce",
fromhost=from_host)["nonce"]
command["username"] = username
command["nonce"] = nonce
command["key"] = helpers._auth_key(nonce, username, password)
return self.admin.command("copydb", **command)
finally:
if not in_request:
self.end_request()
@property
def is_locked(self):
"""Is this server locked? While locked, all write operations
are blocked, although read operations may still be allowed.
Use :meth:`~pymongo.connection.Connection.unlock` to unlock.
.. versionadded:: 2.0
"""
ops = self.admin.current_op()
return bool(ops.get('fsyncLock', 0))
def fsync(self, **kwargs):
"""Flush all pending writes to datafiles.
:Parameters:
Optional parameters can be passed as keyword arguments:
- `lock`: If True lock the server to disallow writes.
- `async`: If True don't block while synchronizing.
.. warning:: `async` and `lock` can not be used together.
.. warning:: MongoDB does not support the `async` option
on Windows and will raise an exception on that
platform.
.. versionadded:: 2.0
"""
self.admin.command("fsync", **kwargs)
def unlock(self):
"""Unlock a previously locked server.
.. versionadded:: 2.0
"""
self.admin['$cmd'].sys.unlock.find_one()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.disconnect()
def __iter__(self):
return self
def next(self):
raise TypeError("'Connection' object is not iterable")
Jump to Line
Something went wrong with that request. Please try again.