Skip to content
This repository has been archived by the owner on Aug 25, 2023. It is now read-only.

Commit

Permalink
Merge dc5d3ee into 5984da2
Browse files Browse the repository at this point in the history
  • Loading branch information
radkomateusz committed Nov 5, 2018
2 parents 5984da2 + dc5d3ee commit 7bbf706
Show file tree
Hide file tree
Showing 13 changed files with 212 additions and 154 deletions.
32 changes: 20 additions & 12 deletions src/backup/copy_job_async/copy_job/copy_job_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@
import uuid

from apiclient.errors import HttpError
from src.commons.decorators.retry import retry

from src.backup.copy_job_async.result_check.result_check_request import \
ResultCheckRequest
from src.backup.copy_job_async.task_creator import TaskCreator
from src.backup.copy_job_async.result_check.result_check_request import ResultCheckRequest
from src.commons.big_query.big_query import BigQuery
from src.commons.big_query.big_query_job_reference import BigQueryJobReference
from src.commons.big_query.big_query_table_metadata import BigQueryTableMetadata
from src.commons.decorators.retry import retry


class CopyJobService(object):
Expand All @@ -16,16 +20,17 @@ def run_copy_job_request(copy_job_request):
target_big_query_table = copy_job_request.target_big_query_table
retry_count = copy_job_request.retry_count

job_id = CopyJobService.__schedule(source_big_query_table,
target_big_query_table,
job_id=CopyJobService._create_random_job_id())
if job_id:
job_reference = CopyJobService.__schedule(
source_big_query_table=source_big_query_table,
target_big_query_table=target_big_query_table,
job_id=CopyJobService._create_random_job_id())

if job_reference:
TaskCreator.create_copy_job_result_check(
ResultCheckRequest(
task_name_suffix=copy_job_request.task_name_suffix,
copy_job_type_id=copy_job_request.copy_job_type_id,
project_id=target_big_query_table.get_project_id(),
job_id=job_id,
job_reference=job_reference,
retry_count=retry_count,
post_copy_action_request=copy_job_request.post_copy_action_request
)
Expand Down Expand Up @@ -66,17 +71,20 @@ def __schedule(source_big_query_table, target_big_query_table, job_id):
}
}
try:
response_job_id = BigQuery().insert_job(
job_reference = BigQuery().insert_job(
target_big_query_table.get_project_id(), job_data)
logging.info("Response job ID: " + response_job_id)
return response_job_id
logging.info("%s", job_reference)
return job_reference
except HttpError as bq_error:
if bq_error.resp.status == 404:
logging.exception('404 while creating Copy Job from %s to %s' % (source_big_query_table, target_big_query_table))
return None
elif bq_error.resp.status == 409:
logging.warning('409 while creating Copy Job from %s to %s' % (source_big_query_table, target_big_query_table))
return job_id
return BigQueryJobReference(
project_id=target_big_query_table.get_project_id(),
job_id=job_id,
location=BigQueryTableMetadata.get_table_by_big_query_table(source_big_query_table).get_location())
else:
raise
except Exception as error:
Expand Down
13 changes: 6 additions & 7 deletions src/backup/copy_job_async/result_check/result_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,23 @@ def __init__(self):
def check(self, result_check_request):
self.__copy_job_type_id = result_check_request.copy_job_type_id
self.__post_copy_action_request = result_check_request.post_copy_action_request
job_json = self.BQ.get_job(
project_id=result_check_request.project_id,
job_id=result_check_request.job_id
)
job_json = self.BQ.get_job(result_check_request.job_reference)

logging.info('Checking result (retryCount=%s) of job: %s',
result_check_request.retry_count, json.dumps(job_json))

copy_job_result = CopyJobResult(job_json)

if copy_job_result.is_done():
logging.info('Copy job %s complete', result_check_request.job_id)
logging.info('Copy job %s complete',
result_check_request.job_reference)
self.__process_copy_job_result(copy_job_result,
result_check_request.retry_count)
else:
logging.info(
"Copy job '%s' not completed yet. Another result check "
"is put on the queue.",
result_check_request.job_id)
result_check_request.job_reference)
TaskCreator.create_copy_job_result_check(result_check_request)

