Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Azure DataStore] Handle storage options as secrets #1206

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 15 additions & 9 deletions mlrun/datastore/azure_blob.py
Expand Up @@ -54,15 +54,21 @@ def get_filesystem(self, silent=True):

def get_storage_options(self):
return dict(
account_name=self._get_secret_or_env("AZURE_STORAGE_ACCOUNT_NAME"),
account_key=self._get_secret_or_env("AZURE_STORAGE_KEY"),
connection_string=self._get_secret_or_env(
"AZURE_STORAGE_CONNECTION_STRING"
),
tenant_id=self._get_secret_or_env("AZURE_STORAGE_TENANT_ID"),
client_id=self._get_secret_or_env("AZURE_STORAGE_CLIENT_ID"),
client_secret=self._get_secret_or_env("AZURE_STORAGE_CLIENT_SECRET"),
sas_token=self._get_secret_or_env("AZURE_STORAGE_SAS_TOKEN"),
account_name=self._get_secret_or_env("AZURE_STORAGE_ACCOUNT_NAME")
or self._get_secret_or_env("account_name"),
account_key=self._get_secret_or_env("AZURE_STORAGE_KEY")
or self._get_secret_or_env("account_key"),
connection_string=self._get_secret_or_env("AZURE_STORAGE_CONNECTION_STRING")
or self._get_secret_or_env("connection_string"),
tenant_id=self._get_secret_or_env("AZURE_STORAGE_TENANT_ID")
or self._get_secret_or_env("tenant_id"),
client_id=self._get_secret_or_env("AZURE_STORAGE_CLIENT_ID")
or self._get_secret_or_env("client_id"),
client_secret=self._get_secret_or_env("AZURE_STORAGE_CLIENT_SECRET")
or self._get_secret_or_env("client_secret"),
sas_token=self._get_secret_or_env("AZURE_STORAGE_SAS_TOKEN")
or self._get_secret_or_env("sas_token"),
credential=self._get_secret_or_env("credential"),
)

def _convert_key_to_remote_path(self, key):
Expand Down
10 changes: 10 additions & 0 deletions tests/integration/azure_blob/test-azure-blob.yml
Expand Up @@ -17,6 +17,16 @@ env:
# Storage container within the given storage account where objects will be created.
AZURE_CONTAINER:

# Support storage_options keywords
connection_string:
account_name:
account_key:
sas_token:
client_id:
client_secret:
client_id:
credential:

# Provide a mlrun function with kind="dask" (i.e. db://<PROJECT>/<FUNCTION>)
# or a Dask Scheduler address in the form ("tcp://"). If none, this will
# create a local distributed client, which causes DDF writes to run slowly
Expand Down
66 changes: 47 additions & 19 deletions tests/integration/azure_blob/test_azure_blob.py
Expand Up @@ -22,19 +22,31 @@
blob_file = f"file_{random.randint(0, 1000)}.txt"

