Skip to content
This repository has been archived by the owner on Aug 25, 2023. It is now read-only.

Commit

Permalink
Merge aa5c87d into 1f590a0
Browse files Browse the repository at this point in the history
  • Loading branch information
przemyslaw-jasinski committed Mar 27, 2019
2 parents 1f590a0 + aa5c87d commit 57ba255
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 52 deletions.
18 changes: 15 additions & 3 deletions src/commons/big_query/big_query_job_error.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import logging

from src.commons.big_query.copy_job_async.task_creator import TaskCreator
Expand All @@ -10,6 +11,7 @@ def __init__(self, bq_error, source_bq_table, target_bq_table):
self.target_bq_table = target_bq_table
self.full_reason = bq_error._get_reason()
self.short_reason = self.__get_short_reason()
self.location = self.__get_location()

def __str__(self):
return "{} while creating Copy Job from {} to {}" \
Expand All @@ -32,10 +34,13 @@ def should_be_retried(self):
else:
raise self.bq_error

def is_deadline_exceeded(self):
return self.bq_error.resp.status == 500 and \
self.full_reason.find('Deadline exceeded') != -1

def __is_access_denied(self):
result = self.bq_error.resp.status == 403 and \
self.full_reason.find('Access Denied') != -1
return result
return self.bq_error.resp.status == 403 and \
self.full_reason.find('Access Denied') != -1

def __is_404(self):
return self.bq_error.resp.status == 404
Expand All @@ -53,6 +58,13 @@ def __get_short_reason(self):
else:
return 'Unknown reason'

def __get_location(self):
try:
data = json.loads(self.bq_error.content)
return data['error']['errors'][0]['location']
except (ValueError, KeyError):
return None

