Skip to content

Commit

Permalink
Actually use the ACLs with S3 + Add (half of) ITs for ACLs
Browse files Browse the repository at this point in the history
  • Loading branch information
rzvoncek committed Jun 17, 2024
1 parent e1664c1 commit 38d1307
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 37 deletions.
27 changes: 17 additions & 10 deletions medusa/storage/s3_base_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ def __init__(self, config):

self.connection_extra_args = self._make_connection_arguments(config)
self.transfer_config = self._make_transfer_config(config)
self.canned_acl = config.canned_acl

self.executor = concurrent.futures.ThreadPoolExecutor(int(config.concurrent_transfers))

Expand Down Expand Up @@ -259,14 +260,18 @@ async def _list_blobs(self, prefix=None) -> t.List[AbstractBlob]:
@retry(stop_max_attempt_number=MAX_UP_DOWN_LOAD_RETRIES, wait_fixed=5000)
async def _upload_object(self, data: io.BytesIO, object_key: str, headers: t.Dict[str, str]) -> AbstractBlob:

kms_args = {}
extra_args = {}
if self.kms_id is not None:
kms_args['ServerSideEncryption'] = 'aws:kms'
kms_args['SSEKMSKeyId'] = self.kms_id
extra_args['ServerSideEncryption'] = 'aws:kms'
extra_args['SSEKMSKeyId'] = self.kms_id

storage_class = self.get_storage_class()
if storage_class is not None:
kms_args['StorageClass'] = storage_class
extra_args['StorageClass'] = storage_class

# doing this to a bucket w/o ACLS enabled causes AccessControlListNotSupported error
if self.canned_acl is not None:
extra_args['ACL'] = self.canned_acl

