Permalink
Browse files

added copy funcitonality to cumulus.

only 1 single simple test.  more tests needed.  specifically we need tests
that verify security.
  • Loading branch information...
BuzzTroll
BuzzTroll committed Oct 5, 2010
1 parent 398444e commit 5157d6fdabaf627d19e6c6915dd26b7149d47323
@@ -9,7 +9,7 @@
from ConfigParser import SafeConfigParser
from pycb.cbAuthzSecurity import cbAuthzUser
from pycb.cbAuthzSecurity import cbAuthzSec
-
+import random
from optparse import OptionParser
import hmac
try:
@@ -107,6 +107,8 @@ def default_settings(self):
self.https_cert = None
self.use_https = False
self.block_size = 1024*512
+ self.lb_file = None
+ self.lb_max = 0
def get_contact(self):
return (self.hostname, self.port)
@@ -191,6 +193,13 @@ def load_settings(self):
except:
pass
+ try:
+ self.lb_file = s.get("load_balanced", "hostfile")
+ self.lb_max = int(s.get("load_balanced", "max"))
+
+ except:
+ pass
+
def parse_cmdline(self, argv):
global Version
@@ -233,3 +242,27 @@ def get_auth_hash(key, method, path, headers, uri):
return auth_hash
config = CBConfig()
+
+# this is not a long term solution
+def get_next_host():
+ if not config.lb_file:
+ return None
+
+ try:
+ hosts = []
+ f = open(config.lb_file, "r")
+ for l in f.readlines():
+ hosts.append(l.strip())
+ f.close()
+
+ my_host = "%s:%d" % (config.hostname, config.port)
+
+ for i in range(0, 10):
+ ndx = random.randint(0, len(hosts)-1)
+ h = hosts[ndx]
+ if h != my_host:
+ return h
+ return h
+ except Exception, ex:
+ log(logging.ERROR, "get next host error %s" % (str(ex)))
+ return None
@@ -326,6 +326,7 @@ class cbException(Exception):
def __init__(self, code, ex=None):
self.code = code
self.ex = ex
+ self.custom_xml = {}
try:
self.httpCode = cbException.errorsHttpCode[code]
self.httpDesc = cbException.errorsHttpMsg[code]
@@ -338,6 +339,9 @@ def __init__(self, code, ex=None):
def getCode(self):
return self.code
+ def add_custom_xml(self, k, v):
+ self.custom_xml[k] = v
+
def make_xml_string(self, path, requestId):
doc = Document()
@@ -365,6 +369,13 @@ def make_xml_string(self, path, requestId):
xRIdText = doc.createTextNode(str(requestId))
xRId.appendChild(xRIdText)
xError.appendChild(xRId)
+
+ for k in self.custom_xml.keys():
+ xId = doc.createElement(k)
+ xText = doc.createTextNode(self.custom_xml[k])
+ xId.appendChild(xText)
+ xError.appendChild(xId)
+
return doc.toxml()
@@ -368,10 +368,6 @@ def list_bucket(self):
xNameText = doc.createTextNode(str(self.bucketName))
xName.appendChild(xNameText)
-# XXX add options to headers
-# if 'max-keys' in self.request.args.keys():
-#
-
xIsTruncated = doc.createElement("IsTruncated")
xList.appendChild(xIsTruncated)
xIsTText = doc.createTextNode('false')
@@ -655,6 +651,9 @@ def endGet(self, dataObj):
gdEx = cbException('InvalidArgument')
gdEx.sendErrorResponse(self.request, self.requestId)
+ # recveive object looks strange because twisted has alrady received
+ # the enitre file and put it in a temp location. we now just have
+ # to recognize that we have it all
def recvObject(self, request, dataObj):
self.set_common_headers()
self.setHeader(request, 'Connection', 'close')
@@ -704,3 +703,125 @@ def work(self):
self.setResponseCode(self.request, 200, 'OK')
self.finish(self.request)
+class cbCopyObject(cbRequest):
+
+ def __init__(self, request, user, requestId, bucketIface, srcBucket, srcObject, dstBucket, dstObject):
+
+ cbRequest.__init__(self, request, user, requestId, bucketIface)
+
+ ndx = dstObject.find("cumulus:/")
+ if ndx >= 0:
+ pycb.log(logging.ERROR, "someone tried to make a key named cumulus://... why would someone do that? %d" % (ndx))
+ raise cbException('InvalidURI')
+
+ self.dstBucketName = dstBucket
+ self.dstObjectName = dstObject
+ self.srcBucketName = srcBucket
+ self.srcObjectName = srcObject
+
+
+ def check_permissions(self):
+ srcExists = self.user.exists(self.srcBucketName, self.srcObjectName)
+ if not srcExists:
+ raise cbException('NoSuchKey')
+ (perms, src_data_key) = self.user.get_perms(self.srcBucketName, self.srcObjectName)
+ # make sure that we can read the source
+ ndx = perms.find("r")
+ if ndx < 0:
+ raise cbException('AccessDenied')
+
+ # make sure we can write to the destination
+ dstExists = self.user.exists(self.dstBucketName, self.dstObjectName)
+
+ (bperms, bdata_key) = self.user.get_perms(self.dstBucketName)
+ ndx = bperms.find("w")
+ if ndx < 0:
+ raise cbException('AccessDenied')
+
+ dst_data_key = None
+ dst_size = 0
+ if dstExists:
+ (perms, dst_data_key) = self.user.get_perms(self.dstBucketName, self.dstObjectName)
+ ndx = perms.find("w")
+ if ndx < 0:
+ raise cbException('AccessDenied')
+
+ (dst_size, ctm, md5) = self.user.get_info(self.dstBucketName, self.dstObjectName)
+ (src_size, self.src_ctm, self.src_md5) = self.user.get_info(self.srcBucketName, self.srcObjectName)
+
+ # check the quota
+ remaining_quota = self.user.get_remaining_quota()
+ if remaining_quota != User.UNLIMITED:
+ if remaining_quota < src_size - dst_size:
+ pycb.log(logging.INFO, "user %s did not pass quota. file size %d quota %d" % (self.user, src_size, remaining_quota))
+ raise cbException('AccountProblem')
+
+ # if we get to here we are allowed to do the copy
+ if dst_data_key == None:
+ self.dst_file = self.bucketIface.put_object(self.dstBucketName, self.dstObjectName)
+ else:
+ self.dst_file = self.bucketIface.get_object(dst_data_key)
+ self.dst_file.set_delete_on_close(True)
+ self.src_file = self.bucketIface.get_object(src_data_key)
+
+ def copy_file(self):
+ try:
+ done = False
+ while not done:
+ b = self.src_file.read()
+ if len(b) > 0:
+ self.dst_file.write(b)
+ else:
+ done = True
+
+ self.dst_file.set_delete_on_close(False)
+ self.dst_file.close()
+ reactor.callFromThread(self.end_copy)
+ except cbException, (ex):
+ ex.sendErrorResponse(self.request, self.requestId)
+ traceback.print_exc(file=sys.stdout)
+ except:
+ traceback.print_exc(file=sys.stdout)
+ gdEx = cbException('InvalidArgument')
+ gdEx.sendErrorResponse(self.request, self.requestId)
+
+
+
+ def end_copy(self):
+
+ try:
+ self.user.put_object(self.dst_file, self.dstBucketName, self.dstObjectName)
+
+ doc = Document()
+ cor = doc.createElement("CopyObjectResult")
+ doc.appendChild(cor)
+
+ lm = doc.createElement("LastModified")
+ cor.appendChild(lm)
+ lmText = doc.createTextNode(str(self.src_ctm))
+ lm.appendChild(lmText)
+
+ lm = doc.createElement("ETag")
+ cor.appendChild(lm)
+ lmText = doc.createTextNode(str(self.src_md5))
+ lm.appendChild(lmText)
+
+ x = doc.toxml();
+ self.setHeader(self.request, 'x-amz-copy-source-version-id', "1")
+ self.setHeader(self.request, 'x-amz-version-id', "1")
+ self.send_xml(x)
+ self.request.finish()
+
+ except cbException, (ex):
+ ex.sendErrorResponse(self.request, self.requestId)
+ traceback.print_exc(file=sys.stdout)
+ except:
+ traceback.print_exc(file=sys.stdout)
+ gdEx = cbException('InvalidArgument')
+ gdEx.sendErrorResponse(self.request, self.requestId)
+
+
+ def work(self):
+ self.check_permissions()
+
+ reactor.callInThread(self.copy_file)
View
@@ -14,6 +14,7 @@
from pycb.cbRequest import cbPutBucket
from pycb.cbRequest import cbPutObject
from pycb.cbRequest import cbHeadObject
+from pycb.cbRequest import cbCopyObject
from datetime import date, datetime
from xml.dom.minidom import Document
import uuid
@@ -26,7 +27,39 @@
import pycb
import threading
import tempfile
+import threading
+
+count_lock = threading.Lock()
+g_connection_count = 0
+def cb_expired(fail=None):
+ global count_lock
+ global g_connection_count
+ count_lock.acquire()
+ try:
+ g_connection_count = g_connection_count - 1
+ finally:
+ count_lock.release()
+
+def check_load(req, bucketName, objectName):
+ next_host = pycb.get_next_host()
+ if next_host == None:
+ return
+
+ global count_locks
+ global g_connection_count
+ count_lock.acquire()
+ try:
+ pycb.log(logging.INFO, "REDIRECT check 2 %d %d %s" % (g_connection_count, pycb.config.lb_max, next_host))
+ if g_connection_count > pycb.config.lb_max:
+ pycb.log(logging.INFO, "REDIRECT %s" % (next_host))
+ ex = cbException('TemporaryRedirect')
+ req.setHeader('location', "http://%s/%s" % (next_host, bucketName))
+ ex.add_custom_xml("Bucket", bucketName)
+ ex.add_custom_xml("Endpoint", next_host)
+ raise ex
+ finally:
+ count_lock.release()
def path_to_bucket_object(path):
if path == "/":
@@ -54,12 +87,6 @@ def createPath(headers, path):
if len(h_a) > 0:
host = h_a[0]
-# if host == pycb.config.hostname:
-# return path
-
-# b = host.split('.', 1)[0]
-# path = '/' + b + path
-
return path
def authorize(headers, message_type, path, uri):
@@ -98,9 +125,10 @@ def next_request_id(self):
return str(uuid.uuid1()).replace("-", "")
# figure out if the operation is targeted at a service, bucket, or
+
+
# object
def request_object_factory(self, request, user, path, requestId):
-
pycb.log(logging.INFO, "path %s" % (path))
# handle the one service operation
if path == "/":
@@ -110,6 +138,7 @@ def request_object_factory(self, request, user, path, requestId):
raise cbException('InvalidArgument')
(bucketName, objectName) = path_to_bucket_object(path)
+ check_load(request, bucketName, objectName)
pycb.log(logging.INFO, "path %s bucket %s object %s" % (path, bucketName, str(objectName)))
if request.method == 'GET':
@@ -122,7 +151,12 @@ def request_object_factory(self, request, user, path, requestId):
if objectName == None:
cbR = cbPutBucket(request, user, bucketName, requestId, pycb.config.bucket)
else:
- cbR = cbPutObject(request, user, bucketName, objectName, requestId, pycb.config.bucket)
+ args = request.getAllHeaders()
+ if 'x-amz-copy-source' in args:
+ (srcBucketName, srcObjectName) = path_to_bucket_object(args['x-amz-copy-source'])
+ cbR = cbCopyObject(request, user, requestId, pycb.config.bucket, srcBucketName, srcObjectName, bucketName, objectName)
+ else:
+ cbR = cbPutObject(request, user, bucketName, objectName, requestId, pycb.config.bucket)
return cbR
elif request.method == 'POST':
pycb.log(logging.ERROR, "Nothing to handle POST")
@@ -212,11 +246,24 @@ def send_access_error(self):
self.transport.write(e_msg)
self.transport.loseConnection()
+ def send_redirect(self):
+ m_msg = "HTTP/1.1 %s %s\r\n" % (ex.httpCode, ex.httpDesc)
+ self.transport.write(m_msg)
+ m_msg = "%s: %s\r\n" % (('x-amz-request-id', str(uuid.uuid1())))
+ self.transport.write(m_msg)
+ self.transport.write('content-type: text/html\r\n')
+ e_msg = ex.make_xml_string(self._path, str(uuid.uuid1()))
+ self.transport.write(e_msg)
+ self.transport.loseConnection()
+ return ex
+
+
# intercept the key event
def allHeadersReceived(self):
http.HTTPChannel.allHeadersReceived(self)
req = self.requests[-1]
+ req._cumulus_killed = None
h = self.getAllHeaders(req)
# we can check the authorization here
rPath = self._path
@@ -227,15 +274,24 @@ def allHeadersReceived(self):
try:
user = authorize(h, self._command, rPath, self._path)
except:
- # if there is an exception set this up to send all
- # arrving data to /dev/null
self.send_access_error()
return
if 'expect' in h:
if h['expect'].lower() == '100-continue':
self.transport.write("HTTP/1.1 100 Continue\r\n\r\n")
+ global count_locks
+ global g_connection_count
+ count_lock.acquire()
+ try:
+ g_connection_count = g_connection_count + 1
+ d = req.notifyFinish()
+ d.addBoth(cb_expired)
+ finally:
+ count_lock.release()
+
+
(bucketName, objectName) = path_to_bucket_object(rPath)
# if we are putting an object
if objectName != None:
Oops, something went wrong.

0 comments on commit 5157d6f

Please sign in to comment.