Permalink
Browse files

create a function to change the location of data files

  • Loading branch information...
BuzzTroll
BuzzTroll committed Nov 4, 2010
1 parent c36ffc1 commit a6df8f8ab12c44f876ef4370048022ad61255cf0
@@ -108,6 +108,12 @@ def get_all_user_files(self):
c = self.db_obj._run_fetch_iterator(s, data, _convert_object_row_to_UserFile, [self])
return c
+ def set_data_key(self, data_key):
+ s = "UPDATE objects set data_key = ? where id = ?"
+ data = (data_key,self.id,)
+ self.db_obj._run_no_fetch(s, data)
+ self.data_key = data_key
+
def get_file_from_db_id(db_obj, id):
s = "SELECT " + File.get_select_str() + """
FROM objects
@@ -214,6 +220,19 @@ def find_files(db_obj, pattern, object_type, parent=None):
find_files = staticmethod(find_files)
+ def find_files_from_data(db_obj, pattern):
+ # look it up
+ s = "SELECT " + File.get_select_str() + """
+ FROM objects
+ WHERE data_key LIKE ?"""
+ data = [pattern,]
+ c = db_obj._run_fetch_iterator(s, data, _convert_alias_row_to_File)
+ return c
+
+ find_files_from_data = staticmethod(find_files_from_data)
+
+
+
def __str__(self):
return self.name + ":" + self.object_type + ":" + str(self.parent)
@@ -10,3 +10,4 @@
from pynimbusauthz.tests.test_stat import *
from pynimbusauthz.tests.user_tests import *
from pynimbusauthz.tests.commit_tests import *
+from pynimbusauthz.tests.test_rebase import *
@@ -8,7 +8,7 @@
from pynimbusauthz.objects import File
import unittest
-class TestUser(unittest.TestCase):
+class TestFile(unittest.TestCase):
def setUp(self):
# os.environ['CUMULUS_AUTHZ_DDL'] = "/home/bresnaha/Dev/Nimbus/nimbus/cumulus/authz/etc/acl.sql"
@@ -85,3 +85,42 @@ def test_file_and_bucket(self):
self.assertNotEqual(f3.get_id(), f4.get_id())
self.db.commit()
+ def test_change_key(self):
+ user1 = User(self.db)
+ name = "/file/name"
+ old_base = "/old/path/base"
+ fname = "/etc/group"
+ new_base = "/new/base/location/dir"
+ f = File.create_file(self.db, name, user1, old_base + fname, pynimbusauthz.object_type_s3)
+
+ self.assertEqual(old_base + fname, f.get_data_key(), "old value not euqal")
+
+ new_key = new_base + fname
+ f.set_data_key(new_key)
+ self.db.commit()
+
+ tst_new_key = f.get_data_key()
+ self.assertEqual(tst_new_key, new_key, "%s should equal %s" % (tst_new_key, new_key))
+
+ f2 = File.get_file(self.db, name, pynimbusauthz.object_type_s3)
+
+ tst_new_key = f2.get_data_key()
+ self.assertEqual(tst_new_key, new_key, "%s should equal %s" % (tst_new_key, new_key))
+
+
+ def test_find_by_key(self):
+ user1 = User(self.db)
+ name = "/file/name"
+ key = "/old/path/base"
+ f = File.create_file(self.db, name, user1, key, pynimbusauthz.object_type_s3)
+ self.db.commit()
+
+ f2a = File.find_files_from_data(self.db, key)
+
+ found = False
+ for f2 in f2a:
+ tst_key = f2.get_data_key()
+ if tst_key == key:
+ found = True
+ self.assertTrue(found, "key not found")
+
@@ -1,48 +0,0 @@
-import sqlite3
-import os
-import sys
-import pynimbusauthz
-from pynimbusauthz.db import DB
-from pynimbusauthz.user import User
-from pynimbusauthz.user import UserAlias
-from pynimbusauthz.objects import File
-from pynimbusauthz.objects import UserFile
-import pynimbusauthz.ls
-import unittest
-import uuid
-import tempfile
-
-
-class TestLsCli(unittest.TestCase):
-
- def setUp(self):
- (osf, self.fname) = tempfile.mkstemp()
- os.close(osf)
-# os.environ['CUMULUS_AUTHZ_DDL'] = "/home/bresnaha/Dev/Nimbus/nimbus/cumulus/authz/etc/acl.sql"
- os.environ['NIMBUS_AUTHZ_DB'] = self.fname
- pynimbusauthz.db.make_test_database(self.fname)
- self.db = DB(con_str=self.fname)
- self.user1 = User(self.db)
- self.name = "/file/name"
- self.data = "/etc/group"
- self.file1 = File.create_file(self.db, self.name, self.user1, self.data, pynimbusauthz.object_type_s3)
- self.uf = UserFile(self.file1)
- self.db.commit()
-
- def tearDown(self):
- self.db.close()
- os.remove(self.fname)
-
- def test_basic_ls(self):
- rc = pynimbusauthz.ls.main([])
- self.assertEqual(rc, 0, "CLI should return success %d" % (rc))
- rc = pynimbusauthz.ls.main(["-t", pynimbusauthz.object_type_s3])
- self.assertEqual(rc, 0, "CLI should return success %d" % (rc))
- rc = pynimbusauthz.ls.main([self.name[0:-1]])
- self.assertEqual(rc, 0, "CLI should return success %d" % (rc))
-
-
- def test_fail_ls(self):
- rc = pynimbusauthz.ls.main(["-p", "nobucket", self.name[0:-1]])
- self.assertNotEqual(rc, 0, "CLI should return success %d" % (rc))
-
@@ -0,0 +1,107 @@
+import sqlite3
+import os
+import sys
+import pynimbusauthz
+from pynimbusauthz.db import DB
+from pynimbusauthz.user import User
+from pynimbusauthz.user import UserAlias
+from pynimbusauthz.objects import File
+from pynimbusauthz.objects import UserFile
+import pynimbusauthz.rebase
+import unittest
+import uuid
+import tempfile
+
+
+class TestRebaseCli(unittest.TestCase):
+
+ def setUp(self):
+ (osf, self.fname) = tempfile.mkstemp()
+ os.close(osf)
+# os.environ['CUMULUS_AUTHZ_DDL'] = "/home/bresnaha/Dev/Nimbus/nimbus/cumulus/authz/etc/acl.sql"
+ os.environ['NIMBUS_AUTHZ_DB'] = self.fname
+ pynimbusauthz.db.make_test_database(self.fname)
+ self.db = DB(con_str=self.fname)
+ self.user1 = User(self.db)
+ self.db.commit()
+
+ def tearDown(self):
+ self.db.close()
+ os.remove(self.fname)
+
+ def test_single_change(self):
+ name = "/file/name"
+ old_base = "/OLD"
+ new_base = "/NEW"
+ data = "/etc/group"
+
+ key = old_base + data
+ file1 = File.create_file(self.db, name, self.user1, key, pynimbusauthz.object_type_s3)
+ self.db.commit()
+
+ rc = pynimbusauthz.rebase.main([old_base, new_base])
+ self.assertEqual(rc, 0, "rc should be 0, is %d" % (rc))
+
+ f2a = File.find_files_from_data(self.db, key)
+ f2a = list(f2a)
+ self.assertEqual(len(f2a), 0, "should be no values with key %s len is %d" % (old_base, len(f2a)))
+ key = new_base + data
+ f2a = File.find_files_from_data(self.db, key)
+ f2a = list(f2a)
+ self.assertNotEqual(len(f2a), 0, "length should be greater than 0 is %d" % (len(f2a)))
+
+ found = False
+ for f2 in f2a:
+ tst_key = f2.get_data_key()
+ if tst_key == key:
+ found = True
+ self.assertTrue(found, "key not found")
+
+
+ def test_many_change(self):
+ name = "/file/name"
+ old_base = "/OLD"
+ new_base = "/NEW"
+ count = 10
+
+ for i in range(0, count):
+ keyname = str(uuid.uuid1())
+ oldkey = old_base + "/" + keyname
+ File.create_file(self.db, name+oldkey, self.user1, oldkey, pynimbusauthz.object_type_s3)
+ self.db.commit()
+
+ rc = pynimbusauthz.rebase.main([old_base, new_base])
+ self.assertEqual(rc, 0, "rc should be 0, is %d" % (rc))
+
+ f2a = File.find_files_from_data(self.db, new_base + "%")
+ f2a = list(f2a)
+ self.assertEqual(len(f2a), count, "length of the new items should be %d is %s" % (count, len(f2a)))
+
+
+ def test_many_change_but_not_all(self):
+ name = "/file/name"
+ old_base = "/OLD"
+ new_base = "/NEW"
+ other_base = "/NOTHERE"
+ count = 10
+
+ for i in range(0, count):
+ keyname = str(uuid.uuid1())
+ oldkey = old_base + "/" + keyname
+ File.create_file(self.db, name+oldkey, self.user1, oldkey, pynimbusauthz.object_type_s3)
+ for i in range(0, count*2):
+ keyname = str(uuid.uuid1())
+ oldkey = other_base + "/" + keyname
+ File.create_file(self.db, name+oldkey, self.user1, oldkey, pynimbusauthz.object_type_s3)
+ self.db.commit()
+
+ rc = pynimbusauthz.rebase.main([old_base, new_base])
+ self.assertEqual(rc, 0, "rc should be 0, is %d" % (rc))
+
+ f2a = File.find_files_from_data(self.db, new_base + "%")
+ f2a = list(f2a)
+ self.assertEqual(len(f2a), count, "length of the new items should be %d is %s" % (count, len(f2a)))
+
+
+
+

0 comments on commit a6df8f8

Please sign in to comment.