Skip to content

Commit

Permalink
[#1775,#1776] Add more tests for tag_create() logic action function
Browse files Browse the repository at this point in the history
  • Loading branch information
Sean Hammond committed Feb 8, 2012
1 parent 6f7b965 commit 8853b04
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 8 deletions.
13 changes: 11 additions & 2 deletions ckan/logic/schema.py
Expand Up @@ -36,7 +36,8 @@
vocabulary_id_exists,
user_id_exists,
object_id_validator,
activity_type_exists)
activity_type_exists,
tag_not_in_vocabulary)
from formencode.validators import OneOf
import ckan.model

Expand Down Expand Up @@ -75,7 +76,8 @@ def default_update_resource_schema():

def default_tags_schema():
schema = {
'name': [not_empty,
'name': [not_missing,
not_empty,
unicode,
tag_length_validator,
tag_name_validator,
Expand All @@ -88,6 +90,13 @@ def default_tags_schema():

def default_create_tag_schema():
schema = default_tags_schema()
# When creating a tag via the create_tag() logic action function, a
# vocabulary_id _must_ be given (you cannot create free tags via this
# function).
schema['vocabulary_id'] = [not_missing, not_empty, unicode,
vocabulary_id_exists, tag_not_in_vocabulary]
# You're not allowed to specify your own ID when creating a tag.
schema['id'] = [empty]
return schema

def default_package_schema():
Expand Down
20 changes: 20 additions & 0 deletions ckan/logic/validators.py
Expand Up @@ -446,3 +446,23 @@ def tag_in_vocabulary_validator(value, context):
raise Invalid(_('Tag %s does not belong to vocabulary %s') % (value, vocabulary.name))
return value

def tag_not_in_vocabulary(key, tag_dict, errors, context):
tag_name = tag_dict[('name',)]
if not tag_name:
raise Invalid(_('No tag name'))
if tag_dict.has_key(('vocabulary_id',)):
vocabulary_id = tag_dict[('vocabulary_id',)]
else:
vocabulary_id = None
model = context['model']
session = context['session']

query = session.query(model.Tag)
query = query.filter(model.Tag.vocabulary_id==vocabulary_id)
query = query.filter(model.Tag.name==tag_name)
count = query.count()
if count > 0:
raise Invalid(_('Tag %s already belongs to vocabulary %s') %
(tag_name, vocabulary_id))
else:
return
142 changes: 136 additions & 6 deletions ckan/tests/functional/api/model/test_vocabulary.py
Expand Up @@ -174,7 +174,7 @@ def test_vocabulary_create(self):
def test_vocabulary_create_id(self):
'''Test error response when user tries to supply their own ID when
creating a vocabulary.
'''
params = {'id': 'xxx', 'name': 'foobar'}
param_string = json.dumps(params)
Expand Down Expand Up @@ -337,11 +337,141 @@ def test_vocabulary_delete_not_authorized(self):
assert response.json['success'] == False

def test_add_tag_to_vocab(self):
vocab = self._create_vocabulary(vocab_name="Musical Genres",
user=self.sysadmin_user)
'''Test that a tag can be added to and then retrieved from a vocab.'''
vocab = self.genre_vocab
tags_before = self._list_tags(vocab)
assert len(tags_before) == 0, tags_before
tag_created = self._create_tag(self.sysadmin_user, 'noise', vocab)
tags_after = self._list_tags(vocab)
assert len(tags_after) == 1
assert tag_created['name'] in tags_after
new_tag_names = [tag_name for tag_name in tags_after if tag_name not in
tags_before]
assert len(new_tag_names) == 1
assert tag_created['name'] in new_tag_names

def test_add_tag_no_vocab(self):
'''Test the error response when a user tries to create a tab without
specifying a vocab.
'''
tag_dict = {'name': 'noise'}
tag_string = json.dumps(tag_dict)
response = self.app.post('/api/action/tag_create',
params=tag_string,
extra_environ = {'Authorization':
str(self.sysadmin_user.apikey)},
status=409)
assert response.json['success'] == False

def test_add_tag_vocab_not_exists(self):
'''Test the error response when a user tries to add a tag to a vocab
that doesn't exist.
'''
tag_dict = {'name': 'noise', 'vocabulary_id': 'does not exist'}
tag_string = json.dumps(tag_dict)
response = self.app.post('/api/action/tag_create',
params=tag_string,
extra_environ = {'Authorization':
str(self.sysadmin_user.apikey)},
status=409)
assert response.json['success'] == False

def test_add_tag_already_added(self):
'''Test the error response when a user tries to add a tag to a vocab
that already has a tag with the same name.
'''
self.test_add_tag_to_vocab()
vocab = self.genre_vocab
tag_dict = {'name': 'noise', 'vocabulary_id': vocab['id']}
tag_string = json.dumps(tag_dict)
response = self.app.post('/api/action/tag_create',
params=tag_string,
extra_environ = {'Authorization':
str(self.sysadmin_user.apikey)},
status=409)
assert response.json['success'] == False

def test_add_tag_with_id(self):
'''Test the error response when a user tries to specify the tag ID when
adding a tag to a vocab.
'''
tag_dict = {
'id': 'dsagdsgsgsd',
'name': 'noise',
'vocabulary_id': self.genre_vocab['id']
}
tag_string = json.dumps(tag_dict)
response = self.app.post('/api/action/tag_create',
params=tag_string,
extra_environ = {'Authorization':
str(self.sysadmin_user.apikey)},
status=409)
assert response.json['success'] == False

def test_add_tag_without_name(self):
'''Test the error response when a user tries to create a tag without a
name.
'''
tag_dict = {
'vocabulary_id': self.genre_vocab['id']
}
tag_string = json.dumps(tag_dict)
response = self.app.post('/api/action/tag_create',
params=tag_string,
extra_environ = {'Authorization':
str(self.sysadmin_user.apikey)},
status=409)
assert response.json['success'] == False

def test_add_tag_invalid_name(self):
for name in ('Not a valid tag name!', '', None):
tag_dict = {
'name': name,
'vocabulary_id': self.genre_vocab['id']
}
tag_string = json.dumps(tag_dict)
response = self.app.post('/api/action/tag_create',
params=tag_string,
extra_environ = {'Authorization':
str(self.sysadmin_user.apikey)},
status=409)
assert response.json['success'] == False

def test_add_tag_invalid_vocab_id(self):
tag_dict = {
'name': 'noise',
'vocabulary_id': 'xxcxzczxczxc',
}
tag_string = json.dumps(tag_dict)
response = self.app.post('/api/action/tag_create',
params=tag_string,
extra_environ = {'Authorization':
str(self.sysadmin_user.apikey)},
status=409)
assert response.json['success'] == False

def test_add_tag_not_logged_in(self):
tag_dict = {
'name': 'noise',
'vocabulary_id': self.genre_vocab['id']
}
tag_string = json.dumps(tag_dict)
response = self.app.post('/api/action/tag_create',
params=tag_string,
status=403)
assert response.json['success'] == False

def test_add_tag_not_authorized(self):
tag_dict = {
'name': 'noise',
'vocabulary_id': self.genre_vocab['id']
}
tag_string = json.dumps(tag_dict)
response = self.app.post('/api/action/tag_create',
params=tag_string,
extra_environ = {'Authorization':
str(self.normal_user.apikey)},
status=403)
assert response.json['success'] == False

0 comments on commit 8853b04

Please sign in to comment.