Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

Adding support for Replica Sets.

* Supporting for multiple hosts and mongo uri's (using uri_parse)
  - Client('foo', host='mongodb://localhost:27017,localhost:27018/test')
  - Client('foo', host=['localhost:27017', 'localhost:27018'], dbname='test')
  and keeping the old:
  - Client('foo', host='localhost', port=27017, dbname='test')
* Increasing required pymongo to 2.0 for uri_parse
* Creating a new Connection() is now asynchronous
  - must be passed a callback which is called with the connected connection
* Updating Connection's to replSets to behave the same way as pymongo
* Adding Pool.command to run db commands
* Making .gitignore more vim friendly
  • Loading branch information...
commit 7c2af310b85e7d92410a18cb1ebbebaa4011802c 1 parent b59201c
authored September 19, 2011
4  .gitignore
... ...
@@ -1,3 +1,7 @@
1 1
 *.pyc
2 2
 build
3 3
 dist
  4
+*~
  5
+*.swp
  6
+tags
  7
+PYSMELLTAGS
2  README.md
Source Rendered
@@ -49,7 +49,7 @@ Requirements
49 49
 ------------
50 50
 The following two python libraries are required
51 51
 
52  
-* [pymongo](http://github.com/mongodb/mongo-python-driver) version 1.9+ for bson library
  52
+* [pymongo](http://github.com/mongodb/mongo-python-driver) version 2.0+ for bson library and uri_parse
53 53
 * [tornado](http://github.com/facebook/tornado)
54 54
 
55 55
 Issues
190  asyncmongo/connection.py
@@ -27,7 +27,12 @@
27 27
 import helpers
28 28
 import struct
29 29
 import logging
  30
+import functools
  31
+import message
  32
+import contextlib
30 33
 
  34
+from bson.son import SON
  35
+from tornado.stack_context import StackContext
31 36
 from errors import ProgrammingError, IntegrityError, InterfaceError
32 37
 
33 38
 class Connection(object):
@@ -35,25 +40,48 @@ class Connection(object):
35 40
     :Parameters:
36 41
       - `host`: hostname or ip of mongo host
37 42
       - `port`: port to connect to
  43
+      - `create_callback`: callback to be called with the connected self
38 44
       - `autoreconnect` (optional): auto reconnect on interface errors
39 45
       
40 46
     """
41  
-    def __init__(self, host, port, autoreconnect=True, pool=None):
42  
-        assert isinstance(host, (str, unicode))
43  
-        assert isinstance(port, int)
  47
+    def __init__(self,
  48
+                 nodes,
  49
+                 slave_okay=True,
  50
+                 autoreconnect=True,
  51
+                 create_callback=None,
  52
+                 pool=None):
  53
+        assert isinstance(nodes, set)
  54
+        assert isinstance(slave_okay, bool)
44 55
         assert isinstance(autoreconnect, bool)
  56
+        assert callable(create_callback)
45 57
         assert pool
46  
-        self.__host = host
47  
-        self.__port = port
  58
+        self.__nodes = nodes
  59
+        self.__host = None
  60
+        self.__port = None
48 61
         self.__stream = None
49 62
         self.__callback = None
50 63
         self.__alive = False
51  
-        self.__connect()
  64
+        self.__slave_okay = slave_okay
52 65
         self.__autoreconnect = autoreconnect
53 66
         self.__pool = pool
  67
+        self.__repl = None
54 68
         self.usage_count = 0
  69
+        self.__connect(callback=create_callback)
  70
+
  71
+    def __connect(self, callback):
  72
+        """Begin the connection process, sets up connection state
  73
+        and associated stack context.
  74
+
  75
+        :Parameters:
  76
+         - `callback`: called when connected
  77
+        """
  78
+        connection_state = _ConnectionState(self.__nodes)
  79
+        connection_manager = functools.partial(self.__connection_manager,
  80
+                state=connection_state, callback=callback)
  81
+        with StackContext(connection_manager):
  82
+            self.__find_node(connection_state)
55 83
     
56  
-    def __connect(self):
  84
+    def __socket_connect(self):
57 85
         try:
58 86
             s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
59 87
             s.connect((self.__host, self.__port))
@@ -62,13 +90,121 @@ def __connect(self):
62 90
             self.__alive = True
63 91
         except socket.error, error:
64 92
             raise InterfaceError(error)
65  
-    
  93
+
  94
+    def __try_node(self, node):
  95
+        """Try to connect to this node and see if it works
  96
+        for our connection type.
  97
+
  98
+        :Parameters:
  99
+         - `node`: The (host, port) pair to try.
  100
+
  101
+        Based on pymongo.Connection.__try_node
  102
+        """
  103
+        if self.__alive:
  104
+            self.close()
  105
+        self.__host, self.__port = node
  106
+        self.__socket_connect()
  107
+
  108
+        command = message.query(
  109
+                options=0,
  110
+                collection_name='admin.$cmd',
  111
+                num_to_skip=0,
  112
+                num_to_return=-1,
  113
+                query=SON([('ismaster', 1)]))
  114
+        self.send_message(command,
  115
+            callback=functools.partial(self.__handle_ismaster, node=node))
  116
+
  117
+    def __handle_ismaster(self, result, error=None, node=None):
  118
+        if error:
  119
+            raise error
  120
+
  121
+        if len(result['data']) == 1:
  122
+            response = result['data'][0]
  123
+        else:
  124
+            raise InterfaceError('Invalid response returned: %s' %
  125
+                    result['data'])
  126
+
  127
+        # Replica Set?
  128
+        if len(self.__nodes) > 1 or self.__repl:
  129
+            # Check that this host is part of the given replica set.
  130
+            if self.__repl:
  131
+                set_name = response.get('setName')
  132
+                # The 'setName' field isn't returned by mongod before 1.6.2
  133
+                # so we can't assume that if it's missing this host isn't in
  134
+                # the specified set.
  135
+                if set_name and set_name != self.__repl:
  136
+                    raise InterfaceError("%s:%d is not a member of "
  137
+                            "replica set %s" % (node[0], node[1], self.__repl))
  138
+            if "hosts" in response:
  139
+                self.__nodes.update([_partition_node(h)
  140
+                                     for h in response["hosts"]])
  141
+            if response["ismaster"]:
  142
+                raise _NodeFound(node)
  143
+            elif "primary" in response:
  144
+                candidate = _partition_node(response["primary"])
  145
+                return self.__try_node(candidate)
  146
+
  147
+            # Explain why we aren't using this connection.
  148
+            raise InterfaceError('%s:%d is not primary' % node)
  149
+
  150
+        # Direct connection
  151
+        else:
  152
+            if response.get("arbiterOnly", False):
  153
+                raise ProgrammingError("%s:%d is an arbiter" % node)
  154
+            raise _NodeFound(node)
  155
+
  156
+    def __find_node(self, state):
  157
+        """Find a host, port pair suitable for our connection type.
  158
+
  159
+        If only one host was supplied to __init__ see if we can connect
  160
+        to it. Don't check if the host is a master/primary so we can make
  161
+        a direct connection to read from a slave.
  162
+
  163
+        If more than one host was supplied treat them as a seed list for
  164
+        connecting to a replica set. Try to find the primary and fail if
  165
+        we can't. Possibly updates any replSet information on success.
  166
+
  167
+        If the list of hosts is not a seed list for a replica set the
  168
+        behavior is still the same. We iterate through the list trying
  169
+        to find a host we can send write operations to.
  170
+
  171
+        In either case a connection to an arbiter will never succeed.
  172
+
  173
+        Based on pymongo.Connection.__find_node
  174
+        """
  175
+        try:
  176
+            node = state.remaining.pop()
  177
+        except KeyError:
  178
+            if state.tested_all_seeds:
  179
+                # We have failed to find a node...
  180
+                error = InterfaceError(', '.join(state.errors))
  181
+                self.__create_callback(None, error=error)
  182
+            else:
  183
+                # No luck with seeds; let's see if we discovered a new node
  184
+                state.tested_all_seeds = True
  185
+                state.remaining = self.__nodes.copy() - state.seeds
  186
+                self.__find_node(state)
  187
+        else:
  188
+            self.__try_node(node)
  189
+
  190
+    @contextlib.contextmanager
  191
+    def __connection_manager(self, state, callback):
  192
+        try:
  193
+            yield
  194
+        except _NodeFound:
  195
+            callback(self)
  196
+        except Exception, why:
  197
+            state.errors.append(str(why))
  198
+            self.__find_node(state)
  199
+
66 200
     def _socket_close(self):
67 201
         """cleanup after the socket is closed by the other end"""
68 202
         if self.__callback:
69 203
             self.__callback(None, InterfaceError('connection closed'))
70 204
         self.__callback = None
71 205
         self.__alive = False
  206
+        self.__host = None
  207
+        self.__port = None
72 208
         self.__pool.cache(self)
73 209
     
74 210
     def _close(self):
@@ -77,6 +213,8 @@ def _close(self):
77 213
             self.__callback(None, InterfaceError('connection closed'))
78 214
         self.__callback = None
79 215
         self.__alive = False
  216
+        self.__host = None
  217
+        self.__port = None
80 218
         self.__stream._close_callback = None
81 219
         self.__stream.close()
82 220
     
@@ -95,7 +233,10 @@ def send_message(self, message, callback):
95 233
         
96 234
         if not self.__alive:
97 235
             if self.__autoreconnect:
98  
-                self.__connect()
  236
+                # logging.warn('connection lost, reconnecting')
  237
+                self.__connect(functools.partial(Connection.send_message,
  238
+                    message=message, callback=callback))
  239
+                return
99 240
             else:
100 241
                 raise InterfaceError('connection invalid. autoreconnect=False')
101 242
         
@@ -156,3 +297,34 @@ def _parse_response(self, response):
156 297
         # logging.info('response: %s' % response)
157 298
         callback(response)
158 299
 
  300
+
  301
+class _ConnectionState(object):
  302
+    def __init__(self, nodes):
  303
+        self.errors = []
  304
+        self.node_found = False
  305
+        self.tested_all_seeds = False
  306
+        self.nodes = nodes
  307
+        self.seeds = nodes.copy()
  308
+        self.remaining = nodes.copy()
  309
+
  310
+
  311
+class _NodeFound(StandardError):
  312
+    def __init__(self, node):
  313
+        super(_NodeFound, self).__init__('Node %s:%d' % node)
  314
+        self.node = node
  315
+
  316
+
  317
+def _partition_node(node):
  318
+    """Split a host:port string returned from mongod/s into
  319
+    a (host, int(port)) pair needed for socket.connect().
  320
+
  321
+    From pymongo.connection._partition_node
  322
+    """
  323
+    host = node
  324
+    port = 27017
  325
+    idx = node.rfind(':')
  326
+    if idx != -1:
  327
+        host, port = node[:idx], int(node[idx + 1:])
  328
+    if host.startswith('['):
  329
+        host = host[1:-1]
  330
+    return host, port
76  asyncmongo/cursor.py
@@ -19,7 +19,7 @@
19 19
 from bson.son import SON
20 20
 
21 21
 import helpers
22  
-import message
  22
+import message as message_factory
23 23
 import functools
24 24
 
25 25
 _QUERY_OPTIONS = {
@@ -114,15 +114,12 @@ def insert(self, doc_or_docs,
114 114
         if callback:
115 115
             callback = functools.partial(self._handle_response, orig_callback=callback)
116 116
 
117  
-        connection = self.__pool.connection()
118  
-        try:
119  
-            connection.send_message(
120  
-                message.insert(self.full_collection_name, docs,
121  
-                    check_keys, safe, kwargs), callback=callback)
122  
-        except:
123  
-            connection.close()
124  
-            raise
125  
-    
  117
+        message = message_factory.insert(self.full_collection_name, docs,
  118
+                    check_keys, safe, kwargs)
  119
+        connection_callback = functools.partial(self._send_message,
  120
+                message=message, callback=callback)
  121
+        self.__pool.connection(connection_callback)
  122
+
126 123
     def remove(self, spec_or_id=None, safe=True, callback=None, **kwargs):
127 124
         if not isinstance(safe, bool):
128 125
             raise TypeError("safe must be an instance of bool")
@@ -144,16 +141,12 @@ def remove(self, spec_or_id=None, safe=True, callback=None, **kwargs):
144 141
         if callback:
145 142
             callback = functools.partial(self._handle_response, orig_callback=callback)
146 143
 
147  
-        connection = self.__pool.connection()
148  
-        try:
149  
-            connection.send_message(
150  
-                message.delete(self.full_collection_name, spec_or_id, safe, kwargs),
151  
-                    callback=callback)
152  
-        except:
153  
-            connection.close()
154  
-            raise
  144
+        message = message_factory.delete(self.full_collection_name,
  145
+                spec_or_id, safe, kwargs)
  146
+        connection_callback = functools.partial(self._send_message,
  147
+                message=message, callback=callback)
  148
+        self.__pool.connection(connection_callback)
155 149
 
156  
-    
157 150
     def update(self, spec, document, upsert=False, manipulate=False,
158 151
                safe=True, multi=False, callback=None, **kwargs):
159 152
         """Update a document(s) in this collection.
@@ -230,32 +223,28 @@ def update(self, spec, document, upsert=False, manipulate=False,
230 223
         # TODO: apply SON manipulators
231 224
         # if upsert and manipulate:
232 225
         #     document = self.__database._fix_incoming(document, self)
233  
-        
  226
+
234 227
         if kwargs:
235 228
             safe = True
236  
-        
  229
+
237 230
         if safe and not callable(callback):
238 231
             raise TypeError("callback must be callable")
239 232
         if not safe and callback is not None:
240 233
             raise TypeError("callback can not be used with safe=False")
241  
-        
  234
+
242 235
         if callback:
243 236
             callback = functools.partial(self._handle_response, orig_callback=callback)
244 237
 
245 238
         self.__limit = None
246  
-        connection = self.__pool.connection()
247  
-        try:
248  
-            connection.send_message(
249  
-                message.update(self.full_collection_name, upsert, multi,
250  
-                    spec, document, safe, kwargs), callback=callback)
251  
-        except:
252  
-            connection.close()
253  
-            raise
  239
+        message = message_factory.update(self.full_collection_name, upsert,
  240
+                multi, spec, document, safe, kwargs)
  241
+        connection_callback = functools.partial(self._send_message,
  242
+                message=message, callback=callback)
  243
+        self.__pool.connection(connection_callback)
254 244
 
255  
-    
256 245
     def find_one(self, spec_or_id, **kwargs):
257 246
         """Get a single document from the database.
258  
-        
  247
+
259 248
         All arguments to :meth:`find` are also valid arguments for
260 249
         :meth:`find_one`, although any `limit` argument will be
261 250
         ignored. Returns a single document, or ``None`` if no matching
@@ -358,7 +347,7 @@ def find(self, spec=None, fields=None, skip=0, limit=0,
358 347
         self.__skip = skip
359 348
         self.__limit = limit
360 349
         self.__batch_size = 0
361  
-        
  350
+
362 351
         self.__timeout = timeout
363 352
         self.__tailable = tailable
364 353
         self.__snapshot = snapshot
@@ -371,14 +360,21 @@ def find(self, spec=None, fields=None, skip=0, limit=0,
371 360
         self.__tz_aware = False #collection.database.connection.tz_aware
372 361
         self.__must_use_master = _must_use_master
373 362
         self.__is_command = _is_command
374  
-        
375  
-        connection = self.__pool.connection()
  363
+
  364
+        message = message_factory.query(self.__query_options(),
  365
+                self.full_collection_name, self.__skip, self.__limit,
  366
+                self.__query_spec(), self.__fields)
  367
+        message_callback = functools.partial(self._handle_response,
  368
+                orig_callback=callback)
  369
+        connection_callback = functools.partial(self._send_message,
  370
+                message=message, callback=message_callback)
  371
+        self.__pool.connection(connection_callback)
  372
+
  373
+    def _send_message(self, connection, message, callback=None, error=None):
  374
+        if error:
  375
+            raise error
376 376
         try:
377  
-            connection.send_message(
378  
-                message.query(self.__query_options(),
379  
-                              self.full_collection_name,
380  
-                              self.__skip, self.__limit,
381  
-                              self.__query_spec(), self.__fields), callback=functools.partial(self._handle_response, orig_callback=callback))
  377
+            connection.send_message(message, callback)
382 378
         except:
383 379
             connection.close()
384 380
     
70  asyncmongo/pool.py
@@ -18,6 +18,9 @@
18 18
 import logging
19 19
 from errors import TooManyConnections, ProgrammingError
20 20
 from connection import Connection
  21
+from pymongo import uri_parser
  22
+from bson.son import SON
  23
+
21 24
 
22 25
 class ConnectionPools(object):
23 26
     """ singleton to keep track of named connection pools """
@@ -61,6 +64,8 @@ class ConnectionPool(object):
61 64
     
62 65
     """
63 66
     def __init__(self, 
  67
+                host=None, 
  68
+                port=27017, 
64 69
                 mincached=0, 
65 70
                 maxcached=0, 
66 71
                 maxconnections=0, 
@@ -68,12 +73,19 @@ def __init__(self,
68 73
                 dbname=None, 
69 74
                 slave_okay=False, 
70 75
                 *args, **kwargs):
  76
+
  77
+        if isinstance(host, basestring):
  78
+            host = [host]
  79
+        else:
  80
+            assert isinstance(host, list)
  81
+        assert isinstance(port, int)
71 82
         assert isinstance(mincached, int)
72 83
         assert isinstance(maxcached, int)
73 84
         assert isinstance(maxconnections, int)
74 85
         assert isinstance(maxusage, int)
75 86
         assert isinstance(dbname, (str, unicode, None.__class__))
76 87
         assert isinstance(slave_okay, bool)
  88
+
77 89
         if mincached and maxcached:
78 90
             assert mincached <= maxcached
79 91
         if maxconnections:
@@ -86,36 +98,62 @@ def __init__(self,
86 98
         self._maxconnections = maxconnections
87 99
         self._idle_cache = [] # the actual connections that can be used
88 100
         self._condition = Condition()
89  
-        self._dbname = dbname
90  
-        self._slave_okay = slave_okay
  101
+        self._kwargs['slave_okay'] = self._slave_okay = slave_okay
91 102
         self._connections = 0
92  
-        
  103
+
  104
+        nodes = set()
  105
+        username = None  # TODO: username/password ignored for now
  106
+        password = None
  107
+        for entity in host:
  108
+            if "://" in entity:
  109
+                if entity.startswith("mongodb://"):
  110
+                    res = uri_parser.parse_uri(entity, port)
  111
+                    nodes.update(res["nodelist"])
  112
+                    username = res["username"] or username
  113
+                    password = res["password"] or password
  114
+                    dbname = res["database"] or dbname
  115
+                else:
  116
+                    idx = entity.find("://")
  117
+                    raise ProgrammingError("Invalid URI scheme: "
  118
+                                     "%s" % (entity[:idx],))
  119
+            else:
  120
+                nodes.update(uri_parser.split_hosts(entity, port))
  121
+        if not nodes:
  122
+            raise ProgrammingError("Need to specify at least one host")
  123
+        self._nodes = nodes
  124
+        self._dbname = dbname
  125
+
93 126
         # Establish an initial number of idle database connections:
94 127
         idle = [self.connection() for i in range(mincached)]
95 128
         while idle:
96 129
             self.cache(idle.pop())
97  
-    
98  
-    def new_connection(self):
  130
+
  131
+    def new_connection(self, callback):
99 132
         kwargs = self._kwargs
100 133
         kwargs['pool'] = self
101  
-        return Connection(*self._args, **kwargs)
102  
-    
103  
-    def connection(self):
  134
+        return Connection(*self._args, nodes=self._nodes,
  135
+                create_callback=callback, **kwargs)
  136
+
  137
+    def connection(self, callback):
104 138
         """ get a cached connection from the pool """
105  
-        
  139
+
  140
+        con = None
106 141
         self._condition.acquire()
107 142
         try:
108 143
             if (self._maxconnections and self._connections >= self._maxconnections):
109  
-                raise TooManyConnections("%d connections are active greater than max: %d" % (self._connections, self._maxconnections))
  144
+                raise TooManyConnections("%d connections are active greater "
  145
+                        "than max: %d" % (self._connections, self._maxconnections))
110 146
             # connection limit not reached, get a dedicated connection
111 147
             try: # first try to get it from the idle cache
112 148
                 con = self._idle_cache.pop(0)
113  
-            except IndexError: # else get a fresh connection
114  
-                con = self.new_connection()
  149
+            except IndexError: # else get a fresh connection, async
  150
+                self.new_connection(callback)
115 151
             self._connections += 1
116 152
         finally:
117 153
             self._condition.release()
118  
-        return con
  154
+        # reusing a connection, so send it to the callback
  155
+        if con:
  156
+            callback(con)
119 157
 
120 158
     def cache(self, con):
121 159
         """Put a dedicated connection back into the idle cache."""
@@ -167,3 +205,9 @@ def __set_slave_okay(self, value):
167 205
         self._slave_okay = value
168 206
 
169 207
     slave_okay = property(__get_slave_okay, __set_slave_okay)
  208
+
  209
+    def command(self, command, value=1, **kwargs):
  210
+        if isinstance(command, basestring):
  211
+            command = SON([(command, value)])
  212
+
  213
+        self["$cmd"].find_one(command, _is_command=True, **kwargs)
4  setup.py
@@ -18,7 +18,7 @@
18 18
         "License :: OSI Approved :: Apache Software License",
19 19
     ],
20 20
     packages=['asyncmongo'],
21  
-    install_requires=['pymongo>=1.9', 'tornado'],
22  
-    requires=['pymongo (>=1.9)', 'tornado'],
  21
+    install_requires=['pymongo>=2.0', 'tornado'],
  22
+    requires=['pymongo (>=2.0)', 'tornado'],
23 23
     download_url="http://github.com/downloads/bitly/asyncmongo/asyncmongo-%s.tar.gz" % version,
24 24
 )

0 notes on commit 7c2af31

Please sign in to comment.
Something went wrong with that request. Please try again.