Browse files

PYCBC-25: Add support for multipule servers during create\nAdd suppor…

…t for multipule formats of the url config
  • Loading branch information...
1 parent f8270df commit dc7fe00a49f8d012d28286811ee8ac3ddbfd34e9 @nerdynick committed Mar 10, 2012
Showing with 72 additions and 19 deletions.
  1. +72 −19 couchbase/couchbaseclient.py
View
91 couchbase/couchbaseclient.py
@@ -31,6 +31,11 @@
from copy import deepcopy
from rest_client import RestHelper, RestConnection
+try:
+ from urlparse import urlparse
+except ImportError:
+ from urllib.parse import urlparse
+
class MemcachedConstants(object):
# Command constants
CMD_GET = 0
@@ -620,43 +625,93 @@ def deregister_tap_client(self, tap_name):
class VBucketAwareCouchbaseClient(object):
- #poll server every few seconds to see if the vbucket-map
- #has changes
- def __init__(self, url, bucket, password="", verbose=False):
+ """
+ A Couchbase client that maintains vBucket awareness via a working thread
+ that polls the server(s) every few seconds and updates the vBucket map.
+
+ @param server: Server config. In the form of a URL, Dict, or List/Tuple
+ @param bucket: Couchbase bucket you wish to manage
+ @param password: Password for the REST API to Admin bucket
+ @param verbose: Enable loggign
+
+ Examples:
+ client = VBucketAwareCouchbaseClient("http://127.0.0.1:8092", "default")
+ client = VBucketAwareCouchbaseClient("http://username:password@127.0.0.1:8092", "default")
+ client = VBucketAwareCouchbaseClient("http://127.0.0.1:8092", "default", password="password")
+
+ servers = ["http://127.0.0.1:8092", "http://127.0.0.2:8092", "http://127.0.0.3:8092"]
+ client = VBucketAwareCouchbaseClient(servers, "default")
+
+ server = {
+ "ip": "127.0.0.1",
+ "port": 8092,
+ "username": "Admin",
+ "password": "pass"
+ }
+ client = VBucketAwareCouchbaseClient(server, "default")
+ """
+
+ def __init__(self, server, bucket, password="", verbose=False):
self.log = logger.logger("VBucketAwareMemcachedClient")
self.bucket = bucket
+ #TODO: Why does this = bucket. If this is incorrect lets add a new arg.
self.rest_username = bucket
self.rest_password = password
self._memcacheds = {}
self._vBucketMap = {}
self._vBucketMap_lock = Lock()
self._vBucketMapFastForward = {}
self._vBucketMapFastForward_lock = Lock()
- #TODO: use regular expressions to parse the url
- server = {}
+
+ servers = []
if not bucket:
raise InvalidArgumentException("bucket can not be an empty string", parameters="bucket")
- if not url:
- raise InvalidArgumentException("url can not be an empty string", parameters="url")
- if url.find("http://") != -1 and url.rfind(":") != -1 and url.find("/pools/default") != -1:
- server["ip"] = url[url.find("http://") + len("http://"):url.rfind(":")]
- server["port"] = url[url.rfind(":") + 1:url.find("/pools/default")]
- server["username"] = self.rest_username
- server["password"] = self.rest_password
- self.servers = [server]
+ if not server:
+ raise InvalidArgumentException("server can not be an empty string", parameters="server")
+
+ if isinstance(server, str): #Handle URL Strings
+ srv = self.parse_url(server)
+ servers.append(srv)
+ elif isinstance(server, dict): #Handle Pre-Formated Dictionary
+ servers.append(server)
+ elif isinstance(server, list) or isinstance(server, tuple): #Handle a collection of servers
+ for srv in list(server):
+ if isinstance(srv, str):
+ temp = self.parse_url(srv)
+ servers.append(temp)
+ elif isinstance(srv, dict):
+ servers.append(srv)
+
+ self.servers = servers
self.servers_lock = Lock()
self.rest = RestConnection(server)
self.reconfig_vbucket_map()
self.init_vbucket_connections()
- self.dispatcher = CommandDispatcher(self)
+ self.dispatcher = CommandDispatcher(self, verbose=verbose)
self.dispatcher.daemon = True
self.dispatcher.start()
self.streaming_thread = Thread(name="streaming", target=self._start_streaming, args=())
self.streaming_thread.daemon = True
self.streaming_thread.start()
self.verbose = verbose
+
+ def parse_url(self, url):
+ srv = urlparse(url)
+ server = {
+ "ip":srv.hostname,
+ "port":srv.port,
+ "username": self.rest_username,
+ "password": self.rest_username
+ }
+ if srv.username is not None:
+ server["username"] = srv.username
+ self.rest_username = srv.username
+
+ if srv.password is not None:
+ server["password"] = srv.password
+ self.rest_password = srv.password
def _start_streaming(self):
# this will dynamically update vBucketMap, vBucketMapFastForward, servers
@@ -713,11 +768,9 @@ def _start_streaming(self):
nodes = data["nodes"]
for node in nodes:
if node["clusterMembership"] == "active" and node["status"] == "healthy":
- hostport = node["hostname"]
- new_servers.append({"ip":hostport.split(":")[0],
- "port":int(hostport.split(":")[1]),
- "username":self.rest_username,
- "password":self.rest_password})
+ hostport = self.parse_url(node["hostname"])
+ new_servers.append(hostport)
+
new_servers.sort()
self.servers_lock.acquire()
self.servers = deepcopy(new_servers)

1 comment on commit dc7fe00

@BigBlueHat

@nerdynick wrote you an e-mail about this, but I'd love to get these changes merged into the upstream couchbase-python-client. You up for signing a CLA to make that happen?

Thanks!

Please sign in to comment.