Skip to content

Commit

Permalink
Fixes #84
Browse files Browse the repository at this point in the history
  • Loading branch information
isichei committed Apr 19, 2019
1 parent f4ab440 commit c32eb92
Showing 1 changed file with 23 additions and 5 deletions.
28 changes: 23 additions & 5 deletions etl_manager/etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import tempfile
import time
import zipfile
from botocore.exceptions import ClientError

from etl_manager.utils import (
read_json,
Expand Down Expand Up @@ -45,6 +46,8 @@ class JobTimedOut(Exception):
class JobStopped(Exception):
pass

class JobThrottlingExceeded(Exception):
pass

class GlueJob:
"""
Expand Down Expand Up @@ -449,7 +452,7 @@ def job_run_state(self):
def is_running(self):
return self.job_run_state == "RUNNING"

def wait_for_completion(self, verbose=False, wait_seconds=10):
def wait_for_completion(self, verbose=False, wait_seconds=10, back_off_retries=5):
"""
Wait for the job to complete.
Expand All @@ -460,13 +463,28 @@ def wait_for_completion(self, verbose=False, wait_seconds=10):
JobTimedOut: When the job timed out
"""

back_off_counter = 0
while True:
time.sleep(wait_seconds)

status = self.job_status
status_code = status["JobRun"]["JobRunState"]
status_error = status["JobRun"].get("ErrorMessage", "n/a")
exec_time = status["JobRun"].get("ExecutionTime", "n/a")
try:
status = self.job_status
except ClientError as e:
if "ThrottlingException" in str(e) and back_off_counter < back_off_retries:
back_off_counter += 1
back_off_wait_time = wait_seconds * (2 ** (back_off_counter))
status_code = f"BOTO_CLIENT_RATE_EXCEEDED (waiting {back_off_wait_time}s)"
time.sleep(back_off_wait_time)
else:
if "ThrottlingException" in str(e):
err_str = f"Total number of retries ({back_off_retries}) exceeded - {str(e)}"
raise JobThrottlingExceeded(err_str)
else:
raise e
else:
status_code = status["JobRun"]["JobRunState"]
status_error = status["JobRun"].get("ErrorMessage", "n/a")
exec_time = status["JobRun"].get("ExecutionTime", "n/a")

if verbose:
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
Expand Down

0 comments on commit c32eb92

Please sign in to comment.