Skip to content
This repository has been archived by the owner on Sep 23, 2020. It is now read-only.

Commit

Permalink
roll back code for new user programs with tests
Browse files Browse the repository at this point in the history
  • Loading branch information
BuzzTroll committed Jul 9, 2010
1 parent 939b5a1 commit e1f34ad
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 15 deletions.
8 changes: 4 additions & 4 deletions home/libexec/nimbus_edit_user.py
Expand Up @@ -84,7 +84,7 @@ def setup_options(argv):

return (o, args, parser)

def add_gridmap(o):
def add_gridmap(dn):
nimbus_home = get_nimbus_home()
configpath = os.path.join(nimbus_home, 'nimbus-setup.conf')
config = SafeConfigParser()
Expand All @@ -101,11 +101,11 @@ def add_gridmap(o):
if l == "":
continue
a = shlex.split(l)
if o.dn == a[0]:
if dn == a[0]:
print "WARNING! This dn is already in the gridmap file"
f.close()
return
f.write("\"%s\" not_a_real_account\n" % (o.dn))
f.write("\"%s\" not_a_real_account\n" % (dn))
f.close()

def remove_gridmap(dn):
Expand Down Expand Up @@ -193,7 +193,7 @@ def edit_user(o, db):
dnu.set_name(o.dn.strip())

remove_gridmap(old_dn)
add_gridmap(o)
add_gridmap(o.dn)

try:
remove_member(groupauthz_dir, old_dn)
Expand Down
46 changes: 39 additions & 7 deletions home/libexec/nimbus_new_user.py
Expand Up @@ -58,6 +58,38 @@ def get_dn(cert_file):
dn = autoca.getCertDN(cert_file, webdir, log)
return dn

def remove_gridmap(dn):
nimbus_home = get_nimbus_home()
configpath = os.path.join(nimbus_home, 'nimbus-setup.conf')
config = SafeConfigParser()
if not config.read(configpath):
raise CLIError('ENIMBUSHOME',
"Failed to read config from '%s'. Has Nimbus been configured?"
% configpath)
gmf = config.get('nimbussetup', 'gridmap')
gmf = os.path.join(nimbus_home, gmf)

found = False
f = open(gmf, 'r')
(nf, new_name) = tempfile.mkstemp(dir=nimbus_home+"/var", prefix="gridmap", text=True)
for l in f.readlines():
l = l.strip()
if l == "":
continue
a = shlex.split(l)
if dn == a[0]:
found = True
else:
os.write(nf, l)
os.write(nf, os.linesep)

if not found:
print "WARNING! user not found in %s" % (dn)
os.close(nf)
f.close()
os.unlink(gmf)
os.rename(new_name, gmf)

def generate_cert(o):
nimbus_home = get_nimbus_home()
webdir = os.path.join(nimbus_home, 'web/')
Expand Down Expand Up @@ -321,13 +353,10 @@ def do_group_bidnes(o):

nh = get_nimbus_home()
groupauthz_dir = os.path.join(nh, "services/etc/nimbus/workspace-service/group-authz/")
try:
if o.group:
add_member(groupauthz_dir, o.dn, int(o.group))
else:
add_member(groupauthz_dir, o.dn)
except Exception, ex:
print "WARNING %s" % (ex)
if o.group:
add_member(groupauthz_dir, o.dn, int(o.group))
else:
add_member(groupauthz_dir, o.dn)

def report_results(o, db):
user = User.get_user_by_friendly(db, o.emailaddr)
Expand Down Expand Up @@ -358,6 +387,9 @@ def main(argv=sys.argv[1:]):
report_results(o, db)
db.close()
except CLIError, clie:
if DEBUG:
traceback.print_exc(file=sys.stdout)

print clie
return clie.get_rc()

Expand Down
4 changes: 4 additions & 0 deletions home/libexec/nimbus_remove_user.py
Expand Up @@ -24,6 +24,7 @@
import shlex
from nimbusweb.setup.setuperrors import *
from nimbusweb.setup.groupauthz import *
import traceback

