Skip to content

Commit

Permalink
enable datastore to run without ckan
Browse files Browse the repository at this point in the history
  • Loading branch information
domoritz authored and tobes committed Sep 19, 2012
1 parent e38468e commit 6ea18bb
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 36 deletions.
78 changes: 44 additions & 34 deletions ckanext/datastore/db.py
@@ -1,16 +1,26 @@
import json
import datetime
import shlex
import os
import logging
import pprint
import sqlalchemy
from sqlalchemy.exc import ProgrammingError, IntegrityError
from sqlalchemy import text
import psycopg2.extras
from paste.deploy.converters import asbool, aslist
import ckan.plugins as p

log = logging.getLogger(__name__)

if not os.environ.get('DATASTORE_LOAD'):
from paste.deploy.converters import asbool, aslist
from ckan.plugins import toolkit
ValidationError = toolkit.ValidationError
else:
log.warn("Running datastore without CKAN")

class ValidationError(Exception):
def __init__(self, error_dict):
pprint.pprint(error_dict)

_pg_types = {}
_type_names = set()
_engines = {}
Expand Down Expand Up @@ -75,7 +85,7 @@ def _validate_int(i, field_name):
try:
int(i)
except ValueError:
raise p.toolkit.ValidationError({
raise ValidationError({
'field_name': ['{0} is not an integer'.format(i)]
})

Expand Down Expand Up @@ -208,11 +218,11 @@ def check_fields(context, fields):
'Check if field types are valid.'
for field in fields:
if field.get('type') and not field['type'] in _type_names:
raise p.toolkit.ValidationError({
raise ValidationError({
'fields': ['{0} is not a valid field type'.format(field['type'])]
})
elif not _is_valid_field_name(field['id']):
raise p.toolkit.ValidationError({
raise ValidationError({
'fields': ['{0} is not a valid field name'.format(field['id'])]
})

Expand Down Expand Up @@ -248,15 +258,15 @@ def create_table(context, data_dict):
for field in supplied_fields:
if 'type' not in field:
if not records or field['id'] not in records[0]:
raise p.toolkit.ValidationError({
raise ValidationError({
'fields': ['{0} type not guessable'.format(field['id'])]
})
field['type'] = _guess_type(records[0][field['id']])

if records:
# check record for sanity
if not isinstance(records[0], dict):
raise p.toolkit.ValidationError({
raise ValidationError({
'records': ['The first row is not a json object']
})
supplied_field_ids = records[0].keys()
Expand All @@ -281,7 +291,7 @@ def create_table(context, data_dict):

def _get_aliases(context, data_dict):
res_id = data_dict['resource_id']
alias_sql = text(u'SELECT name FROM "_table_metadata" WHERE alias_of = :id')
alias_sql = sqlalchemy.text(u'SELECT name FROM "_table_metadata" WHERE alias_of = :id')
results = context['connection'].execute(alias_sql, id=res_id).fetchall()
return [x[0] for x in results]

Expand Down Expand Up @@ -327,7 +337,7 @@ def create_indexes(context, data_dict):
fields = _get_list(index)
for field in fields:
if field not in field_ids:
raise p.toolkit.ValidationError({
raise ValidationError({
'index': [('The field {0} is not a valid column name.').format(
index)]
})
Expand All @@ -346,7 +356,7 @@ def create_indexes(context, data_dict):
# create unique index
for field in primary_key:
if field not in field_ids:
raise p.toolkit.ValidationError({
raise ValidationError({
'primary_key': [('The field {0} is not a valid column name.').format(
field)]
})
Expand Down Expand Up @@ -398,7 +408,7 @@ def alter_table(context, data_dict):
# extension of current fields
if num < len(current_fields):
if field['id'] != current_fields[num]['id']:
raise p.toolkit.ValidationError({
raise ValidationError({
'fields': [('Supplied field "{0}" not '
'present or in wrong order').format(field['id'])]
})
Expand All @@ -407,7 +417,7 @@ def alter_table(context, data_dict):

if 'type' not in field:
if not records or field['id'] not in records[0]:
raise p.toolkit.ValidationError({
raise ValidationError({
'fields': ['{0} type not guessable'.format(field['id'])]
})
field['type'] = _guess_type(records[0][field['id']])
Expand All @@ -416,7 +426,7 @@ def alter_table(context, data_dict):
if records:
# check record for sanity
if not isinstance(records[0], dict):
raise p.toolkit.ValidationError({
raise ValidationError({
'records': ['The first row is not a json object']
})
supplied_field_ids = records[0].keys()
Expand Down Expand Up @@ -448,7 +458,7 @@ def upsert_data(context, data_dict):
method = data_dict.get('method', UPSERT)

if method not in _methods:
raise p.toolkit.ValidationError({
raise ValidationError({
'method': [u'{0} is not defined'.format(method)]
})

Expand Down Expand Up @@ -484,7 +494,7 @@ def upsert_data(context, data_dict):
elif method in [UPDATE, UPSERT]:
unique_keys = _get_unique_key(context, data_dict)
if len(unique_keys) < 1:
raise p.toolkit.ValidationError({
raise ValidationError({
'table': [u'table does not have a unique key defined']
})

Expand All @@ -493,7 +503,7 @@ def upsert_data(context, data_dict):
missing_fields = [field for field in unique_keys
if field not in record]
if missing_fields:
raise p.toolkit.ValidationError({
raise ValidationError({
'key': [u'fields "{0}" are missing but needed as key'.format(
', '.join(missing_fields))]
})
Expand All @@ -506,7 +516,7 @@ def upsert_data(context, data_dict):
non_existing_filed_names = [field for field in used_field_names
if field not in field_names]
if non_existing_filed_names:
raise p.toolkit.ValidationError({
raise ValidationError({
'fields': [u'fields "{0}" do not exist'.format(
', '.join(missing_fields))]
})
Expand All @@ -528,7 +538,7 @@ def upsert_data(context, data_dict):

# validate that exactly one row has been updated
if results.rowcount != 1:
raise p.toolkit.ValidationError({
raise ValidationError({
'key': [u'key "{0}" not found'.format(unique_values)]
})

Expand Down Expand Up @@ -576,14 +586,14 @@ def _get_unique_key(context, data_dict):
def _validate_record(record, num, field_names):
# check record for sanity
if not isinstance(record, dict):
raise p.toolkit.ValidationError({
raise ValidationError({
'records': [u'row {0} is not a json object'.format(num)]
})
## check for extra fields in data
extra_keys = set(record.keys()) - set(field_names)

if extra_keys:
raise p.toolkit.ValidationError({
raise ValidationError({
'records': [u'row {0} has extra keys "{1}"'.format(
num + 1,
', '.join(list(extra_keys))
Expand All @@ -607,7 +617,7 @@ def _where(field_ids, data_dict):
filters = data_dict.get('filters', {})

if not isinstance(filters, dict):
raise p.toolkit.ValidationError({
raise ValidationError({
'filters': ['Not a json object']}
)

Expand All @@ -616,7 +626,7 @@ def _where(field_ids, data_dict):

for field, value in filters.iteritems():
if field not in field_ids:
raise p.toolkit.ValidationError({
raise ValidationError({
'filters': ['field "{0}" not in table'.format(field)]}
)
where_clauses.append(u'"{0}" = %s'.format(field))
Expand Down Expand Up @@ -666,18 +676,18 @@ def _sort(context, data_dict, field_ids):
elif len(clause_parts) == 2:
field, sort = clause_parts
else:
raise p.toolkit.ValidationError({
raise ValidationError({
'sort': ['not valid syntax for sort clause']
})
field, sort = unicode(field, 'utf-8'), unicode(sort, 'utf-8')

if field not in field_ids:
raise p.toolkit.ValidationError({
raise ValidationError({
'sort': [u'field {0} not it table'.format(
unicode(field, 'utf-8'))]
})
if sort.lower() not in ('asc', 'desc'):
raise p.toolkit.ValidationError({
raise ValidationError({
'sort': ['sorting can only be asc or desc']
})
clause_parsed.append(u'"{0}" {1}'.format(
Expand Down Expand Up @@ -714,7 +724,7 @@ def search_data(context, data_dict):

for field in field_ids:
if not field in all_field_ids:
raise p.toolkit.ValidationError({
raise ValidationError({
'fields': [u'field "{0}" not in table'.format(field)]}
)
else:
Expand Down Expand Up @@ -823,7 +833,7 @@ def create(context, data_dict):
except IntegrityError, e:
if ('duplicate key value violates unique constraint' in str(e)
or 'could not create unique index' in str(e)):
raise p.toolkit.ValidationError({
raise ValidationError({
'constraints': ['Cannot insert records because of uniqueness constraint'],
'info': {
'details': str(e)
Expand All @@ -833,7 +843,7 @@ def create(context, data_dict):
raise
except Exception, e:
if 'due to statement timeout' in str(e):
raise p.toolkit.ValidationError({
raise ValidationError({
'query': ['Query took too long']
})
raise
Expand Down Expand Up @@ -872,7 +882,7 @@ def delete(context, data_dict):
data_dict['resource_id']
).fetchone()
if not result:
raise p.toolkit.ValidationError({
raise ValidationError({
'resource_id': [u'table for resource {0} does not exist'.format(
data_dict['resource_id'])]
})
Expand Down Expand Up @@ -908,14 +918,14 @@ def search(context, data_dict):
u"(SELECT 1 FROM pg_views where viewname = '{0}')".format(id)
).fetchone()
if not result:
raise p.toolkit.ValidationError({
raise ValidationError({
'resource_id': [u'table for resource {0} does not exist'.format(
data_dict['resource_id'])]
})
return search_data(context, data_dict)
except Exception, e:
if 'due to statement timeout' in str(e):
raise p.toolkit.ValidationError({
raise ValidationError({
'query': ['Search took too long']
})
raise
Expand All @@ -938,7 +948,7 @@ def search_sql(context, data_dict):
return format_results(context, results, data_dict)

except ProgrammingError, e:
raise p.toolkit.ValidationError({
raise ValidationError({
'query': [str(e)],
'info': {
'statement': [e.statement],
Expand All @@ -948,7 +958,7 @@ def search_sql(context, data_dict):
})
except Exception, e:
if 'due to statement timeout' in str(e):
raise p.toolkit.ValidationError({
raise ValidationError({
'query': ['Search took too long']
})
raise
Expand Down
4 changes: 2 additions & 2 deletions ckanext/datastore/logic/action.py
Expand Up @@ -3,7 +3,7 @@
import ckan.logic as logic
import ckan.plugins as p
import ckanext.datastore.db as db
from sqlalchemy import text
import sqlalchemy

log = logging.getLogger(__name__)
_get_or_bust = logic.get_or_bust
Expand Down Expand Up @@ -162,7 +162,7 @@ def datastore_search(context, data_dict):

data_dict['connection_url'] = pylons.config['ckan.datastore_read_url']

resources_sql = text(u'SELECT 1 FROM "_table_metadata" WHERE name = :id')
resources_sql = sqlalchemy.text(u'SELECT 1 FROM "_table_metadata" WHERE name = :id')
results = db._get_engine(None, data_dict).execute(resources_sql, id=res_id)
res_exists = results.rowcount > 0

Expand Down

0 comments on commit 6ea18bb

Please sign in to comment.