Skip to content

Commit

Permalink
[#1724] Add tests for tag_search() logic action function
Browse files Browse the repository at this point in the history
And fix a couple of bugs found in tag_search() and tag_list() in the process,
and factor out some code repetition, and add some docstrings.
  • Loading branch information
Sean Hammond committed Feb 14, 2012
1 parent a08f209 commit 6ec1e64
Show file tree
Hide file tree
Showing 2 changed files with 161 additions and 30 deletions.
79 changes: 53 additions & 26 deletions ckan/logic/action/get.py
Expand Up @@ -203,23 +203,13 @@ def tag_list(context, data_dict):
model = context['model']
user = context['user']

all_fields = data_dict.get('all_fields',None)
all_fields = data_dict.get('all_fields', None)

check_access('tag_list',context, data_dict)

q = data_dict.get('q','')
if q:
limit = data_dict.get('limit',25)
offset = data_dict.get('offset',0)
return_objects = data_dict.get('return_objects',True)

query = query_for(model.Tag)
query.run(query=q,
limit=limit,
offset=offset,
return_objects=return_objects,
username=user)
tags = query.results
tags = _tag_search(context, data_dict)
else:
tags = model.Session.query(model.Tag).all()

Expand Down Expand Up @@ -607,15 +597,6 @@ def package_autocomplete(context, data_dict):

return pkg_list

def tag_autocomplete(context, data_dict):
'''Returns tags containing the provided string'''
check_access('tag_autocomplete', context, data_dict)
matching_tags = tag_search(context, data_dict)
if matching_tags:
return [tag.name for tag in matching_tags['results']]
else:
return []

def format_autocomplete(context, data_dict):
'''Returns formats containing the provided string'''
model = context['model']
Expand Down Expand Up @@ -803,10 +784,22 @@ def resource_search(context, data_dict):

return {'count': count, 'results': results}

def tag_search(context, data_dict):
def _tag_search(context, data_dict):
'''Return a list of tag objects that contain the given string.
The query string should be provided in the data_dict with key 'query' or
'q'.
By default only free tags (tags that don't belong to a vocabulary) are
searched. If a 'vocabulary_name' is provided in the data_dict then tags
belonging to the named vocabulary will be searched instead.
'''
model = context['model']

query = data_dict.get('query') or data_dict.get('q')
if query:
query = query.strip()
terms = [query] if query else []

fields = data_dict.get('fields', {})
Expand Down Expand Up @@ -834,17 +827,51 @@ def tag_search(context, data_dict):
terms.append(value)

if not len(terms):
return
return []

for term in terms:
escaped_term = misc.escape_sql_like_special_characters(term, escape='\\')
q = q.filter(model.Tag.name.ilike('%' + escaped_term + '%'))

count = q.count()
q = q.offset(offset)
q = q.limit(limit)
results = [r for r in q]
return {'count': count, 'results': results}
return q.all()

def tag_search(context, data_dict):
'''Return a list of tag dictionaries that contain the given string.
The query string should be provided in the data_dict with key 'query' or
'q'.
By default only free tags (tags that don't belong to a vocabulary) are
searched. If a 'vocabulary_name' is provided in the data_dict then tags
belonging to the named vocabulary will be searched instead.
Returns a dictionary with keys 'count' (the number of tags in the result)
and 'results' (the list of tag dicts).
'''
tags = _tag_search(context, data_dict)
return {'count': len(tags),
'results': [table_dictize(tag, context) for tag in tags]}

def tag_autocomplete(context, data_dict):
'''Return a list of tag names that contain the given string.
The query string should be provided in the data_dict with key 'query' or
'q'.
By default only free tags (tags that don't belong to a vocabulary) are
searched. If a 'vocabulary_name' is provided in the data_dict then tags
belonging to the named vocabulary will be searched instead.
'''
check_access('tag_autocomplete', context, data_dict)
matching_tags = _tag_search(context, data_dict)
if matching_tags:
return [tag.name for tag in matching_tags]
else:
return []

def task_status_show(context, data_dict):
model = context['model']
Expand Down
112 changes: 108 additions & 4 deletions ckan/tests/logic/test_action.py
Expand Up @@ -51,6 +51,19 @@ def make_some_vocab_tags(cls):
composers_vocab = model.Vocabulary(u'composers')
model.Session.add(composers_vocab)

# Create some additional free tags for tag search tests.
tolkien_tag = model.Tag(name="tolkien")
model.Session.add(tolkien_tag)
toledo_tag = model.Tag(name="toledo")
model.Session.add(toledo_tag)
tolerance_tag = model.Tag(name="tolerance")
model.Session.add(tolerance_tag)
tollbooth_tag = model.Tag(name="tollbooth")
model.Session.add(tollbooth_tag)
# We have to add free tags to a package or they won't show up in tag results.
model.Package.get('warandpeace').add_tags((tolkien_tag, toledo_tag,
tolerance_tag, tollbooth_tag))

# Create some tags that belong to vocabularies.
sonata_tag = model.Tag(name=u'sonata', vocabulary_id=genre_vocab.id)
model.Session.add(sonata_tag)
Expand Down Expand Up @@ -322,7 +335,8 @@ def test_06a_tag_list(self):
json.loads(res.body),
{'help': 'Returns a list of tags',
'success': True,
'result': ['russian', 'tolstoy', u'Flexible \u30a1']})
'result': ['russian', 'tolstoy', u'Flexible \u30a1', 'tollbooth',
'tolkien', 'toledo', 'tolerance']})
#Get all fields
postparams = '%s=1' % json.dumps({'all_fields':True})
res = self.app.post('/api/action/tag_list', params=postparams)
Expand Down Expand Up @@ -709,6 +723,96 @@ def test_14_group_show(self):
'success': False
}