def __process_copy_job_result(self, job_result, retry_count):
Expand All @@ -42,7 +41,7 @@ def __process_copy_job_result(self, job_result, retry_count):
logging.info("retry_count: %s", retry_count)
logging.error(job_result.error_message)
if self.__should_retry(job_result.error_result) \
and retry_count < self.__MAX_RETRY_COUNT:
and retry_count < self.__MAX_RETRY_COUNT:
logging.error('We may need to re-trigger this task.')
retry_count += 1
TaskCreator.create_copy_job(
Expand Down
38 changes: 16 additions & 22 deletions src/backup/copy_job_async/result_check/result_check_request.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
class ResultCheckRequest(object):
def __init__(self, task_name_suffix, copy_job_type_id, project_id, job_id,
def __init__(self, task_name_suffix, copy_job_type_id, job_reference,
retry_count=0, post_copy_action_request=None):
self.__task_name_suffix = task_name_suffix
self.__copy_job_type_id = copy_job_type_id
self.__project_id = project_id
self.__job_id = job_id
self.__job_reference = job_reference
self.__retry_count = retry_count
self.__post_copy_action_request = post_copy_action_request

Expand All @@ -17,12 +16,8 @@ def copy_job_type_id(self):
return self.__copy_job_type_id

@property
def project_id(self):
return self.__project_id

@property
def job_id(self):
return self.__job_id
def job_reference(self):
return self.__job_reference

@property
def retry_count(self):
Expand All @@ -33,23 +28,22 @@ def post_copy_action_request(self):
return self.__post_copy_action_request

def __str__(self):
return 'task_name_suffix: {}, copy_job_type_id: {}, projectId: {}, jobId: {}, retryCount: {} ' \
', postCopyActionRequest: {}'.format(
self.__task_name_suffix, self.__copy_job_type_id,
self.__project_id, self.__job_id,
self.__retry_count, self.__post_copy_action_request)
return 'task_name_suffix: {}, copy_job_type_id: {}, jobReference: {},' \
' retryCount: {}, postCopyActionRequest: {}'.format(
self.__task_name_suffix, self.__copy_job_type_id,
self.__job_reference, self.__retry_count,
self.__post_copy_action_request)

def __repr__(self):
return self.__str__()

def __eq__(self, o):
return type(o) is ResultCheckRequest \
and self.__task_name_suffix == o.__task_name_suffix \
and self.__copy_job_type_id == o.__copy_job_type_id \
and self.__project_id == o.__project_id \
and self.__job_id == o.__job_id \
and self.__retry_count == o.__retry_count \
and self.__post_copy_action_request == o.__post_copy_action_request
def __eq__(self, other):
return type(other) is ResultCheckRequest \
and self.__task_name_suffix == other.__task_name_suffix \
and self.__copy_job_type_id == other.__copy_job_type_id \
and self.__job_reference == other.__job_reference \
and self.__retry_count == other.__retry_count \
and self.__post_copy_action_request == other.__post_copy_action_request

def __ne__(self, other):
return not (self == other)
7 changes: 2 additions & 5 deletions src/backup/copy_job_async/task_creator.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,8 @@ def create_copy_job_result_check(cls, result_check_request):
assert result_check_request.retry_count >= 0
queue_name = result_check_request.copy_job_type_id + '-result-check'
logging.info(
"Schedule copy_job_result_check task for %s (project: '%s') "
"jobId: '%s' (retry count:'%s') in queue '%s'",
result_check_request, result_check_request.project_id,
result_check_request.job_id, result_check_request.retry_count,
queue_name)
"Schedule copy_job_result_check task for %s in queue '%s'",
result_check_request, queue_name)

task = Task(
method='POST',
Expand Down
30 changes: 18 additions & 12 deletions src/commons/big_query/big_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@
from apiclient.errors import HttpError, Error
from oauth2client.client import GoogleCredentials

from src.commons.big_query.big_query_job_reference import BigQueryJobReference
from src.commons.big_query.big_query_table import BigQueryTable
from src.commons.config.configuration import configuration
from src.commons.decorators.cached import cached
from src.commons.decorators.google_http_error_retry import \
google_http_error_retry
from src.commons.decorators.log_time import log_time, measure_time_and_log
from src.commons.decorators.retry import retry
from src.commons.big_query.big_query_table import BigQueryTable
from src.commons.config.configuration import configuration
from src.commons.table_reference import TableReference


Expand All @@ -24,7 +25,7 @@ class DatasetNotFoundException(Exception):
pass


class BigQuery(object): # pylint: disable=R0904
class BigQuery(object):
def __init__(self):
self.http = self._create_http()
self.service = googleapiclient.discovery.build(
Expand Down Expand Up @@ -99,7 +100,7 @@ def list_table_ids(self, project_id, dataset_id):

@retry(Error, tries=3, delay=2, backoff=2)
def list_tables(self, project_id, dataset_id, page_token=None,
max_results=1000):
max_results=1000):
return self.service.tables().list(
projectId=project_id, datasetId=dataset_id,
maxResults=max_results, pageToken=page_token
Expand Down Expand Up @@ -150,7 +151,7 @@ def fetch_random_table(self):
use_legacy_sql=True)

if query_results and 'totalRows' in query_results \
and int(query_results['totalRows']) > 0:
and int(query_results['totalRows']) > 0:
results = []
results.extend(query_results.get('rows', []))
first_row = results[0]
Expand All @@ -165,7 +166,7 @@ def fetch_random_table(self):

@staticmethod
def random_table_from_project_query():
return "SELECT tableId, datasetId, projectId FROM [{}]"\
return "SELECT tableId, datasetId, projectId FROM [{}]" \
.format(configuration.restoration_daily_test_random_table_view)

def __sync_query(self, query, timeout=30000, use_legacy_sql=False):
Expand Down Expand Up @@ -233,11 +234,16 @@ def insert_job(self, project_id, body):
projectId=project_id, body=body
).execute()
logging.info('Insert job response: ' + json.dumps(response))
return response['jobReference']['jobId']
return BigQueryJobReference(
project_id=response['jobReference']['projectId'],
job_id=response['jobReference']['jobId'],
location=response['jobReference']['location'])

def get_job(self, project_id, job_id):
def get_job(self, job_reference):
return self.service.jobs().get(
projectId=project_id, jobId=job_id
projectId=job_reference.project_id,
jobId=job_reference.job_id,
location=job_reference.location
).execute(num_retries=3)

@retry(Error, tries=3, delay=2, backoff=2)
Expand All @@ -262,7 +268,7 @@ def create_table(self, projectId, datasetId, body):
# @refactor - boolean argument
@retry(Error, tries=6, delay=2, backoff=2)
def create_dataset(
self, project_id, dataset_id, location, table_expiration_in_ms=False
self, project_id, dataset_id, location, table_expiration_in_ms=False
):
logging.info(
"Creating dataset %s / %s (location: %s)",
Expand All @@ -277,8 +283,8 @@ def create_dataset(
}

if table_expiration_in_ms and \
isinstance(table_expiration_in_ms,
(int, long)): # pylint: disable=E0602
isinstance(table_expiration_in_ms,
(int, long)): # pylint: disable=E0602
body['defaultTableExpirationMs'] = table_expiration_in_ms

try:
Expand Down
21 changes: 21 additions & 0 deletions src/commons/big_query/big_query_job_reference.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
class BigQueryJobReference(object):
def __init__(self, project_id, job_id, location):
self.project_id = project_id
self.job_id = job_id
self.location = location

def __str__(self):
return "BigQueryJobReference(projectId:{}, job_id:{}, location: {})" \
.format(self.project_id, self.job_id, self.location)

def __repr__(self):
return self.__str__()

def __eq__(self, other):
return type(other) is BigQueryJobReference \
and self.project_id == other.project_id \
and self.job_id == other.job_id \
and self.location == other.location

def __ne__(self, other):
return not (self == other)
17 changes: 13 additions & 4 deletions src/commons/big_query/big_query_table_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,19 @@ def get_table_id_with_partition_id(table_id, partition_id):

@staticmethod
def get_table_by_reference(reference):
return BigQueryTableMetadata.__get_table_or_partition(project_id=reference.project_id,
dataset_id=reference.dataset_id,
table_id=reference.table_id,
partition_id=reference.partition_id)
return BigQueryTableMetadata.__get_table_or_partition(
project_id=reference.project_id,
dataset_id=reference.dataset_id,
table_id=reference.table_id,
partition_id=reference.partition_id)

@staticmethod
def get_table_by_big_query_table(big_query_table):
return BigQueryTableMetadata.__get_table_or_partition(
project_id=big_query_table.project_id,
dataset_id=big_query_table.dataset_id,
table_id=big_query_table.table_id,
partition_id=None)

@staticmethod
@cached(time=300)
Expand Down
Loading

0 comments on commit 7bbf706

Please sign in to comment.