Skip to content

Commit

Permalink
[#718] Try to avoid PL/pgSQL since we cannot guarantee that is is act…
Browse files Browse the repository at this point in the history
…ivated
  • Loading branch information
domoritz committed Mar 28, 2013
1 parent 26b45b6 commit c2e13e5
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 41 deletions.
51 changes: 30 additions & 21 deletions ckanext/datastore/db.py
Expand Up @@ -32,9 +32,11 @@ def __init__(self, error_dict):
_engines = {}

# See http://www.postgresql.org/docs/9.2/static/errcodes-appendix.html
_pg_err_code = {
_PG_ERR_CODE = {
'unique_violation': 23505,
'query_canceled': 57014
'query_canceled': 57014,
'undefined_object': 42704,
'syntax_error': 42601
}

_date_formats = ['%Y-%m-%d',
Expand All @@ -46,10 +48,10 @@ def __init__(self, error_dict):
'%d-%m-%Y',
'%m-%d-%Y',
]
INSERT = 'insert'
UPSERT = 'upsert'
UPDATE = 'update'
_methods = [INSERT, UPSERT, UPDATE]

_INSERT = 'insert'
_UPSERT = 'upsert'
_UPDATE = 'update'


def _strip(input):
Expand Down Expand Up @@ -162,8 +164,15 @@ def _is_valid_pg_type(context, type_name):
return True
else:
connection = context['connection']
return connection.execute('SELECT is_valid_type(%s)',
type_name).first()[0]
try:
connection.execute('SELECT %s::regtype', type_name)
except ProgrammingError, e:
if int(e.orig.pgcode) in [_PG_ERR_CODE['undefined_object'],
_PG_ERR_CODE['syntax_error']]:
return False
raise
else:
return True


def _get_type(context, oid):
Expand Down Expand Up @@ -520,7 +529,7 @@ def alter_table(context, data_dict):


def insert_data(context, data_dict):
data_dict['method'] = INSERT
data_dict['method'] = _INSERT
return upsert_data(context, data_dict)


Expand All @@ -529,9 +538,9 @@ def upsert_data(context, data_dict):
if not data_dict.get('records'):
return

method = data_dict.get('method', UPSERT)
method = data_dict.get('method', _UPSERT)

if method not in _methods:
if method not in [_INSERT, _UPSERT, _UPDATE]:
raise ValidationError({
'method': [u'"{0}" is not defined'.format(method)]
})
Expand All @@ -542,7 +551,7 @@ def upsert_data(context, data_dict):
sql_columns = ", ".join(['"%s"' % name.replace('%', '%%') for name in field_names]
+ ['"_full_text"'])

if method == INSERT:
if method == _INSERT:
rows = []
for num, record in enumerate(records):
_validate_record(record, num, field_names)
Expand All @@ -565,7 +574,7 @@ def upsert_data(context, data_dict):

context['connection'].execute(sql_string, rows)

elif method in [UPDATE, UPSERT]:
elif method in [_UPDATE, _UPSERT]:
unique_keys = _get_unique_key(context, data_dict)
if len(unique_keys) < 1:
raise ValidationError({
Expand Down Expand Up @@ -607,7 +616,7 @@ def upsert_data(context, data_dict):

full_text = _to_full_text(fields, record)

if method == UPDATE:
if method == _UPDATE:
sql_string = u'''
UPDATE "{res_id}"
SET ({columns}, "_full_text") = ({values}, to_tsvector(%s))
Expand All @@ -628,7 +637,7 @@ def upsert_data(context, data_dict):
'key': [u'key "{0}" not found'.format(unique_values)]
})

elif method == UPSERT:
elif method == _UPSERT:
sql_string = u'''
UPDATE "{res_id}"
SET ({columns}, "_full_text") = ({values}, to_tsvector(%s))
Expand Down Expand Up @@ -962,7 +971,7 @@ def create(context, data_dict):
trans.commit()
return _unrename_json_field(data_dict)
except IntegrityError, e:
if int(e.orig.pgcode) == _pg_err_code['unique_violation']:
if int(e.orig.pgcode) == _PG_ERR_CODE['unique_violation']:
raise ValidationError({
'constraints': ['Cannot insert records or create index because '
'of uniqueness constraint'],
Expand All @@ -972,7 +981,7 @@ def create(context, data_dict):
})
raise
except DBAPIError, e:
if int(e.orig.pgcode) == _pg_err_code['query_canceled']:
if int(e.orig.pgcode) == _PG_ERR_CODE['query_canceled']:
raise ValidationError({
'query': ['Query took too long']
})
Expand Down Expand Up @@ -1005,7 +1014,7 @@ def upsert(context, data_dict):
trans.commit()
return _unrename_json_field(data_dict)
except IntegrityError, e:
if int(e.orig.pgcode) == _pg_err_code['unique_violation']:
if int(e.orig.pgcode) == _PG_ERR_CODE['unique_violation']:
raise ValidationError({
'constraints': ['Cannot insert records or create index because '
'of uniqueness constraint'],
Expand All @@ -1015,7 +1024,7 @@ def upsert(context, data_dict):
})
raise
except DBAPIError, e:
if int(e.orig.pgcode) == _pg_err_code['query_canceled']:
if int(e.orig.pgcode) == _PG_ERR_CODE['query_canceled']:
raise ValidationError({
'query': ['Query took too long']
})
Expand Down Expand Up @@ -1082,7 +1091,7 @@ def search(context, data_dict):
})
return search_data(context, data_dict)
except DBAPIError, e:
if int(e.orig.pgcode) == _pg_err_code['query_canceled']:
if int(e.orig.pgcode) == _PG_ERR_CODE['query_canceled']:
raise ValidationError({
'query': ['Search took too long']
})
Expand Down Expand Up @@ -1115,7 +1124,7 @@ def search_sql(context, data_dict):
}
})
except DBAPIError, e:
if int(e.orig.pgcode) == _pg_err_code['query_canceled']:
if int(e.orig.pgcode) == _PG_ERR_CODE['query_canceled']:
raise ValidationError({
'query': ['Query took too long']
})
Expand Down
20 changes: 0 additions & 20 deletions ckanext/datastore/plugin.py
Expand Up @@ -105,7 +105,6 @@ def new_resource_show(context, data_dict):
if not hasattr(resource_show, '_datastore_wrapped'):
new_resource_show._datastore_wrapped = True
logic._actions['resource_show'] = new_resource_show
self._add_is_valid_type_function()

def _is_read_only_database(self):
for url in [self.ckan_url, self.write_url, self.read_url]:
Expand Down Expand Up @@ -206,25 +205,6 @@ def _create_alias_table(self):
{'connection_url': pylons.config['ckan.datastore.write_url']}).connect()
connection.execute(create_alias_table_sql)

def _add_is_valid_type_function(self):
# syntax_error - may occur if someone provides a keyword as a type
# undefined_object - is raised if the type does not exist
create_func_sql = '''
CREATE OR REPLACE FUNCTION is_valid_type(v_type text)
RETURNS boolean
AS $$
BEGIN
PERFORM v_type::regtype;
RETURN true;
EXCEPTION WHEN undefined_object OR syntax_error THEN
RETURN false;
END;
$$ LANGUAGE plpgsql stable;
'''
connection = db._get_engine(None,
{'connection_url': pylons.config['ckan.datastore.write_url']}).connect()
connection.execute(create_func_sql)

def get_actions(self):
actions = {'datastore_create': action.datastore_create,
'datastore_upsert': action.datastore_upsert,
Expand Down

0 comments on commit c2e13e5

Please sign in to comment.