Skip to content
This repository has been archived by the owner on Nov 4, 2021. It is now read-only.

Commit

Permalink
Add unittest cases to test the business logic module.
Browse files Browse the repository at this point in the history
  • Loading branch information
jmafc committed Oct 20, 2011
1 parent 9709fba commit 6f66c17
Show file tree
Hide file tree
Showing 4 changed files with 288 additions and 0 deletions.
Empty file added werkzeug/__init__.py
Empty file.
8 changes: 8 additions & 0 deletions werkzeug/bl/film.py
@@ -1,5 +1,7 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-


from psycopg2 import DatabaseError



class Film(object): class Film(object):
def __init__(self, id=0, title='', release_year=0): def __init__(self, id=0, title='', release_year=0):
Expand Down Expand Up @@ -41,6 +43,8 @@ def insert(self, db):
"(%(id)s, %(title)s, %(release_year)s)", self.__dict__) "(%(id)s, %(title)s, %(release_year)s)", self.__dict__)
except Exception as exc: except Exception as exc:
raise exc raise exc
if curs.rowcount != 1:
raise DatabaseError("Failed to add film with id %d" % self.id)
curs.close() curs.close()


def update(self, db): def update(self, db):
Expand All @@ -50,11 +54,15 @@ def update(self, db):
"WHERE id = %s", (self.title, self.release_year, self.id)) "WHERE id = %s", (self.title, self.release_year, self.id))
except Exception as exc: except Exception as exc:
raise exc raise exc
if curs.rowcount != 1:
raise DatabaseError("Failed to update film with id %d" % self.id)
curs.close() curs.close()


def delete(self, db): def delete(self, db):
try: try:
curs = db.execute("DELETE FROM film WHERE id = %s", (self.id,)) curs = db.execute("DELETE FROM film WHERE id = %s", (self.id,))
except Exception as exc: except Exception as exc:
raise exc raise exc
if curs.rowcount != 1:
raise DatabaseError("Failed to delete film with id %d" % self.id)
curs.close() curs.close()
124 changes: 124 additions & 0 deletions werkzeug/tests/bl/test_film.py
@@ -0,0 +1,124 @@
# -*- coding: utf-8 -*-
"""Test business logic"""

import unittest

from psycopg2 import IntegrityError, DatabaseError

from bl.film import Film

from utils import DbAppTestCase


class BLFilmTestCase(DbAppTestCase):
"""Test film business logic operations"""

def setUp(self):
self.db.execute_commit("TRUNCATE TABLE film")
self.key = 123

def insert_one(self):
self.db.execute_commit("INSERT INTO film VALUES (%s, %s, %s)",
(self.key, "A test movie", 1929))

def delete_one(self):
self.db.execute_commit("DELETE FROM film WHERE id = %s", (self.key,))

def get_one(self):
return self.db.fetchone("SELECT * FROM film WHERE id = %s",
(self.key,))

def test_insert(self):
"Insert a film"
newfilm = Film(self.key, "A test movie", 1929)
db = self.connection()
newfilm.insert(db)
db.commit()
row = self.get_one()
self.assertEqual(row['title'], newfilm.title)
self.assertEqual(row['release_year'], newfilm.release_year)

def test_dup_insert(self):
"Insert a duplicate"
self.insert_one()
newfilm = Film(self.key, "A test movie", 1929)
self.assertRaises(IntegrityError, newfilm.insert, self.connection())

def test_update(self):
"Update a film"
self.insert_one()
db = self.connection()
film = Film(self.key).get(db)
film.title = "A test movie - changed"
film.update(db)
db.commit()
row = self.get_one()
self.assertEqual(row['title'], film.title)

def test_update_missing(self):
"Update a film that has been deleted"
self.insert_one()
db = self.connection()
film = Film(self.key).get(db)
self.delete_one()
film.title = "A test movie - changed"
self.assertRaises(DatabaseError, film.update, db)

def test_delete(self):
"Delete a film"
self.insert_one()
db = self.connection()
film = Film(self.key).get(db)
film.delete(db)
db.commit()
row = self.get_one()
self.assertEqual(row, None)

def test_delete_missing(self):
"Delete a film that has already been deleted"
self.insert_one()
db = self.connection()
film = Film(self.key).get(db)
self.delete_one()
self.assertRaises(DatabaseError, film.delete, db)

def test_get_one(self):
"Get a single film"
self.insert_one()
db = self.connection()
film = Film(self.key).get(db)
self.assertEqual(film.title, "A test movie")
self.assertEqual(film.release_year, 1929)

def test_get_one_fail(self):
"Fail to get a single film"
db = self.connection()
film = Film(self.key).get(db)
self.assertEqual(film, None)

def test_get_several(self):
"Get several films"
self.db.execute_commit("INSERT INTO film VALUES (%s, %s, %s)",
(678, "A third movie", 1978))
self.db.execute_commit("INSERT INTO film VALUES (%s, %s, %s)",
(345, "A second movie", 1945))
self.insert_one()
db = self.connection()
films = Film().all(db)
self.assertEqual(len(films), 3)
self.assertEqual(films[0].title, "A test movie")
self.assertEqual(films[2].release_year, 1978)

