From e503ce9f4d04271b747b57789bba2ffd8463a594 Mon Sep 17 00:00:00 2001 From: John Krauss Date: Sat, 10 Mar 2012 21:09:00 -0500 Subject: [PATCH] passing db tests with jsongit --- caustic/database.py | 164 +++++++++++++++++++++++++++++ caustic/models.py | 1 + test/test_database.py | 235 +++++++++++++++++++++++++++--------------- 3 files changed, 317 insertions(+), 83 deletions(-) create mode 100644 caustic/database.py diff --git a/caustic/database.py b/caustic/database.py new file mode 100644 index 0000000..272831d --- /dev/null +++ b/caustic/database.py @@ -0,0 +1,164 @@ +# -*- coding: utf-8 -*- + +""" +caustic.database +""" + +import pymongo +from pymongo.errors import DuplicateKeyError +from jsongit import signature +from caustic.models import User, InstructionDocument +from dictshield.base import ShieldException + +def get_db(server, port, name): + db = pymongo.Connection(server, port)[name] + db.safe = True + return db + + +class Users(object): + """Collection of users. Ensures uniqueness of non-deleted + names. + """ + + def __init__(self, db): + self.coll = db.users + self.coll.ensure_index('name', unique=True) + self.deleted = db.deleted_users + + def create(self, name): + """Create a new user. + + Returns the User, or None if the user has a duplicate name. + """ + try: + id = self.coll.insert(User(name=name).to_python()) + return self.get(id) + except DuplicateKeyError: + return None + + def get(self, id): + """Get a user by id. + + Returns the User or None. + """ + u = self.coll.find_one(id) + return User(**u) if u else None + + def find(self, name): + """Get a user by name. + + Returns the User or None. + """ + u = self.coll.find_one({'name': name}) + return User(**u) if u else None + + def delete(self, user): + """Delete a user. + """ + self.coll.remove(user.id) + self.deleted.save(user.to_python()) + + +class Instructions(object): + """Collection of instructions. Ensures uniquenss of + creator_id and name. Keeps git repo fresh. + """ + + def __init__(self, users, repo, db): + self.repo = repo + self.users = users + self.coll = db.instructions + self.coll.ensure_index([('creator_id', pymongo.ASCENDING), + ('name', pymongo.ASCENDING)], + unique=True) + + def _repo_key(self, creator, instruction): + """The key for the repo. + """ + return '/'.join([str(creator.id), str(instruction.id)]) + + def for_creator(self, creator_name): + """Find all instructions by a creator. + + Returns an array of InstructionDocuments, or None if + the creator_name does not exist. + """ + u = self.users.find(creator_name) + if u: + cursor = self.coll.find({'creator_id': u.id}) + return [InstructionDocument(**i) for i in cursor] + else: + return None + + def find(self, creator_name, name): + """Find an instruction by creator name and its own name. + + Returns the InstructionDocument or None. + """ + u = self.users.find(creator_name) + if u: + i = self.coll.find_one({'creator_id': u.id, 'name': name}) + return InstructionDocument(**i) if i else None + else: + return None + + def tagged(self, creator_name, tag): + """Find instructions by creator name and tag. + + Returns a list of InstructionDocuments, or None if + the creator doesn't exist. + """ + u = self.users.find(creator_name) + if u: + cursor = self.coll.find({'creator_id': u.id, 'tags': tag}) + return [InstructionDocument(**i) for i in cursor] + else: + return None + + def create(self, creator, name, instruction): + """Create an instruction for a creator. + + Returns the Instruction if it's created, or None if it + is a duplicate or invalid. + """ + try: + doc = InstructionDocument( + creator_id=creator.id, + name=name, + instruction=instruction) + doc.validate() + except ShieldException: + return None + + try: + id = self.coll.insert(doc.to_python()) + doc = InstructionDocument(**self.coll.find_one(id)) # grab ID + self.repo.create(self._repo_key(creator, doc), doc.instruction, + author=signature(creator.name, creator.name)) + return doc + except DuplicateKeyError: + return None + + def save(self, doc): + """Save an instruction. + + Returns None if the save was successful, a message explaining why it + failed otherwise. + """ + try: + doc.validate() + self.coll.save(doc.to_python()) + except ShieldException as e: + return str(e) + + creator = self.users.get(doc.creator_id) + self.repo.commit(self._repo_key(creator, doc), doc.instruction, + author=signature(creator.name, creator.name)) + + def delete(self, doc): + """Delete an instruction. + + Returns True if the deletion was successful. + """ + return self.coll.remove(doc.id)['n'] == 1 diff --git a/caustic/models.py b/caustic/models.py index fd1e6f9..59b648b 100644 --- a/caustic/models.py +++ b/caustic/models.py @@ -42,6 +42,7 @@ class InstructionDocument(Document): An Instruction Document has not just the instruction, but also a name, tags, and a creator ID. """ + id = ObjectIdField(id_field=True) creator_id = ObjectIdField(required=True) name = StringField(required=True) tags = ListField(StringField()) diff --git a/test/test_database.py b/test/test_database.py index 5a932a9..5fd3c7e 100644 --- a/test/test_database.py +++ b/test/test_database.py @@ -2,101 +2,170 @@ Test the database. Mongod must be running. """ -import pymongo import unittest -import bson -from caustic.models import User, InstructionDocument +import shutil +from caustic.database import get_db, Users, Instructions +from jsongit import JsonGitRepository -LOAD_GOOGLE = '{"load":"http://www.google.com/"}' +db = get_db('localhost', 27017, 'caustic_test') +REPO_DIR = 'tmp_git' +INSTRUCTION = {'load':'google'} # valid instruction for convenience -class TestCollection(unittest.TestCase): +class TestUsers(unittest.TestCase): def setUp(self): - """Provide access to collection in test_caustic database - """ - self.db = pymongo.Connection().test_caustic - self.collection = self.db.collection + self.users = Users(db) def tearDown(self): - """Drop collections. - """ - self.collection.drop() - + for name in set(db.collection_names()) - set([u'system.indexes']): + db[name].drop() -class TestUsers(TestCollection): - def test_save_user_assigns_id(self): + def test_create_user(self): """Saving a user should assign an id. """ - self.assertIsNotNone(self.collection.save(User(name="john").to_python())) + u = self.users.create('joe') + self.assertEquals('joe', u.name) def test_get_user_by_id(self): """Get a user by the assigned id. """ - id = self.collection.save(User(name="fred").to_python()) - u = User(**self.collection.find_one(id)) - self.assertEqual("fred", u.name) - - def test_get_user_by_name(self): - """Get a user by name - """ - self.collection.save(User(name="sally").to_python()) - self.assertIsNotNone(self.collection.find_one({'name': 'sally'})) - - -class TestInstructions(TestCollection): - - def test_find_instruction_by_name(self): - """Saving an instruction. - """ - cid = bson.objectid.ObjectId() - self.collection.save(InstructionDocument(creator_id=cid, name='google', - instruction=LOAD_GOOGLE).to_python()) - cursor = self.collection.find({'creator_id':cid, 'name': 'google'}) - self.assertEqual(1, cursor.count()) - self.assertDictContainsSubset({'name':'google', - 'instruction': LOAD_GOOGLE}, cursor[0]) - - # def test_get_instructions_by_owner_name(self): - # """ - # Get an owner's instructions. - # """ - # chris = User(name='chris') - # chris.instructions['first'] = LOAD_GOOGLE - # chris.instructions['second'] = LOAD_GOOGLE - # self.users.save(chris.to_python()) - # self.assertEqual(2, self.instructions.find({'owner.name': 'chris'}).count()) - - # def test_get_instructions_by_tag(self): - # """ - # Get instructions by their tag. - # """ - # tuten = User(name='tuten') - # mao = User(name='mao') - # self.instructions.save(Instruction(owner=tuten, - # name="fiction", - # tags=['long march'], - # json=LOAD_GOOGLE).to_python()) - # self.instructions.save(Instruction(owner=mao, - # name="works", - # tags=['long march'], - # json=LOAD_GOOGLE).to_python()) - # self.assertEqual(2, self.instructions.find({'tags':'long march'}).count()) - - # def test_get_instructions_by_tag_and_name(self): - # """ - # Get instructions by their tag and name. - # """ - # tuten = User(name='tuten') - # mao = User(name='mao') - # self.instructions.save(Instruction(owner=tuten, - # name="fiction", - # tags=['long march'], - # json=LOAD_GOOGLE).to_python()) - # self.instructions.save(Instruction(owner=mao, - # name="works", - # tags=['long march'], - # json=LOAD_GOOGLE).to_python()) - # self.assertEqual(1, self.instructions.find({'tags':'long march', 'owner.name': 'mao'}).count()) - # t = Instruction(**self.instructions.find_one({'tags':'long march', 'owner.name':'mao'})) - # self.assertEqual('works', t.name) + u = self.users.create('steve') + self.assertEquals('steve', self.users.get(u.id).name) + + def test_find_user_by_name(self): + """Find a user by name + """ + self.users.create('sally') + self.assertIsNotNone(self.users.find('sally')) + + def test_no_duplicate_names(self): + """Cannot create two users with same name. + """ + self.users.create('dave') + self.assertIsNone(self.users.create('dave')) + + def test_delete_user(self): + """Cannot find user by ID or name after being deleted. + """ + george = self.users.create('george') + self.users.delete(george) + self.assertIsNone(self.users.get(george.id)) + self.assertIsNone(self.users.find('george')) + + +class TestInstructions(unittest.TestCase): + + def setUp(self): + repo = JsonGitRepository(REPO_DIR) + self.users = Users(db) + self.creator = self.users.create('creator') + self.instructions = Instructions(self.users, repo, db) + + def tearDown(self): + for name in set(db.collection_names()) - set([u'system.indexes']): + db[name].drop() + shutil.rmtree(REPO_DIR) + + def test_create_instruction(self): + """Creating an instruction. + """ + doc = self.instructions.create(self.creator, 'google', INSTRUCTION) + self.assertIsNotNone(doc) + self.assertEquals('google', doc.name) + self.assertEquals({'load':'google'}, doc.instruction) + self.assertEquals([], doc.tags) + doc.validate() + + def test_duplicate_names_ok(self): + """Duplicate names are OK provided the creators are different. + """ + joe = self.users.create('joe') + bobo = self.users.create('bobo') + + self.assertIsNotNone(self.instructions.create(joe, 'name', + INSTRUCTION)) + self.assertIsNotNone(self.instructions.create(bobo, 'name', + INSTRUCTION)) + + def test_duplicate_creator_and_name_not_ok(self): + """Duplicate names forbidden if the creator is the same. + """ + self.instructions.create(self.creator, 'name', INSTRUCTION) + self.assertIsNone(self.instructions.create(self.creator, 'name', + INSTRUCTION)) + + def test_find_instruction(self): + """Find an instruction. + """ + self.instructions.create(self.creator, 'google', INSTRUCTION) + doc = self.instructions.find(self.creator.name, 'google') + self.assertEqual('google', doc.name) + self.assertEqual(INSTRUCTION, doc.instruction) + + def test_find_creator_instructions(self): + """Find instructions created by a name. + """ + self.instructions.create(self.creator, 'foo', INSTRUCTION) + self.instructions.create(self.creator, 'bar', INSTRUCTION) + self.instructions.create(self.creator, 'baz', INSTRUCTION) + docs = self.instructions.for_creator(self.creator.name) + self.assertEqual(3, len(docs)) + self.assertItemsEqual(['foo', 'bar', 'baz'], [d.name for d in docs]) + + def test_save_instruction_tags(self): + """Update the tags in an instruction. + """ + doc = self.instructions.create(self.creator, 'foo', INSTRUCTION) + doc.tags = ['bar', 'baz'] + self.assertIsNone(self.instructions.save(doc)) + def test_bad_instruction(self): + """Don't update bad instruction. + """ + doc = self.instructions.create(self.creator, 'foo', INSTRUCTION) + for bad in [{'foo':'bar'}, 7]: + doc.instruction = bad + self.assertIsNotNone(self.instructions.save(doc)) + + def test_valid_instructions(self): + """Instructions come in all shapes & sizes. + """ + doc = self.instructions.create(self.creator, 'foo', INSTRUCTION) + for valid in ['bare string', ['some', 'array'], {'load':'google'}]: + doc.instruction = valid + self.assertIsNone(self.instructions.save(doc)) + + def test_bad_tags(self): + """Don't update with bad tags. + """ + doc = self.instructions.create(self.creator, 'foo', INSTRUCTION) + for bad in [7, 'string', {}]: + doc.tags = bad + self.assertIsNotNone(self.instructions.save(doc)) + + def test_tagged_instructions(self): + """ Get instructions by their tag. + """ + roses = self.instructions.create(self.creator, 'roses', INSTRUCTION) + violets = self.instructions.create(self.creator, 'violets', INSTRUCTION) + lilacs = self.instructions.create(self.creator, 'lilacs', INSTRUCTION) + + roses.tags = ['red', 'white'] + violets.tags = ['blue', 'purple'] + lilacs.tags = ['purple', 'white'] + + for doc in [roses, violets, lilacs]: + self.assertIsNone(self.instructions.save(doc)) + + whites = self.instructions.tagged(self.creator.name, 'white') + self.assertEqual(2, len(whites)) + self.assertItemsEqual(['roses', 'lilacs'], [d.name for d in whites]) + + def test_delete(self): + """Deleting eliminates, allows creation of new with same name. + """ + doomed = self.instructions.create(self.creator, 'doomed', INSTRUCTION) + self.assertTrue(self.instructions.delete(doomed)) + self.assertFalse(self.instructions.delete(doomed)) + dup = self.instructions.create(self.creator, 'doomed', INSTRUCTION) + self.assertIsNotNone(dup)