AUTH_METHODS_AND_REQUIRED_PARAMS = {
"conn_str": ["AZURE_STORAGE_CONNECTION_STRING"],
"sas_token": ["AZURE_STORAGE_ACCOUNT_NAME", "AZURE_STORAGE_SAS_TOKEN"],
"account_key": ["AZURE_STORAGE_ACCOUNT_NAME", "AZURE_STORAGE_ACCOUNT_KEY"],
"spn": [
"env_conn_str": ["AZURE_STORAGE_CONNECTION_STRING"],
"env_sas_token": ["AZURE_STORAGE_ACCOUNT_NAME", "AZURE_STORAGE_SAS_TOKEN"],
"env_account_key": ["AZURE_STORAGE_ACCOUNT_NAME", "AZURE_STORAGE_ACCOUNT_KEY"],
"env_spn": [
"AZURE_STORAGE_ACCOUNT_NAME",
"AZURE_STORAGE_CLIENT_ID",
"AZURE_STORAGE_CLIENT_SECRET",
"AZURE_STORAGE_TENANT_ID",
],
"fsspec_conn_str": ["connection_string"],
"fsspec_sas_token": ["account_name", "sas_token"],
"fsspec_account_key": ["account_name", "account_key"],
"fsspec_spn": ["account_name", "client_id", "client_secret", "tenant_id"],
"fsspec_credential": ["credential"],
}


def verify_auth_parameters_and_configure_env(auth_method):
# This sets up the authentication method against Azure
# if testing the use of Azure credentials stored as
# environmental variable, it creates the environmental
# variables and returns storage_options = None. Otherwise
# it returns adlfs-recognized parameters compliant with the
# fsspec api. These get saved as secrets by mlrun.get_dataitem()
# for authentication.
if not config["env"].get("AZURE_CONTAINER"):
return False

Expand All @@ -46,14 +58,28 @@ def verify_auth_parameters_and_configure_env(auth_method):
if not test_params:
return False

for env_var in test_params:
env_value = config["env"].get(env_var)
if not env_value:
return False
os.environ[env_var] = env_value
if auth_method.startswith("env"):
for env_var in test_params:
env_value = config["env"].get(env_var)
if not env_value:
return False
os.environ[env_var] = env_value

logger.info(f"Testing auth method {auth_method}")
return True
logger.info(f"Testing auth method {auth_method}")
return None

elif auth_method.startswith("fsspec"):
storage_options = {}
for var in test_params:
value = config["env"].get(var)
if not value:
return False
storage_options[var] = value
logger.info(f"Testing auth method {auth_method}")
return storage_options

else:
raise ValueError("auth_method not known")


# Apply parametrization to all tests in this file. Skip test if auth method is not configured.
Expand All @@ -73,13 +99,13 @@ def verify_auth_parameters_and_configure_env(auth_method):


def test_azure_blob(auth_method):
verify_auth_parameters_and_configure_env(auth_method)
storage_options = verify_auth_parameters_and_configure_env(auth_method)
blob_path = "az://" + config["env"].get("AZURE_CONTAINER")
blob_url = blob_path + "/" + blob_dir + "/" + blob_file

print(f"\nBlob URL: {blob_url}")

data_item = mlrun.run.get_dataitem(blob_url)
data_item = mlrun.run.get_dataitem(blob_url, secrets=storage_options)
data_item.put(test_string)

# Validate append is properly blocked (currently not supported for Azure blobs)
Expand All @@ -97,29 +123,31 @@ def test_azure_blob(auth_method):


def test_list_dir(auth_method):
verify_auth_parameters_and_configure_env(auth_method)
storage_options = verify_auth_parameters_and_configure_env(auth_method)
blob_container_path = "az://" + config["env"].get("AZURE_CONTAINER")
blob_url = blob_container_path + "/" + blob_dir + "/" + blob_file
print(f"\nBlob URL: {blob_url}")

mlrun.run.get_dataitem(blob_url).put(test_string)
mlrun.run.get_dataitem(blob_url, storage_options).put(test_string)

# Check dir list for container
dir_list = mlrun.run.get_dataitem(blob_container_path).listdir()
dir_list = mlrun.run.get_dataitem(blob_container_path, storage_options).listdir()
assert blob_dir + "/" + blob_file in dir_list, "File not in container dir-list"

# Check dir list for folder in container
dir_list = mlrun.run.get_dataitem(blob_container_path + "/" + blob_dir).listdir()
dir_list = mlrun.run.get_dataitem(
blob_container_path + "/" + blob_dir, storage_options
).listdir()
assert blob_file in dir_list, "File not in folder dir-list"


def test_blob_upload(auth_method):
verify_auth_parameters_and_configure_env(auth_method)
storage_options = verify_auth_parameters_and_configure_env(auth_method)
blob_path = "az://" + config["env"].get("AZURE_CONTAINER")
blob_url = blob_path + "/" + blob_dir + "/" + blob_file
print(f"\nBlob URL: {blob_url}")

upload_data_item = mlrun.run.get_dataitem(blob_url)
upload_data_item = mlrun.run.get_dataitem(blob_url, storage_options)
upload_data_item.upload(test_filename)

response = upload_data_item.get()
Expand Down