Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for hadoop fs -appendToFile #910

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
21 changes: 19 additions & 2 deletions luigi/contrib/hdfs/__init__.py
Expand Up @@ -181,6 +181,12 @@ def count(self, path):
def copy(self, path, destination):
call_check(load_hadoop_cmd() + ['fs', '-cp', path, destination])

def append(self, local_path, destination):
"""
Requires Hadoop >= 2.3.0
"""
call_check(load_hadoop_cmd() + ['fs', '-appendToFile', local_path, destination])

def put(self, local_path, destination):
call_check(load_hadoop_cmd() + ['fs', '-put', local_path, destination])

Expand Down Expand Up @@ -628,13 +634,19 @@ def close(self):
super(HdfsAtomicWritePipe, self).close()
rename(self.tmppath, self.path)

class HdfsAppendPipe(luigi.format.OutputPipeProcessWrapper):
def __init__(self, path):
self.path = path
parent_dir = os.path.dirname(self.path)
mkdir(parent_dir, parents=True, raise_if_exists=False)
super(HdfsAtomicWritePipe, self).__init__(load_hadoop_cmd() + ['fs', '-appendToFile', '-', self.path])

class HdfsAtomicWriteDirPipe(luigi.format.OutputPipeProcessWrapper):
"""
Writes a data<data_extension> file to a directory at <path>.
"""

def __init__(self, path, data_extension=""):
def __init__(self, path, data_extension="", append=False):
self.path = path
self.tmppath = tmppath(self.path)
self.datapath = self.tmppath + ("/data%s" % data_extension)
Expand Down Expand Up @@ -706,6 +718,9 @@ def __init__(self, writer, reader, input=None):
def pipe_writer(self, output):
return self.writer(output)

def pipe_appender(self, output):
return HdfsAppendPipe(output)

def pipe_reader(self, input):
return self.reader(input)

Expand Down Expand Up @@ -794,11 +809,13 @@ def glob_exists(self, expected_files):
return False

def open(self, mode='r'):
if mode not in ('r', 'w'):
if mode not in ('r', 'w', 'a'):
raise ValueError("Unsupported open mode '%s'" % mode)

if mode == 'r':
return self.format.pipe_reader(self.path)
elif mode == 'a':
return self.format.pipe_appender(self.path)
else:
return self.format.pipe_writer(self.path)

Expand Down
11 changes: 8 additions & 3 deletions luigi/file.py
Expand Up @@ -30,9 +30,11 @@
import luigi.util
from luigi.format import FileWrapper, get_default_format, MixedUnicodeBytes
from luigi.target import FileSystem, FileSystemTarget, AtomicLocalFile
from luigi.target import AtomicLocalFileAppend


class atomic_file(AtomicLocalFile):

"""Simple class that writes to a temp file and moves it on close()
Also cleans up the temp file if close is not invoked
"""
Expand All @@ -45,6 +47,7 @@ def generate_tmp_path(self, path):


class LocalFileSystem(FileSystem):

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these really pep8 fixes??? I mean, you didn't change this code and the pep8 check (tox pep8) ran fine before. Hmm...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pylint claims they are.

From pep8: https://www.python.org/dev/peps/pep-0008/#blank-lines
"Method definitions inside a class are surrounded by a single blank line."

I guess it's up to interpretation if you should consider the constructor to be a class method or not. Pylint seems to believe that is the case.

Thanks for the code review.

"""
Wrapper for access to file system operations.

Expand Down Expand Up @@ -95,17 +98,19 @@ def makedirs(self):
if parentfolder and not os.path.exists(parentfolder):
os.makedirs(parentfolder)

def open(self, mode='r'):
def open(self, mode='r', timeout=10):
if mode == 'w':
self.makedirs()
return self.format.pipe_writer(atomic_file(self.path))

elif mode == 'a':
self.makedirs()
return self.format.pipe_writer(AtomicLocalFileAppend(self.path, timeout=timeout))
elif mode == 'r':
fileobj = FileWrapper(io.BufferedReader(io.FileIO(self.path, 'r')))
return self.format.pipe_reader(fileobj)

else:
raise Exception('mode must be r/w')
raise Exception('mode must be r, w or a')

def move(self, new_path, raise_if_exists=False):
if raise_if_exists and os.path.exists(new_path):
Expand Down