From c8132d0115c7019b97de032929a89ae121b3e450 Mon Sep 17 00:00:00 2001 From: Bianca Danforth Date: Tue, 21 May 2024 17:28:00 -0400 Subject: [PATCH] bug-1889156: review feedback except crash prefix splitting changes --- bin/load_processed_crashes_into_es.py | 65 ++++++++++++------- .../test_load_processed_crashes_into_es.py | 52 ++------------- 2 files changed, 44 insertions(+), 73 deletions(-) diff --git a/bin/load_processed_crashes_into_es.py b/bin/load_processed_crashes_into_es.py index ad0fc44118..13be9e8ccd 100755 --- a/bin/load_processed_crashes_into_es.py +++ b/bin/load_processed_crashes_into_es.py @@ -6,7 +6,7 @@ # Loads a processed crash by either crash ID or date from either # GCS or S3 into Elasticsearch, depending on `settings.CRASH_SOURCE`, -# skipping crashes already in Elasticsearch. +# optionally skipping crashes already in Elasticsearch. # Uses a variation of `check_crash_ids_for_date` # from the `verifyprocessed` command in Crash Stats to get crash IDs from S3/GCS: @@ -77,7 +77,7 @@ def check_elasticsearch(supersearch, crash_ids): return set(crash_ids) - set(crash_ids_in_es) -def check_crashids_for_date(firstchars_chunk, date): +def check_crashids_for_date(firstchars_chunk, date, only_missing_in_es): """Check crash ids for a given subset of all crash id prefixes and date""" crash_source = build_instance_from_settings(settings.CRASH_SOURCE) crash_dest = build_instance_from_settings(settings.CRASH_DESTINATIONS["es"]) @@ -109,12 +109,28 @@ def check_crashids_for_date(firstchars_chunk, date): f"Could not find processed crash for raw crash {crash_id}." ) - # Check Elasticsearch in batches - for crash_ids_batch in chunked(in_crash_source, 100): - missing_in_es_batch = check_elasticsearch(supersearch, crash_ids_batch) - missing_in_es.extend(missing_in_es_batch) + if only_missing_in_es: + # Check Elasticsearch in batches + for crash_ids_batch in chunked(in_crash_source, 100): + missing_in_es_batch = check_elasticsearch(supersearch, crash_ids_batch) + missing_in_es.extend(missing_in_es_batch) - return list(set(missing_in_es)) + return list(set(missing_in_es)) + + return in_crash_source + + +def save_crash_to_es(crash_id): + crash_source = build_instance_from_settings(settings.CRASH_SOURCE) + crash_dest = build_instance_from_settings(settings.CRASH_DESTINATIONS["es"]) + try: + processed_crash = crash_source.get_processed_crash(crash_id) + crash_dest.save_processed_crash(None, processed_crash) + return ( + f"Crash with ID {crash_id!r} loaded from {type(crash_source).__name__!r}." + ) + except Exception as exc: + return f"Unable to load crash with ID {crash_id!r} from {type(crash_source).__name__!r}; error: {exc}." @click.command() @@ -136,8 +152,14 @@ def check_crashids_for_date(firstchars_chunk, date): type=int, help="The number of workers to use to check for crash IDs in the crashstorage source. Defaults to 4.", ) +@click.option( + "--only-missing-in-es", + default=False, + type=bool, + help="Whether to load only those processed crashes that are present in the crashstorage source but missing in Elasticsearch. Defaults to False.", +) @click.pass_context -def load_crashes(ctx, date, crash_id, num_workers): +def load_crashes(ctx, date, crash_id, num_workers, only_missing_in_es): """ Loads processed crashes into Elasticsearch by crash source (S3 or GCS) and either crash ID or date. @@ -159,7 +181,11 @@ def load_crashes(ctx, date, crash_id, num_workers): f"Checking for missing processed crashes for: {check_date_formatted}" ) - check_crashids = partial(check_crashids_for_date, date=check_date_formatted) + check_crashids = partial( + check_crashids_for_date, + date=check_date_formatted, + only_missing_in_es=only_missing_in_es, + ) firstchars_chunked = chunked(get_threechars(), CHUNK_SIZE) @@ -181,22 +207,11 @@ def load_crashes(ctx, date, crash_id, num_workers): param_hint=["date", "crash_id"], ) - crash_source = build_instance_from_settings(settings.CRASH_SOURCE) - crash_dest = build_instance_from_settings(settings.CRASH_DESTINATIONS["es"]) - - for crash_id in crash_ids: - try: - processed_crash = crash_source.get_processed_crash(crash_id) - crash_dest.save_processed_crash(None, processed_crash) - - click.echo( - f"Crash with ID {crash_id!r} loaded from {type(crash_source).__name__!r}." - ) - except Exception as exc: - click.echo( - f"Unable to load crash with ID {crash_id!r} from {type(crash_source).__name__!r}; error: {exc}." - ) - continue + with concurrent.futures.ProcessPoolExecutor(max_workers=num_workers) as executor: + results = list( + executor.map(save_crash_to_es, crash_ids, timeout=WORKER_TIMEOUT) + ) + click.echo(results) if __name__ == "__main__": diff --git a/socorro/tests/test_load_processed_crashes_into_es.py b/socorro/tests/test_load_processed_crashes_into_es.py index c311ab09c5..938d68f34b 100644 --- a/socorro/tests/test_load_processed_crashes_into_es.py +++ b/socorro/tests/test_load_processed_crashes_into_es.py @@ -4,7 +4,6 @@ from datetime import date, datetime, time, timezone import json -import pytest from click.testing import CliRunner @@ -62,14 +61,10 @@ def test_it_runs(): assert result.exit_code == 0 -@pytest.mark.skipif( - settings.CLOUD_PROVIDER != "AWS", - reason="Skipping test because CLOUD_PROVIDER is not set to 'AWS'", -) -def test_it_loads_processed_crashes_by_date_s3(s3_helper, es_helper): +def test_it_loads_processed_crashes_by_date(storage_helper, es_helper): """Test whether the module loads processed crashes by date from S3.""" date_str = "2024-05-01" - expected_crashes = load_crashes_into_crashstorage_source(s3_helper, date_str) + expected_crashes = load_crashes_into_crashstorage_source(storage_helper, date_str) runner = CliRunner() result = runner.invoke(load_crashes, ["--date", date_str]) assert result.exit_code == 0 @@ -80,48 +75,9 @@ def test_it_loads_processed_crashes_by_date_s3(s3_helper, es_helper): assert actual_crash == expected_crash -@pytest.mark.skipif( - settings.CLOUD_PROVIDER != "AWS", - reason="Skipping test because CLOUD_PROVIDER is not set to 'AWS'", -) -def test_it_loads_processed_crashes_by_crashid_s3(s3_helper, es_helper): +def test_it_loads_processed_crashes_by_crashid(storage_helper, es_helper): """Test whether the module loads processed crashes by crash id from S3.""" - expected_crashes = load_crashes_into_crashstorage_source(s3_helper) - runner = CliRunner() - expected_crash = expected_crashes[0] - crash_id = expected_crash["uuid"] - result = runner.invoke(load_crashes, ["--crash-id", crash_id]) - assert result.exit_code == 0 - es_helper.refresh() - actual_crash = es_helper.get_crash_data(crash_id)["processed_crash"] - assert actual_crash == expected_crash - - -@pytest.mark.skipif( - settings.CLOUD_PROVIDER != "GCP", - reason="Skipping test because CLOUD_PROVIDER is not set to 'GCP'", -) -def test_it_loads_processed_crashes_by_date_gcs(gcs_helper, es_helper): - """Test whether the module loads processed crashes by date from GCS.""" - date_str = "2024-05-01" - expected_crashes = load_crashes_into_crashstorage_source(gcs_helper, date_str) - runner = CliRunner() - result = runner.invoke(load_crashes, ["--date", date_str]) - assert result.exit_code == 0 - es_helper.refresh() - for expected_crash in expected_crashes: - crash_id = expected_crash["uuid"] - actual_crash = es_helper.get_crash_data(crash_id)["processed_crash"] - assert actual_crash == expected_crash - - -@pytest.mark.skipif( - settings.CLOUD_PROVIDER != "GCP", - reason="Skipping test because CLOUD_PROVIDER is not set to 'GCP'", -) -def test_it_loads_processed_crashes_by_crashid_gcs(gcs_helper, es_helper): - """Test whether the module loads processed crashes by crash id from GCS.""" - expected_crashes = load_crashes_into_crashstorage_source(gcs_helper) + expected_crashes = load_crashes_into_crashstorage_source(storage_helper) runner = CliRunner() expected_crash = expected_crashes[0] crash_id = expected_crash["uuid"]