-
Notifications
You must be signed in to change notification settings - Fork 4.6k
/
base.py
769 lines (660 loc) · 27.9 KB
/
base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
"""Simple reader that reads files of different formats from a directory."""
from abc import ABC, abstractmethod
import os
import logging
import mimetypes
import multiprocessing
import warnings
from datetime import datetime
from functools import reduce
import asyncio
from itertools import repeat
from pathlib import Path, PurePosixPath
import fsspec
from fsspec.implementations.local import LocalFileSystem
from typing import Any, Callable, Dict, Generator, List, Optional, Type
from llama_index.core.readers.base import BaseReader, ResourcesReaderMixin
from llama_index.core.async_utils import run_jobs, get_asyncio_module
from llama_index.core.schema import Document
from tqdm import tqdm
class FileSystemReaderMixin(ABC):
@abstractmethod
def read_file_content(self, input_file: Path, **kwargs) -> bytes:
"""
Read the bytes content of a file.
Args:
input_file (Path): Path to the file.
Returns:
bytes: File content.
"""
async def aread_file_content(self, input_file: Path, **kwargs) -> bytes:
"""
Read the bytes content of a file asynchronously.
Args:
input_file (Path): Path to the file.
Returns:
bytes: File content.
"""
return self.read_file_content(input_file, **kwargs)
def _try_loading_included_file_formats() -> Dict[str, Type[BaseReader]]:
try:
from llama_index.readers.file import (
DocxReader,
EpubReader,
HWPReader,
ImageReader,
IPYNBReader,
MarkdownReader,
MboxReader,
PandasCSVReader,
PandasExcelReader,
PDFReader,
PptxReader,
VideoAudioReader,
) # pants: no-infer-dep
except ImportError:
raise ImportError("`llama-index-readers-file` package not found")
default_file_reader_cls: Dict[str, Type[BaseReader]] = {
".hwp": HWPReader,
".pdf": PDFReader,
".docx": DocxReader,
".pptx": PptxReader,
".ppt": PptxReader,
".pptm": PptxReader,
".gif": ImageReader,
".jpg": ImageReader,
".png": ImageReader,
".jpeg": ImageReader,
".webp": ImageReader,
".mp3": VideoAudioReader,
".mp4": VideoAudioReader,
".csv": PandasCSVReader,
".epub": EpubReader,
".md": MarkdownReader,
".mbox": MboxReader,
".ipynb": IPYNBReader,
".xls": PandasExcelReader,
".xlsx": PandasExcelReader,
}
return default_file_reader_cls
def _format_file_timestamp(
timestamp: float, include_time: bool = False
) -> Optional[str]:
"""
Format file timestamp to a %Y-%m-%d string.
Args:
timestamp (float): timestamp in float
include_time (bool): whether to include time in the formatted string
Returns:
str: formatted timestamp
"""
try:
if include_time:
return datetime.utcfromtimestamp(timestamp).strftime("%Y-%m-%dT%H:%M:%SZ")
return datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d")
except Exception:
return None
def default_file_metadata_func(
file_path: str, fs: Optional[fsspec.AbstractFileSystem] = None
) -> Dict:
"""
Get some handy metadata from filesystem.
Args:
file_path: str: file path in str
"""
fs = fs or get_default_fs()
stat_result = fs.stat(file_path)
try:
file_name = os.path.basename(str(stat_result["name"]))
except Exception as e:
file_name = os.path.basename(file_path)
creation_date = _format_file_timestamp(stat_result.get("created"))
last_modified_date = _format_file_timestamp(stat_result.get("mtime"))
last_accessed_date = _format_file_timestamp(stat_result.get("atime"))
default_meta = {
"file_path": file_path,
"file_name": file_name,
"file_type": mimetypes.guess_type(file_path)[0],
"file_size": stat_result.get("size"),
"creation_date": creation_date,
"last_modified_date": last_modified_date,
"last_accessed_date": last_accessed_date,
}
# Return not null value
return {
meta_key: meta_value
for meta_key, meta_value in default_meta.items()
if meta_value is not None
}
class _DefaultFileMetadataFunc:
"""
Default file metadata function wrapper which stores the fs.
Allows for pickling of the function.
"""
def __init__(self, fs: Optional[fsspec.AbstractFileSystem] = None):
self.fs = fs or get_default_fs()
def __call__(self, file_path: str) -> Dict:
return default_file_metadata_func(file_path, self.fs)
def get_default_fs() -> fsspec.AbstractFileSystem:
return LocalFileSystem()
def is_default_fs(fs: fsspec.AbstractFileSystem) -> bool:
return isinstance(fs, LocalFileSystem) and not fs.auto_mkdir
logger = logging.getLogger(__name__)
class SimpleDirectoryReader(BaseReader, ResourcesReaderMixin, FileSystemReaderMixin):
"""
Simple directory reader.
Load files from file directory.
Automatically select the best file reader given file extensions.
Args:
input_dir (str): Path to the directory.
input_files (List): List of file paths to read
(Optional; overrides input_dir, exclude)
exclude (List): glob of python file paths to exclude (Optional)
exclude_hidden (bool): Whether to exclude hidden files (dotfiles).
encoding (str): Encoding of the files.
Default is utf-8.
errors (str): how encoding and decoding errors are to be handled,
see https://docs.python.org/3/library/functions.html#open
recursive (bool): Whether to recursively search in subdirectories.
False by default.
filename_as_id (bool): Whether to use the filename as the document id.
False by default.
required_exts (Optional[List[str]]): List of required extensions.
Default is None.
file_extractor (Optional[Dict[str, BaseReader]]): A mapping of file
extension to a BaseReader class that specifies how to convert that file
to text. If not specified, use default from DEFAULT_FILE_READER_CLS.
num_files_limit (Optional[int]): Maximum number of files to read.
Default is None.
file_metadata (Optional[Callable[str, Dict]]): A function that takes
in a filename and returns a Dict of metadata for the Document.
Default is None.
raise_on_error (bool): Whether to raise an error if a file cannot be read.
fs (Optional[fsspec.AbstractFileSystem]): File system to use. Defaults
to using the local file system. Can be changed to use any remote file system
exposed via the fsspec interface.
"""
supported_suffix_fn: Callable = _try_loading_included_file_formats
def __init__(
self,
input_dir: Optional[str] = None,
input_files: Optional[List] = None,
exclude: Optional[List] = None,
exclude_hidden: bool = True,
errors: str = "ignore",
recursive: bool = False,
encoding: str = "utf-8",
filename_as_id: bool = False,
required_exts: Optional[List[str]] = None,
file_extractor: Optional[Dict[str, BaseReader]] = None,
num_files_limit: Optional[int] = None,
file_metadata: Optional[Callable[[str], Dict]] = None,
raise_on_error: bool = False,
fs: Optional[fsspec.AbstractFileSystem] = None,
) -> None:
"""Initialize with parameters."""
super().__init__()
if not input_dir and not input_files:
raise ValueError("Must provide either `input_dir` or `input_files`.")
self.fs = fs or get_default_fs()
self.errors = errors
self.encoding = encoding
self.exclude = exclude
self.recursive = recursive
self.exclude_hidden = exclude_hidden
self.required_exts = required_exts
self.num_files_limit = num_files_limit
self.raise_on_error = raise_on_error
_Path = Path if is_default_fs(self.fs) else PurePosixPath
if input_files:
self.input_files = []
for path in input_files:
if not self.fs.isfile(path):
raise ValueError(f"File {path} does not exist.")
input_file = _Path(path)
self.input_files.append(input_file)
elif input_dir:
if not self.fs.isdir(input_dir):
raise ValueError(f"Directory {input_dir} does not exist.")
self.input_dir = _Path(input_dir)
self.exclude = exclude
self.input_files = self._add_files(self.input_dir)
if file_extractor is not None:
self.file_extractor = file_extractor
else:
self.file_extractor = {}
self.file_metadata = file_metadata or _DefaultFileMetadataFunc(self.fs)
self.filename_as_id = filename_as_id
def is_hidden(self, path: Path) -> bool:
return any(
part.startswith(".") and part not in [".", ".."] for part in path.parts
)
def _add_files(self, input_dir: Path) -> List[Path]:
"""Add files."""
all_files = set()
rejected_files = set()
rejected_dirs = set()
# Default to POSIX paths for non-default file systems (e.g. S3)
_Path = Path if is_default_fs(self.fs) else PurePosixPath
if self.exclude is not None:
for excluded_pattern in self.exclude:
if self.recursive:
# Recursive glob
excluded_glob = _Path(input_dir) / _Path("**") / excluded_pattern
else:
# Non-recursive glob
excluded_glob = _Path(input_dir) / excluded_pattern
for file in self.fs.glob(str(excluded_glob)):
if self.fs.isdir(file):
rejected_dirs.add(_Path(file))
else:
rejected_files.add(_Path(file))
file_refs: List[str] = []
if self.recursive:
file_refs = self.fs.glob(str(input_dir) + "/**/*")
else:
file_refs = self.fs.glob(str(input_dir) + "/*")
for ref in file_refs:
# Manually check if file is hidden or directory instead of
# in glob for backwards compatibility.
ref = _Path(ref)
is_dir = self.fs.isdir(ref)
skip_because_hidden = self.exclude_hidden and self.is_hidden(ref)
skip_because_bad_ext = (
self.required_exts is not None and ref.suffix not in self.required_exts
)
skip_because_excluded = ref in rejected_files
if not skip_because_excluded:
if is_dir:
ref_parent_dir = ref
else:
ref_parent_dir = self.fs._parent(ref)
for rejected_dir in rejected_dirs:
if str(ref_parent_dir).startswith(str(rejected_dir)):
skip_because_excluded = True
logger.debug(
"Skipping %s because it in parent dir %s which is in %s",
ref,
ref_parent_dir,
rejected_dir,
)
break
if (
is_dir
or skip_because_hidden
or skip_because_bad_ext
or skip_because_excluded
):
continue
else:
all_files.add(ref)
new_input_files = sorted(all_files)
if len(new_input_files) == 0:
raise ValueError(f"No files found in {input_dir}.")
if self.num_files_limit is not None and self.num_files_limit > 0:
new_input_files = new_input_files[0 : self.num_files_limit]
# print total number of files added
logger.debug(
f"> [SimpleDirectoryReader] Total files added: {len(new_input_files)}"
)
return new_input_files
def _exclude_metadata(self, documents: List[Document]) -> List[Document]:
"""
Exclude metadata from documents.
Args:
documents (List[Document]): List of documents.
"""
for doc in documents:
# Keep only metadata['file_path'] in both embedding and llm content
# str, which contain extreme important context that about the chunks.
# Dates is provided for convenience of postprocessor such as
# TimeWeightedPostprocessor, but excluded for embedding and LLMprompts
doc.excluded_embed_metadata_keys.extend(
[
"file_name",
"file_type",
"file_size",
"creation_date",
"last_modified_date",
"last_accessed_date",
]
)
doc.excluded_llm_metadata_keys.extend(
[
"file_name",
"file_type",
"file_size",
"creation_date",
"last_modified_date",
"last_accessed_date",
]
)
return documents
def list_resources(self, *args: Any, **kwargs: Any) -> List[Path]:
"""List files in the given filesystem."""
return self.input_files
def get_resource_info(self, resource_id: str, *args: Any, **kwargs: Any) -> Dict:
info_result = self.fs.info(resource_id)
creation_date = _format_file_timestamp(
info_result.get("created"), include_time=True
)
last_modified_date = _format_file_timestamp(
info_result.get("mtime"), include_time=True
)
info_dict = {
"file_path": resource_id,
"file_size": info_result.get("size"),
"creation_date": creation_date,
"last_modified_date": last_modified_date,
}
# Ignore None values
return {
meta_key: meta_value
for meta_key, meta_value in info_dict.items()
if meta_value is not None
}
def load_resource(
self, resource_id: str, *args: Any, **kwargs: Any
) -> List[Document]:
file_metadata = kwargs.get("file_metadata", self.file_metadata)
file_extractor = kwargs.get("file_extractor", self.file_extractor)
filename_as_id = kwargs.get("filename_as_id", self.filename_as_id)
encoding = kwargs.get("encoding", self.encoding)
errors = kwargs.get("errors", self.errors)
raise_on_error = kwargs.get("raise_on_error", self.raise_on_error)
fs = kwargs.get("fs", self.fs)
return SimpleDirectoryReader.load_file(
input_file=Path(resource_id),
file_metadata=file_metadata,
file_extractor=file_extractor,
filename_as_id=filename_as_id,
encoding=encoding,
errors=errors,
raise_on_error=raise_on_error,
fs=fs,
**kwargs,
)
async def aload_resource(
self, resource_id: str, *args: Any, **kwargs: Any
) -> List[Document]:
file_metadata = kwargs.get("file_metadata", self.file_metadata)
file_extractor = kwargs.get("file_extractor", self.file_extractor)
filename_as_id = kwargs.get("filename_as_id", self.filename_as_id)
encoding = kwargs.get("encoding", self.encoding)
errors = kwargs.get("errors", self.errors)
raise_on_error = kwargs.get("raise_on_error", self.raise_on_error)
fs = kwargs.get("fs", self.fs)
return await SimpleDirectoryReader.aload_file(
input_file=Path(resource_id),
file_metadata=file_metadata,
file_extractor=file_extractor,
filename_as_id=filename_as_id,
encoding=encoding,
errors=errors,
raise_on_error=raise_on_error,
fs=fs,
**kwargs,
)
def read_file_content(self, input_file: Path, **kwargs) -> bytes:
"""Read file content."""
fs: fsspec.AbstractFileSystem = kwargs.get("fs", self.fs)
with fs.open(input_file, errors=self.errors, encoding=self.encoding) as f:
return f.read()
@staticmethod
def load_file(
input_file: Path,
file_metadata: Callable[[str], Dict],
file_extractor: Dict[str, BaseReader],
filename_as_id: bool = False,
encoding: str = "utf-8",
errors: str = "ignore",
raise_on_error: bool = False,
fs: Optional[fsspec.AbstractFileSystem] = None,
) -> List[Document]:
"""
Static method for loading file.
NOTE: necessarily as a static method for parallel processing.
Args:
input_file (Path): _description_
file_metadata (Callable[[str], Dict]): _description_
file_extractor (Dict[str, BaseReader]): _description_
filename_as_id (bool, optional): _description_. Defaults to False.
encoding (str, optional): _description_. Defaults to "utf-8".
errors (str, optional): _description_. Defaults to "ignore".
fs (Optional[fsspec.AbstractFileSystem], optional): _description_. Defaults to None.
input_file (Path): File path to read
file_metadata ([Callable[str, Dict]]): A function that takes
in a filename and returns a Dict of metadata for the Document.
file_extractor (Dict[str, BaseReader]): A mapping of file
extension to a BaseReader class that specifies how to convert that file
to text.
filename_as_id (bool): Whether to use the filename as the document id.
encoding (str): Encoding of the files.
Default is utf-8.
errors (str): how encoding and decoding errors are to be handled,
see https://docs.python.org/3/library/functions.html#open
raise_on_error (bool): Whether to raise an error if a file cannot be read.
fs (Optional[fsspec.AbstractFileSystem]): File system to use. Defaults
to using the local file system. Can be changed to use any remote file system
Returns:
List[Document]: loaded documents
"""
# TODO: make this less redundant
default_file_reader_cls = SimpleDirectoryReader.supported_suffix_fn()
default_file_reader_suffix = list(default_file_reader_cls.keys())
metadata: Optional[dict] = None
documents: List[Document] = []
if file_metadata is not None:
metadata = file_metadata(str(input_file))
file_suffix = input_file.suffix.lower()
if file_suffix in default_file_reader_suffix or file_suffix in file_extractor:
# use file readers
if file_suffix not in file_extractor:
# instantiate file reader if not already
reader_cls = default_file_reader_cls[file_suffix]
file_extractor[file_suffix] = reader_cls()
reader = file_extractor[file_suffix]
# load data -- catch all errors except for ImportError
try:
kwargs = {"extra_info": metadata}
if fs and not is_default_fs(fs):
kwargs["fs"] = fs
docs = reader.load_data(input_file, **kwargs)
except ImportError as e:
# ensure that ImportError is raised so user knows
# about missing dependencies
raise ImportError(str(e))
except Exception as e:
if raise_on_error:
raise Exception("Error loading file") from e
# otherwise, just skip the file and report the error
print(
f"Failed to load file {input_file} with error: {e}. Skipping...",
flush=True,
)
return []
# iterate over docs if needed
if filename_as_id:
for i, doc in enumerate(docs):
doc.id_ = f"{input_file!s}_part_{i}"
documents.extend(docs)
else:
# do standard read
fs = fs or get_default_fs()
with fs.open(input_file, errors=errors, encoding=encoding) as f:
data = f.read().decode(encoding, errors=errors)
doc = Document(text=data, metadata=metadata or {})
if filename_as_id:
doc.id_ = str(input_file)
documents.append(doc)
return documents
async def aload_file(self, input_file: Path) -> List[Document]:
"""Load file asynchronously."""
# TODO: make this less redundant
default_file_reader_cls = SimpleDirectoryReader.supported_suffix_fn()
default_file_reader_suffix = list(default_file_reader_cls.keys())
metadata: Optional[dict] = None
documents: List[Document] = []
if self.file_metadata is not None:
metadata = self.file_metadata(str(input_file))
file_suffix = input_file.suffix.lower()
if (
file_suffix in default_file_reader_suffix
or file_suffix in self.file_extractor
):
# use file readers
if file_suffix not in self.file_extractor:
# instantiate file reader if not already
reader_cls = default_file_reader_cls[file_suffix]
self.file_extractor[file_suffix] = reader_cls()
reader = self.file_extractor[file_suffix]
# load data -- catch all errors except for ImportError
try:
kwargs = {"extra_info": metadata}
if self.fs and not is_default_fs(self.fs):
kwargs["fs"] = self.fs
docs = await reader.aload_data(input_file, **kwargs)
except ImportError as e:
# ensure that ImportError is raised so user knows
# about missing dependencies
raise ImportError(str(e))
except Exception as e:
if self.raise_on_error:
raise
# otherwise, just skip the file and report the error
print(
f"Failed to load file {input_file} with error: {e}. Skipping...",
flush=True,
)
return []
# iterate over docs if needed
if self.filename_as_id:
for i, doc in enumerate(docs):
doc.id_ = f"{input_file!s}_part_{i}"
documents.extend(docs)
else:
# do standard read
fs = self.fs or get_default_fs()
with fs.open(input_file, errors=self.errors, encoding=self.encoding) as f:
data = f.read().decode(self.encoding, errors=self.errors)
doc = Document(text=data, metadata=metadata or {})
if self.filename_as_id:
doc.id_ = str(input_file)
documents.append(doc)
return documents
def load_data(
self,
show_progress: bool = False,
num_workers: Optional[int] = None,
fs: Optional[fsspec.AbstractFileSystem] = None,
) -> List[Document]:
"""
Load data from the input directory.
Args:
show_progress (bool): Whether to show tqdm progress bars. Defaults to False.
num_workers (Optional[int]): Number of workers to parallelize data-loading over.
fs (Optional[fsspec.AbstractFileSystem]): File system to use. If fs was specified
in the constructor, it will override the fs parameter here.
Returns:
List[Document]: A list of documents.
"""
documents = []
files_to_process = self.input_files
fs = fs or self.fs
if num_workers and num_workers > 1:
if num_workers > multiprocessing.cpu_count():
warnings.warn(
"Specified num_workers exceed number of CPUs in the system. "
"Setting `num_workers` down to the maximum CPU count."
)
with multiprocessing.get_context("spawn").Pool(num_workers) as p:
results = p.starmap(
SimpleDirectoryReader.load_file,
zip(
files_to_process,
repeat(self.file_metadata),
repeat(self.file_extractor),
repeat(self.filename_as_id),
repeat(self.encoding),
repeat(self.errors),
repeat(self.raise_on_error),
repeat(fs),
),
)
documents = reduce(lambda x, y: x + y, results)
else:
if show_progress:
files_to_process = tqdm(
self.input_files, desc="Loading files", unit="file"
)
for input_file in files_to_process:
documents.extend(
SimpleDirectoryReader.load_file(
input_file=input_file,
file_metadata=self.file_metadata,
file_extractor=self.file_extractor,
filename_as_id=self.filename_as_id,
encoding=self.encoding,
errors=self.errors,
raise_on_error=self.raise_on_error,
fs=fs,
)
)
return self._exclude_metadata(documents)
async def aload_data(
self,
show_progress: bool = False,
num_workers: Optional[int] = None,
fs: Optional[fsspec.AbstractFileSystem] = None,
) -> List[Document]:
"""
Load data from the input directory.
Args:
show_progress (bool): Whether to show tqdm progress bars. Defaults to False.
num_workers (Optional[int]): Number of workers to parallelize data-loading over.
fs (Optional[fsspec.AbstractFileSystem]): File system to use. If fs was specified
in the constructor, it will override the fs parameter here.
Returns:
List[Document]: A list of documents.
"""
files_to_process = self.input_files
fs = fs or self.fs
coroutines = [self.aload_file(input_file) for input_file in files_to_process]
if num_workers:
document_lists = await run_jobs(
coroutines, show_progress=show_progress, workers=num_workers
)
elif show_progress:
_asyncio = get_asyncio_module(show_progress=show_progress)
document_lists = await _asyncio.gather(*coroutines)
else:
document_lists = await asyncio.gather(*coroutines)
documents = [doc for doc_list in document_lists for doc in doc_list]
return self._exclude_metadata(documents)
def iter_data(
self, show_progress: bool = False
) -> Generator[List[Document], Any, Any]:
"""
Load data iteratively from the input directory.
Args:
show_progress (bool): Whether to show tqdm progress bars. Defaults to False.
Returns:
Generator[List[Document]]: A list of documents.
"""
files_to_process = self.input_files
if show_progress:
files_to_process = tqdm(self.input_files, desc="Loading files", unit="file")
for input_file in files_to_process:
documents = SimpleDirectoryReader.load_file(
input_file=input_file,
file_metadata=self.file_metadata,
file_extractor=self.file_extractor,
filename_as_id=self.filename_as_id,
encoding=self.encoding,
errors=self.errors,
raise_on_error=self.raise_on_error,
fs=self.fs,
)
documents = self._exclude_metadata(documents)
if len(documents) > 0:
yield documents