Skip to content

Commit

Permalink
Merge branch '718-datastore-depends-on-localisation-for-2.0' into rel…
Browse files Browse the repository at this point in the history
…ease-v2.0
  • Loading branch information
amercader committed Apr 15, 2013
2 parents 0e1fdbf + 09b23dd commit 6f8f706
Showing 1 changed file with 54 additions and 43 deletions.
97 changes: 54 additions & 43 deletions ckanext/datastore/db.py
Expand Up @@ -11,7 +11,7 @@
import logging
import pprint
import sqlalchemy
from sqlalchemy.exc import ProgrammingError, IntegrityError
from sqlalchemy.exc import ProgrammingError, IntegrityError, DBAPIError
import psycopg2.extras

log = logging.getLogger(__name__)
Expand All @@ -31,19 +31,27 @@ def __init__(self, error_dict):
_type_names = set()
_engines = {}

# See http://www.postgresql.org/docs/9.2/static/errcodes-appendix.html
_PG_ERR_CODE = {
'unique_violation': 23505,
'query_canceled': 57014,
'undefined_object': 42704,
'syntax_error': 42601
}

_date_formats = ['%Y-%m-%d',
'%Y-%m-%d %H:%M:%S',
'%Y-%m-%dT%H:%M:%S',
'%Y-%m-%dT%H:%M:%SZ',
'%d/%m/%Y',
'%m/%d/%Y',
'%d-%m-%Y',
'%m-%d-%Y',
]
INSERT = 'insert'
UPSERT = 'upsert'
UPDATE = 'update'
_methods = [INSERT, UPSERT, UPDATE]
'%Y-%m-%d %H:%M:%S',
'%Y-%m-%dT%H:%M:%S',
'%Y-%m-%dT%H:%M:%SZ',
'%d/%m/%Y',
'%m/%d/%Y',
'%d-%m-%Y',
'%m-%d-%Y',
]

_INSERT = 'insert'
_UPSERT = 'upsert'
_UPDATE = 'update'


def _strip(input):
Expand Down Expand Up @@ -159,10 +167,10 @@ def _is_valid_pg_type(context, type_name):
try:
connection.execute('SELECT %s::regtype', type_name)
except ProgrammingError, e:
if 'invalid type name' in str(e) or 'does not exist' in str(e):
if int(e.orig.pgcode) in [_PG_ERR_CODE['undefined_object'],
_PG_ERR_CODE['syntax_error']]:
return False
else:
raise
raise
else:
return True

Expand Down Expand Up @@ -520,7 +528,7 @@ def alter_table(context, data_dict):


def insert_data(context, data_dict):
data_dict['method'] = INSERT
data_dict['method'] = _INSERT
return upsert_data(context, data_dict)


Expand All @@ -529,9 +537,9 @@ def upsert_data(context, data_dict):
if not data_dict.get('records'):
return

method = data_dict.get('method', UPSERT)
method = data_dict.get('method', _UPSERT)

if method not in _methods:
if method not in [_INSERT, _UPSERT, _UPDATE]:
raise ValidationError({
'method': [u'"{0}" is not defined'.format(method)]
})
Expand All @@ -542,7 +550,7 @@ def upsert_data(context, data_dict):
sql_columns = ", ".join(['"%s"' % name.replace('%', '%%') for name in field_names]
+ ['"_full_text"'])

if method == INSERT:
if method == _INSERT:
rows = []
for num, record in enumerate(records):
_validate_record(record, num, field_names)
Expand All @@ -565,7 +573,7 @@ def upsert_data(context, data_dict):

context['connection'].execute(sql_string, rows)

elif method in [UPDATE, UPSERT]:
elif method in [_UPDATE, _UPSERT]:
unique_keys = _get_unique_key(context, data_dict)
if len(unique_keys) < 1:
raise ValidationError({
Expand Down Expand Up @@ -607,7 +615,7 @@ def upsert_data(context, data_dict):

full_text = _to_full_text(fields, record)

if method == UPDATE:
if method == _UPDATE:
sql_string = u'''
UPDATE "{res_id}"
SET ({columns}, "_full_text") = ({values}, to_tsvector(%s))
Expand All @@ -628,7 +636,7 @@ def upsert_data(context, data_dict):
'key': [u'key "{0}" not found'.format(unique_values)]
})

elif method == UPSERT:
elif method == _UPSERT:
sql_string = u'''
UPDATE "{res_id}"
SET ({columns}, "_full_text") = ({values}, to_tsvector(%s))
Expand Down Expand Up @@ -961,23 +969,24 @@ def create(context, data_dict):
trans.commit()
return _unrename_json_field(data_dict)
except IntegrityError, e:
if ('duplicate key value violates unique constraint' in str(e)
or 'could not create unique index' in str(e)):
if int(e.orig.pgcode) == _PG_ERR_CODE['unique_violation']:
raise ValidationError({
'constraints': ['Cannot insert records or create index because of uniqueness constraint'],
'constraints': ['Cannot insert records or create index because '
'of uniqueness constraint'],
'info': {
'details': str(e)
}
})
else:
raise
except Exception, e:
trans.rollback()
if 'due to statement timeout' in str(e):
raise
except DBAPIError, e:
if int(e.orig.pgcode) == _PG_ERR_CODE['query_canceled']:
raise ValidationError({
'query': ['Query took too long']
})
raise
except Exception, e:
trans.rollback()
raise
finally:
context['connection'].close()

Expand All @@ -1003,22 +1012,24 @@ def upsert(context, data_dict):
trans.commit()
return _unrename_json_field(data_dict)
except IntegrityError, e:
if 'duplicate key value violates unique constraint' in str(e):
if int(e.orig.pgcode) == _PG_ERR_CODE['unique_violation']:
raise ValidationError({
'constraints': ['Cannot insert records because of uniqueness constraint'],
'constraints': ['Cannot insert records or create index because '
'of uniqueness constraint'],
'info': {
'details': str(e)
}
})
else:
raise
except Exception, e:
trans.rollback()
if 'due to statement timeout' in str(e):
raise
except DBAPIError, e:
if int(e.orig.pgcode) == _PG_ERR_CODE['query_canceled']:
raise ValidationError({
'query': ['Query took too long']
})
raise
except Exception, e:
trans.rollback()
raise
finally:
context['connection'].close()

Expand Down Expand Up @@ -1077,8 +1088,8 @@ def search(context, data_dict):
data_dict['resource_id'])]
})
return search_data(context, data_dict)
except Exception, e:
if 'due to statement timeout' in str(e):
except DBAPIError, e:
if int(e.orig.pgcode) == _PG_ERR_CODE['query_canceled']:
raise ValidationError({
'query': ['Search took too long']
})
Expand Down Expand Up @@ -1110,10 +1121,10 @@ def search_sql(context, data_dict):
'orig': [str(e.orig)]
}
})
except Exception, e:
if 'due to statement timeout' in str(e):
except DBAPIError, e:
if int(e.orig.pgcode) == _PG_ERR_CODE['query_canceled']:
raise ValidationError({
'query': ['Search took too long']
'query': ['Query took too long']
})
raise
finally:
Expand Down

0 comments on commit 6f8f706

Please sign in to comment.