def test_get_none(self):
"Get several films but find none"
db = self.connection()
films = Film().all(db)
self.assertEqual(len(films), 0)


def suite():
tests = unittest.TestLoader().loadTestsFromTestCase(BLFilmTestCase)
return tests

if __name__ == '__main__':
unittest.main(defaultTest='suite')
156 changes: 156 additions & 0 deletions werkzeug/tests/bl/utils.py
@@ -0,0 +1,156 @@
# -*- coding: utf-8 -*-
"""Utility functions and classes for testing business logic"""

import os
from unittest import TestCase

from psycopg2 import connect
from psycopg2.extras import DictConnection
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT

from dblib import DbConnection


def pgconnect(dbname, user, host, port):
"Connect to a Postgres database using psycopg2"
if host is None or host == '127.0.0.1' or host == 'localhost':
host = ''
else:
host = 'host=%s ' % host
if port is None or port == 5432:
port = ''
else:
port = "port=%d " % port
return connect("%s%sdbname=%s user=%s" % (
host, port, dbname, user), connection_factory=DictConnection)


def pgexecute(dbconn, query, args=None):
"Execute a query using a cursor"
curs = dbconn.cursor()
try:
curs.execute(query, args)
except:
curs.close()
dbconn.rollback()
raise
return curs


def pgexecute_auto(dbconn, query):
"Execute a query using a cursor with autocommit enabled"
isolation_level = dbconn.isolation_level
dbconn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
curs = pgexecute(dbconn, query)
dbconn.set_isolation_level(isolation_level)
return curs


TEST_DBNAME = os.environ.get("DBAPP_TEST_DB", 'dbapp_testdb')
TEST_USER = os.environ.get("DBAPP_TEST_USER", os.getenv("USER"))
TEST_HOST = os.environ.get("DBAPP_TEST_HOST", None)
TEST_PORT = os.environ.get("DBAPP_TEST_PORT", None)
ADMIN_DB = os.environ.get("DBAPP_ADMIN_DB", 'postgres')
CREATE_DDL = "CREATE DATABASE %s TEMPLATE = template0"
YAML_SPEC = os.path.normpath(os.path.join(os.getcwd(), '../../../film.yaml'))


class PostgresDb(object):
"""A PostgreSQL database connection
This is separate from the one used by DbConnection, because the
tests need to create and drop databases and other objects,
independently.
"""
def __init__(self, name, user, host, port):
self.name = name
self.user = user
self.host = host
self.port = port and int(port)
self.conn = None

def connect(self):
"""Connect to the database
If we're not already connected we first connect to the admin
database and see if the given database exists. If it doesn't,
we create and then connect to it.
"""
if not self.conn:
conn = pgconnect(ADMIN_DB, self.user, self.host, self.port)
curs = pgexecute(conn,
"SELECT 1 FROM pg_database WHERE datname = '%s'" %
self.name)
row = curs.fetchone()
if not row:
curs.close()
curs = pgexecute_auto(conn, CREATE_DDL % self.name)
curs.close()
conn.close()
self.conn = pgconnect(self.name, self.user, self.host, self.port)

def close(self):
"Close the connection if still open"
if not self.conn:
return ValueError
self.conn.close()

def create(self):
"Drop the database if it exists and re-create it"
conn = pgconnect(ADMIN_DB, self.user, self.host, self.port)
curs = pgexecute_auto(conn, "DROP DATABASE IF EXISTS %s" % self.name)
curs = pgexecute_auto(conn, CREATE_DDL % self.name)
curs.close()
conn.close()

def drop(self):
"Drop the database"
conn = pgconnect(ADMIN_DB, self.user, self.host, self.port)
curs = pgexecute_auto(conn, "DROP DATABASE %s" % self.name)
curs.close()
conn.close()

def execute(self, stmt, args=None):
"Execute a statement"
curs = pgexecute(self.conn, stmt, args)
curs.close()

def execute_commit(self, stmt, args=None):
"Execute a statement and commit"
self.execute(stmt, args)
self.conn.commit()

def fetchone(self, query, args=None):
"Execute a query and return one row"
try:
curs = pgexecute(self.conn, query, args)
except Exception as exc:
raise exc
row = curs.fetchone()
curs.close()
return row


class DbAppTestCase(TestCase):
"""Base class for most test cases"""

@classmethod
def setUpClass(cls):
import yaml
from pyrseas.database import Database
from pyrseas.dbconn import DbConnection as PyrDbConn

cls.db = PostgresDb(TEST_DBNAME, TEST_USER, TEST_HOST, TEST_PORT)
cls.db.connect()
db = Database(PyrDbConn(TEST_DBNAME, TEST_USER, TEST_HOST, TEST_PORT))
stmts = db.diff_map(yaml.load(open(YAML_SPEC)))
for stmt in stmts:
cls.db.execute_commit(stmt)

@classmethod
def tearDownClass(cls):
cls.db.close()

def connection(self):
return DbConnection(self.db.name, self.db.user, self.db.host,
self.db.port)

0 comments on commit 6f66c17

Please sign in to comment.