From cbe3687fb15257d469d1b8922d05a6fb3cd218c9 Mon Sep 17 00:00:00 2001 From: Bianca Danforth Date: Thu, 2 May 2024 16:12:49 -0400 Subject: [PATCH] bug-1889156: script to load processed crashes into Elasticsearch --- bin/load_processed_crashes_into_es.py | 203 ++++++++++++++++++ .../test_load_processed_crashes_into_es.py | 132 ++++++++++++ 2 files changed, 335 insertions(+) create mode 100755 bin/load_processed_crashes_into_es.py create mode 100644 socorro/tests/test_load_processed_crashes_into_es.py diff --git a/bin/load_processed_crashes_into_es.py b/bin/load_processed_crashes_into_es.py new file mode 100755 index 0000000000..ad0fc44118 --- /dev/null +++ b/bin/load_processed_crashes_into_es.py @@ -0,0 +1,203 @@ +#!/usr/bin/env python + +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at https://mozilla.org/MPL/2.0/. + +# Loads a processed crash by either crash ID or date from either +# GCS or S3 into Elasticsearch, depending on `settings.CRASH_SOURCE`, +# skipping crashes already in Elasticsearch. + +# Uses a variation of `check_crash_ids_for_date` +# from the `verifyprocessed` command in Crash Stats to get crash IDs from S3/GCS: +# https://github.com/mozilla-services/socorro/blob/3f39c6aaa7f294884f3261fd268e8084d5eec93a/webapp/crashstats/crashstats/management/commands/verifyprocessed.py#L77-L115 + +# Usage: ./bin/load_processed_crash_into_es.py [OPTIONS] [CRASH_ID | DATE] + +import concurrent.futures +import datetime +from functools import partial +from isodate import parse_date +from more_itertools import chunked + +import click + +from socorro import settings +from socorro.external.es.super_search_fields import FIELDS +from socorro.external.es.supersearch import SuperSearch +from socorro.lib.libooid import date_from_ooid +from socorro.libclass import build_instance_from_settings + +NUM_CRASHIDS_TO_FETCH = "all" +# Number of prefix variations to pass to a check_crashids subprocess +CHUNK_SIZE = 4 +# Number of seconds until we decide a worker has stalled +WORKER_TIMEOUT = 15 * 60 + + +def is_in_storage(crash_storage, crash_id): + """Is the processed crash in storage.""" + return crash_storage.exists_object(f"v1/processed_crash/{crash_id}") + + +def get_threechars(): + """Generate all combinations of 3 hex digits.""" + chars = "0123456789abcdef" + for x in chars: + for y in chars: + for z in chars: + yield x + y + z + + +def check_elasticsearch(supersearch, crash_ids): + """Checks Elasticsearch and returns list of missing crash ids. + + Crash ids should all be on the same day. + + """ + crash_ids = [crash_ids] if isinstance(crash_ids, str) else crash_ids + crash_date = date_from_ooid(crash_ids[0]) + + # The datestamp in the crashid doesn't match the processed date sometimes especially + # when the crash came in at the end of the day. + start_date = (crash_date - datetime.timedelta(days=5)).strftime("%Y-%m-%d") + end_date = (crash_date + datetime.timedelta(days=5)).strftime("%Y-%m-%d") + + params = { + "uuid": crash_ids, + "date": [">=%s" % start_date, "<=%s" % end_date], + "_columns": ["uuid"], + "_facets": [], + "_facets_size": 0, + "_fields": FIELDS, + } + search_results = supersearch.get(**params) + + crash_ids_in_es = [hit["uuid"] for hit in search_results["hits"]] + return set(crash_ids) - set(crash_ids_in_es) + + +def check_crashids_for_date(firstchars_chunk, date): + """Check crash ids for a given subset of all crash id prefixes and date""" + crash_source = build_instance_from_settings(settings.CRASH_SOURCE) + crash_dest = build_instance_from_settings(settings.CRASH_DESTINATIONS["es"]) + + supersearch = SuperSearch(crash_dest) + + in_crash_source = [] + missing_in_es = [] + + for firstchars in firstchars_chunk: + # Grab all the crash ids at the given date directory + page_iterator = crash_source.list_objects_paginator( + prefix=f"v1/raw_crash/{date}/{firstchars}", + ) + + for page in page_iterator: + # NOTE(bdanforth): Keys here look like /v1/raw_crash/DATE/CRASHID + crash_ids = [item.split("/")[-1] for item in page] + + if not crash_ids: + continue + + # Check crashstorage source first + for crash_id in crash_ids: + if is_in_storage(crash_source, crash_id): + in_crash_source.append(crash_id) + else: + click.echo( + f"Could not find processed crash for raw crash {crash_id}." + ) + + # Check Elasticsearch in batches + for crash_ids_batch in chunked(in_crash_source, 100): + missing_in_es_batch = check_elasticsearch(supersearch, crash_ids_batch) + missing_in_es.extend(missing_in_es_batch) + + return list(set(missing_in_es)) + + +@click.command() +@click.option( + "--date", + default=None, + type=str, + help=("Date to load processed crashes from as YYYY-MM-DD. Defaults to None."), +) +@click.option( + "--crash-id", + default=None, + type=str, + help="A single crash ID to load into ES from the source. E.g. 64f9a777-771d-4db3-82fa-b9c190240430. Defaults to None.", +) +@click.option( + "--num-workers", + default=4, + type=int, + help="The number of workers to use to check for crash IDs in the crashstorage source. Defaults to 4.", +) +@click.pass_context +def load_crashes(ctx, date, crash_id, num_workers): + """ + Loads processed crashes into Elasticsearch by crash source (S3 or GCS) + and either crash ID or date. + + Must specify either CRASH_ID or DATE. + + """ + crash_ids = [] + + if crash_id: + crash_ids.append(crash_id) + elif date: + check_date = parse_date(date) + if not check_date: + raise click.ClickException(f"Unrecognized run_time format: {date}") + + check_date_formatted = check_date.strftime("%Y%m%d") + click.echo( + f"Checking for missing processed crashes for: {check_date_formatted}" + ) + + check_crashids = partial(check_crashids_for_date, date=check_date_formatted) + + firstchars_chunked = chunked(get_threechars(), CHUNK_SIZE) + + if num_workers == 1: + for result in map(check_crashids, firstchars_chunked): + crash_ids.extend(result) + else: + with concurrent.futures.ProcessPoolExecutor( + max_workers=num_workers + ) as executor: + for result in executor.map( + check_crashids, firstchars_chunked, timeout=WORKER_TIMEOUT + ): + crash_ids.extend(result) + else: + raise click.BadParameter( + "Neither date nor crash_id were provided. At least one must be provided.", + ctx=ctx, + param_hint=["date", "crash_id"], + ) + + crash_source = build_instance_from_settings(settings.CRASH_SOURCE) + crash_dest = build_instance_from_settings(settings.CRASH_DESTINATIONS["es"]) + + for crash_id in crash_ids: + try: + processed_crash = crash_source.get_processed_crash(crash_id) + crash_dest.save_processed_crash(None, processed_crash) + + click.echo( + f"Crash with ID {crash_id!r} loaded from {type(crash_source).__name__!r}." + ) + except Exception as exc: + click.echo( + f"Unable to load crash with ID {crash_id!r} from {type(crash_source).__name__!r}; error: {exc}." + ) + continue + + +if __name__ == "__main__": + load_crashes() diff --git a/socorro/tests/test_load_processed_crashes_into_es.py b/socorro/tests/test_load_processed_crashes_into_es.py new file mode 100644 index 0000000000..c311ab09c5 --- /dev/null +++ b/socorro/tests/test_load_processed_crashes_into_es.py @@ -0,0 +1,132 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at https://mozilla.org/MPL/2.0/. + +from datetime import date, datetime, time, timezone +import json +import pytest + +from click.testing import CliRunner + +from bin.load_processed_crashes_into_es import load_crashes +from socorro.lib.libooid import create_new_ooid +from socorro import settings + + +# Setup helper, so we have some processed crashes to load into ES +def load_crashes_into_crashstorage_source(helper, date_str="2024-05-01", num_crashes=2): + bucket = settings.STORAGE["options"]["bucket"] + helper.create_bucket(bucket) + + expected_processed_crashes = [] + date_args = [int(date_part) for date_part in date_str.split("-")] + date_datetime = date(*date_args) + expected_processed_crash_base = { + "date_processed": datetime.combine( + date_datetime, time.min, timezone.utc + ).isoformat(), + } + raw_crash_base = {"submitted_timestamp": date_str} + + for _ in range(num_crashes): + # Encode the date in the crash ID, so we can determine the correct + # Elasticsearch index by crash ID downstream. + crash_id = create_new_ooid(timestamp=date_datetime) + processed_crash = {**expected_processed_crash_base, "uuid": crash_id} + expected_processed_crashes.append(processed_crash) + + # Upload raw crash for this crash ID, since only the raw crash + # path contains the date for lookups. + raw_crash = {**raw_crash_base, "uuid": crash_id} + date_str_fmt = raw_crash["submitted_timestamp"].replace("-", "") + helper.upload( + bucket_name=bucket, + key=f"v1/raw_crash/{date_str_fmt}/{crash_id}", + data=json.dumps(raw_crash).encode("utf-8"), + ) + + # Upload processed crash for the crash ID + helper.upload( + bucket_name=bucket, + key=f"v1/processed_crash/{crash_id}", + data=json.dumps(processed_crash).encode("utf-8"), + ) + + return expected_processed_crashes + + +def test_it_runs(): + """Test whether the module loads and spits out help.""" + runner = CliRunner() + result = runner.invoke(load_crashes, ["--help"]) + assert result.exit_code == 0 + + +@pytest.mark.skipif( + settings.CLOUD_PROVIDER != "AWS", + reason="Skipping test because CLOUD_PROVIDER is not set to 'AWS'", +) +def test_it_loads_processed_crashes_by_date_s3(s3_helper, es_helper): + """Test whether the module loads processed crashes by date from S3.""" + date_str = "2024-05-01" + expected_crashes = load_crashes_into_crashstorage_source(s3_helper, date_str) + runner = CliRunner() + result = runner.invoke(load_crashes, ["--date", date_str]) + assert result.exit_code == 0 + es_helper.refresh() + for expected_crash in expected_crashes: + crash_id = expected_crash["uuid"] + actual_crash = es_helper.get_crash_data(crash_id)["processed_crash"] + assert actual_crash == expected_crash + + +@pytest.mark.skipif( + settings.CLOUD_PROVIDER != "AWS", + reason="Skipping test because CLOUD_PROVIDER is not set to 'AWS'", +) +def test_it_loads_processed_crashes_by_crashid_s3(s3_helper, es_helper): + """Test whether the module loads processed crashes by crash id from S3.""" + expected_crashes = load_crashes_into_crashstorage_source(s3_helper) + runner = CliRunner() + expected_crash = expected_crashes[0] + crash_id = expected_crash["uuid"] + result = runner.invoke(load_crashes, ["--crash-id", crash_id]) + assert result.exit_code == 0 + es_helper.refresh() + actual_crash = es_helper.get_crash_data(crash_id)["processed_crash"] + assert actual_crash == expected_crash + + +@pytest.mark.skipif( + settings.CLOUD_PROVIDER != "GCP", + reason="Skipping test because CLOUD_PROVIDER is not set to 'GCP'", +) +def test_it_loads_processed_crashes_by_date_gcs(gcs_helper, es_helper): + """Test whether the module loads processed crashes by date from GCS.""" + date_str = "2024-05-01" + expected_crashes = load_crashes_into_crashstorage_source(gcs_helper, date_str) + runner = CliRunner() + result = runner.invoke(load_crashes, ["--date", date_str]) + assert result.exit_code == 0 + es_helper.refresh() + for expected_crash in expected_crashes: + crash_id = expected_crash["uuid"] + actual_crash = es_helper.get_crash_data(crash_id)["processed_crash"] + assert actual_crash == expected_crash + + +@pytest.mark.skipif( + settings.CLOUD_PROVIDER != "GCP", + reason="Skipping test because CLOUD_PROVIDER is not set to 'GCP'", +) +def test_it_loads_processed_crashes_by_crashid_gcs(gcs_helper, es_helper): + """Test whether the module loads processed crashes by crash id from GCS.""" + expected_crashes = load_crashes_into_crashstorage_source(gcs_helper) + runner = CliRunner() + expected_crash = expected_crashes[0] + crash_id = expected_crash["uuid"] + result = runner.invoke(load_crashes, ["--crash-id", crash_id]) + assert result.exit_code == 0 + es_helper.refresh() + actual_crash = es_helper.get_crash_data(crash_id)["processed_crash"] + assert actual_crash == expected_crash