Skip to content

Commit

Permalink
Add encoding argument to Client.read and Client.write.
Browse files Browse the repository at this point in the history
This makes it much easier in python 3 to work with JSON for example.
  • Loading branch information
mtth committed Aug 25, 2015
1 parent 3774184 commit 4720160
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 17 deletions.
4 changes: 2 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ API and command line interface for HDFS.
In [1]: CLIENT.list('models/')
Out[1]: ['1.json', '2.json']
In [2]: with CLIENT.read('models/2.json') as reader:
In [2]: with CLIENT.read('models/2.json', encoding='utf-8') as reader:
...: from json import load
...: model = load(reader)
...: model['normalize'] = False
...:
In [3]: with CLIENT.write('models/2.json', overwrite=True) as writer:
In [3]: with CLIENT.write('models/2.json', encoding='utf-8', overwrite=True) as writer:
...: from json import dump
...: dump(model, writer)
...:
Expand Down
5 changes: 2 additions & 3 deletions examples/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,12 @@

# Serialize to JSON and upload to HDFS.
data = dumps(weights)
client.write(path, data=data, overwrite=True)
client.write(path, data=data, encoding='utf-8', overwrite=True)

# The file's HDFS status, we can use it to verify that all the data is there.
status = client.status(path)
assert status['length'] == len(data)

# Download the file back and check that the deserialized contents match.
with client.read(path) as reader:
contents = reader.read().decode('utf-8')
with client.read(path, encoding='utf-8') as reader:
assert loads(contents) == weights
2 changes: 1 addition & 1 deletion hdfs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import logging as lg


__version__ = '2.0.0'
__version__ = '2.0.1'
__license__ = 'MIT'


Expand Down
24 changes: 16 additions & 8 deletions hdfs/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from six import add_metaclass
from six.moves.urllib.parse import quote
from threading import Lock
import codecs
import logging as lg
import os
import os.path as osp
Expand Down Expand Up @@ -325,7 +326,8 @@ def parts(self, hdfs_path, parts=None, status=False):
return infos if status else [name for name, _ in infos]