g_created_cert_files=False
g_report_options = ["cert", "key", "dn", "canonical_id", "access_id", "access_secret", "url", "web_id"]
Expand Down Expand Up @@ -127,6 +128,9 @@ def main(argv=sys.argv[1:]):
except CLIError, clie:
print clie
return clie.get_rc()
except:
traceback.print_exc(file=sys.stdout)

return 0

if __name__ == "__main__":
Expand Down
6 changes: 2 additions & 4 deletions tests/user-test.sh
Expand Up @@ -18,8 +18,6 @@ export PYTHONPATH
DJANGO_SETTINGS_MODULE="nimbusweb.portal.settings"
export DJANGO_SETTINGS_MODULE

cd $NIMBUS_HOME/
./bin/nimbusctl restart

source $NIMBUS_HOME/ve/bin/activate
cd $NIMBUS_HOME/libexec
nosetests ../tests/user_tests.py ../tests/ec2_test.py
nosetests ../tests/user_tests.py ../tests/user_failures_tests.py
100 changes: 100 additions & 0 deletions tests/user_failures_tests.py
@@ -0,0 +1,100 @@
import string
import random
import os
import sys
import nose.tools
import boto
from boto.ec2.connection import EC2Connection
import boto.ec2
import sys
from ConfigParser import SafeConfigParser
import time
import unittest
import tempfile
import filecmp
import pycb
import pynimbusauthz
from pynimbusauthz.db import *
from pynimbusauthz.user import *
import pycb.test_common
from boto.s3.connection import OrdinaryCallingFormat
from boto.s3.connection import S3Connection
import random
import nimbus_remove_user
import nimbus_new_user
import nimbus_list_users
import nimbus_edit_user

class TestUsersFailures(unittest.TestCase):


def setUp(self):
self.users = []
self.nh = os.environ['NIMBUS_HOME']
os.environ['NIMBUS_HOME'] = "/not/such place"

def tearDown(self):
os.environ['NIMBUS_HOME'] = self.nh
for f in self.users:
nimbus_remove_user.main([f])

def get_user_name(self, friendly_name=None):
if friendly_name == None:
friendly_name = str(uuid.uuid1())
self.users.append(friendly_name)
return friendly_name

def find_in_file(self, fname, needle):
found = False
f = open(fname)
l = f.readline()
while l:
print "#### " + l
x = l.find(needle)
if x >= 0:
found = True
l = f.readline()
f.close()
return found


def test_new_user(self):
friendly_name = self.get_user_name()

(tmpFD, outFileName) = tempfile.mkstemp("cumulustests")
os.close(tmpFD)

rc = nimbus_new_user.main([friendly_name])
self.assertNotEqual(rc, 0, "should not be 0 %d" % (rc))

# make sure the user was not added
os.environ['NIMBUS_HOME'] = self.nh
rc = nimbus_list_users.main(["-b", "-r", "display_name", "-O", outFileName, friendly_name])
rc = self.find_in_file(outFileName, friendly_name)
self.assertFalse(rc)

def test_remove_user(self):
friendly_name = self.get_user_name()

(tmpFD, outFileName) = tempfile.mkstemp("cumulustests")
os.close(tmpFD)

# add a good user
os.environ['NIMBUS_HOME'] = self.nh
rc = nimbus_new_user.main([friendly_name])
self.assertEqual(rc, 0, "should be 0 %d" % (rc))

os.environ['NIMBUS_HOME'] = "/nope"
# remove with an error
rc = nimbus_remove_user.main([friendly_name])
self.assertNotEqual(rc, 0, "should not be 0 %d" % (rc))

# relist to see user is still there
os.environ['NIMBUS_HOME'] = self.nh
rc = nimbus_list_users.main(["-b", "-r", "display_name", "-O", outFileName, friendly_name])
self.assertEqual(rc, 0, "should not be 0 %d" % (rc))
rc = self.find_in_file(outFileName, friendly_name)
self.assertTrue(rc)



1 change: 1 addition & 0 deletions tests/user_tests.py
Expand Up @@ -209,3 +209,4 @@ def test_db_commit_user(self):
rc = nimbus_new_user.main([friendly_name])
self.assertEqual(rc, 0, "but then this clarification should succeed %d" % (rc))


0 comments on commit e1f34ad

Please sign in to comment.