Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Datastore] Extend Azure blob to support other auth methods #1140

Merged
merged 24 commits into from Jul 26, 2021
Merged
Changes from 2 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
a786ff5
Leverage adlfs to enable ABFS operations when authenticating with cre…
hayesgb Jul 23, 2021
b55b9cb
First commit
hayesgb Jul 23, 2021
93e71d3
Set self.bsc in azure_blob.py
hayesgb Jul 23, 2021
d4f680e
Merge branch 'enable_abfs' into upload_ddf_to_datastore
hayesgb Jul 23, 2021
0dba58c
Updated line 196 to assure storage_options is passed to read_parquet …
hayesgb Jul 23, 2021
b29b8b9
Removed inplace keyword s from drop ince it is not s supported by Dask
hayesgb Jul 23, 2021
065bd89
Removed pdb import
hayesgb Jul 23, 2021
df1578a
WThe default config for Dask workers is 4G memory, 1 CPU. Writing la…
hayesgb Jul 23, 2021
99a5111
Passing test of base.py
hayesgb Jul 24, 2021
398e8e7
Clean up
hayesgb Jul 24, 2021
31a85cc
Linting
hayesgb Jul 25, 2021
873df5a
Updated listdir to return all files in a dir
hayesgb Jul 25, 2021
fa1fe60
Update to stat method in azure_blob
hayesgb Jul 25, 2021
6419e50
Setting tests across all authentication methods
hayesgb Jul 25, 2021
1591c3e
Linting and pass offset to start in azure_blob.py get method
hayesgb Jul 25, 2021
1263934
Resolving commit errors
hayesgb Jul 25, 2021
52045a4
Fixed merge conflict in azure_blob.py
hayesgb Jul 25, 2021
75bc5d0
Linting
hayesgb Jul 25, 2021
efadbef
Update mlrun/datastore/azure_blob.py
hayesgb Jul 26, 2021
1514ae0
Update mlrun/datastore/azure_blob.py
hayesgb Jul 26, 2021
ac92fda
Update tests/integration/azure_blob/test_azure_blob.py
hayesgb Jul 26, 2021
c1c6fb7
Update tests/integration/azure_blob/test_azure_blob.py
hayesgb Jul 26, 2021
7bde182
Updated handling of env_vars in test_azure_blob.py
hayesgb Jul 26, 2021
2dc8a1b
Fixes for executing tests against azure_blob.py
hayesgb Jul 26, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
70 changes: 51 additions & 19 deletions mlrun/datastore/azure_blob.py
Expand Up @@ -30,6 +30,9 @@ def __init__(self, parent, schema, name, endpoint=""):
con_string = self._get_secret_or_env("AZURE_STORAGE_CONNECTION_STRING")
if con_string:
self.bsc = BlobServiceClient.from_connection_string(con_string)
else:
self.bsc = None
self.get_filesystem()

def get_filesystem(self, silent=True):
"""return fsspec file system object, if supported"""
Expand All @@ -53,36 +56,65 @@ def get_storage_options(self):
connection_string=self._get_secret_or_env(
"AZURE_STORAGE_CONNECTION_STRING"
),
tenant_id=self._get_secret_or_env("AZURE_STORAGE_TENANT_ID"),
client_id=self._get_secret_or_env("AZURE_STORAGE_CLIENT_ID"),
client_secret=self._get_secret_or_env("AZURE_STORAGE_CLIENT_SECRET"),
sas_token=self._get_secret_or_env("AZURE_STORAGE_SAS_TOKEN"),
)

def upload(self, key, src_path):
# Need to strip leading / from key
blob_client = self.bsc.get_blob_client(container=self.endpoint, blob=key[1:])
with open(src_path, "rb") as data:
blob_client.upload_blob(data, overwrite=True)
if self.bsc:
# Need to strip leading / from key
blob_client = self.bsc.get_blob_client(container=self.endpoint, blob=key[1:])
with open(src_path, "rb") as data:
blob_client.upload_blob(data, overwrite=True)
else:
remote_path = f"{self.endpoint}{key}"
self._filesystem.put_file(src_path, remote_path)

def get(self, key, size=None, offset=0):
blob_client = self.bsc.get_blob_client(container=self.endpoint, blob=key[1:])
size = size if size else None
return blob_client.download_blob(offset, size).readall()
if self.bsc:
blob_client = self.bsc.get_blob_client(container=self.endpoint, blob=key[1:])
size = size if size else None
return blob_client.download_blob(offset, size).readall()
else:
blob = self._filesystem.cat_file(key, start=0)
hayesgb marked this conversation as resolved.
Show resolved Hide resolved
return blob

def put(self, key, data, append=False):
blob_client = self.bsc.get_blob_client(container=self.endpoint, blob=key[1:])
# Note that append=True is not supported. If the blob already exists, this call will fail
blob_client.upload_blob(data, overwrite=True)
if self.bsc:
blob_client = self.bsc.get_blob_client(container=self.endpoint, blob=key[1:])
# Note that append=True is not supported. If the blob already exists, this call will fail
theSaarco marked this conversation as resolved.
Show resolved Hide resolved
blob_client.upload_blob(data, overwrite=True)
else:
path = f"{self.endpoint}{key}"
with self._filesystem.open(path, "wb") as f:
f.write(data)

def stat(self, key):
blob_client = self.bsc.get_blob_client(container=self.endpoint, blob=key[1:])
props = blob_client.get_blob_properties()
size = props.size
modified = props.last_modified
if self.bsc:
blob_client = self.bsc.get_blob_client(container=self.endpoint, blob=key[1:])
props = blob_client.get_blob_properties()
size = props.size
modified = props.last_modified
else:
path = f"{self.endpoint}{key}"
files = self._filesystem.ls(path, detail=True)
hayesgb marked this conversation as resolved.
Show resolved Hide resolved
size = files[0]['size']
modified = files[0]['last_modified']
return FileStats(size, time.mktime(modified.timetuple()))


def listdir(self, key):
if key and not key.endswith("/"):
key = key[1:] + "/"

key_length = len(key)
container_client = self.bsc.get_container_client(self.endpoint)
blob_list = container_client.list_blobs(name_starts_with=key)
return [blob.name[key_length:] for blob in blob_list]
if self.bsc:
key_length = len(key)
container_client = self.bsc.get_container_client(self.endpoint)
blob_list = container_client.list_blobs(name_starts_with=key)
return [blob.name[key_length:] for blob in blob_list]
else:
path = f"{self.endpoint}{key}"
files = self._filesystem.ls(path, detail=True)
return [f for f in files if f['type'] == 'directory']
hayesgb marked this conversation as resolved.
Show resolved Hide resolved