Skip to content

Commit

Permalink
fix(backfills): switch to concurrent.futures to improve debuggability (
Browse files Browse the repository at this point in the history
  • Loading branch information
ANich committed May 23, 2024
1 parent e843469 commit d7b5bad
Showing 1 changed file with 14 additions and 4 deletions.
18 changes: 14 additions & 4 deletions bigquery_etl/cli/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import subprocess
import sys
import tempfile
from concurrent import futures
from datetime import date, timedelta
from functools import partial
from glob import glob
Expand Down Expand Up @@ -743,10 +744,19 @@ def backfill(

if not depends_on_past and parallelism > 0:
# run backfill for dates in parallel if depends_on_past is false
with Pool(parallelism) as p:
result = p.map(backfill_query, date_range, chunksize=1)
if not all(result):
sys.exit(1)
with futures.ProcessPoolExecutor(max_workers=parallelism) as executor:
future_to_date = {
executor.submit(backfill_query, backfill_date): backfill_date
for backfill_date in date_range
}
for future in futures.as_completed(future_to_date):
backfill_date = future_to_date[future]
try:
future.result()
except Exception as e: # TODO: More specific exception(s)
print(f"Encountered exception {e}: {backfill_date}.")
else:
print(f"Completed processing: {backfill_date}.")
else:
# if data depends on previous runs, then execute backfill sequentially
for backfill_date in date_range:
Expand Down

0 comments on commit d7b5bad

Please sign in to comment.