From 3f4b397062159de3db1a603a4c8f5f1168a3196d Mon Sep 17 00:00:00 2001 From: Bianca Danforth Date: Thu, 2 May 2024 16:12:49 -0400 Subject: [PATCH 1/3] bug-1889156: script to load processed crashes into Elasticsearch --- bin/load_processed_crashes_into_es.py | 203 ++++++++++++++++++ .../test_load_processed_crashes_into_es.py | 132 ++++++++++++ 2 files changed, 335 insertions(+) create mode 100755 bin/load_processed_crashes_into_es.py create mode 100644 socorro/tests/test_load_processed_crashes_into_es.py diff --git a/bin/load_processed_crashes_into_es.py b/bin/load_processed_crashes_into_es.py new file mode 100755 index 0000000000..ad0fc44118 --- /dev/null +++ b/bin/load_processed_crashes_into_es.py @@ -0,0 +1,203 @@ +#!/usr/bin/env python + +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at https://mozilla.org/MPL/2.0/. + +# Loads a processed crash by either crash ID or date from either +# GCS or S3 into Elasticsearch, depending on `settings.CRASH_SOURCE`, +# skipping crashes already in Elasticsearch. + +# Uses a variation of `check_crash_ids_for_date` +# from the `verifyprocessed` command in Crash Stats to get crash IDs from S3/GCS: +# https://github.com/mozilla-services/socorro/blob/3f39c6aaa7f294884f3261fd268e8084d5eec93a/webapp/crashstats/crashstats/management/commands/verifyprocessed.py#L77-L115 + +# Usage: ./bin/load_processed_crash_into_es.py [OPTIONS] [CRASH_ID | DATE] + +import concurrent.futures +import datetime +from functools import partial +from isodate import parse_date +from more_itertools import chunked + +import click + +from socorro import settings +from socorro.external.es.super_search_fields import FIELDS +from socorro.external.es.supersearch import SuperSearch +from socorro.lib.libooid import date_from_ooid +from socorro.libclass import build_instance_from_settings + +NUM_CRASHIDS_TO_FETCH = "all" +# Number of prefix variations to pass to a check_crashids subprocess +CHUNK_SIZE = 4 +# Number of seconds until we decide a worker has stalled +WORKER_TIMEOUT = 15 * 60 + + +def is_in_storage(crash_storage, crash_id): + """Is the processed crash in storage.""" + return crash_storage.exists_object(f"v1/processed_crash/{crash_id}") + + +def get_threechars(): + """Generate all combinations of 3 hex digits.""" + chars = "0123456789abcdef" + for x in chars: + for y in chars: + for z in chars: + yield x + y + z + + +def check_elasticsearch(supersearch, crash_ids): + """Checks Elasticsearch and returns list of missing crash ids. + + Crash ids should all be on the same day. + + """ + crash_ids = [crash_ids] if isinstance(crash_ids, str) else crash_ids + crash_date = date_from_ooid(crash_ids[0]) + + # The datestamp in the crashid doesn't match the processed date sometimes especially + # when the crash came in at the end of the day. + start_date = (crash_date - datetime.timedelta(days=5)).strftime("%Y-%m-%d") + end_date = (crash_date + datetime.timedelta(days=5)).strftime("%Y-%m-%d") + + params = { + "uuid": crash_ids, + "date": [">=%s" % start_date, "<=%s" % end_date], + "_columns": ["uuid"], + "_facets": [], + "_facets_size": 0, + "_fields": FIELDS, + } + search_results = supersearch.get(**params) + + crash_ids_in_es = [hit["uuid"] for hit in search_results["hits"]] + return set(crash_ids) - set(crash_ids_in_es) + + +def check_crashids_for_date(firstchars_chunk, date): + """Check crash ids for a given subset of all crash id prefixes and date""" + crash_source = build_instance_from_settings(settings.CRASH_SOURCE) + crash_dest = build_instance_from_settings(settings.CRASH_DESTINATIONS["es"]) + + supersearch = SuperSearch(crash_dest) + + in_crash_source = [] + missing_in_es = [] + + for firstchars in firstchars_chunk: + # Grab all the crash ids at the given date directory + page_iterator = crash_source.list_objects_paginator( + prefix=f"v1/raw_crash/{date}/{firstchars}", + ) + + for page in page_iterator: + # NOTE(bdanforth): Keys here look like /v1/raw_crash/DATE/CRASHID + crash_ids = [item.split("/")[-1] for item in page] + + if not crash_ids: + continue + + # Check crashstorage source first + for crash_id in crash_ids: + if is_in_storage(crash_source, crash_id): + in_crash_source.append(crash_id) + else: + click.echo( + f"Could not find processed crash for raw crash {crash_id}." + ) + + # Check Elasticsearch in batches + for crash_ids_batch in chunked(in_crash_source, 100): + missing_in_es_batch = check_elasticsearch(supersearch, crash_ids_batch) + missing_in_es.extend(missing_in_es_batch) + + return list(set(missing_in_es)) + + +@click.command() +@click.option( + "--date", + default=None, + type=str, + help=("Date to load processed crashes from as YYYY-MM-DD. Defaults to None."), +) +@click.option( + "--crash-id", + default=None, + type=str, + help="A single crash ID to load into ES from the source. E.g. 64f9a777-771d-4db3-82fa-b9c190240430. Defaults to None.", +) +@click.option( + "--num-workers", + default=4, + type=int, + help="The number of workers to use to check for crash IDs in the crashstorage source. Defaults to 4.", +) +@click.pass_context +def load_crashes(ctx, date, crash_id, num_workers): + """ + Loads processed crashes into Elasticsearch by crash source (S3 or GCS) + and either crash ID or date. + + Must specify either CRASH_ID or DATE. + + """ + crash_ids = [] + + if crash_id: + crash_ids.append(crash_id) + elif date: + check_date = parse_date(date) + if not check_date: + raise click.ClickException(f"Unrecognized run_time format: {date}") + + check_date_formatted = check_date.strftime("%Y%m%d") + click.echo( + f"Checking for missing processed crashes for: {check_date_formatted}" + ) + + check_crashids = partial(check_crashids_for_date, date=check_date_formatted) + + firstchars_chunked = chunked(get_threechars(), CHUNK_SIZE) + + if num_workers == 1: + for result in map(check_crashids, firstchars_chunked): + crash_ids.extend(result) + else: + with concurrent.futures.ProcessPoolExecutor( + max_workers=num_workers + ) as executor: + for result in executor.map( + check_crashids, firstchars_chunked, timeout=WORKER_TIMEOUT + ): + crash_ids.extend(result) + else: + raise click.BadParameter( + "Neither date nor crash_id were provided. At least one must be provided.", + ctx=ctx, + param_hint=["date", "crash_id"], + ) + + crash_source = build_instance_from_settings(settings.CRASH_SOURCE) + crash_dest = build_instance_from_settings(settings.CRASH_DESTINATIONS["es"]) + + for crash_id in crash_ids: + try: + processed_crash = crash_source.get_processed_crash(crash_id) + crash_dest.save_processed_crash(None, processed_crash) + + click.echo( + f"Crash with ID {crash_id!r} loaded from {type(crash_source).__name__!r}." + ) + except Exception as exc: + click.echo( + f"Unable to load crash with ID {crash_id!r} from {type(crash_source).__name__!r}; error: {exc}." + ) + continue + + +if __name__ == "__main__": + load_crashes() diff --git a/socorro/tests/test_load_processed_crashes_into_es.py b/socorro/tests/test_load_processed_crashes_into_es.py new file mode 100644 index 0000000000..c311ab09c5 --- /dev/null +++ b/socorro/tests/test_load_processed_crashes_into_es.py @@ -0,0 +1,132 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at https://mozilla.org/MPL/2.0/. + +from datetime import date, datetime, time, timezone +import json +import pytest + +from click.testing import CliRunner + +from bin.load_processed_crashes_into_es import load_crashes +from socorro.lib.libooid import create_new_ooid +from socorro import settings + + +# Setup helper, so we have some processed crashes to load into ES +def load_crashes_into_crashstorage_source(helper, date_str="2024-05-01", num_crashes=2): + bucket = settings.STORAGE["options"]["bucket"] + helper.create_bucket(bucket) + + expected_processed_crashes = [] + date_args = [int(date_part) for date_part in date_str.split("-")] + date_datetime = date(*date_args) + expected_processed_crash_base = { + "date_processed": datetime.combine( + date_datetime, time.min, timezone.utc + ).isoformat(), + } + raw_crash_base = {"submitted_timestamp": date_str} + + for _ in range(num_crashes): + # Encode the date in the crash ID, so we can determine the correct + # Elasticsearch index by crash ID downstream. + crash_id = create_new_ooid(timestamp=date_datetime) + processed_crash = {**expected_processed_crash_base, "uuid": crash_id} + expected_processed_crashes.append(processed_crash) + + # Upload raw crash for this crash ID, since only the raw crash + # path contains the date for lookups. + raw_crash = {**raw_crash_base, "uuid": crash_id} + date_str_fmt = raw_crash["submitted_timestamp"].replace("-", "") + helper.upload( + bucket_name=bucket, + key=f"v1/raw_crash/{date_str_fmt}/{crash_id}", + data=json.dumps(raw_crash).encode("utf-8"), + ) + + # Upload processed crash for the crash ID + helper.upload( + bucket_name=bucket, + key=f"v1/processed_crash/{crash_id}", + data=json.dumps(processed_crash).encode("utf-8"), + ) + + return expected_processed_crashes + + +def test_it_runs(): + """Test whether the module loads and spits out help.""" + runner = CliRunner() + result = runner.invoke(load_crashes, ["--help"]) + assert result.exit_code == 0 + + +@pytest.mark.skipif( + settings.CLOUD_PROVIDER != "AWS", + reason="Skipping test because CLOUD_PROVIDER is not set to 'AWS'", +) +def test_it_loads_processed_crashes_by_date_s3(s3_helper, es_helper): + """Test whether the module loads processed crashes by date from S3.""" + date_str = "2024-05-01" + expected_crashes = load_crashes_into_crashstorage_source(s3_helper, date_str) + runner = CliRunner() + result = runner.invoke(load_crashes, ["--date", date_str]) + assert result.exit_code == 0 + es_helper.refresh() + for expected_crash in expected_crashes: + crash_id = expected_crash["uuid"] + actual_crash = es_helper.get_crash_data(crash_id)["processed_crash"] + assert actual_crash == expected_crash + + +@pytest.mark.skipif( + settings.CLOUD_PROVIDER != "AWS", + reason="Skipping test because CLOUD_PROVIDER is not set to 'AWS'", +) +def test_it_loads_processed_crashes_by_crashid_s3(s3_helper, es_helper): + """Test whether the module loads processed crashes by crash id from S3.""" + expected_crashes = load_crashes_into_crashstorage_source(s3_helper) + runner = CliRunner() + expected_crash = expected_crashes[0] + crash_id = expected_crash["uuid"] + result = runner.invoke(load_crashes, ["--crash-id", crash_id]) + assert result.exit_code == 0 + es_helper.refresh() + actual_crash = es_helper.get_crash_data(crash_id)["processed_crash"] + assert actual_crash == expected_crash + + +@pytest.mark.skipif( + settings.CLOUD_PROVIDER != "GCP", + reason="Skipping test because CLOUD_PROVIDER is not set to 'GCP'", +) +def test_it_loads_processed_crashes_by_date_gcs(gcs_helper, es_helper): + """Test whether the module loads processed crashes by date from GCS.""" + date_str = "2024-05-01" + expected_crashes = load_crashes_into_crashstorage_source(gcs_helper, date_str) + runner = CliRunner() + result = runner.invoke(load_crashes, ["--date", date_str]) + assert result.exit_code == 0 + es_helper.refresh() + for expected_crash in expected_crashes: + crash_id = expected_crash["uuid"] + actual_crash = es_helper.get_crash_data(crash_id)["processed_crash"] + assert actual_crash == expected_crash + + +@pytest.mark.skipif( + settings.CLOUD_PROVIDER != "GCP", + reason="Skipping test because CLOUD_PROVIDER is not set to 'GCP'", +) +def test_it_loads_processed_crashes_by_crashid_gcs(gcs_helper, es_helper): + """Test whether the module loads processed crashes by crash id from GCS.""" + expected_crashes = load_crashes_into_crashstorage_source(gcs_helper) + runner = CliRunner() + expected_crash = expected_crashes[0] + crash_id = expected_crash["uuid"] + result = runner.invoke(load_crashes, ["--crash-id", crash_id]) + assert result.exit_code == 0 + es_helper.refresh() + actual_crash = es_helper.get_crash_data(crash_id)["processed_crash"] + assert actual_crash == expected_crash From 9704c8f6bc37e9a0d5055cce7a742c59b74f64bb Mon Sep 17 00:00:00 2001 From: Bianca Danforth Date: Tue, 21 May 2024 17:28:00 -0400 Subject: [PATCH 2/3] bug-1889156: review feedback except crash prefix splitting changes --- bin/load_processed_crashes_into_es.py | 65 ++++++++++++------- .../test_load_processed_crashes_into_es.py | 52 ++------------- 2 files changed, 44 insertions(+), 73 deletions(-) diff --git a/bin/load_processed_crashes_into_es.py b/bin/load_processed_crashes_into_es.py index ad0fc44118..13be9e8ccd 100755 --- a/bin/load_processed_crashes_into_es.py +++ b/bin/load_processed_crashes_into_es.py @@ -6,7 +6,7 @@ # Loads a processed crash by either crash ID or date from either # GCS or S3 into Elasticsearch, depending on `settings.CRASH_SOURCE`, -# skipping crashes already in Elasticsearch. +# optionally skipping crashes already in Elasticsearch. # Uses a variation of `check_crash_ids_for_date` # from the `verifyprocessed` command in Crash Stats to get crash IDs from S3/GCS: @@ -77,7 +77,7 @@ def check_elasticsearch(supersearch, crash_ids): return set(crash_ids) - set(crash_ids_in_es) -def check_crashids_for_date(firstchars_chunk, date): +def check_crashids_for_date(firstchars_chunk, date, only_missing_in_es): """Check crash ids for a given subset of all crash id prefixes and date""" crash_source = build_instance_from_settings(settings.CRASH_SOURCE) crash_dest = build_instance_from_settings(settings.CRASH_DESTINATIONS["es"]) @@ -109,12 +109,28 @@ def check_crashids_for_date(firstchars_chunk, date): f"Could not find processed crash for raw crash {crash_id}." ) - # Check Elasticsearch in batches - for crash_ids_batch in chunked(in_crash_source, 100): - missing_in_es_batch = check_elasticsearch(supersearch, crash_ids_batch) - missing_in_es.extend(missing_in_es_batch) + if only_missing_in_es: + # Check Elasticsearch in batches + for crash_ids_batch in chunked(in_crash_source, 100): + missing_in_es_batch = check_elasticsearch(supersearch, crash_ids_batch) + missing_in_es.extend(missing_in_es_batch) - return list(set(missing_in_es)) + return list(set(missing_in_es)) + + return in_crash_source + + +def save_crash_to_es(crash_id): + crash_source = build_instance_from_settings(settings.CRASH_SOURCE) + crash_dest = build_instance_from_settings(settings.CRASH_DESTINATIONS["es"]) + try: + processed_crash = crash_source.get_processed_crash(crash_id) + crash_dest.save_processed_crash(None, processed_crash) + return ( + f"Crash with ID {crash_id!r} loaded from {type(crash_source).__name__!r}." + ) + except Exception as exc: + return f"Unable to load crash with ID {crash_id!r} from {type(crash_source).__name__!r}; error: {exc}." @click.command() @@ -136,8 +152,14 @@ def check_crashids_for_date(firstchars_chunk, date): type=int, help="The number of workers to use to check for crash IDs in the crashstorage source. Defaults to 4.", ) +@click.option( + "--only-missing-in-es", + default=False, + type=bool, + help="Whether to load only those processed crashes that are present in the crashstorage source but missing in Elasticsearch. Defaults to False.", +) @click.pass_context -def load_crashes(ctx, date, crash_id, num_workers): +def load_crashes(ctx, date, crash_id, num_workers, only_missing_in_es): """ Loads processed crashes into Elasticsearch by crash source (S3 or GCS) and either crash ID or date. @@ -159,7 +181,11 @@ def load_crashes(ctx, date, crash_id, num_workers): f"Checking for missing processed crashes for: {check_date_formatted}" ) - check_crashids = partial(check_crashids_for_date, date=check_date_formatted) + check_crashids = partial( + check_crashids_for_date, + date=check_date_formatted, + only_missing_in_es=only_missing_in_es, + ) firstchars_chunked = chunked(get_threechars(), CHUNK_SIZE) @@ -181,22 +207,11 @@ def load_crashes(ctx, date, crash_id, num_workers): param_hint=["date", "crash_id"], ) - crash_source = build_instance_from_settings(settings.CRASH_SOURCE) - crash_dest = build_instance_from_settings(settings.CRASH_DESTINATIONS["es"]) - - for crash_id in crash_ids: - try: - processed_crash = crash_source.get_processed_crash(crash_id) - crash_dest.save_processed_crash(None, processed_crash) - - click.echo( - f"Crash with ID {crash_id!r} loaded from {type(crash_source).__name__!r}." - ) - except Exception as exc: - click.echo( - f"Unable to load crash with ID {crash_id!r} from {type(crash_source).__name__!r}; error: {exc}." - ) - continue + with concurrent.futures.ProcessPoolExecutor(max_workers=num_workers) as executor: + results = list( + executor.map(save_crash_to_es, crash_ids, timeout=WORKER_TIMEOUT) + ) + click.echo(results) if __name__ == "__main__": diff --git a/socorro/tests/test_load_processed_crashes_into_es.py b/socorro/tests/test_load_processed_crashes_into_es.py index c311ab09c5..938d68f34b 100644 --- a/socorro/tests/test_load_processed_crashes_into_es.py +++ b/socorro/tests/test_load_processed_crashes_into_es.py @@ -4,7 +4,6 @@ from datetime import date, datetime, time, timezone import json -import pytest from click.testing import CliRunner @@ -62,14 +61,10 @@ def test_it_runs(): assert result.exit_code == 0 -@pytest.mark.skipif( - settings.CLOUD_PROVIDER != "AWS", - reason="Skipping test because CLOUD_PROVIDER is not set to 'AWS'", -) -def test_it_loads_processed_crashes_by_date_s3(s3_helper, es_helper): +def test_it_loads_processed_crashes_by_date(storage_helper, es_helper): """Test whether the module loads processed crashes by date from S3.""" date_str = "2024-05-01" - expected_crashes = load_crashes_into_crashstorage_source(s3_helper, date_str) + expected_crashes = load_crashes_into_crashstorage_source(storage_helper, date_str) runner = CliRunner() result = runner.invoke(load_crashes, ["--date", date_str]) assert result.exit_code == 0 @@ -80,48 +75,9 @@ def test_it_loads_processed_crashes_by_date_s3(s3_helper, es_helper): assert actual_crash == expected_crash -@pytest.mark.skipif( - settings.CLOUD_PROVIDER != "AWS", - reason="Skipping test because CLOUD_PROVIDER is not set to 'AWS'", -) -def test_it_loads_processed_crashes_by_crashid_s3(s3_helper, es_helper): +def test_it_loads_processed_crashes_by_crashid(storage_helper, es_helper): """Test whether the module loads processed crashes by crash id from S3.""" - expected_crashes = load_crashes_into_crashstorage_source(s3_helper) - runner = CliRunner() - expected_crash = expected_crashes[0] - crash_id = expected_crash["uuid"] - result = runner.invoke(load_crashes, ["--crash-id", crash_id]) - assert result.exit_code == 0 - es_helper.refresh() - actual_crash = es_helper.get_crash_data(crash_id)["processed_crash"] - assert actual_crash == expected_crash - - -@pytest.mark.skipif( - settings.CLOUD_PROVIDER != "GCP", - reason="Skipping test because CLOUD_PROVIDER is not set to 'GCP'", -) -def test_it_loads_processed_crashes_by_date_gcs(gcs_helper, es_helper): - """Test whether the module loads processed crashes by date from GCS.""" - date_str = "2024-05-01" - expected_crashes = load_crashes_into_crashstorage_source(gcs_helper, date_str) - runner = CliRunner() - result = runner.invoke(load_crashes, ["--date", date_str]) - assert result.exit_code == 0 - es_helper.refresh() - for expected_crash in expected_crashes: - crash_id = expected_crash["uuid"] - actual_crash = es_helper.get_crash_data(crash_id)["processed_crash"] - assert actual_crash == expected_crash - - -@pytest.mark.skipif( - settings.CLOUD_PROVIDER != "GCP", - reason="Skipping test because CLOUD_PROVIDER is not set to 'GCP'", -) -def test_it_loads_processed_crashes_by_crashid_gcs(gcs_helper, es_helper): - """Test whether the module loads processed crashes by crash id from GCS.""" - expected_crashes = load_crashes_into_crashstorage_source(gcs_helper) + expected_crashes = load_crashes_into_crashstorage_source(storage_helper) runner = CliRunner() expected_crash = expected_crashes[0] crash_id = expected_crash["uuid"] From 85284aa493eb26b3fba2bb4ccb3da820aa84d78f Mon Sep 17 00:00:00 2001 From: Bianca Danforth Date: Wed, 22 May 2024 11:26:43 -0400 Subject: [PATCH 3/3] bug-1889156: remove crash prefix splitting See https://github.com/mozilla-services/socorro/pull/6609#discussion_r1607319339 --- bin/load_processed_crashes_into_es.py | 91 +++++++++---------- .../test_load_processed_crashes_into_es.py | 4 +- 2 files changed, 45 insertions(+), 50 deletions(-) diff --git a/bin/load_processed_crashes_into_es.py b/bin/load_processed_crashes_into_es.py index 13be9e8ccd..379eeef778 100755 --- a/bin/load_processed_crashes_into_es.py +++ b/bin/load_processed_crashes_into_es.py @@ -29,8 +29,6 @@ from socorro.libclass import build_instance_from_settings NUM_CRASHIDS_TO_FETCH = "all" -# Number of prefix variations to pass to a check_crashids subprocess -CHUNK_SIZE = 4 # Number of seconds until we decide a worker has stalled WORKER_TIMEOUT = 15 * 60 @@ -77,47 +75,58 @@ def check_elasticsearch(supersearch, crash_ids): return set(crash_ids) - set(crash_ids_in_es) -def check_crashids_for_date(firstchars_chunk, date, only_missing_in_es): - """Check crash ids for a given subset of all crash id prefixes and date""" +def get_crashids_in_storage(page, only_missing_in_es): crash_source = build_instance_from_settings(settings.CRASH_SOURCE) crash_dest = build_instance_from_settings(settings.CRASH_DESTINATIONS["es"]) - supersearch = SuperSearch(crash_dest) + in_crash_source_page = [] + missing_in_es_page = [] + # NOTE(bdanforth): Keys here look like /v1/raw_crash/DATE/CRASHID + crash_ids = [item.split("/")[-1] for item in page] - in_crash_source = [] - missing_in_es = [] + if not crash_ids: + return [] - for firstchars in firstchars_chunk: - # Grab all the crash ids at the given date directory - page_iterator = crash_source.list_objects_paginator( - prefix=f"v1/raw_crash/{date}/{firstchars}", - ) - - for page in page_iterator: - # NOTE(bdanforth): Keys here look like /v1/raw_crash/DATE/CRASHID - crash_ids = [item.split("/")[-1] for item in page] - - if not crash_ids: - continue - - # Check crashstorage source first - for crash_id in crash_ids: - if is_in_storage(crash_source, crash_id): - in_crash_source.append(crash_id) - else: - click.echo( - f"Could not find processed crash for raw crash {crash_id}." - ) + # Check crashstorage source first + for crash_id in crash_ids: + if is_in_storage(crash_source, crash_id): + in_crash_source_page.append(crash_id) + else: + click.echo(f"Could not find processed crash for raw crash {crash_id}.") if only_missing_in_es: + supersearch = SuperSearch(crash_dest) + # Check Elasticsearch in batches - for crash_ids_batch in chunked(in_crash_source, 100): + for crash_ids_batch in chunked(in_crash_source_page, 100): missing_in_es_batch = check_elasticsearch(supersearch, crash_ids_batch) - missing_in_es.extend(missing_in_es_batch) + missing_in_es_page.extend(missing_in_es_batch) + + return list(set(missing_in_es_page)) - return list(set(missing_in_es)) + return in_crash_source_page - return in_crash_source + +def check_crashids_for_date(date, only_missing_in_es, num_workers): + """Check crash ids for a given date""" + crash_source = build_instance_from_settings(settings.CRASH_SOURCE) + + crash_ids = [] + + # Grab all the crash ids at the given date directory + page_iterator = crash_source.list_objects_paginator( + prefix=f"v1/raw_crash/{date}", + ) + + get_crashids = partial( + get_crashids_in_storage, only_missing_in_es=only_missing_in_es + ) + + with concurrent.futures.ProcessPoolExecutor(max_workers=num_workers) as executor: + for result in executor.map(get_crashids, page_iterator, timeout=WORKER_TIMEOUT): + crash_ids.extend(result) + + return crash_ids def save_crash_to_es(crash_id): @@ -181,25 +190,11 @@ def load_crashes(ctx, date, crash_id, num_workers, only_missing_in_es): f"Checking for missing processed crashes for: {check_date_formatted}" ) - check_crashids = partial( - check_crashids_for_date, + crash_ids = check_crashids_for_date( date=check_date_formatted, only_missing_in_es=only_missing_in_es, + num_workers=num_workers, ) - - firstchars_chunked = chunked(get_threechars(), CHUNK_SIZE) - - if num_workers == 1: - for result in map(check_crashids, firstchars_chunked): - crash_ids.extend(result) - else: - with concurrent.futures.ProcessPoolExecutor( - max_workers=num_workers - ) as executor: - for result in executor.map( - check_crashids, firstchars_chunked, timeout=WORKER_TIMEOUT - ): - crash_ids.extend(result) else: raise click.BadParameter( "Neither date nor crash_id were provided. At least one must be provided.", diff --git a/socorro/tests/test_load_processed_crashes_into_es.py b/socorro/tests/test_load_processed_crashes_into_es.py index 938d68f34b..e1a3e0a51f 100644 --- a/socorro/tests/test_load_processed_crashes_into_es.py +++ b/socorro/tests/test_load_processed_crashes_into_es.py @@ -62,7 +62,7 @@ def test_it_runs(): def test_it_loads_processed_crashes_by_date(storage_helper, es_helper): - """Test whether the module loads processed crashes by date from S3.""" + """Test whether the module loads processed crashes by date.""" date_str = "2024-05-01" expected_crashes = load_crashes_into_crashstorage_source(storage_helper, date_str) runner = CliRunner() @@ -76,7 +76,7 @@ def test_it_loads_processed_crashes_by_date(storage_helper, es_helper): def test_it_loads_processed_crashes_by_crashid(storage_helper, es_helper): - """Test whether the module loads processed crashes by crash id from S3.""" + """Test whether the module loads processed crashes by crash id.""" expected_crashes = load_crashes_into_crashstorage_source(storage_helper) runner = CliRunner() expected_crash = expected_crashes[0]