def write(self, hdfs_path, data=None, overwrite=False, permission=None,
blocksize=None, replication=None, buffersize=None, append=False):
blocksize=None, replication=None, buffersize=None, append=False,
encoding=None):
"""Create a file on HDFS.
:param hdfs_path: Path where to create file. The necessary directories will
Expand All @@ -342,6 +344,7 @@ def write(self, hdfs_path, data=None, overwrite=False, permission=None,
:param replication: Number of replications of the file.
:param buffersize: Size of upload buffer.
:param append: Append to a file rather than create a new one.
:param encoding: Encoding used to serialize data written.
Sample usages:
Expand Down Expand Up @@ -389,7 +392,7 @@ def consumer(_data):
self._request(
method='POST' if append else 'PUT',
url=res.headers['location'],
data=_data,
data=(c.encode(encoding) for c in _data) if encoding else _data,
auth=False,
)

Expand Down Expand Up @@ -541,7 +544,7 @@ def wrap(_reader, _chunk_size, _progress):

@contextmanager
def read(self, hdfs_path, offset=0, length=None, buffer_size=None,
chunk_size=None, progress=None):
chunk_size=None, progress=None, encoding=None):
"""Read a file from HDFS.
:param hdfs_path: HDFS path.
Expand All @@ -558,6 +561,9 @@ def read(self, hdfs_path, offset=0, length=None, buffer_size=None,
will be passed two arguments, the path to the file being uploaded and the
number of bytes transferred so far. On completion, it will be called once
with `-1` as second argument.
:param encoding: Encoding used to decode the request. By default the raw
data is returned. This is mostly helpful in python 3, e.g. since the JSON
module will only deserialize unicode.
This method must be called using a `with` block:
Expand All @@ -580,22 +586,24 @@ def read(self, hdfs_path, offset=0, length=None, buffer_size=None,
)
try:
if chunk_size:
# Patch in encoding so that `iter_content` can pick it up.
res.encoding = encoding
if progress:

def reader(_hdfs_path, _res, _chunk_size, _progress):
def reader(_hdfs_path, _progress):
"""Generator that tracks progress."""
nbytes = 0
for chunk in _res.iter_content(_chunk_size):
for chunk in res.iter_content(chunk_size, decode_unicode=True):
nbytes += len(chunk)
_progress(_hdfs_path, nbytes)
yield chunk
_progress(_hdfs_path, -1)

yield reader(hdfs_path, res, chunk_size, progress)
yield reader(hdfs_path, progress)
else:
yield res.iter_content(chunk_size)
yield res.iter_content(chunk_size, decode_unicode=True)
else:
yield res.raw
yield codecs.getreader(encoding)(res.raw) if encoding else res.raw
finally:
res.close()
_logger.debug('Closed response for reading file %r.', hdfs_path)
Expand Down
43 changes: 42 additions & 1 deletion test/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,14 +200,23 @@ def test_create_file_with_percent(self):
class TestWrite(_IntegrationTest):

def test_create_from_string(self):
self.client.write('up', 'hello, world!')
self.client.write('up', u'hello, world!')
eq_(self._read('up'), b'hello, world!')

def test_create_from_string_with_encoding(self):
self.client.write('up', u'hello, world!')
eq_(self._read('up'), b'hello, world!')

def test_create_from_generator(self):
data = (e for e in [b'hello, ', b'world!'])
self.client.write('up', data)
eq_(self._read('up'), b'hello, world!')

def test_create_from_generator_with_encoding(self):
data = (e for e in [u'hello, ', u'world!'])
self.client.write('up', data, encoding='utf-8')
eq_(self._read('up'), b'hello, world!')

def test_create_from_file_object(self):
with temppath() as tpath:
with open(tpath, 'w') as writer:
Expand Down Expand Up @@ -237,6 +246,19 @@ def test_as_context_manager(self):
writer.write(b'world!')
eq_(self._read('up'), b'hello, world!')

def test_as_context_manager_with_encoding(self):
with self.client.write('up', encoding='utf-8') as writer:
writer.write(u'hello, ')
writer.write(u'world!')
eq_(self._read('up'), b'hello, world!')

def test_dump_json(self):
from json import dump, loads
data = {'one': 1, 'two': 2}
with self.client.write('up', encoding='utf-8') as writer:
dump(data, writer)
eq_(loads(self._read('up', encoding='utf-8')), data)

@raises(HdfsError)
def test_create_and_overwrite_directory(self):
# can't overwrite a directory with a file
Expand Down Expand Up @@ -506,6 +528,25 @@ def cb(path, nbytes, chunk_lengths=[]):
eq_(reader.read(), b'hello, world!')
eq_(cb('', 0), [5, 10, 13, -1, 0])

def test_read_with_encoding(self):
s = u'hello, world!'
self.client.write('foo', s)
with self.client.read('foo', encoding='utf-8') as reader:
eq_(reader.read(), s)

def test_read_with_chunk_size_and_encoding(self):
s = u'hello, world!'
self.client.write('foo', s)
with self.client.read('foo', chunk_size=5, encoding='utf-8') as reader:
eq_(list(reader), [u'hello', u', wor', u'ld!'])

def test_read_json(self):
from json import dumps, load
data = {'one': 1, 'two': 2}
self.client.write('foo', data=dumps(data), encoding='utf-8')
with self.client.read('foo', encoding='utf-8') as reader:
eq_(load(reader), data)


class TestRename(_IntegrationTest):

Expand Down
4 changes: 2 additions & 2 deletions test/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ def setup(self):

# Helpers.

def _read(self, hdfs_path):
with self.client.read(hdfs_path) as reader:
def _read(self, hdfs_path, encoding=None):
with self.client.read(hdfs_path, encoding=encoding) as reader:
return reader.read()

def _exists(self, hdfs_path):
Expand Down

0 comments on commit 4720160

Please sign in to comment.