Skip to content

Commit

Permalink
Merge pull request #2360 from twobraids/clearer-than-upsert
Browse files Browse the repository at this point in the history
fixes Bug 1066316 - fix PG extension table insert for reprocessing jobs
  • Loading branch information
twobraids committed Sep 12, 2014
2 parents 849858f + 2c449ec commit 1b9caba
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 6 deletions.
18 changes: 18 additions & 0 deletions socorro/external/postgresql/crashstorage.py
Expand Up @@ -183,6 +183,7 @@ def _save_processed_transaction(self, connection, processed_crash):
self._save_extensions(connection, processed_crash, report_id)
self._save_processed_crash(connection, processed_crash)

#--------------------------------------------------------------------------
def _save_processed_crash(self, connection, processed_crash):
crash_id = processed_crash['uuid']
processed_crashes_table_name = (
Expand Down Expand Up @@ -371,6 +372,8 @@ def _save_plugins(self, connection, processed_crash, report_id):
#--------------------------------------------------------------------------
def _save_extensions(self, connection, processed_crash, report_id):
extensions = processed_crash['addons']
if not extensions:
return
crash_id = processed_crash['uuid']
table_suffix = self._table_suffix_for_crash_id(crash_id)
extensions_table_name = 'extensions_%s' % table_suffix
Expand All @@ -380,6 +383,21 @@ def _save_extensions(self, connection, processed_crash, report_id):
" extension_version)"
"values (%%s, %%s, %%s, %%s, %%s)" % extensions_table_name
)
# why are we deleting first? This might be a reprocessing job and
# the extensions data might already be in the table: a straight insert
# might fail. Why not check to see if there is data already there
# and then just not insert if data is there? We may be reprocessing
# to deal with missing extensions data, so just because there is
# already data there doesn't mean that we can skip this.
# What about using "upsert" sql - that would be fine and result in one
# fewer round trip between client and database, but "upsert" sql is
# opaque and not easy to understand at a glance. This was faster to
# implement. What about using "transaction check points"?
# too many round trips between the client and the server.
clear_extensions_sql = (
"delete from %s where report_id = %%s" % extensions_table_name
)
execute_no_results(connection, clear_extensions_sql, (report_id,))
for i, x in enumerate(extensions):
try:
execute_no_results(connection, extensions_insert_sql,
Expand Down
18 changes: 12 additions & 6 deletions socorro/unittest/external/postgresql/test_crashstorage.py
Expand Up @@ -390,9 +390,9 @@ def fetch_all_func(*args):
database = crashstorage.database.return_value = m
m.cursor.return_value.fetchall.side_effect = fetch_all_func
crashstorage.save_processed(a_processed_crash)
eq_(m.cursor.call_count, 5)
eq_(m.cursor.call_count, 6)
eq_(m.cursor().fetchall.call_count, 2)
eq_(m.cursor().execute.call_count, 5)
eq_(m.cursor().execute.call_count, 6)

expected_execute_args = (
(('WITH update_report AS (UPDATE reports_20120402 SET addons_checked=%s, address=%s, app_notes=%s, build=%s, client_crash_date=%s, completed_datetime=%s, cpu_info=%s, cpu_name=%s, date_processed=%s, distributor=%s, distributor_version=%s, email=%s, exploitability=%s, flash_version=%s, hangid=%s, install_age=%s, last_crash=%s, os_name=%s, os_version=%s, processor_notes=%s, process_type=%s, product=%s, productid=%s, reason=%s, release_channel=%s, signature=%s, started_datetime=%s, success=%s, topmost_filenames=%s, truncated=%s, uptime=%s, user_comments=%s, user_id=%s, url=%s, uuid=%s, version=%s WHERE uuid=%s RETURNING id), insert_report AS (INSERT INTO reports_20120402(addons_checked, address, app_notes, build, client_crash_date, completed_datetime, cpu_info, cpu_name, date_processed, distributor, distributor_version, email, exploitability, flash_version, hangid, install_age, last_crash, os_name, os_version, processor_notes, process_type, product, productid, reason, release_channel, signature, started_datetime, success, topmost_filenames, truncated, uptime, user_comments, user_id, url, uuid, version)(SELECT%s as addons_checked, %s as address, %s as app_notes, %s as build, %s as client_crash_date, %s as completed_datetime, %s as cpu_info, %s as cpu_name, %s as date_processed, %s as distributor, %s as distributor_version, %s as email, %s as exploitability, %s as flash_version, %s as hangid, %s as install_age, %s as last_crash, %s as os_name, %s as os_version, %s as processor_notes, %s as process_type, %s as product, %s as productid, %s as reason, %s as release_channel, %s as signature, %s as started_datetime, %s as success, %s as topmost_filenames, %s as truncated, %s as uptime, %s as user_comments, %s as user_id, %s as url, %s as uuid, %s as version WHERE NOT EXISTS(SELECT uuid from reports_20120402 WHERE uuid=%s LIMIT 1)) RETURNING id) SELECT * from update_report UNION ALL SELECT * from insert_report',
Expand All @@ -402,6 +402,8 @@ def fetch_all_func(*args):
('dwight.txt', 'wilma')),),
(('insert into plugins_reports_20120402 (report_id, plugin_id, date_processed, version) values (%s, %s, %s, %s)',
(666, 23, '2012-04-08 10:56:41.558922', '69')),),
(('delete from extensions_20120402 where report_id = %s',
(666, )),),
(('insert into extensions_20120402 (report_id, date_processed, extension_key, extension_id, extension_version)values (%s, %s, %s, %s, %s)',
(666, '2012-04-08 10:56:41.558922', 0, '{1a5dabbd-0e74-41da-b532-a364bb552cab}', '1.0.4.1')),),
(("""WITH update_processed_crash AS ( UPDATE processed_crashes_20120402 SET processed_crash = %(processed_json)s, date_processed = %(date_processed)s WHERE uuid = %(uuid)s RETURNING 1), insert_processed_crash AS ( INSERT INTO processed_crashes_20120402 (uuid, processed_crash, date_processed) ( SELECT %(uuid)s as uuid, %(processed_json)s as processed_crash, %(date_processed)s as date_processed WHERE NOT EXISTS ( SELECT uuid from processed_crashes_20120402 WHERE uuid = %(uuid)s LIMIT 1)) RETURNING 2) SELECT * from update_processed_crash UNION ALL SELECT * from insert_processed_crash """,
Expand Down Expand Up @@ -453,9 +455,9 @@ def fetch_all_func(*args):
database = crashstorage.database.return_value = m
m.cursor.return_value.fetchall.side_effect = fetch_all_func
crashstorage.save_processed(a_processed_crash)
eq_(m.cursor.call_count, 6)
eq_(m.cursor.call_count, 7)
eq_(m.cursor().fetchall.call_count, 3)
eq_(m.cursor().execute.call_count, 6)
eq_(m.cursor().execute.call_count, 7)

expected_execute_args = (
(('WITH update_report AS (UPDATE reports_20120402 SET addons_checked=%s, address=%s, app_notes=%s, build=%s, client_crash_date=%s, completed_datetime=%s, cpu_info=%s, cpu_name=%s, date_processed=%s, distributor=%s, distributor_version=%s, email=%s, exploitability=%s, flash_version=%s, hangid=%s, install_age=%s, last_crash=%s, os_name=%s, os_version=%s, processor_notes=%s, process_type=%s, product=%s, productid=%s, reason=%s, release_channel=%s, signature=%s, started_datetime=%s, success=%s, topmost_filenames=%s, truncated=%s, uptime=%s, user_comments=%s, user_id=%s, url=%s, uuid=%s, version=%s WHERE uuid=%s RETURNING id), insert_report AS (INSERT INTO reports_20120402(addons_checked, address, app_notes, build, client_crash_date, completed_datetime, cpu_info, cpu_name, date_processed, distributor, distributor_version, email, exploitability, flash_version, hangid, install_age, last_crash, os_name, os_version, processor_notes, process_type, product, productid, reason, release_channel, signature, started_datetime, success, topmost_filenames, truncated, uptime, user_comments, user_id, url, uuid, version)(SELECT%s as addons_checked, %s as address, %s as app_notes, %s as build, %s as client_crash_date, %s as completed_datetime, %s as cpu_info, %s as cpu_name, %s as date_processed, %s as distributor, %s as distributor_version, %s as email, %s as exploitability, %s as flash_version, %s as hangid, %s as install_age, %s as last_crash, %s as os_name, %s as os_version, %s as processor_notes, %s as process_type, %s as product, %s as productid, %s as reason, %s as release_channel, %s as signature, %s as started_datetime, %s as success, %s as topmost_filenames, %s as truncated, %s as uptime, %s as user_comments, %s as user_id, %s as url, %s as uuid, %s as version WHERE NOT EXISTS(SELECT uuid from reports_20120402 WHERE uuid=%s LIMIT 1)) RETURNING id) SELECT * from update_report UNION ALL SELECT * from insert_report',
Expand All @@ -467,6 +469,8 @@ def fetch_all_func(*args):
('dwight.txt', 'wilma')),),
(('insert into plugins_reports_20120402 (report_id, plugin_id, date_processed, version) values (%s, %s, %s, %s)',
(666, 23, '2012-04-08 10:56:41.558922', '69')),),
(('delete from extensions_20120402 where report_id = %s',
(666, )),),
(('insert into extensions_20120402 (report_id, date_processed, extension_key, extension_id, extension_version)values (%s, %s, %s, %s, %s)',
(666, '2012-04-08 10:56:41.558922', 0, '{1a5dabbd-0e74-41da-b532-a364bb552cab}', '1.0.4.1')),),
(("""WITH update_processed_crash AS ( UPDATE processed_crashes_20120402 SET processed_crash = %(processed_json)s, date_processed = %(date_processed)s WHERE uuid = %(uuid)s RETURNING 1), insert_processed_crash AS ( INSERT INTO processed_crashes_20120402 (uuid, processed_crash, date_processed) ( SELECT %(uuid)s as uuid, %(processed_json)s as processed_crash, %(date_processed)s as date_processed WHERE NOT EXISTS ( SELECT uuid from processed_crashes_20120402 WHERE uuid = %(uuid)s LIMIT 1)) RETURNING 2) SELECT * from update_processed_crash UNION ALL SELECT * from insert_processed_crash """,
Expand Down Expand Up @@ -578,9 +582,9 @@ def broken_connection(*args):
database = crashstorage.database.return_value = m
m.cursor.side_effect = broken_connection
crashstorage.save_processed(a_processed_crash)
eq_(m.cursor.call_count, 8)
eq_(m.cursor.call_count, 9)
eq_(m.cursor().fetchall.call_count, 3)
eq_(m.cursor().execute.call_count, 6)
eq_(m.cursor().execute.call_count, 7)

expected_execute_args = (
(('WITH update_report AS (UPDATE reports_20120402 SET addons_checked=%s, address=%s, app_notes=%s, build=%s, client_crash_date=%s, completed_datetime=%s, cpu_info=%s, cpu_name=%s, date_processed=%s, distributor=%s, distributor_version=%s, email=%s, exploitability=%s, flash_version=%s, hangid=%s, install_age=%s, last_crash=%s, os_name=%s, os_version=%s, processor_notes=%s, process_type=%s, product=%s, productid=%s, reason=%s, release_channel=%s, signature=%s, started_datetime=%s, success=%s, topmost_filenames=%s, truncated=%s, uptime=%s, user_comments=%s, user_id=%s, url=%s, uuid=%s, version=%s WHERE uuid=%s RETURNING id), insert_report AS (INSERT INTO reports_20120402(addons_checked, address, app_notes, build, client_crash_date, completed_datetime, cpu_info, cpu_name, date_processed, distributor, distributor_version, email, exploitability, flash_version, hangid, install_age, last_crash, os_name, os_version, processor_notes, process_type, product, productid, reason, release_channel, signature, started_datetime, success, topmost_filenames, truncated, uptime, user_comments, user_id, url, uuid, version)(SELECT%s as addons_checked, %s as address, %s as app_notes, %s as build, %s as client_crash_date, %s as completed_datetime, %s as cpu_info, %s as cpu_name, %s as date_processed, %s as distributor, %s as distributor_version, %s as email, %s as exploitability, %s as flash_version, %s as hangid, %s as install_age, %s as last_crash, %s as os_name, %s as os_version, %s as processor_notes, %s as process_type, %s as product, %s as productid, %s as reason, %s as release_channel, %s as signature, %s as started_datetime, %s as success, %s as topmost_filenames, %s as truncated, %s as uptime, %s as user_comments, %s as user_id, %s as url, %s as uuid, %s as version WHERE NOT EXISTS(SELECT uuid from reports_20120402 WHERE uuid=%s LIMIT 1)) RETURNING id) SELECT * from update_report UNION ALL SELECT * from insert_report',
Expand All @@ -592,6 +596,8 @@ def broken_connection(*args):
('dwight.txt', 'wilma')),),
(('insert into plugins_reports_20120402 (report_id, plugin_id, date_processed, version) values (%s, %s, %s, %s)',
(666, 23, '2012-04-08 10:56:41.558922', '69')),),
(('delete from extensions_20120402 where report_id = %s',
(666, )),),
(('insert into extensions_20120402 (report_id, date_processed, extension_key, extension_id, extension_version)values (%s, %s, %s, %s, %s)',
(666, '2012-04-08 10:56:41.558922', 0, '{1a5dabbd-0e74-41da-b532-a364bb552cab}', '1.0.4.1')),),
(("""WITH update_processed_crash AS ( UPDATE processed_crashes_20120402 SET processed_crash = %(processed_json)s, date_processed = %(date_processed)s WHERE uuid = %(uuid)s RETURNING 1), insert_processed_crash AS ( INSERT INTO processed_crashes_20120402 (uuid, processed_crash, date_processed) ( SELECT %(uuid)s as uuid, %(processed_json)s as processed_crash, %(date_processed)s as date_processed WHERE NOT EXISTS ( SELECT uuid from processed_crashes_20120402 WHERE uuid = %(uuid)s LIMIT 1)) RETURNING 2) SELECT * from update_processed_crash UNION ALL SELECT * from insert_processed_crash """,
Expand Down

0 comments on commit 1b9caba

Please sign in to comment.