Skip to content

Commit

Permalink
[AIRFLOW-3531] Add gcs to gcs transfer operator. (apache#4331)
Browse files Browse the repository at this point in the history
  • Loading branch information
jmcarp authored and wayne.morris committed Jul 29, 2019
1 parent f8d4ad0 commit c4f6965
Show file tree
Hide file tree
Showing 7 changed files with 276 additions and 7 deletions.
6 changes: 3 additions & 3 deletions airflow/contrib/hooks/gcp_transfer_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook

# Time to sleep between active checks of the operation results
TIME_TO_SLEEP_IN_SECONDS = 1
TIME_TO_SLEEP_IN_SECONDS = 10


# noinspection PyAbstractClass
Expand Down Expand Up @@ -56,10 +56,10 @@ def get_conn(self):
http=http_authorized, cache_discovery=False)
return self._conn

def create_transfer_job(self, project_id, description, schedule, transfer_spec):
def create_transfer_job(self, description, schedule, transfer_spec, project_id=None):
transfer_job = {
'status': 'ENABLED',
'projectId': project_id,
'projectId': project_id or self.project_id,
'description': description,
'transferSpec': transfer_spec,
'schedule': schedule or self._schedule_once_now(),
Expand Down
127 changes: 127 additions & 0 deletions airflow/contrib/operators/gcs_to_gcs_transfer_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from airflow.models import BaseOperator
from airflow.contrib.hooks.gcp_transfer_hook import GCPTransferServiceHook
from airflow.utils.decorators import apply_defaults


class GoogleCloudStorageToGoogleCloudStorageTransferOperator(BaseOperator):
"""
Copies objects from a bucket to another using the GCP Storage Transfer
Service.
:param source_bucket: The source Google cloud storage bucket where the
object is. (templated)
:type source_bucket: str
:param destination_bucket: The destination Google cloud storage bucket
where the object should be. (templated)
:type destination_bucket: str
:param project_id: The ID of the Google Cloud Platform Console project that
owns the job
:type project_id: str
:param gcp_conn_id: Optional connection ID to use when connecting to Google Cloud
Storage.
:type gcp_conn_id: str
:param delegate_to: The account to impersonate, if any.
For this to work, the service account making the request must have
domain-wide delegation enabled.
:type delegate_to: str
:param description: Optional transfer service job description
:type description: str
:param schedule: Optional transfer service schedule; see
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs.
If not set, run transfer job once as soon as the operator runs
:type schedule: dict
:param object_conditions: Optional transfer service object conditions; see
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/TransferSpec#ObjectConditions
:type object_conditions: dict
:param transfer_options: Optional transfer service transfer options; see
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/TransferSpec#TransferOptions
:type transfer_options: dict
:param wait: Wait for transfer to finish; defaults to `True`
:type wait: bool
**Example**:
.. code-block:: python
gcs_to_gcs_transfer_op = GoogleCloudStorageToGoogleCloudStorageTransferOperator(
task_id='gcs_to_gcs_transfer_example',
source_bucket='my-source-bucket',
destination_bucket='my-destination-bucket',
project_id='my-gcp-project',
dag=my_dag)
"""

template_fields = ('source_bucket', 'destination_bucket', 'description', 'object_conditions')
ui_color = '#e09411'

@apply_defaults
def __init__(self,
source_bucket,
destination_bucket,
project_id=None,
gcp_conn_id='google_cloud_default',
delegate_to=None,
description=None,
schedule=None,
object_conditions=None,
transfer_options=None,
wait=True,
*args,
**kwargs):

super(GoogleCloudStorageToGoogleCloudStorageTransferOperator, self).__init__(
*args,
**kwargs)
self.source_bucket = source_bucket
self.destination_bucket = destination_bucket
self.project_id = project_id
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to
self.description = description
self.schedule = schedule
self.object_conditions = object_conditions or {}
self.transfer_options = transfer_options or {}
self.wait = wait

def execute(self, context):
transfer_hook = GCPTransferServiceHook(
gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to)

job = transfer_hook.create_transfer_job(
project_id=self.project_id,
description=self.description,
schedule=self.schedule,
transfer_spec={
'gcsDataSource': {
'bucketName': self.source_bucket,
},
'gcsDataSink': {
'bucketName': self.destination_bucket,
},
'objectConditions': self.object_conditions,
'transferOptions': self.transfer_options,
}
)

if self.wait:
transfer_hook.wait_for_transfer_job(job)
8 changes: 4 additions & 4 deletions airflow/contrib/operators/s3_to_gcs_transfer_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class S3ToGoogleCloudStorageTransferOperator(BaseOperator):
:param gcs_bucket: The destination Google Cloud Storage bucket
where you want to store the files. (templated)
:type gcs_bucket: str
:param project_id: The ID of the Google Cloud Platform Console project that
:param project_id: Optional ID of the Google Cloud Platform Console project that
owns the job
:type project_id: str
:param aws_conn_id: The source S3 connection
Expand All @@ -51,10 +51,10 @@ class S3ToGoogleCloudStorageTransferOperator(BaseOperator):
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs.
If not set, run transfer job once as soon as the operator runs
:type schedule: dict
:param object_conditions: Transfer service object conditions; see
:param object_conditions: Optional transfer service object conditions; see
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/TransferSpec
:type object_conditions: dict
:param transfer_options: Transfer service transfer options; see
:param transfer_options: Optional transfer service transfer options; see
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/TransferSpec
:type transfer_options: dict
:param wait: Wait for transfer to finish
Expand All @@ -79,7 +79,7 @@ class S3ToGoogleCloudStorageTransferOperator(BaseOperator):
def __init__(self,
s3_bucket,
gcs_bucket,
project_id,
project_id=None,
aws_conn_id='aws_default',
gcp_conn_id='google_cloud_default',
delegate_to=None,
Expand Down
1 change: 1 addition & 0 deletions docs/code.rst
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ Operators
.. autoclass:: airflow.contrib.operators.gcs_operator.GoogleCloudStorageCreateBucketOperator
.. autoclass:: airflow.contrib.operators.gcs_to_bq.GoogleCloudStorageToBigQueryOperator
.. autoclass:: airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageToGoogleCloudStorageOperator
.. autoclass:: airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageToGoogleCloudStorageTransferOperator
.. autoclass:: airflow.contrib.operators.gcs_to_s3.GoogleCloudStorageToS3Operator
.. autoclass:: airflow.contrib.operators.hipchat_operator.HipChatAPIOperator
.. autoclass:: airflow.contrib.operators.hipchat_operator.HipChatAPISendRoomNotificationOperator
Expand Down
8 changes: 8 additions & 0 deletions docs/integration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1283,6 +1283,7 @@ Storage Operators
- :ref:`GoogleCloudStorageObjectCreateAclEntryOperator` : Creates a new ACL entry on the specified object.
- :ref:`GoogleCloudStorageToBigQueryOperator` : Loads files from Google cloud storage into BigQuery.
- :ref:`GoogleCloudStorageToGoogleCloudStorageOperator` : Copies objects from a bucket to another, with renaming if requested.
- :ref:`GoogleCloudStorageToGoogleCloudStorageTransferOperator` : Copies objects from a bucket to another using Google Transfer service.
- :ref:`MySqlToGoogleCloudStorageOperator`: Copy data from any MySQL Database to Google cloud storage in JSON format.

.. _FileToGoogleCloudStorageOperator:
Expand Down Expand Up @@ -1341,6 +1342,13 @@ GoogleCloudStorageToGoogleCloudStorageOperator

.. autoclass:: airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageToGoogleCloudStorageOperator

.. _GoogleCloudStorageToGoogleCloudStorageTransferOperator:

GoogleCloudStorageToGoogleCloudStorageTransferOperator
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.. autoclass:: airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageToGoogleCloudStorageTransferOperator

.. _MySqlToGoogleCloudStorageOperator:

MySqlToGoogleCloudStorageOperator
Expand Down
131 changes: 131 additions & 0 deletions tests/contrib/operators/test_gcs_to_gcs_transfer_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import unittest

from airflow.contrib.operators.gcs_to_gcs_transfer_operator import \
GoogleCloudStorageToGoogleCloudStorageTransferOperator

try:
from unittest import mock
except ImportError:
try:
import mock
except ImportError:
mock = None


TASK_ID = 'test-gcs-gcs-transfer-operator'
SOURCE_BUCKET = 'test-source-bucket'
DESTINATION_BUCKET = 'test-destination-bucket'
PROJECT_ID = 'test-project'
DESCRIPTION = 'test-description'
SCHEDULE = {
'scheduleStartDate': {'month': 10, 'day': 1, 'year': 2018},
'scheduleEndDate': {'month': 10, 'day': 31, 'year': 2018},
}


class GoogleCloudStorageToGoogleCloudStorageTransferOperatorTest(unittest.TestCase):
def test_constructor(self):
"""Test GoogleCloudStorageToGoogleCloudStorageTransferOperator instance is properly initialized."""

operator = GoogleCloudStorageToGoogleCloudStorageTransferOperator(
task_id=TASK_ID,
source_bucket=SOURCE_BUCKET,
destination_bucket=DESTINATION_BUCKET,
project_id=PROJECT_ID,
description=DESCRIPTION,
schedule=SCHEDULE,
)

self.assertEqual(operator.task_id, TASK_ID)
self.assertEqual(operator.source_bucket, SOURCE_BUCKET)
self.assertEqual(operator.destination_bucket, DESTINATION_BUCKET)
self.assertEqual(operator.project_id, PROJECT_ID)
self.assertEqual(operator.description, DESCRIPTION)
self.assertEqual(operator.schedule, SCHEDULE)

@mock.patch('airflow.contrib.operators.gcs_to_gcs_transfer_operator.GCPTransferServiceHook')
def test_execute(self, mock_transfer_hook):
"""Test the execute function when the run is successful."""

operator = GoogleCloudStorageToGoogleCloudStorageTransferOperator(
task_id=TASK_ID,
source_bucket=SOURCE_BUCKET,
destination_bucket=DESTINATION_BUCKET,
project_id=PROJECT_ID,
description=DESCRIPTION,
schedule=SCHEDULE,
)

operator.execute(None)

mock_transfer_hook.return_value.create_transfer_job.assert_called_once_with(
project_id=PROJECT_ID,
description=DESCRIPTION,
schedule=SCHEDULE,
transfer_spec={
'gcsDataSource': {
'bucketName': SOURCE_BUCKET,
},
'gcsDataSink': {
'bucketName': DESTINATION_BUCKET,
},
'objectConditions': {},
'transferOptions': {}
}
)

mock_transfer_hook.return_value.wait_for_transfer_job.assert_called_once_with(
mock_transfer_hook.return_value.create_transfer_job.return_value
)

@mock.patch('airflow.contrib.operators.gcs_to_gcs_transfer_operator.GCPTransferServiceHook')
def test_execute_skip_wait(self, mock_transfer_hook):
"""Test the execute function when the run is successful."""

operator = GoogleCloudStorageToGoogleCloudStorageTransferOperator(
task_id=TASK_ID,
source_bucket=SOURCE_BUCKET,
destination_bucket=DESTINATION_BUCKET,
project_id=PROJECT_ID,
description=DESCRIPTION,
wait=False,
)

operator.execute(None)

mock_transfer_hook.return_value.create_transfer_job.assert_called_once_with(
project_id=PROJECT_ID,
description=DESCRIPTION,
schedule=None,
transfer_spec={
'gcsDataSource': {
'bucketName': SOURCE_BUCKET,
},
'gcsDataSink': {
'bucketName': DESTINATION_BUCKET,
},
'objectConditions': {},
'transferOptions': {}
}
)

assert not mock_transfer_hook.return_value.wait_for_transfer_job.called
2 changes: 2 additions & 0 deletions tests/contrib/operators/test_s3_to_gcs_transfer_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,15 @@ def test_constructor(self):
gcs_bucket=GCS_BUCKET,
project_id=PROJECT_ID,
description=DESCRIPTION,
schedule=SCHEDULE,
)

self.assertEqual(operator.task_id, TASK_ID)
self.assertEqual(operator.s3_bucket, S3_BUCKET)
self.assertEqual(operator.gcs_bucket, GCS_BUCKET)
self.assertEqual(operator.project_id, PROJECT_ID)
self.assertEqual(operator.description, DESCRIPTION)
self.assertEqual(operator.schedule, SCHEDULE)

@mock.patch('airflow.contrib.operators.s3_to_gcs_transfer_operator.GCPTransferServiceHook')
@mock.patch('airflow.contrib.operators.s3_to_gcs_transfer_operator.S3Hook')
Expand Down

0 comments on commit c4f6965

Please sign in to comment.