Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Rebase] Add s3 canned ACL support #779

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/aws_s3_setup.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ Create an IAM Policy called `MedusaStorageStrategy`, with the following definiti
"s3:GetReplicationConfiguration",
"s3:ListMultipartUploadParts",
"s3:PutObject",
"s3:PutObjectAcl",
"s3:GetObject",
"s3:GetObjectTorrent",
"s3:PutObjectRetention",
Expand Down
3 changes: 3 additions & 0 deletions medusa-example.ini
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ use_sudo_for_restore = True
; Read timeout in seconds for the storage provider.
;read_timeout = 60

; Canned ACL for uploaded objects on S3. Defaults to private
canned_acl = private

[monitoring]
;monitoring_provider = <Provider used for sending metrics. Currently either of "ffwd" or "local">

Expand Down
6 changes: 4 additions & 2 deletions medusa/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
['bucket_name', 'key_file', 'prefix', 'fqdn', 'host_file_separator', 'storage_provider', 'storage_class',
'base_path', 'max_backup_age', 'max_backup_count', 'api_profile', 'transfer_max_bandwidth',
'concurrent_transfers', 'multi_part_upload_threshold', 'host', 'region', 'port', 'secure', 'ssl_verify',
'aws_cli_path', 'kms_id', 'backup_grace_period_in_days', 'use_sudo_for_restore', 'k8s_mode', 'read_timeout']
'aws_cli_path', 'kms_id', 'backup_grace_period_in_days', 'use_sudo_for_restore', 'k8s_mode', 'read_timeout',
'canned_acl']
)

CassandraConfig = collections.namedtuple(
Expand Down Expand Up @@ -117,7 +118,8 @@ def _build_default_config():
'region': 'default',
'backup_grace_period_in_days': 10,
'use_sudo_for_restore': 'True',
'read_timeout': 60
'read_timeout': 60,
'canned_acl': 'private',
}

