Skip to content

Commit

Permalink
support container client and blob client for azure blob storage (#652)
Browse files Browse the repository at this point in the history
* support container client and blob client for azure blob storage

* add type alias to resolve flake8 E501 line-too-long

* remove type alias due to importlib thinking that 'azure' is not defined, add noqa to appease flake8

* Apply suggestions from code review

pytest asserts

* Update CHANGELOG.md

* avoid use of format()

Co-authored-by: Christopher Bare <christopher.bare@volparahealth.com>
Co-authored-by: Michael Penkov <misha.penkov@gmail.com>
Co-authored-by: Michael Penkov <m@penkov.dev>
  • Loading branch information
4 people committed Oct 10, 2021
1 parent 99cd400 commit d9c864e
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 17 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
@@ -1,6 +1,7 @@
# Unreleased

- Use pytest instead of parameterizedtestcase (PR [#657](https://github.com/RaRe-Technologies/smart_open/pull/657), [@mpenkov](https://github.com/mpenkov))
- Support container client and blob client for azure blob storage (PR [#652](https://github.com/RaRe-Technologies/smart_open/pull/652), [@cbare](https://github.com/cbare))

# 5.2.1, 28 August 2021

Expand Down
59 changes: 42 additions & 17 deletions smart_open/azure.py
Expand Up @@ -11,6 +11,7 @@
import base64
import io
import logging
from typing import Union

import smart_open.bytebuffer
import smart_open.constants
Expand Down Expand Up @@ -68,7 +69,7 @@ def open(
container_id,
blob_id,
mode,
client=None, # type: azure.storage.blob.BlobServiceClient
client=None, # type: Union[azure.storage.blob.BlobServiceClient, azure.storage.blob.ContainerClient, azure.storage.blob.BlobClient] # noqa
buffer_size=DEFAULT_BUFFER_SIZE,
min_part_size=_DEFAULT_MIN_PART_SIZE,
max_concurrency=DEFAULT_MAX_CONCURRENCY,
Expand All @@ -83,7 +84,7 @@ def open(
The name of the blob within the bucket.
mode: str
The mode for opening the object. Must be either "rb" or "wb".
client: azure.storage.blob.BlobServiceClient
client: azure.storage.blob.BlobServiceClient, ContainerClient, or BlobClient
The Azure Blob Storage client to use when working with azure-storage-blob.
buffer_size: int, optional
The buffer size to use when performing I/O. For reading only.
Expand Down Expand Up @@ -116,6 +117,27 @@ def open(
raise NotImplementedError('Azure Blob Storage support for mode %r not implemented' % mode)


def _get_blob_client(client, container, blob):
# type: (Union[azure.storage.blob.BlobServiceClient, azure.storage.blob.ContainerClient, azure.storage.blob.BlobClient], str, str) -> azure.storage.blob.BlobClient # noqa
"""
Return an Azure BlobClient starting with any of BlobServiceClient,
ContainerClient, or BlobClient plus container name and blob name.
"""
if hasattr(client, "get_container_client"):
client = client.get_container_client(container)

if hasattr(client, "container_name") and client.container_name != container:
raise ValueError(
"Client for %r doesn't match "
"container %r" % (client.container_name, container)
)

if hasattr(client, "get_blob_client"):
client = client.get_blob_client(blob)

return client


class _RawReader(object):
"""Read an Azure Blob Storage file."""

Expand Down Expand Up @@ -175,15 +197,16 @@ def __init__(
self,
container,
blob,
client, # type: azure.storage.blob.BlobServiceClient
client, # type: Union[azure.storage.blob.BlobServiceClient, azure.storage.blob.ContainerClient, azure.storage.blob.BlobClient] # noqa
buffer_size=DEFAULT_BUFFER_SIZE,
line_terminator=smart_open.constants.BINARY_NEWLINE,
max_concurrency=DEFAULT_MAX_CONCURRENCY,
):
self._container_client = client.get_container_client(container)
# type: azure.storage.blob.ContainerClient
self._container_name = container

self._blob = _get_blob_client(client, container, blob)
# type: azure.storage.blob.BlobClient

self._blob = self._container_client.get_blob_client(blob)
if self._blob is None:
raise azure.core.exceptions.ResourceNotFoundError(
'blob %s not found in %s' % (blob, container)
Expand Down Expand Up @@ -345,12 +368,12 @@ def __exit__(self, exc_type, exc_val, exc_tb):

def __str__(self):
return "(%s, %r, %r)" % (self.__class__.__name__,
self._container.container_name,
self._container_name,
self._blob.blob_name)

def __repr__(self):
return "%s(container=%r, blob=%r)" % (
self.__class__.__name__, self._container_client.container_name, self._blob.blob_name,
self.__class__.__name__, self._container_name, self._blob.blob_name,
)


Expand All @@ -363,13 +386,15 @@ def __init__(
self,
container,
blob,
client, # type: azure.storage.blob.BlobServiceClient
client, # type: Union[azure.storage.blob.BlobServiceClient, azure.storage.blob.ContainerClient, azure.storage.blob.BlobClient] # noqa
min_part_size=_DEFAULT_MIN_PART_SIZE,
):
self._client = client
self._container_client = self._client.get_container_client(container)
# type: azure.storage.blob.ContainerClient
self._blob = self._container_client.get_blob_client(blob) # type: azure.storage.blob.BlobClient
self._is_closed = False
self._container_name = container

self._blob = _get_blob_client(client, container, blob)
# type: azure.storage.blob.BlobClient

self._min_part_size = min_part_size

self._total_size = 0
Expand All @@ -396,12 +421,12 @@ def close(self):
self._upload_part()
self._blob.commit_block_list(self._block_list)
self._block_list = []
self._client = None
self._is_closed = True
logger.debug("successfully closed")

@property
def closed(self):
return self._client is None
return self._is_closed

def writable(self):
"""Return True if the stream supports writing."""
Expand Down Expand Up @@ -483,14 +508,14 @@ def __exit__(self, exc_type, exc_val, exc_tb):
def __str__(self):
return "(%s, %r, %r)" % (
self.__class__.__name__,
self._container_client.container_name,
self._container_name,
self._blob.blob_name
)

def __repr__(self):
return "%s(container=%r, blob=%r, min_part_size=%r)" % (
self.__class__.__name__,
self._container_client.container_name,
self._container_name,
self._blob.blob_name,
self._min_part_size
)
42 changes: 42 additions & 0 deletions smart_open/tests/test_azure.py
Expand Up @@ -526,6 +526,31 @@ def test_read_past_end(self):

self.assertEqual(data, content)

def test_read_container_client(self):
content = "spirits in the material world".encode("utf-8")
blob_name = "test_read_container_client_%s" % BLOB_NAME
put_to_container(blob_name, contents=content)

container_client = CLIENT.get_container_client(CONTAINER_NAME)

with smart_open.azure.Reader(CONTAINER_NAME, blob_name, container_client) as fin:
data = fin.read(100)

assert data == content

def test_read_blob_client(self):
content = "walking on the moon".encode("utf-8")
blob_name = "test_read_blob_client_%s" % BLOB_NAME
put_to_container(blob_name, contents=content)

container_client = CLIENT.get_container_client(CONTAINER_NAME)
blob_client = container_client.get_blob_client(blob_name)

with smart_open.azure.Reader(CONTAINER_NAME, blob_name, blob_client) as fin:
data = fin.read(100)

assert data == content


class WriterTest(unittest.TestCase):
"""Test writing into Azure Blob files."""
Expand All @@ -548,6 +573,23 @@ def test_write_01(self):
))
self.assertEqual(output, [test_string])

def test_write_container_client(self):
"""Does writing into Azure Blob Storage work correctly?"""
test_string = u"Hiszékeny Öngyilkos Vasárnap".encode('utf8')
blob_name = "test_write_container_client_%s" % BLOB_NAME

container_client = CLIENT.get_container_client(CONTAINER_NAME)

with smart_open.azure.Writer(CONTAINER_NAME, blob_name, container_client) as fout:
fout.write(test_string)

output = list(smart_open.open(
"azure://%s/%s" % (CONTAINER_NAME, blob_name),
"rb",
transport_params=dict(client=container_client),
))
assert output == [test_string]

def test_incorrect_input(self):
"""Does azure write fail on incorrect input?"""
blob_name = "test_incorrect_input_%s" % BLOB_NAME
Expand Down

0 comments on commit d9c864e

Please sign in to comment.