Skip to content
This repository has been archived by the owner on Aug 25, 2023. It is now read-only.

Commit

Permalink
NON-JIRA: Copy job service now handles deadline exceeded eerror
Browse files Browse the repository at this point in the history
  • Loading branch information
przemyslaw-jasinski committed Mar 21, 2019
1 parent 1f590a0 commit 178d3c9
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 8 deletions.
9 changes: 6 additions & 3 deletions src/commons/big_query/big_query_job_error.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,13 @@ def should_be_retried(self):
else:
raise self.bq_error

def is_deadline_exceeded(self):
return self.bq_error.resp.status == 500 and \
self.full_reason.find('Deadline exceeded') != -1

def __is_access_denied(self):
result = self.bq_error.resp.status == 403 and \
self.full_reason.find('Access Denied') != -1
return result
return self.bq_error.resp.status == 403 and \
self.full_reason.find('Access Denied') != -1

def __is_404(self):
return self.bq_error.resp.status == 404
Expand Down
20 changes: 15 additions & 5 deletions src/commons/big_query/copy_job_async/copy_job/copy_job_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ def run_copy_job_request(copy_job_request):
def __schedule(source_big_query_table, target_big_query_table, job_id,
create_disposition, write_disposition):
logging.info("Scheduling job ID: " + job_id)
target_project_id = target_big_query_table.get_project_id()
job_data = {
"jobReference": {
"jobId": job_id,
"projectId": target_big_query_table.get_project_id()
"projectId": target_project_id
},
"configuration": {
"copy": {
Expand All @@ -44,7 +45,7 @@ def __schedule(source_big_query_table, target_big_query_table, job_id,
"tableId": source_big_query_table.get_table_id(),
},
"destinationTable": {
"projectId": target_big_query_table.get_project_id(),
"projectId": target_project_id,
"datasetId": target_big_query_table.get_dataset_id(),
"tableId": target_big_query_table.get_table_id(),
},
Expand All @@ -54,8 +55,7 @@ def __schedule(source_big_query_table, target_big_query_table, job_id,
}
}
try:
job_reference = BigQuery().insert_job(
target_big_query_table.get_project_id(), job_data)
job_reference = BigQuery().insert_job(target_project_id, job_data)
logging.info("Successfully insert: %s", job_reference)
return job_reference
except HttpError as bq_error:
Expand All @@ -65,10 +65,12 @@ def __schedule(source_big_query_table, target_big_query_table, job_id,
if copy_job_error.should_be_retried():
logging.warning(copy_job_error)
return BigQueryJobReference(
project_id=target_big_query_table.get_project_id(),
project_id=target_project_id,
job_id=job_id,
location=BigQueryTableMetadata.get_table_by_big_query_table(
source_big_query_table).get_location())
elif copy_job_error.is_deadline_exceeded():
return CopyJobService.__get_job(job_id, target_project_id)
else:
logging.exception(copy_job_error)
return copy_job_error
Expand All @@ -77,6 +79,14 @@ def __schedule(source_big_query_table, target_big_query_table, job_id,
type(error), error)
raise error

@staticmethod
@retry(HttpError, tries=6, delay=2, backoff=2)
def __get_job(job_id, project_id):
job_reference = BigQueryJobReference(project_id=project_id,
job_id=job_id,
location=None)
return BigQuery().get_job(job_reference)

@staticmethod
def _create_random_job_id():
return "bbq_copy_job_" + str(uuid.uuid4())

0 comments on commit 178d3c9

Please sign in to comment.