def test_15a_tag_search_with_empty_query(self):
for q in ('missing', None, '', ' '):
paramd = {}
if q != 'missing':
paramd['q'] = q
params = json.dumps(paramd)
res = self.app.post('/api/action/tag_search', params=params)
assert res.json['success'] is True
assert res.json['result']['count'] == 0
assert res.json['result']['results'] == []

def test_15a_tag_search_with_no_matches(self):
paramd = {'q': 'no matches' }
params = json.dumps(paramd)
res = self.app.post('/api/action/tag_search', params=params)
assert res.json['success'] is True
assert res.json['result']['count'] == 0
assert res.json['result']['results'] == []

def test_15a_tag_search_with_one_match(self):
paramd = {'q': 'russ' }
params = json.dumps(paramd)
res = self.app.post('/api/action/tag_search', params=params)
assert res.json['success'] is True
assert res.json['result']['count'] == 1
tag_dicts = res.json['result']['results']
assert len(tag_dicts) == 1
assert tag_dicts[0]['name'] == 'russian'

def test_15a_tag_search_with_many_matches(self):
paramd = {'q': 'tol' }
params = json.dumps(paramd)
res = self.app.post('/api/action/tag_search', params=params)
assert res.json['success'] is True
assert res.json['result']['count'] == 5
tag_dicts = res.json['result']['results']
assert ([tag['name'] for tag in tag_dicts] ==
sorted(['tolkien', 'toledo', 'tolerance', 'tollbooth', 'tolstoy']))

def test_15a_tag_search_with_vocab_and_empty_query(self):
for q in ('missing', None, '', ' '):
paramd = {'vocabulary_name': 'genre'}
if q != 'missing':
paramd['q'] = q
params = json.dumps(paramd)
res = self.app.post('/api/action/tag_search', params=params)
assert res.json['success'] is True
assert res.json['result']['count'] == 0
assert res.json['result']['results'] == []

def test_15a_tag_search_with_vocab_and_one_match(self):
paramd = {'q': 'son', 'vocabulary_name': 'genre' }
params = json.dumps(paramd)
res = self.app.post('/api/action/tag_search', params=params)
assert res.json['success'] is True
assert res.json['result']['count'] == 1
tag_dicts = res.json['result']['results']
assert len(tag_dicts) == 1
assert tag_dicts[0]['name'] == 'sonata'

def test_15a_tag_search_with_vocab_and_multiple_matches(self):
paramd = {'q': 'neo', 'vocabulary_name': 'genre' }
params = json.dumps(paramd)
res = self.app.post('/api/action/tag_search', params=params)
assert res.json['success'] is True
assert res.json['result']['count'] == 6
tag_dicts = res.json['result']['results']
assert [tag['name'] for tag in tag_dicts] == sorted(('neoclassical',
'neofolk', 'neomedieval', 'neoprog', 'neopsychedelia', 'neosoul'))

def test_15a_tag_search_with_vocab_and_no_matches(self):
paramd = {'q': 'xxxxxxx', 'vocabulary_name': 'genre' }
params = json.dumps(paramd)
res = self.app.post('/api/action/tag_search', params=params)
assert res.json['success'] is True
assert res.json['result']['count'] == 0
tag_dicts = res.json['result']['results']
assert tag_dicts == []

def test_15a_tag_search_with_vocab_that_does_not_exist(self):
paramd = {'q': 'neo', 'vocabulary_name': 'xxxxxx' }
params = json.dumps(paramd)
self.app.post('/api/action/tag_search', params=params, status=404)

def test_15a_tag_search_with_invalid_vocab(self):
for vocab_name in (None, '', 'a', 'e'*200):
paramd = {'q': 'neo', 'vocabulary_name': vocab_name }
params = json.dumps(paramd)
self.app.post('/api/action/tag_search', params=params, status=404)

def test_15_tag_autocomplete(self):
#Empty query
postparams = '%s=1' % json.dumps({})
Expand All @@ -726,7 +830,7 @@ def test_15_tag_autocomplete(self):
res_obj = json.loads(res.body)
assert res_obj == {
'help': 'Returns tags containing the provided string',
'result': ['russian'],
'result': ['russian', 'tolerance'],
'success': True
}

Expand Down Expand Up @@ -895,8 +999,8 @@ def test_15_tag_autocomplete_with_vocab_and_multiple_matches(self):
params = json.dumps(paramd)
res = self.app.post('/api/action/tag_autocomplete', params=params)
assert res.json['success'] is True
assert res.json['result'] == ['neoclassical', 'neofolk', 'neomedieval',
'neoprog', 'neopsychedelia', 'neosoul'], res.json['result']
assert res.json['result'] == sorted(('neoclassical', 'neofolk',
'neomedieval', 'neoprog', 'neopsychedelia', 'neosoul'))

def test_15_tag_autocomplete_with_vocab_and_no_matches(self):
paramd = {'vocabulary_name': 'composers', 'q': 'Jonny Greenwood'}
Expand Down

0 comments on commit 6ec1e64

Please sign in to comment.