Skip to content
Branch: master
Find file Copy path
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
125 lines (107 sloc) 4.43 KB
# Copyright (c) 2012 - 2018, Anaconda, Inc. and Intake contributors
# All rights reserved.
# The full license is in the LICENSE file, distributed with this software.
from . import base, import_name
class TextFilesSource(base.DataSource):
"""Read textfiles as sequence of lines
Prototype of sources reading sequential data.
Takes a set of files, and returns an iterator over the text in each of them.
The files can be local or remote. Extra parameters for encoding, etc.,
go into ``storage_options``.
name = 'textfiles'
version = '0.0.1'
container = 'python'
partition_access = True
def __init__(self, urlpath, text_mode=True, text_encoding='utf8',
compression=None, decoder=None, read=True, metadata=None,
urlpath : str or list(str)
Target files. Can be a glob-path (with "*") and include protocol
specified (e.g., "s3://"). Can also be a list of absolute paths.
text_mode : bool
Whether to open the file in text mode, recoding binary
characters on the fly
text_encoding : str
If text_mode is True, apply this encoding. UTF* is by far the most
compression : str or None
If given, decompress the file with the given codec on load. Can
be something like "gz", "bz2", or to try to guess from the filename,
decoder : function, str or None
Use this to decode the contents of files. If None, you will get
a list of lines of text/bytes. If a function, it must operate on
an open file-like object or a bytes/str instance, and return a
read : bool
If decoder is not None, this flag controls whether bytes/str get
passed to the function indicated (True) or the open file-like
object (False)
storage_options: dict
Options to pass to the file reader backend, including text-specific
encoding arguments, and parameters specific to the remote
file-system driver, if using.
self._urlpath = urlpath
self._storage_options = storage_options or {}
self._dataframe = None
self._files = None
if isinstance(decoder, str):
decoder = import_name(decoder)
self.decoder = decoder
self.compression = compression
self.mode = 'rt' if text_mode else 'rb'
self.encoding = text_encoding
self._read = read
super(TextFilesSource, self).__init__(metadata=metadata)
def _get_schema(self):
from fsspec import open_files
if self._files is None:
urlpath = self._get_cache(self._urlpath)[0]
self._files = open_files(
urlpath, mode=self.mode, encoding=self.encoding,
self.npartitions = len(self._files)
return base.Schema(datashape=None,
shape=(None, ),
def _get_partition(self, i):
return get_file(self._files[i], self.decoder, self._read)
def read(self):
return self.to_dask().compute()
def to_spark(self):
from intake_spark.base import SparkHolder
h = SparkHolder(False, [
('textFile', (self._urlpath, ))
], {})
return h.setup()
def to_dask(self):
import dask.bag as db
from dask import delayed
dfile = delayed(get_file)
return db.from_delayed([dfile(f, self.decoder, self._read)
for f in self._files])
def get_file(f, decoder, read):
"""Serializable function to take an OpenFile object and read lines"""
with f as f:
if decoder is None:
return list(f)
d = if read else f
out = decoder(d)
if isinstance(out, (tuple, list)):
return out
return [out]
You can’t perform that action at this time.