Permalink
Browse files

adding a redirector abstraction

  • Loading branch information...
1 parent 29a9e04 commit e730abc3ff85f7371c0683cc12868169dbc9b0ab BuzzTroll committed Oct 11, 2010
Showing with 87 additions and 44 deletions.
  1. +7 −0 cumulus/cb/pycb/__init__.py
  2. +65 −0 cumulus/cb/pycb/cbRedirector.py
  3. +15 −44 cumulus/cb/pycb/cumulus.py
View
7 cumulus/cb/pycb/__init__.py
@@ -109,6 +109,7 @@ def default_settings(self):
self.block_size = 1024*512
self.lb_file = None
self.lb_max = 0
+ self.redirector = cbRedirectorIface()
def get_contact(self):
return (self.hostname, self.port)
@@ -196,7 +197,13 @@ def load_settings(self):
try:
self.lb_file = s.get("load_balanced", "hostfile")
self.lb_max = int(s.get("load_balanced", "max"))
+ except:
+ pass
+ try:
+ redirector_name = s.get("redirector", "type")
+ if redirector_name == "basic":
+ self.redirector = cbBasicRedirector(s)
except:
pass
View
65 cumulus/cb/pycb/cbRedirector.py
@@ -0,0 +1,65 @@
+import os
+import sys
+from pycb.cbException import cbException
+import pycb
+import stat
+import urllib
+import glob
+import errno
+import logging
+import threading
+import tempfile
+import hashlib
+import traceback
+import time
+import pycb
+
+class cbRedirectorIface(object):
+
+ # return new host direction or None
+ def new_connection(self, request):
+ return None
+
+ # called when a connection is closed
+ def end_connection(self, request):
+ pass
+
+
+class cbBasicRedirector(object):
+
+ def __init__(self, parser):
+ self.max = max_connections
+ self.connection_count = 0
+ self.host_file = parser.get("load_balanced", "hostfile")
+ self.max = int(parser.get("load_balanced", "max"))
+
+ def new_connection(self, request):
+ h = None
+ self.connection_count = self.connection_count + 1
+ if g_connection_count >= self.max:
+ h = self.get_next_host()
+ return h
+
+ def end_connection(self, request):
+ self.connection_count = self.connection_count - 1
+
+ def get_next_host():
+ try:
+ hosts = []
+ f = open(self.host_file, "r")
+ for l in f.readlines():
+ hosts.append(l.strip())
+ f.close()
+
+ my_host = "%s:%d" % (pycb.config.hostname, pycb.config.port)
+
+ for i in range(0, 10):
+ ndx = random.randint(0, len(hosts)-1)
+ h = hosts[ndx]
+ if h != my_host:
+ return h
+ return h
+ except Exception, ex:
+ log(logging.ERROR, "get next host error %s" % (str(ex)))
+ return None
+
View
59 cumulus/cb/pycb/cumulus.py
@@ -29,48 +29,20 @@
import tempfile
import threading
-count_lock = threading.Lock()
-g_connection_count = 0
-
-def connection_count_inc(req):
- global count_locks
- global g_connection_count
- count_lock.acquire()
- try:
- g_connection_count = g_connection_count + 1
- d = req.notifyFinish()
- d.addBoth(cb_expired)
- finally:
- count_lock.release()
-
-def cb_expired(fail=None):
- global count_lock
- global g_connection_count
- count_lock.acquire()
- try:
- g_connection_count = g_connection_count - 1
- finally:
- count_lock.release()
-
-def check_load(req, bucketName, objectName):
- next_host = pycb.get_next_host()
- if next_host == None:
- return
-
- global count_locks
- global g_connection_count
- count_lock.acquire()
- try:
- pycb.log(logging.INFO, "REDIRECT check 2 %d %d %s" % (g_connection_count, pycb.config.lb_max, next_host))
- if g_connection_count > pycb.config.lb_max:
- pycb.log(logging.INFO, "REDIRECT %s" % (next_host))
- ex = cbException('TemporaryRedirect')
- req.setHeader('location', "http://%s%s" % (next_host, req.uri))
- ex.add_custom_xml("Bucket", bucketName)
- ex.add_custom_xml("Endpoint", next_host)
- raise ex
- finally:
- count_lock.release()
+def end_redirector(request):
+ self.redirector.end_connection()
+
+def init_redirector(req, bucketName, objectName):
+ redir_host = self.redirector.new_connection(request)
+ req.notifyFinish().addBoth(end_redirector, req)
+
+ if redir_host:
+ pycb.log(logging.INFO, "REDIRECT %s" % (next_host))
+ ex = cbException('TemporaryRedirect')
+ req.setHeader('location', "http://%s%s" % (next_host, req.uri))
+ ex.add_custom_xml("Bucket", bucketName)
+ ex.add_custom_xml("Endpoint", next_host)
+ raise ex
def path_to_bucket_object(path):
if path == "/":
@@ -140,7 +112,6 @@ def next_request_id(self):
# object
def request_object_factory(self, request, user, path, requestId):
- connection_count_inc(request)
pycb.log(logging.INFO, "path %s" % (path))
# handle the one service operation
if path == "/":
@@ -150,7 +121,7 @@ def request_object_factory(self, request, user, path, requestId):
raise cbException('InvalidArgument')
(bucketName, objectName) = path_to_bucket_object(path)
- check_load(request, bucketName, objectName)
+ init_redirector(request, bucketName, objectName)
pycb.log(logging.INFO, "path %s bucket %s object %s" % (path, bucketName, str(objectName)))
if request.method == 'GET':

0 comments on commit e730abc

Please sign in to comment.