Permalink
Browse files

Merge pull request #1911 from selenamarie/bug976088-upsert-for-proces…

…sed_crashes

Fixes bug 976088 - add upsert for processed_crashes storage
  • Loading branch information...
2 parents c1025be + 5be75a5 commit f7d4fc43754d079c716d20b5be9a79ab6504698a @selenamarie selenamarie committed Feb 27, 2014
Showing with 184 additions and 184 deletions.
  1. +65 −52 socorro/external/postgresql/crashstorage.py
  2. +119 −132 socorro/unittest/external/postgresql/test_crashstorage.py
@@ -36,7 +36,7 @@ class PostgreSQLCrashStorage(CrashStorageBase):
required_config.add_option(
'transaction_executor_class',
default="socorro.database.transaction_executor."
- "TransactionExecutorWithInfiniteBackoff",
+ "TransactionExecutorWithInfiniteBackoff",
doc='a class that will manage transactions',
from_string_converter=class_converter,
reference_value_from='resource.postgresql',
@@ -111,7 +111,7 @@ def save_raw_crash(self, raw_crash, dumps, crash_id):
#--------------------------------------------------------------------------
def _save_raw_crash_transaction(self, connection, raw_crash, crash_id):
raw_crash_table_name = (
- 'raw_crashes_%s' % self._table_suffix_for_crash_id(crash_id)
+ 'raw_crashes_%s' % self._table_suffix_for_crash_id(crash_id)
)
insert_sql = """insert into %s (uuid, raw_crash, date_processed) values
(%%s, %%s, %%s)""" % raw_crash_table_name
@@ -125,23 +125,23 @@ def _save_raw_crash_transaction(self, connection, raw_crash, crash_id):
try:
execute_no_results(connection, insert_sql, value_list)
execute_no_results(
- connection,
- "release savepoint %s" % savepoint_name
+ connection,
+ "release savepoint %s" % savepoint_name
)
except self.config.database_class.IntegrityError:
# report already exists
execute_no_results(
- connection,
- "rollback to savepoint %s" % savepoint_name
+ connection,
+ "rollback to savepoint %s" % savepoint_name
)
execute_no_results(
- connection,
- "release savepoint %s" % savepoint_name
+ connection,
+ "release savepoint %s" % savepoint_name
)
execute_no_results(
- connection,
- "delete from %s where uuid = %%s" % raw_crash_table_name,
- (crash_id,)
+ connection,
+ "delete from %s where uuid = %%s" % raw_crash_table_name,
+ (crash_id,)
)
execute_no_results(connection, insert_sql, value_list)
@@ -159,7 +159,7 @@ def get_raw_crash(self, crash_id):
#--------------------------------------------------------------------------
def _get_raw_crash_transaction(self, connection, crash_id):
raw_crash_table_name = (
- 'raw_crash_%s' % self._table_suffix_for_crash_id(crash_id)
+ 'raw_crash_%s' % self._table_suffix_for_crash_id(crash_id)
)
fetch_sql = 'select raw_crash from %s where uuid = %ss' % \
raw_crash_table_name
@@ -177,34 +177,48 @@ def _save_processed_transaction(self, connection, processed_crash):
report_id = self._save_processed_report(connection, processed_crash)
self._save_plugins(connection, processed_crash, report_id)
self._save_extensions(connection, processed_crash, report_id)
+ self._save_processed_crash(connection, processed_crash)
+ def _save_processed_crash(self, connection, processed_crash):
crash_id = processed_crash['uuid']
processed_crashes_table_name = (
- 'processed_crashes_%s' % self._table_suffix_for_crash_id(crash_id)
+ 'processed_crashes_%s' % self._table_suffix_for_crash_id(crash_id)
)
- insert_sql = """insert into %s (uuid, processed_crash, date_processed) values
- (%%s, %%s, %%s)""" % processed_crashes_table_name
-
- savepoint_name = threading.currentThread().getName().replace('-', '')
- value_list = (
- crash_id,
- json.dumps(processed_crash, cls=JsonDTEncoder),
- processed_crash["date_processed"]
- )
- execute_no_results(connection, "savepoint %s" % savepoint_name)
- try:
- execute_no_results(connection, insert_sql, value_list)
- execute_no_results(
- connection,
- "release savepoint %s" % savepoint_name
- )
- except self.config.database_class.IntegrityError:
- # report already exists
- execute_no_results(
- connection,
- "rollback to savepoint %s" % savepoint_name
+ upsert_sql = """
+ WITH
+ update_processed_crash AS (
+ UPDATE %(table)s SET
+ processed_crash = %%(processed_json)s,
+ date_processed = %%(date_processed)s
+ WHERE uuid = %%(uuid)s
+ RETURNING 1
+ ),
+ insert_processed_crash AS (
+ INSERT INTO %(table)s (uuid, processed_crash, date_processed)
+ ( SELECT
+ %%(uuid)s as uuid,
+ %%(processed_json)s as processed_crash,
+ %%(date_processed)s as date_processed
+ WHERE NOT EXISTS (
+ SELECT uuid from %(table)s
+ WHERE
+ uuid = %%(uuid)s
+ LIMIT 1
+ )
)
+ RETURNING 2
+ )
+ SELECT * from update_processed_crash
+ UNION ALL
+ SELECT * from insert_processed_crash
+ """ % {'table': processed_crashes_table_name, 'uuid': crash_id}
+ values = {
+ 'processed_json': json.dumps(processed_crash, cls=JsonDTEncoder),
+ 'date_processed': processed_crash["date_processed"],
+ 'uuid': crash_id
+ }
+ execute_no_results(connection, upsert_sql, values)
#--------------------------------------------------------------------------
def _save_processed_report(self, connection, processed_crash):
@@ -217,7 +231,7 @@ def _save_processed_report(self, connection, processed_crash):
value_list.append(processed_crash[pro_crash_name])
crash_id = processed_crash['uuid']
reports_table_name = (
- 'reports_%s' % self._table_suffix_for_crash_id(crash_id)
+ 'reports_%s' % self._table_suffix_for_crash_id(crash_id)
)
insert_sql = "insert into %s (%s) values (%s) returning id" % (
reports_table_name,
@@ -236,23 +250,23 @@ def _save_processed_report(self, connection, processed_crash):
try:
report_id = single_value_sql(connection, insert_sql, value_list)
execute_no_results(
- connection,
- "release savepoint %s" % savepoint_name
+ connection,
+ "release savepoint %s" % savepoint_name
)
except self.config.database_class.IntegrityError:
# report already exists
execute_no_results(
- connection,
- "rollback to savepoint %s" % savepoint_name
+ connection,
+ "rollback to savepoint %s" % savepoint_name
)
execute_no_results(
- connection,
- "release savepoint %s" % savepoint_name
+ connection,
+ "release savepoint %s" % savepoint_name
)
execute_no_results(
- connection,
- "delete from %s where uuid = %%s" % reports_table_name,
- (processed_crash.uuid,)
+ connection,
+ "delete from %s where uuid = %%s" % reports_table_name,
+ (processed_crash.uuid,)
)
report_id = single_value_sql(connection, insert_sql, value_list)
return report_id
@@ -322,10 +336,10 @@ def _save_extensions(self, connection, processed_crash, report_id):
table_suffix = self._table_suffix_for_crash_id(crash_id)
extensions_table_name = 'extensions_%s' % table_suffix
extensions_insert_sql = (
- "insert into %s "
- " (report_id, date_processed, extension_key, extension_id, "
- " extension_version)"
- "values (%%s, %%s, %%s, %%s, %%s)" % extensions_table_name
+ "insert into %s "
+ " (report_id, date_processed, extension_key, extension_id, "
+ " extension_version)"
+ "values (%%s, %%s, %%s, %%s, %%s)" % extensions_table_name
)
for i, x in enumerate(extensions):
try:
@@ -337,8 +351,8 @@ def _save_extensions(self, connection, processed_crash, report_id):
x[1]))
except IndexError:
self.config.logger.warning(
- '"%s" is deficient as a name and version for an addon',
- str(x[0])
+ '"%s" is deficient as a name and version for an addon',
+ str(x[0])
)
#--------------------------------------------------------------------------
@@ -347,9 +361,8 @@ def _table_suffix_for_crash_id(crash_id):
"""given an crash_id, return the name of its storage table"""
crash_id_date = uuid_to_date(crash_id)
previous_monday_date = (
- crash_id_date + datetime.timedelta(days=-crash_id_date.weekday())
+ crash_id_date + datetime.timedelta(days=-crash_id_date.weekday())
)
return '%4d%02d%02d' % (previous_monday_date.year,
previous_monday_date.month,
previous_monday_date.day)
-
Oops, something went wrong.

0 comments on commit f7d4fc4

Please sign in to comment.