logging.debug(
'[S3 Storage] Uploading object from stream -> s3://{}/{}'.format(
Expand All @@ -281,7 +286,7 @@ async def _upload_object(self, data: io.BytesIO, object_key: str, headers: t.Dic
Bucket=self.bucket_name,
Key=object_key,
Body=data,
**kms_args,
**extra_args,
)
except Exception as e:
logging.error(e)
Expand Down Expand Up @@ -352,14 +357,16 @@ async def _upload_blob(self, src: str, dest: str) -> ManifestObject:
# check if objects resides in a sub-folder (e.g. secondary index). if it does, use the sub-folder in object path
object_key = AbstractStorage.path_maybe_with_parent(dest, src_path)

kms_args = {}
extra_args = {}
if self.kms_id is not None:
kms_args['ServerSideEncryption'] = 'aws:kms'
kms_args['SSEKMSKeyId'] = self.kms_id
extra_args['ServerSideEncryption'] = 'aws:kms'
extra_args['SSEKMSKeyId'] = self.kms_id

storage_class = self.get_storage_class()
if storage_class is not None:
kms_args['StorageClass'] = storage_class
extra_args['StorageClass'] = storage_class
if self.canned_acl is not None:
extra_args['ACL'] = self.canned_acl

file_size = os.stat(src).st_size
logging.debug(
Expand All @@ -373,7 +380,7 @@ async def _upload_blob(self, src: str, dest: str) -> ManifestObject:
'Bucket': self.bucket_name,
'Key': object_key,
'Config': self.transfer_config,
'ExtraArgs': kms_args,
'ExtraArgs': extra_args,
}
# we are going to combine asyncio with boto's threading
# we do this by submitting the upload into an executor
Expand Down
33 changes: 17 additions & 16 deletions tests/integration/features/integration_tests.feature
Original file line number Diff line number Diff line change
Expand Up @@ -1151,36 +1151,37 @@ Feature: Integration tests
Scenario Outline: Perform a differential backup with explicit storage class, then verify it
Given I have a fresh ccm cluster "<client encryption>" running named "scenario32"
Given I will use "<storage class>" as storage class in the storage
Given I will use "<canned ACL>" canned ACL when uploading objects
Given I am using "<storage>" as storage provider in ccm cluster "<client encryption>" with gRPC server
When I create the "test" table with secondary index in keyspace "medusa"
When I load 100 rows in the "medusa.test" table
When I run a "ccm node1 nodetool -- -Dcom.sun.jndi.rmiURLParsing=legacy flush" command
When I perform a backup in "differential" mode of the node named "first_backup" with md5 checks "disabled"
Then I can see the backup named "first_backup" when I list the backups
Then I can verify the backup named "first_backup" with md5 checks "disabled" successfully
Then I can see 2 SSTables with "<storage class>" in the SSTable pool for the "test" table in keyspace "medusa"
Then I can see 2 SSTables with "<storage class>" and "<canned ACL>" in the SSTable pool for the "test" table in keyspace "medusa"

@s3
Examples: S3 storage
| storage | client encryption | storage class |
| s3_us_west_oregon | without_client_encryption | STANDARD |
| s3_us_west_oregon | without_client_encryption | REDUCED_REDUNDANCY |
| s3_us_west_oregon | without_client_encryption | STANDARD_IA |
| s3_us_west_oregon | without_client_encryption | ONEZONE_IA |
| s3_us_west_oregon | without_client_encryption | INTELLIGENT_TIERING |
| storage | client encryption | storage class | canned ACL |
| s3_us_west_oregon | without_client_encryption | STANDARD | bucket-owner-read |
| s3_us_west_oregon | without_client_encryption | REDUCED_REDUNDANCY | bucket-owner-read |
| s3_us_west_oregon | without_client_encryption | STANDARD_IA | bucket-owner-read |
| s3_us_west_oregon | without_client_encryption | ONEZONE_IA | bucket-owner-read |
| s3_us_west_oregon | without_client_encryption | INTELLIGENT_TIERING | bucket-owner-read |

@gcs
Examples: Google Cloud Storage
| storage | client encryption | storage class |
| google_storage | without_client_encryption | STANDARD |
| storage | client encryption | storage class | canned ACL |
| google_storage | without_client_encryption | STANDARD | None |
# this is buggy for now, the library does not propagate the custom storage class headers
# | google_storage | without_client_encryption | NEARLINE |
# | google_storage | without_client_encryption | COLDLINE |
# | google_storage | without_client_encryption | ARCHIVE |
# | google_storage | without_client_encryption | NEARLINE | None |
# | google_storage | without_client_encryption | COLDLINE | None |
# | google_storage | without_client_encryption | ARCHIVE | None |

@azure
Examples: Azure Blob Storage
| storage | client encryption | storage class |
| azure_blobs | without_client_encryption | HOT |
| azure_blobs | without_client_encryption | COOL |
| azure_blobs | without_client_encryption | COLD |
| storage | client encryption | storage class | canned ACL |
| azure_blobs | without_client_encryption | HOT | None |
| azure_blobs | without_client_encryption | COOL | None |
| azure_blobs | without_client_encryption | COLD | None |
18 changes: 15 additions & 3 deletions tests/integration/features/steps/integration_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,11 @@ def i_will_use_storage_class(context, storage_class):
context.storage_class = storage_class


@given(r'I will use "{canned_acl}" canned ACL when uploading objects')
def i_will_use_canned_acl(context, canned_acl):
context.canned_acl = canned_acl


@given(r'I am using "{storage_provider}" as storage provider in ccm cluster "{client_encryption}"')
def i_am_using_storage_provider(context, storage_provider, client_encryption):
context.storage_provider = storage_provider
Expand Down Expand Up @@ -538,6 +543,8 @@ def get_args(context, storage_provider, client_encryption, cassandra_url, use_mg
storage_args = {"prefix": storage_prefix}
if hasattr(context, "storage_class"):
storage_args.update({"storage_class": context.storage_class})
if hasattr(context, "canned_acl"):
storage_args.update({"canned_acl": context.canned_acl})

cassandra_args = {
"is_ccm": str(is_ccm),
Expand Down Expand Up @@ -1280,11 +1287,10 @@ def _i_can_see_nb_sstables_in_the_sstable_pool(
_i_can_see_nb_sstables_with_storage_class_in_the_sstable_pool(context, nb_sstables, None, table_name, keyspace)


# Then I can see 2 SSTables with "<storage class>" in the SSTable pool for the "test" table in keyspace "medusa"
@then(r'I can see {nb_sstables} SSTables with "{storage_class}" in the SSTable pool '
@then(r'I can see {nb_sstables} SSTables with "{storage_class}" and "{canned_acl}" in the SSTable pool '
r'for the "{table_name}" table in keyspace "{keyspace}"')
def _i_can_see_nb_sstables_with_storage_class_in_the_sstable_pool(
context, nb_sstables, storage_class, table_name, keyspace
context, nb_sstables, storage_class, canned_acl, table_name, keyspace
):
with Storage(config=context.medusa_config.storage) as storage:
path = os.path.join(
Expand All @@ -1302,6 +1308,12 @@ def _i_can_see_nb_sstables_with_storage_class_in_the_sstable_pool(
logging.info(f'{storage_class.upper()} vs {sstable.storage_class.upper()}')
assert storage_class.upper() == sstable.storage_class.upper()

# to make checking ACLs work, we'd need to make the driver call
# response = s3.get_object_acl(Bucket=bucket_name, Key=object_key)
# but that assumes we have a bucket with ACLs enabled
if canned_acl is not None:
pass


@then(
r'backup named "{backup_name}" has {nb_files} files '
Expand Down
24 changes: 16 additions & 8 deletions tests/storage/s3_storage_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ def test_credentials_with_default_region_and_s3_compatible_storage(self):
'region': 'default',
'storage_provider': 's3_compatible',
'key_file': credentials_file.name,
'concurrent_transfers': '1'
'concurrent_transfers': '1',
'canned_acl': 'public-read',
})

credentials = S3BaseStorage._consolidate_credentials(config)
Expand All @@ -220,7 +221,8 @@ def test_make_s3_url(self):
'ssl_verify': 'False',
'host': None,
'port': None,
'concurrent_transfers': '1'
'concurrent_transfers': '1',
'canned_acl': 'public-read',
})
s3_storage = S3BaseStorage(config)
# there are no extra connection args when connecting to regular S3
Expand All @@ -244,7 +246,8 @@ def test_make_s3_url_without_secure(self):
'ssl_verify': 'False',
'host': None,
'port': None,
'concurrent_transfers': '1'
'concurrent_transfers': '1',
'canned_acl': 'public-read',
})
s3_storage = S3BaseStorage(config)
# again, no extra connection args when connecting to regular S3
Expand All @@ -269,7 +272,8 @@ def test_make_s3_compatible_url(self):
'ssl_verify': 'False',
'host': 's3.example.com',
'port': '443',
'concurrent_transfers': '1'
'concurrent_transfers': '1',
'canned_acl': 'public-read',
})
s3_storage = S3BaseStorage(config)
self.assertEqual(
Expand All @@ -292,7 +296,8 @@ def test_make_s3_compatible_url_without_secure(self):
'ssl_verify': 'False',
'host': 's3.example.com',
'port': '8080',
'concurrent_transfers': '1'
'concurrent_transfers': '1',
'canned_acl': 'public-read',
})
s3_storage = S3BaseStorage(config)
self.assertEqual(
Expand All @@ -314,7 +319,8 @@ def test_make_connection_arguments_without_ssl_verify(self):
'ssl_verify': 'False',
'host': 's3.example.com',
'port': '8080',
'concurrent_transfers': '1'
'concurrent_transfers': '1',
'canned_acl': 'public-read',
})
s3_storage = S3BaseStorage(config)
connection_args = s3_storage._make_connection_arguments(config)
Expand All @@ -334,7 +340,8 @@ def test_make_connection_arguments_with_ssl_verify(self):
'ssl_verify': 'True',
'host': 's3.example.com',
'port': '8080',
'concurrent_transfers': '1'
'concurrent_transfers': '1',
'canned_acl': 'public-read',
})
s3_storage = S3BaseStorage(config)
connection_args = s3_storage._make_connection_arguments(config)
Expand Down Expand Up @@ -375,7 +382,8 @@ def test_assume_role_authentication(self):
'ssl_verify': 'False',
'host': None,
'port': None,
'concurrent_transfers': '1'
'concurrent_transfers': '1',
'canned_acl': 'public-read',
})

# Replace the open function with the mock
Expand Down

0 comments on commit 38d1307

Please sign in to comment.