Skip to content

Commit

Permalink
update fileio docs (#52)
Browse files Browse the repository at this point in the history
  • Loading branch information
svenkreiss committed Aug 6, 2016
1 parent 978451e commit 7559711
Show file tree
Hide file tree
Showing 13 changed files with 178 additions and 72 deletions.
60 changes: 5 additions & 55 deletions docs/sphinx/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,59 +9,9 @@ or ``Context.textFile("path/to/textfile.txt")``. These two methods return an
``RDD`` which can then be processed with the methods below.


RDD
---
.. toctree::
:maxdepth: 2

.. autoclass:: pysparkling.RDD
:members:

.. autoclass:: pysparkling.StatCounter
:members:


Context
-------

A ``Context`` describes the setup. Instantiating a Context with the default
arguments using ``Context()`` is the most lightweight setup. All data is just
in the local thread and is never serialized or deserialized.

If you want to process the data in parallel, you can use the ``multiprocessing``
module. Given the limitations of the default ``pickle`` serializer, you can
specify to serialize all methods with ``cloudpickle`` instead. For example,
a common instantiation with ``multiprocessing`` looks like this:

.. code-block:: python
c = Context(
multiprocessing.Pool(4),
serializer=cloudpickle.dumps,
deserializer=pickle.loads,
)
This assumes that your data is serializable with ``pickle`` which is generally
faster. You can also specify a custom serializer/deserializer for data.

.. autoclass:: pysparkling.Context
:members:


fileio
------

The functionality provided by this module is used in ``Context.textFile()``
for reading and in ``RDD.saveAsTextFile()`` for writing. You can use this
submodule for writing files directly with ``File(filename).dump(some_data)``,
``File(filename).load()`` and ``File.exists(path)`` to read, write and check
for existance of a file. All methods transparently handle various schemas
(for example ``http://``, ``s3://`` and ``file://``) and
compression/decompression of ``.gz`` and ``.bz2`` files (among others).

Use environment variables ``AWS_SECRET_ACCESS_KEY`` and ``AWS_ACCESS_KEY_ID``
for auth and use file paths of the form ``s3://bucket_name/filename.txt``.

.. autoclass:: pysparkling.fileio.File
:members:

.. autoclass:: pysparkling.fileio.TextFile
:members:
api_rdd
api_context
api_fileio
29 changes: 29 additions & 0 deletions docs/sphinx/api_context.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
.. _api_context:

.. currentmodule:: pysparkling

Context
-------

A :class:`Context` describes the setup. Instantiating a Context with the default
arguments using ``Context()`` is the most lightweight setup. All data is just
in the local thread and is never serialized or deserialized.

If you want to process the data in parallel, you can use the ``multiprocessing``
module. Given the limitations of the default ``pickle`` serializer, you can
specify to serialize all methods with ``cloudpickle`` instead. For example,
a common instantiation with ``multiprocessing`` looks like this:

.. code-block:: python
c = Context(
multiprocessing.Pool(4),
serializer=cloudpickle.dumps,
deserializer=pickle.loads,
)
This assumes that your data is serializable with ``pickle`` which is generally
faster. You can also specify a custom serializer/deserializer for data.

.. autoclass:: pysparkling.Context
:members:
43 changes: 43 additions & 0 deletions docs/sphinx/api_fileio.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
.. _api_fileio:


fileio
------

.. currentmodule:: pysparkling

The functionality provided by this module is used in :func:`Context.textFile`
for reading and in :func:`RDD.saveAsTextFile` for writing.

.. currentmodule:: pysparkling.fileio

You can use this submodule for writing files directly with :func:`File.dump`,
:func:`File.load` and :func:`File.exists` to read, write and check
for existance of a file. All methods transparently handle various schemas
(for example ``http://``, ``s3://`` and ``file://``) and
compression/decompression of ``.gz`` and ``.bz2`` files (among others).


.. autoclass:: pysparkling.fileio.File
:members:

.. autoclass:: pysparkling.fileio.TextFile
:members:

.. autoclass:: pysparkling.fileio.fs.FileSystem
:members:

.. autoclass:: pysparkling.fileio.fs.S3
:members:

.. autoclass:: pysparkling.fileio.fs.GS
:members:

.. autoclass:: pysparkling.fileio.fs.Hdfs
:members:

.. autoclass:: pysparkling.fileio.fs.Http
:members:

.. autoclass:: pysparkling.fileio.fs.Local
:members:
10 changes: 10 additions & 0 deletions docs/sphinx/api_rdd.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
.. _api_rdd:

RDD
---

.. autoclass:: pysparkling.RDD
:members:

.. autoclass:: pysparkling.StatCounter
:members:
11 changes: 11 additions & 0 deletions pysparkling/fileio/codec/codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,22 @@


class Codec(object):
"""Codec."""
def __init__(self):
pass

def compress(self, stream):
"""Compress.
:param io.BytesIO stream: Uncompressed input stream.
:rtype: io.BytesIO
"""
return stream

def decompress(self, stream):
"""Decompress.
:param io.BytesIO stream: Compressed input stream.
:rtype: io.BytesIO
"""
return stream
2 changes: 1 addition & 1 deletion pysparkling/fileio/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@


class File(object):
"""file object
"""File object.
:param file_name: Any file name.
"""
Expand Down
35 changes: 35 additions & 0 deletions pysparkling/fileio/fs/file_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,57 @@


class FileSystem(object):
"""Interface class for the file system.
:param str file_name: File name.
"""
def __init__(self, file_name):
self.file_name = file_name

@staticmethod
def resolve_filenames(expr):
"""Resolve the given glob-like expression to filenames.
:rtype: list
"""
log.error('Cannot resolve: {0}'.format(expr))

def exists(self):
"""Check whether the given file_name exists.
:rtype: bool
"""
log.warning('Could not determine whether {0} exists due to '
'unhandled scheme.'.format(self.file_name))

def load(self):
"""Load a file to a stream.
:rtype: io.BytesIO
"""
log.error('Cannot load: {0}'.format(self.file_name))

def load_text(self, encoding='utf8', encoding_errors='ignore'):
"""Load a file to a stream.
:param str encoding: Text encoding.
:param str encoding_errors: How to handle encoding errors.
:rtype: io.StringIO
"""
log.error('Cannot load: {0}'.format(self.file_name))

def dump(self, stream):
"""Dump a stream to a file.
:param io.BytesIO stream: Input tream.
"""
log.error('Cannot dump: {0}'.format(self.file_name))

def make_public(self, recursive=False):
"""Make the file public (only on some file systems).
:param bool recursive: Recurse.
:rtype: FileSystem
"""
log.warning('Cannot make {0} public.'.format(self.file_name))
13 changes: 12 additions & 1 deletion pysparkling/fileio/fs/gs.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,18 @@


class GS(FileSystem):
""":class:`.FileSystem` implementation for Google Storage.
Paths are of the form `gs://bucket_name/file_path` or
`gs://project_name:bucket_name/file_path`.
"""

#: Set a default project name.
project_name = None

#: Default mime type.
mime_type = 'text/plain'

_clients = {}

def __init__(self, file_name):
Expand Down Expand Up @@ -101,7 +112,7 @@ def load_text(self, encoding='utf8', encoding_errors='ignore'):

def dump(self, stream):
log.debug('Dumping to {0}.'.format(self.blob.name))
self.blob.upload_from_string(stream.read())
self.blob.upload_from_string(stream.read(), mime_type=self.mime_type)
return self

def make_public(self, recursive=False):
Expand Down
2 changes: 2 additions & 0 deletions pysparkling/fileio/fs/hdfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@


class Hdfs(FileSystem):
""":class:`.FileSystem` implementation for HDFS."""

_conn = {}

def __init__(self, file_name):
Expand Down
2 changes: 2 additions & 0 deletions pysparkling/fileio/fs/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@


class Http(FileSystem):
""":class:`.FileSystem` implementation for HTTP."""

def __init__(self, file_name):
if requests is None:
raise FileSystemNotSupported(
Expand Down
2 changes: 2 additions & 0 deletions pysparkling/fileio/fs/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@


class Local(FileSystem):
""":class:`.FileSystem` implementation for the local file system."""

def __init__(self, file_name):
super(Local, self).__init__(file_name)

Expand Down
31 changes: 21 additions & 10 deletions pysparkling/fileio/fs/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,17 @@


class S3(FileSystem):
""":class:`.FileSystem` implementation for S3.
Use environment variables ``AWS_SECRET_ACCESS_KEY`` and
``AWS_ACCESS_KEY_ID`` for auth and use file paths of the form
``s3://bucket_name/filename.txt``.
"""

#: Keyword arguments for new connections.
#: Example: set to `{'anon': True}` for anonymous connections.
connection_kwargs = {}

_conn = None

def __init__(self, file_name):
Expand All @@ -30,28 +41,28 @@ def __init__(self, file_name):
t.next('://') # skip scheme
bucket_name = t.next('/')
key_name = t.next()
conn = S3._get_conn()
conn = self._get_conn()
bucket = conn.get_bucket(bucket_name, validate=False)
self.key = bucket.get_key(key_name)
if not self.key:
self.key = bucket.new_key(key_name)

@staticmethod
def _get_conn():
if not S3._conn:
S3._conn = boto.connect_s3()
return S3._conn
@classmethod
def _get_conn(cls):
if not cls._conn:
cls._conn = boto.connect_s3(**cls.connection_kwargs)
return cls._conn

@staticmethod
def resolve_filenames(expr):
@classmethod
def resolve_filenames(cls, expr):
files = []

t = Tokenizer(expr)
scheme = t.next('://')
bucket_name = t.next('/')
prefix = t.next(['*', '?'])

bucket = S3._get_conn().get_bucket(
bucket = cls._get_conn().get_bucket(
bucket_name,
validate=False
)
Expand All @@ -70,7 +81,7 @@ def exists(self):
t.next('//') # skip scheme
bucket_name = t.next('/')
key_name = t.next()
conn = S3._get_conn()
conn = self._get_conn()
bucket = conn.get_bucket(bucket_name, validate=False)
return (bucket.get_key(key_name) or
bucket.list(prefix='{}/'.format(key_name)))
Expand Down
10 changes: 5 additions & 5 deletions pysparkling/fileio/textfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from . import codec
from .file import File
from .fs.file_system import FileSystem

log = logging.getLogger(__name__)

Expand All @@ -16,7 +17,7 @@


class TextFile(File):
"""Derived from :class:`pysparkling.fileio.File`.
"""Derived from :class:`File`.
:param file_name: Any text file name.
"""
Expand All @@ -27,13 +28,12 @@ def __init__(self, file_name):
def load(self, encoding='utf8', encoding_errors='ignore'):
"""Load the data from a file.
:param encoding: (optional)
The character encoding of the file.
:param str encoding: The character encoding of the file.
:param str encoding_errors: How to handle encoding errors.
:rtype: io.StringIO
"""
if type(self.codec) == codec.Codec and \
hasattr(self.fs, 'load_text'):
self.fs.load_text != FileSystem.load_text:
stream = self.fs.load_text(encoding, encoding_errors)
else:
stream = self.fs.load()
Expand Down

0 comments on commit 7559711

Please sign in to comment.