Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for hadoop fs -appendToFile #910

Closed
wants to merge 3 commits into from

Conversation

hellais
Copy link

@hellais hellais commented Apr 26, 2015

In this pull request I add support for the Hadoop appendToFile command.

@erikbern
Copy link
Contributor

lgtm

@hellais
Copy link
Author

hellais commented Apr 27, 2015

Actually after proposing this and digging a bit more into the matter I realise that perhaps it's worth discussing if it makes sense to add appending capabilities to luigi.

Tomorrow I will write some thoughts down on the matter, but I think it would be useful to have also for: file.LocalTarget to allow opening files with mode="a".
To do this I think changes to luigi.format are also needed. It seems like it would be possible to modify Format and BaseWrapper classes to support append also.
In this case append I think it should work with locking and concatenation of the previous file to a temporary location (this would allow also support for gzip compression).

@Tarrasch
Copy link
Contributor

Yea, let us know when you've done some more research. Let's not merge this for now. Looking forward to your findings! :)

@hellais
Copy link
Author

hellais commented Apr 27, 2015

@Tarrasch so I have started adding append support to LocalTarget. I haven't yet fully tested this, but let me first explain a bit the rationale behind this.

My use case is that I have a bunch of smallish files (you can think of them like log files, but they are a bit more complex) and I would like to aggregate them all together into a daily view (hadoop does not perform well when processing many small files, but works best with files that are > it's block size which is I believe around 60 MB).

I don't think I have yet fully grasped the execution model of luigi, but I am assuming that tasks will parallelise on tasks returned by requires().
In my case I return in the requires() the list of log files that have no ordering. It may be that two reports from the same day are opened in parallel and therefore there would be a concurrent append operation (consistency for append operations is guaranteed only when the size of the chunk to append is less than some small buffer size and also there I don't think it will work uniformly across OS').
For this reason I have a mutex lock on the file I wish to append to. Before opening the file the lock is acquired, when it is closed the lock is released.

This allows me to consolidate my logs into daily views and ensure consistency on the output file.

Commit: hellais@3e7816b shows this and I will now proceed in testing this with my data.

Let me know if you have any questions or feedback on this PR.

@erikbern
Copy link
Contributor

Looks great but you should also add support for append mode in HdfsTarget or else this PR is a bit inconsistent

@@ -45,6 +47,7 @@ def generate_tmp_path(self, path):


class LocalFileSystem(FileSystem):

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these really pep8 fixes??? I mean, you didn't change this code and the pep8 check (tox pep8) ran fine before. Hmm...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pylint claims they are.

From pep8: https://www.python.org/dev/peps/pep-0008/#blank-lines
"Method definitions inside a class are surrounded by a single blank line."

I guess it's up to interpretation if you should consider the constructor to be a class method or not. Pylint seems to believe that is the case.

Thanks for the code review.

@hellais
Copy link
Author

hellais commented Apr 27, 2015

I am still a bit uncertain if this is in line with the overall design philosophy of this framework. While testing this feature for my use case I realised that I have to use another approach.

The difficulty that I am running into is that it's not possible for me to dynamically generate the output() target from the input(). @erikbern @Tarrasch is that something that is possible? Is there some pattern for doing this?

@Tarrasch
Copy link
Contributor

dynamically generate the output() target from the input()

I'm not sure what this means. I mean, when defining a task, you have the same context for both defining the input() and output().

@erikbern
Copy link
Contributor

Yeah not sure what this means, sorry

@hellais
Copy link
Author

hellais commented Apr 30, 2015

@Tarrasch @erikbern what I mean is that I don't see how it's possible to accomplish something like this:

def list_resources():
    # Returns some tasks or is iterable

def get_output_from_input(input):
    # Returns the output Target given a certain input

def process(input):
    # Transforms the input

class ReportStreams(luigi.Task):
    def requires(self):
        for task in list_resources():
            tasks.append(task)
        return task

    def output(self, input=None):
        return get_output_from_input(input)

    def run(self):
        for input in self.input():
            in_file = input.open("r")
            with self.output(input).open('w') as out_file:
                out_file.write(process(input))
            in_file.close()

@hellais
Copy link
Author

hellais commented May 25, 2015

I created a new pull request for this where I integrate your feedback: #973

@hellais hellais closed this May 25, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants