Skip to content

Commit

Permalink
bug-1889156: review feedback except crash prefix splitting changes
Browse files Browse the repository at this point in the history
  • Loading branch information
biancadanforth committed May 21, 2024
1 parent cbe3687 commit c8132d0
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 73 deletions.
65 changes: 40 additions & 25 deletions bin/load_processed_crashes_into_es.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

# Loads a processed crash by either crash ID or date from either
# GCS or S3 into Elasticsearch, depending on `settings.CRASH_SOURCE`,
# skipping crashes already in Elasticsearch.
# optionally skipping crashes already in Elasticsearch.

# Uses a variation of `check_crash_ids_for_date`
# from the `verifyprocessed` command in Crash Stats to get crash IDs from S3/GCS:
Expand Down Expand Up @@ -77,7 +77,7 @@ def check_elasticsearch(supersearch, crash_ids):
return set(crash_ids) - set(crash_ids_in_es)


def check_crashids_for_date(firstchars_chunk, date):
def check_crashids_for_date(firstchars_chunk, date, only_missing_in_es):
"""Check crash ids for a given subset of all crash id prefixes and date"""
crash_source = build_instance_from_settings(settings.CRASH_SOURCE)
crash_dest = build_instance_from_settings(settings.CRASH_DESTINATIONS["es"])
Expand Down Expand Up @@ -109,12 +109,28 @@ def check_crashids_for_date(firstchars_chunk, date):
f"Could not find processed crash for raw crash {crash_id}."
)

# Check Elasticsearch in batches
for crash_ids_batch in chunked(in_crash_source, 100):
missing_in_es_batch = check_elasticsearch(supersearch, crash_ids_batch)
missing_in_es.extend(missing_in_es_batch)
if only_missing_in_es:
# Check Elasticsearch in batches
for crash_ids_batch in chunked(in_crash_source, 100):
missing_in_es_batch = check_elasticsearch(supersearch, crash_ids_batch)
missing_in_es.extend(missing_in_es_batch)

return list(set(missing_in_es))
return list(set(missing_in_es))

return in_crash_source


def save_crash_to_es(crash_id):
crash_source = build_instance_from_settings(settings.CRASH_SOURCE)
crash_dest = build_instance_from_settings(settings.CRASH_DESTINATIONS["es"])
try:
processed_crash = crash_source.get_processed_crash(crash_id)
crash_dest.save_processed_crash(None, processed_crash)
return (
f"Crash with ID {crash_id!r} loaded from {type(crash_source).__name__!r}."
)
except Exception as exc:
return f"Unable to load crash with ID {crash_id!r} from {type(crash_source).__name__!r}; error: {exc}."


@click.command()
Expand All @@ -136,8 +152,14 @@ def check_crashids_for_date(firstchars_chunk, date):
type=int,
help="The number of workers to use to check for crash IDs in the crashstorage source. Defaults to 4.",
)
@click.option(
"--only-missing-in-es",
default=False,
type=bool,
help="Whether to load only those processed crashes that are present in the crashstorage source but missing in Elasticsearch. Defaults to False.",
)
@click.pass_context
def load_crashes(ctx, date, crash_id, num_workers):
def load_crashes(ctx, date, crash_id, num_workers, only_missing_in_es):
"""
Loads processed crashes into Elasticsearch by crash source (S3 or GCS)
and either crash ID or date.
Expand All @@ -159,7 +181,11 @@ def load_crashes(ctx, date, crash_id, num_workers):
f"Checking for missing processed crashes for: {check_date_formatted}"
)

check_crashids = partial(check_crashids_for_date, date=check_date_formatted)
check_crashids = partial(
check_crashids_for_date,
date=check_date_formatted,
only_missing_in_es=only_missing_in_es,
)

firstchars_chunked = chunked(get_threechars(), CHUNK_SIZE)

Expand All @@ -181,22 +207,11 @@ def load_crashes(ctx, date, crash_id, num_workers):
param_hint=["date", "crash_id"],
)

crash_source = build_instance_from_settings(settings.CRASH_SOURCE)
crash_dest = build_instance_from_settings(settings.CRASH_DESTINATIONS["es"])

for crash_id in crash_ids:
try:
processed_crash = crash_source.get_processed_crash(crash_id)
crash_dest.save_processed_crash(None, processed_crash)

click.echo(
f"Crash with ID {crash_id!r} loaded from {type(crash_source).__name__!r}."
)
except Exception as exc:
click.echo(
f"Unable to load crash with ID {crash_id!r} from {type(crash_source).__name__!r}; error: {exc}."
)
continue
with concurrent.futures.ProcessPoolExecutor(max_workers=num_workers) as executor:
results = list(
executor.map(save_crash_to_es, crash_ids, timeout=WORKER_TIMEOUT)
)
click.echo(results)


if __name__ == "__main__":
Expand Down
52 changes: 4 additions & 48 deletions socorro/tests/test_load_processed_crashes_into_es.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

from datetime import date, datetime, time, timezone
import json
import pytest

from click.testing import CliRunner

Expand Down Expand Up @@ -62,14 +61,10 @@ def test_it_runs():
assert result.exit_code == 0


@pytest.mark.skipif(
settings.CLOUD_PROVIDER != "AWS",
reason="Skipping test because CLOUD_PROVIDER is not set to 'AWS'",
)
def test_it_loads_processed_crashes_by_date_s3(s3_helper, es_helper):
def test_it_loads_processed_crashes_by_date(storage_helper, es_helper):
"""Test whether the module loads processed crashes by date from S3."""
date_str = "2024-05-01"
expected_crashes = load_crashes_into_crashstorage_source(s3_helper, date_str)
expected_crashes = load_crashes_into_crashstorage_source(storage_helper, date_str)
runner = CliRunner()
result = runner.invoke(load_crashes, ["--date", date_str])
assert result.exit_code == 0
Expand All @@ -80,48 +75,9 @@ def test_it_loads_processed_crashes_by_date_s3(s3_helper, es_helper):
assert actual_crash == expected_crash


@pytest.mark.skipif(
settings.CLOUD_PROVIDER != "AWS",
reason="Skipping test because CLOUD_PROVIDER is not set to 'AWS'",
)
def test_it_loads_processed_crashes_by_crashid_s3(s3_helper, es_helper):
def test_it_loads_processed_crashes_by_crashid(storage_helper, es_helper):
"""Test whether the module loads processed crashes by crash id from S3."""
expected_crashes = load_crashes_into_crashstorage_source(s3_helper)
runner = CliRunner()
expected_crash = expected_crashes[0]
crash_id = expected_crash["uuid"]
result = runner.invoke(load_crashes, ["--crash-id", crash_id])
assert result.exit_code == 0
es_helper.refresh()
actual_crash = es_helper.get_crash_data(crash_id)["processed_crash"]
assert actual_crash == expected_crash


@pytest.mark.skipif(
settings.CLOUD_PROVIDER != "GCP",
reason="Skipping test because CLOUD_PROVIDER is not set to 'GCP'",
)
def test_it_loads_processed_crashes_by_date_gcs(gcs_helper, es_helper):
"""Test whether the module loads processed crashes by date from GCS."""
date_str = "2024-05-01"
expected_crashes = load_crashes_into_crashstorage_source(gcs_helper, date_str)
runner = CliRunner()
result = runner.invoke(load_crashes, ["--date", date_str])
assert result.exit_code == 0
es_helper.refresh()
for expected_crash in expected_crashes:
crash_id = expected_crash["uuid"]
actual_crash = es_helper.get_crash_data(crash_id)["processed_crash"]
assert actual_crash == expected_crash


@pytest.mark.skipif(
settings.CLOUD_PROVIDER != "GCP",
reason="Skipping test because CLOUD_PROVIDER is not set to 'GCP'",
)
def test_it_loads_processed_crashes_by_crashid_gcs(gcs_helper, es_helper):
"""Test whether the module loads processed crashes by crash id from GCS."""
expected_crashes = load_crashes_into_crashstorage_source(gcs_helper)
expected_crashes = load_crashes_into_crashstorage_source(storage_helper)
runner = CliRunner()
expected_crash = expected_crashes[0]
crash_id = expected_crash["uuid"]
Expand Down

0 comments on commit c8132d0

Please sign in to comment.