Skip to content

Commit

Permalink
community[minor]: Add CloudBlobLoader that supports loading data from…
Browse files Browse the repository at this point in the history
… cloud buckets (#21957)

Thank you for contributing to LangChain!

- [ ] **PR title**: "Add CloudBlobLoader"
  - community: Add CloudBlobLoader

- [ ] **PR message**: Add cloud blob loader
    - **Description:** 
 Langchain provides several approaches to read different file formats:

Specific loaders (`CVSLoader`) or blob-compatible loaders
(`FileSystemBlobLoader`). The only implementation proposed for
BlobLoader is `FileSystemBlobLoader`.
      
Many projects retrieve files from cloud storage. We propose a new
implementation of `BlobLoader` to read files from the three cloud
storage systems. The interface is strictly identical to
`FileSystemBlobLoader`. The only difference is the constructor, which
takes a cloud "url" object such as `s3://my-bucket`, `az://my-bucket`,
or `gs://my-bucket`.
      
By streamlining the process, this novel implementation eliminates the
requirement to pre-download files from cloud storage to local temporary
files (which are seldom removed).
      
The code relies on the
[CloudPathLib](https://cloudpathlib.drivendata.org/stable/) library to
interpret cloud URLs. This has been added as an optional dependency.

```Python
loader = CloudBlobLoader("s3://mybucket/id")
for blob in loader.yield_blobs():
    print(blob)
```

- [X] **Dependencies:** CloudPathLib
- [X] **Twitter handle:** pprados


- [X] **Add tests and docs**: Add unit test, but it's easy to convert to
integration test, with some files in a cloud storage (see
`test_cloud_blob_loader.py`)

- [X] **Lint and test**: Run `make format`, `make lint` and `make test`
from the root of the package(s) you've modified.

Hello from Paris @hwchase17. Can you review this PR?

---------

Co-authored-by: Eugene Yurtsev <eugene@langchain.dev>
  • Loading branch information
pprados and eyurtsev committed May 23, 2024
1 parent 74947ec commit 6dd621d
Show file tree
Hide file tree
Showing 7 changed files with 550 additions and 124 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
from langchain_core.document_loaders import Blob, BlobLoader

if TYPE_CHECKING:
from langchain_community.document_loaders.blob_loaders.cloud_blob_loader import (
CloudBlobLoader,
)
from langchain_community.document_loaders.blob_loaders.file_system import (
FileSystemBlobLoader,
)
Expand All @@ -13,6 +16,9 @@


_module_lookup = {
"CloudBlobLoader": (
"langchain_community.document_loaders.blob_loaders.cloud_blob_loader"
),
"FileSystemBlobLoader": (
"langchain_community.document_loaders.blob_loaders.file_system"
),
Expand All @@ -32,6 +38,7 @@ def __getattr__(name: str) -> Any:
__all__ = [
"BlobLoader",
"Blob",
"CloudBlobLoader",
"FileSystemBlobLoader",
"YoutubeAudioLoader",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,289 @@
"""Use to load blobs from the local file system."""
import contextlib
import mimetypes
import tempfile
from io import BufferedReader, BytesIO
from pathlib import Path
from typing import (
TYPE_CHECKING,
Callable,
Generator,
Iterable,
Iterator,
Optional,
Sequence,
TypeVar,
Union,
)
from urllib.parse import urlparse

if TYPE_CHECKING:
from cloudpathlib import AnyPath

from langchain_community.document_loaders.blob_loaders.schema import (
Blob,
BlobLoader,
)

T = TypeVar("T")


class _CloudBlob(Blob):
def as_string(self) -> str:
"""Read data as a string."""
from cloudpathlib import AnyPath

if self.data is None and self.path:
return AnyPath(self.path).read_text(encoding=self.encoding) # type: ignore
elif isinstance(self.data, bytes):
return self.data.decode(self.encoding)
elif isinstance(self.data, str):
return self.data
else:
raise ValueError(f"Unable to get string for blob {self}")

def as_bytes(self) -> bytes:
"""Read data as bytes."""
from cloudpathlib import AnyPath

if isinstance(self.data, bytes):
return self.data
elif isinstance(self.data, str):
return self.data.encode(self.encoding)
elif self.data is None and self.path:
return AnyPath(self.path).read_bytes() # type: ignore
else:
raise ValueError(f"Unable to get bytes for blob {self}")

@contextlib.contextmanager
def as_bytes_io(self) -> Generator[Union[BytesIO, BufferedReader], None, None]:
"""Read data as a byte stream."""
from cloudpathlib import AnyPath

if isinstance(self.data, bytes):
yield BytesIO(self.data)
elif self.data is None and self.path:
return AnyPath(self.path).read_bytes() # type: ignore
else:
raise NotImplementedError(f"Unable to convert blob {self}")


def _url_to_filename(url: str) -> str:
"""
Convert file:, s3:, az: or gs: url to localfile.
If the file is not here, download it in a temporary file.
"""
from cloudpathlib import AnyPath

url_parsed = urlparse(url)
suffix = Path(url_parsed.path).suffix
if url_parsed.scheme in ["s3", "az", "gs"]:
with AnyPath(url).open("rb") as f: # type: ignore
temp_file = tempfile.NamedTemporaryFile(suffix=suffix, delete=False)
while True:
buf = f.read()
if not buf:
break
temp_file.write(buf)
temp_file.close()
file_path = temp_file.name
elif url_parsed.scheme in ["file", ""]:
file_path = url_parsed.path
else:
raise ValueError(f"Scheme {url_parsed.scheme} not supported")
return file_path


def _make_iterator(
length_func: Callable[[], int], show_progress: bool = False
) -> Callable[[Iterable[T]], Iterator[T]]:
"""Create a function that optionally wraps an iterable in tqdm."""
if show_progress:
try:
from tqdm.auto import tqdm
except ImportError:
raise ImportError(
"You must install tqdm to use show_progress=True."
"You can install tqdm with `pip install tqdm`."
)

# Make sure to provide `total` here so that tqdm can show
# a progress bar that takes into account the total number of files.
def _with_tqdm(iterable: Iterable[T]) -> Iterator[T]:
"""Wrap an iterable in a tqdm progress bar."""
return tqdm(iterable, total=length_func())

iterator = _with_tqdm
else:
iterator = iter # type: ignore

return iterator


# PUBLIC API


class CloudBlobLoader(BlobLoader):
"""Load blobs from cloud URL or file:.
Example:
.. code-block:: python
loader = CloudBlobLoader("s3://mybucket/id")
for blob in loader.yield_blobs():
print(blob)
""" # noqa: E501

def __init__(
self,
url: Union[str, "AnyPath"],
*,
glob: str = "**/[!.]*",
exclude: Sequence[str] = (),
suffixes: Optional[Sequence[str]] = None,
show_progress: bool = False,
) -> None:
"""Initialize with a url and how to glob over it.
Use [CloudPathLib](https://cloudpathlib.drivendata.org/).
Args:
url: Cloud URL to load from.
If a path to a file is provided, glob/exclude/suffixes are ignored.
glob: Glob pattern relative to the specified path
by default set to pick up all non-hidden files
exclude: patterns to exclude from results, use glob syntax
suffixes: Provide to keep only files with these suffixes
Useful when wanting to keep files with different suffixes
Suffixes must include the dot, e.g. ".txt"
show_progress: If true, will show a progress bar as the files are loaded.
This forces an iteration through all matching files
to count them prior to loading them.
Examples:
.. code-block:: python
from langchain_community.document_loaders.blob_loaders import CloudBlobLoader
# Load a single file.
loader = CloudBlobLoader("s3://mybucket/id") # az://
# Recursively load all text files in a directory.
loader = CloudBlobLoader("az://mybucket/id", glob="**/*.txt")
# Recursively load all non-hidden files in a directory.
loader = CloudBlobLoader("gs://mybucket/id", glob="**/[!.]*")
# Load all files in a directory without recursion.
loader = CloudBlobLoader("s3://mybucket/id", glob="*")
# Recursively load all files in a directory, except for py or pyc files.
loader = CloudBlobLoader(
"s3://mybucket/id",
glob="**/*.txt",
exclude=["**/*.py", "**/*.pyc"]
)
""" # noqa: E501
from cloudpathlib import AnyPath

url_parsed = urlparse(str(url))

if url_parsed.scheme == "file":
url = url_parsed.path

if isinstance(url, str):
self.path = AnyPath(url)
else:
self.path = url

self.glob = glob
self.suffixes = set(suffixes or [])
self.show_progress = show_progress
self.exclude = exclude

def yield_blobs(
self,
) -> Iterable[Blob]:
"""Yield blobs that match the requested pattern."""
iterator = _make_iterator(
length_func=self.count_matching_files, show_progress=self.show_progress
)

for path in iterator(self._yield_paths()):
# yield Blob.from_path(path)
yield self.from_path(path)

def _yield_paths(self) -> Iterable["AnyPath"]:
"""Yield paths that match the requested pattern."""
if self.path.is_file(): # type: ignore
yield self.path
return

paths = self.path.glob(self.glob)
for path in paths:
if self.exclude:
if any(path.match(glob) for glob in self.exclude):
continue
if path.is_file():
if self.suffixes and path.suffix not in self.suffixes:
continue # FIXME
yield path

def count_matching_files(self) -> int:
"""Count files that match the pattern without loading them."""
# Carry out a full iteration to count the files without
# materializing anything expensive in memory.
num = 0
for _ in self._yield_paths():
num += 1
return num

@classmethod
def from_path(
cls,
path: "AnyPath",
*,
encoding: str = "utf-8",
mime_type: Optional[str] = None,
guess_type: bool = True,
metadata: Optional[dict] = None,
) -> Blob:
"""Load the blob from a path like object.
Args:
path: path like object to file to be read
encoding: Encoding to use if decoding the bytes into a string
mime_type: if provided, will be set as the mime-type of the data
guess_type: If True, the mimetype will be guessed from the file extension,
if a mime-type was not provided
metadata: Metadata to associate with the blob
Returns:
Blob instance
"""
if mime_type is None and guess_type:
_mimetype = mimetypes.guess_type(path)[0] if guess_type else None # type: ignore
else:
_mimetype = mime_type

url_parsed = urlparse(str(path))
if url_parsed.scheme in ["file", ""]:
if url_parsed.scheme == "file":
local_path = url_parsed.path
else:
local_path = str(path)
return Blob(
data=None,
mimetype=_mimetype,
encoding=encoding,
path=local_path,
metadata=metadata if metadata is not None else {},
)

return _CloudBlob(
data=None,
mimetype=_mimetype,
encoding=encoding,
path=str(path),
metadata=metadata if metadata is not None else {},
)
Loading

0 comments on commit 6dd621d

Please sign in to comment.