def __create_post_copy_job_json(self):
return {
"status": {
Expand Down
31 changes: 25 additions & 6 deletions src/commons/big_query/copy_job_async/copy_job/copy_job_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ def run_copy_job_request(copy_job_request):
def __schedule(source_big_query_table, target_big_query_table, job_id,
create_disposition, write_disposition):
logging.info("Scheduling job ID: " + job_id)
target_project_id = target_big_query_table.get_project_id()
job_data = {
"jobReference": {
"jobId": job_id,
"projectId": target_big_query_table.get_project_id()
"projectId": target_project_id
},
"configuration": {
"copy": {
Expand All @@ -44,7 +45,7 @@ def __schedule(source_big_query_table, target_big_query_table, job_id,
"tableId": source_big_query_table.get_table_id(),
},
"destinationTable": {
"projectId": target_big_query_table.get_project_id(),
"projectId": target_project_id,
"datasetId": target_big_query_table.get_dataset_id(),
"tableId": target_big_query_table.get_table_id(),
},
Expand All @@ -54,18 +55,21 @@ def __schedule(source_big_query_table, target_big_query_table, job_id,
}
}
try:
job_reference = BigQuery().insert_job(
target_big_query_table.get_project_id(), job_data)
job_reference = BigQuery().insert_job(target_project_id, job_data)
logging.info("Successfully insert: %s", job_reference)
return job_reference
except HttpError as bq_error:
copy_job_error = BigQueryJobError(bq_error,
source_big_query_table,
target_big_query_table)
if copy_job_error.should_be_retried():
if copy_job_error.is_deadline_exceeded():
job_json = CopyJobService.__get_job(job_id, target_project_id,
copy_job_error.location)
return CopyJobService.__to_bq_job_reference(job_json)
elif copy_job_error.should_be_retried():
logging.warning(copy_job_error)
return BigQueryJobReference(
project_id=target_big_query_table.get_project_id(),
project_id=target_project_id,
job_id=job_id,
location=BigQueryTableMetadata.get_table_by_big_query_table(
source_big_query_table).get_location())
Expand All @@ -77,6 +81,21 @@ def __schedule(source_big_query_table, target_big_query_table, job_id,
type(error), error)
raise error

@staticmethod
@retry(HttpError, tries=6, delay=2, backoff=2)
def __get_job(job_id, project_id, location):
job_reference = BigQueryJobReference(project_id=project_id,
job_id=job_id,
location=location)
return BigQuery().get_job(job_reference)

@staticmethod
def __to_bq_job_reference(job_json):
job_reference = job_json["jobReference"]
return BigQueryJobReference(job_reference["projectId"],
job_reference["jobId"],
job_reference["location"])

@staticmethod
def _create_random_job_id():
return "bbq_copy_job_" + str(uuid.uuid4())
162 changes: 119 additions & 43 deletions tests/commons/big_query/copy_job_async/copy_job/test_copy_job_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def tearDown(self):
location='EU'))
@patch.object(TaskCreator, 'create_copy_job_result_check')
def test_that_post_copy_action_request_is_passed(
self, create_copy_job_result_check, _):
self, create_copy_job_result_check, _):
# given
post_copy_action_request = \
PostCopyActionRequest(url='/my/url', data={'key1': 'value1'})
Expand Down Expand Up @@ -79,51 +79,51 @@ def test_that_post_copy_action_request_is_passed(
post_copy_action_request=post_copy_action_request
)
)
@patch.object(BigQuery, 'insert_job',
return_value=BigQueryJobReference(
project_id='test_project',
job_id='job123',
location='EU'))
@patch.object(TaskCreator, 'create_copy_job_result_check')
def test_that_create_and_write_disposition_are_passed_to_result_check(

@patch.object(BigQuery, 'insert_job',
return_value=BigQueryJobReference(
project_id='test_project',
job_id='job123',
location='EU'))
@patch.object(TaskCreator, 'create_copy_job_result_check')
def test_that_create_and_write_disposition_are_passed_to_result_check(
self, create_copy_job_result_check, _):
# given
create_disposition = "SOME_CREATE_DISPOSITION"
write_disposition = "SOME_WRITE_DISPOSITION"

# when
CopyJobService().run_copy_job_request(
CopyJobRequest(
task_name_suffix='task_name_suffix',
copy_job_type_id='test-process',
source_big_query_table=self.example_source_bq_table,
target_big_query_table=self.example_target_bq_table,
create_disposition=create_disposition,
write_disposition=write_disposition,
retry_count=0,
post_copy_action_request=None
)
)
# given
create_disposition = "SOME_CREATE_DISPOSITION"
write_disposition = "SOME_WRITE_DISPOSITION"

# then
create_copy_job_result_check.assert_called_once_with(
ResultCheckRequest(
task_name_suffix='task_name_suffix',
copy_job_type_id='test-process',
job_reference=BigQueryJobReference(
project_id='test_project',
job_id='job123',
location='EU'),
retry_count=0,
post_copy_action_request=None
)
# when
CopyJobService().run_copy_job_request(
CopyJobRequest(
task_name_suffix='task_name_suffix',
copy_job_type_id='test-process',
source_big_query_table=self.example_source_bq_table,
target_big_query_table=self.example_target_bq_table,
create_disposition=create_disposition,
write_disposition=write_disposition,
retry_count=0,
post_copy_action_request=None
)
)

# then
create_copy_job_result_check.assert_called_once_with(
ResultCheckRequest(
task_name_suffix='task_name_suffix',
copy_job_type_id='test-process',
job_reference=BigQueryJobReference(
project_id='test_project',
job_id='job123',
location='EU'),
retry_count=0,
post_copy_action_request=None
)
)

@patch.object(BigQuery, 'insert_job')
@patch('time.sleep', side_effect=lambda _: None)
def test_that_copy_table_should_throw_error_after_exception_not_being_http_error_thrown_on_copy_job_creation(
self, _, insert_job):
self, _, insert_job):
# given
error_message = 'test exception'
insert_job.side_effect = Exception(error_message)
Expand All @@ -145,10 +145,11 @@ def test_that_copy_table_should_throw_error_after_exception_not_being_http_error

@patch.object(BigQuery, 'insert_job')
@patch('time.sleep', side_effect=lambda _: None)
def test_that_copy_table_should_throw_error_after_http_error_different_than_404_thrown_on_copy_job_creation(
self, _, insert_job):
def test_that_copy_table_should_throw_unhandled_errors(self, _, insert_job):
# given
exception = HttpError(Mock(status=500), 'internal error')
exception._get_reason = Mock(return_value='internal error')

insert_job.side_effect = exception
request = CopyJobRequest(
task_name_suffix=None,
Expand All @@ -169,9 +170,12 @@ def test_that_copy_table_should_throw_error_after_http_error_different_than_404_
@patch.object(BigQuery, 'insert_job')
@patch.object(TaskCreator, 'create_post_copy_action')
def test_that_copy_table_should_create_correct_post_copy_action_if_404_http_error_thrown_on_copy_job_creation(
self, create_post_copy_action, insert_job):
self, create_post_copy_action, insert_job):
# given
insert_job.side_effect = HttpError(Mock(status=404), 'not found')
error = HttpError(Mock(status=404), 'not found')
error._get_reason = Mock(return_value='not found')

insert_job.side_effect = error
post_copy_action_request = PostCopyActionRequest(url='/my/url', data={'key1': 'value1'})
request = CopyJobRequest(
task_name_suffix='task_name_suffix',
Expand Down Expand Up @@ -228,7 +232,8 @@ def test_that_copy_table_should_create_correct_post_copy_action_if_access_denied
http_error_content = "{\"error\": " \
" {\"errors\": [" \
" {\"reason\": \"Access Denied\"," \
" \"message\": \"Access Denied\"" \
" \"message\": \"Access Denied\"," \
" \"location\": \"US\"" \
" }]," \
" \"code\": 403,\"" \
" message\": \"Access Denied\"}}"
Expand Down Expand Up @@ -282,6 +287,77 @@ def test_that_copy_table_should_create_correct_post_copy_action_if_access_denied
}
)