config['logging'] = {
Expand Down
27 changes: 17 additions & 10 deletions medusa/storage/s3_base_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ def __init__(self, config):

self.connection_extra_args = self._make_connection_arguments(config)
self.transfer_config = self._make_transfer_config(config)
self.canned_acl = config.canned_acl

self.executor = concurrent.futures.ThreadPoolExecutor(int(config.concurrent_transfers))

Expand Down Expand Up @@ -259,14 +260,18 @@ async def _list_blobs(self, prefix=None) -> t.List[AbstractBlob]:
@retry(stop_max_attempt_number=MAX_UP_DOWN_LOAD_RETRIES, wait_fixed=5000)
async def _upload_object(self, data: io.BytesIO, object_key: str, headers: t.Dict[str, str]) -> AbstractBlob:

kms_args = {}
extra_args = {}
if self.kms_id is not None:
kms_args['ServerSideEncryption'] = 'aws:kms'
kms_args['SSEKMSKeyId'] = self.kms_id
extra_args['ServerSideEncryption'] = 'aws:kms'
extra_args['SSEKMSKeyId'] = self.kms_id

storage_class = self.get_storage_class()
if storage_class is not None:
kms_args['StorageClass'] = storage_class
extra_args['StorageClass'] = storage_class

# doing this to a bucket w/o ACLS enabled causes AccessControlListNotSupported error
if self.canned_acl is not None:
extra_args['ACL'] = self.canned_acl

logging.debug(
'[S3 Storage] Uploading object from stream -> s3://{}/{}'.format(
Expand All @@ -281,7 +286,7 @@ async def _upload_object(self, data: io.BytesIO, object_key: str, headers: t.Dic
Bucket=self.bucket_name,
Key=object_key,
Body=data,
**kms_args,
**extra_args,
)
except Exception as e:
logging.error(e)
Expand Down Expand Up @@ -352,14 +357,16 @@ async def _upload_blob(self, src: str, dest: str) -> ManifestObject:
# check if objects resides in a sub-folder (e.g. secondary index). if it does, use the sub-folder in object path
object_key = AbstractStorage.path_maybe_with_parent(dest, src_path)

kms_args = {}
extra_args = {}
if self.kms_id is not None:
kms_args['ServerSideEncryption'] = 'aws:kms'
kms_args['SSEKMSKeyId'] = self.kms_id
extra_args['ServerSideEncryption'] = 'aws:kms'
extra_args['SSEKMSKeyId'] = self.kms_id

storage_class = self.get_storage_class()
if storage_class is not None:
kms_args['StorageClass'] = storage_class
extra_args['StorageClass'] = storage_class
if self.canned_acl is not None:
extra_args['ACL'] = self.canned_acl

file_size = os.stat(src).st_size
logging.debug(
Expand All @@ -373,7 +380,7 @@ async def _upload_blob(self, src: str, dest: str) -> ManifestObject:
'Bucket': self.bucket_name,
'Key': object_key,
'Config': self.transfer_config,
'ExtraArgs': kms_args,
'ExtraArgs': extra_args,
}
# we are going to combine asyncio with boto's threading
# we do this by submitting the upload into an executor
Expand Down
33 changes: 17 additions & 16 deletions tests/integration/features/integration_tests.feature
Original file line number Diff line number Diff line change
Expand Up @@ -1151,36 +1151,37 @@ Feature: Integration tests
Scenario Outline: Perform a differential backup with explicit storage class, then verify it
Given I have a fresh ccm cluster "<client encryption>" running named "scenario32"
Given I will use "<storage class>" as storage class in the storage
Given I will use "<canned ACL>" canned ACL when uploading objects
Given I am using "<storage>" as storage provider in ccm cluster "<client encryption>" with gRPC server
When I create the "test" table with secondary index in keyspace "medusa"
When I load 100 rows in the "medusa.test" table
When I run a "ccm node1 nodetool -- -Dcom.sun.jndi.rmiURLParsing=legacy flush" command
When I perform a backup in "differential" mode of the node named "first_backup" with md5 checks "disabled"
Then I can see the backup named "first_backup" when I list the backups
Then I can verify the backup named "first_backup" with md5 checks "disabled" successfully
Then I can see 2 SSTables with "<storage class>" in the SSTable pool for the "test" table in keyspace "medusa"
Then I can see 2 SSTables with "<storage class>" and "<canned ACL>" in the SSTable pool for the "test" table in keyspace "medusa"

@s3
Examples: S3 storage
| storage | client encryption | storage class |
| s3_us_west_oregon | without_client_encryption | STANDARD |
| s3_us_west_oregon | without_client_encryption | REDUCED_REDUNDANCY |
| s3_us_west_oregon | without_client_encryption | STANDARD_IA |
| s3_us_west_oregon | without_client_encryption | ONEZONE_IA |
| s3_us_west_oregon | without_client_encryption | INTELLIGENT_TIERING |
| storage | client encryption | storage class | canned ACL |
| s3_us_west_oregon | without_client_encryption | STANDARD | bucket-owner-read |
| s3_us_west_oregon | without_client_encryption | REDUCED_REDUNDANCY | bucket-owner-read |
| s3_us_west_oregon | without_client_encryption | STANDARD_IA | bucket-owner-read |
| s3_us_west_oregon | without_client_encryption | ONEZONE_IA | bucket-owner-read |
| s3_us_west_oregon | without_client_encryption | INTELLIGENT_TIERING | bucket-owner-read |

@gcs
Examples: Google Cloud Storage
| storage | client encryption | storage class |
| google_storage | without_client_encryption | STANDARD |
| storage | client encryption | storage class | canned ACL |
| google_storage | without_client_encryption | STANDARD | None |
# this is buggy for now, the library does not propagate the custom storage class headers
# | google_storage | without_client_encryption | NEARLINE |
# | google_storage | without_client_encryption | COLDLINE |
# | google_storage | without_client_encryption | ARCHIVE |
# | google_storage | without_client_encryption | NEARLINE | None |
# | google_storage | without_client_encryption | COLDLINE | None |
# | google_storage | without_client_encryption | ARCHIVE | None |

@azure
Examples: Azure Blob Storage
| storage | client encryption | storage class |
| azure_blobs | without_client_encryption | HOT |
| azure_blobs | without_client_encryption | COOL |
| azure_blobs | without_client_encryption | COLD |
| storage | client encryption | storage class | canned ACL |
| azure_blobs | without_client_encryption | HOT | None |
| azure_blobs | without_client_encryption | COOL | None |
| azure_blobs | without_client_encryption | COLD | None |
22 changes: 18 additions & 4 deletions tests/integration/features/steps/integration_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,11 @@ def i_will_use_storage_class(context, storage_class):
context.storage_class = storage_class


@given(r'I will use "{canned_acl}" canned ACL when uploading objects')
def i_will_use_canned_acl(context, canned_acl):
context.canned_acl = canned_acl


@given(r'I am using "{storage_provider}" as storage provider in ccm cluster "{client_encryption}"')
def i_am_using_storage_provider(context, storage_provider, client_encryption):
context.storage_provider = storage_provider
Expand Down Expand Up @@ -538,6 +543,8 @@ def get_args(context, storage_provider, client_encryption, cassandra_url, use_mg
storage_args = {"prefix": storage_prefix}
if hasattr(context, "storage_class"):
storage_args.update({"storage_class": context.storage_class})
if hasattr(context, "canned_acl"):
storage_args.update({"canned_acl": context.canned_acl})

cassandra_args = {
"is_ccm": str(is_ccm),
Expand Down Expand Up @@ -1277,14 +1284,15 @@ def _the_backup_index_exists(context):
def _i_can_see_nb_sstables_in_the_sstable_pool(
context, nb_sstables, table_name, keyspace
):
_i_can_see_nb_sstables_with_storage_class_in_the_sstable_pool(context, nb_sstables, None, table_name, keyspace)
_i_can_see_nb_sstables_with_storage_class_in_the_sstable_pool(
context, nb_sstables, None, None, table_name, keyspace
)


# Then I can see 2 SSTables with "<storage class>" in the SSTable pool for the "test" table in keyspace "medusa"
@then(r'I can see {nb_sstables} SSTables with "{storage_class}" in the SSTable pool '
@then(r'I can see {nb_sstables} SSTables with "{storage_class}" and "{canned_acl}" in the SSTable pool '
r'for the "{table_name}" table in keyspace "{keyspace}"')
def _i_can_see_nb_sstables_with_storage_class_in_the_sstable_pool(
context, nb_sstables, storage_class, table_name, keyspace
context, nb_sstables, storage_class, canned_acl, table_name, keyspace
):
with Storage(config=context.medusa_config.storage) as storage:
path = os.path.join(
Expand All @@ -1302,6 +1310,12 @@ def _i_can_see_nb_sstables_with_storage_class_in_the_sstable_pool(
logging.info(f'{storage_class.upper()} vs {sstable.storage_class.upper()}')
assert storage_class.upper() == sstable.storage_class.upper()

# to make checking ACLs work, we'd need to make the driver call
# response = s3.get_object_acl(Bucket=bucket_name, Key=object_key)
# but that assumes we have a bucket with ACLs enabled
if canned_acl is not None:
pass


@then(
r'backup named "{backup_name}" has {nb_files} files '
Expand Down
24 changes: 16 additions & 8 deletions tests/storage/s3_storage_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ def test_credentials_with_default_region_and_s3_compatible_storage(self):
'region': 'default',
'storage_provider': 's3_compatible',
'key_file': credentials_file.name,
'concurrent_transfers': '1'
'concurrent_transfers': '1',
'canned_acl': 'public-read',
})

credentials = S3BaseStorage._consolidate_credentials(config)
Expand All @@ -220,7 +221,8 @@ def test_make_s3_url(self):
'ssl_verify': 'False',
'host': None,
'port': None,
'concurrent_transfers': '1'
'concurrent_transfers': '1',
'canned_acl': 'public-read',
})
s3_storage = S3BaseStorage(config)
# there are no extra connection args when connecting to regular S3
Expand All @@ -244,7 +246,8 @@ def test_make_s3_url_without_secure(self):
'ssl_verify': 'False',
'host': None,
'port': None,
'concurrent_transfers': '1'
'concurrent_transfers': '1',
'canned_acl': 'public-read',
})
s3_storage = S3BaseStorage(config)
# again, no extra connection args when connecting to regular S3
Expand All @@ -269,7 +272,8 @@ def test_make_s3_compatible_url(self):
'ssl_verify': 'False',
'host': 's3.example.com',
'port': '443',
'concurrent_transfers': '1'
'concurrent_transfers': '1',
'canned_acl': 'public-read',
})
s3_storage = S3BaseStorage(config)
self.assertEqual(
Expand All @@ -292,7 +296,8 @@ def test_make_s3_compatible_url_without_secure(self):
'ssl_verify': 'False',
'host': 's3.example.com',
'port': '8080',
'concurrent_transfers': '1'
'concurrent_transfers': '1',
'canned_acl': 'public-read',
})
s3_storage = S3BaseStorage(config)
self.assertEqual(
Expand All @@ -314,7 +319,8 @@ def test_make_connection_arguments_without_ssl_verify(self):
'ssl_verify': 'False',
'host': 's3.example.com',
'port': '8080',
'concurrent_transfers': '1'
'concurrent_transfers': '1',
'canned_acl': 'public-read',
})
s3_storage = S3BaseStorage(config)
connection_args = s3_storage._make_connection_arguments(config)
Expand All @@ -334,7 +340,8 @@ def test_make_connection_arguments_with_ssl_verify(self):
'ssl_verify': 'True',
'host': 's3.example.com',
'port': '8080',
'concurrent_transfers': '1'
'concurrent_transfers': '1',
'canned_acl': 'public-read',
})
s3_storage = S3BaseStorage(config)
connection_args = s3_storage._make_connection_arguments(config)
Expand Down Expand Up @@ -375,7 +382,8 @@ def test_assume_role_authentication(self):
'ssl_verify': 'False',
'host': None,
'port': None,
'concurrent_transfers': '1'
'concurrent_transfers': '1',
'canned_acl': 'public-read',
})

# Replace the open function with the mock
Expand Down
Loading