@patch.object(BigQuery, 'get_job')
@patch.object(BigQuery, 'insert_job')
@patch.object(TaskCreator, 'create_copy_job_result_check')
def test_that_copy_table_will_try_to_wait_if_deadline_exceeded(
self, create_copy_job_result_check, insert_job, get_job):
# given
http_error_content = "{\"error\": " \
" {\"errors\": [" \
" {\"reason\": \"Deadline exceeded\"," \
" \"message\": \"Deadline exceeded\"," \
" \"location\": \"US\"" \
" }]," \
" \"code\": 500,\"" \
" message\": \"Deadline exceeded\"}}"
successful_job_json = {
'status': {
'state': 'DONE'
},
'jobReference': {
'projectId': self.example_target_bq_table.get_project_id(),
'location': 'EU',
'jobId': 'job123',
},
'configuration': {
'copy': {
'sourceTable': {
'projectId': self.example_source_bq_table.get_project_id(),
'tableId': self.example_source_bq_table.get_table_id(),
'datasetId': self.example_source_bq_table.get_dataset_id()
},
'destinationTable': {
'projectId': self.example_target_bq_table.get_project_id(),
'tableId': self.example_target_bq_table.get_table_id(),
'datasetId': self.example_target_bq_table.get_dataset_id()
}
}
}
}

insert_job.side_effect = HttpError(Mock(status=500), http_error_content)
get_job.return_value = successful_job_json

request = CopyJobRequest(
task_name_suffix='task_name_suffix',
copy_job_type_id='test-process',
source_big_query_table=self.example_source_bq_table,
target_big_query_table=self.example_target_bq_table,
create_disposition="CREATE_IF_NEEDED",
write_disposition="WRITE_EMPTY",
retry_count=0,
post_copy_action_request=None
)

# when
CopyJobService().run_copy_job_request(request)

# then
create_copy_job_result_check.assert_called_once_with(
ResultCheckRequest(
task_name_suffix='task_name_suffix',
copy_job_type_id='test-process',
job_reference=BigQueryJobReference(
project_id=self.example_target_bq_table.get_project_id(),
job_id='job123',
location='EU'
),
retry_count=0,
post_copy_action_request=None
)
)

@patch('src.commons.big_query.big_query_table_metadata.BigQueryTableMetadata')
@patch.object(TaskCreator, 'create_copy_job_result_check')
@patch.object(CopyJobService, '_create_random_job_id',
Expand Down

0 comments on commit 57ba255

Please sign in to comment.