Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unfulfilled dependencies at run time #1552

Closed
joeshaw opened this issue Feb 18, 2016 · 10 comments
Closed

Unfulfilled dependencies at run time #1552

joeshaw opened this issue Feb 18, 2016 · 10 comments

Comments

@joeshaw
Copy link
Contributor

joeshaw commented Feb 18, 2016

I have been getting what seem to be spurious "Unfulfilled dependencies at run time" errors with Luigi 2.0.1. They are fairly sporadic, happening every few hours. Subsequent runs tend to work. Here's is a part of a redacted log:

DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 143
INFO: [pid 8] Worker Worker(salt=579567130, workers=1, host=1a575d8355b3, username=luigi, pid=8) running   FirehoseDateHourTask(task=FirehoseEventToRedshift, s3_path=s3://redacted, date_hour=2016-02-18T21)
ERROR: [pid 8] Worker Worker(salt=579567130, workers=1, host=1a575d8355b3, username=luigi, pid=8) failed    FirehoseDateHourTask(task=FirehoseEventToRedshift, s3_path=s3://redacted, date_hour=2016-02-18T21)
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/luigi/worker.py", line 151, in run
    raise RuntimeError('Unfulfilled %s at run time: %s' % (deps, ', '.join(missing)))
RuntimeError: Unfulfilled dependencies at run time: FirehoseEventToRedshift(host=redacted, database=redacted, s3_path=s3://redacted/2016/02/18/21/redacted-1-2016-02-18-21-46-17-18c3966f-3993-441f-96c9-29629657c50d.gz, output_path=s3://redacted), FirehoseEventToRedshift(host=redacted:5439, database=redacted, s3_path=s3://redacted/2016/02/18/21/redacted-1-2016-02-18-21-51-39-3c5811ea-d333-4e94-b9f3-eb13a5d7f8ab.gz, output_path=s3://redacted)
INFO: Sending warning email to ['redacted']
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   FirehoseDateHourTask(task=FirehoseEventToRedshift, s3_path=s3://redacted, date_hour=2016-02-18T21)   has status   FAILED
DEBUG: Checking if FirehoseDateHourTask(task=FirehoseEventToRedshift, s3_path=s3://redacted, date_hour=2016-02-18T21) is complete
DEBUG: Checking if FirehoseEventToRedshift(host=redacted:5439, database=redacted, s3_path=s3://redacted/2016/02/18/21/redacted-1-2016-02-18-21-05-11-57347ce0-fb1a-4390-9eb6-89600825fd72.gz, output_path=s3://redacted) is complete
DEBUG: Checking if FirehoseEventToRedshift(host=redacted:5439, database=redacted, s3_path=s3://redacted/2016/02/18/21/redacted-1-2016-02-18-21-10-24-9ef6d999-f3e6-4de2-b166-ad13c98819cc.gz, output_path=s3://redacted) is complete
INFO: Informed scheduler that task   FirehoseDateHourTask(task=FirehoseEventToRedshift, s3_path=s3://redacted, date_hour=2016-02-18T21)   has status   PENDING

For context, FirehoseDateHourTask is a meta-task that given a TaskParameter and DateHourParameter, walks a list of files in S3 with a date-based prefix and yields tasks. It looks like this:

class FirehoseDateHourTask(luigi.WrapperTask):
    task = luigi.TaskParameter()
    s3_path = luigi.Parameter(default='s3://redacted')
    date_hour = luigi.DateHourParameter()

    def requires(self):
        for file in firehose_files_for_date_hour(self.s3_path, self.date_hour):
            yield self.task(s3_path=file)

What sticks out to me in the log are two things: one, that it marks the task as failed with failed dependencies without even checking them first. It then immediately checks for its dependencies and goes into a pending state.

When I look at the same task run for other date-hours, it doesn't have this problem. It checks first if tasks are finished, then goes into a PENDING state.

It seems like there's a race or some other ordering problem here, but I'm not quite sure how to debug it.

@deuxpi
Copy link

deuxpi commented Feb 18, 2016

I usually have this kind of problem when the dependency task (in your case, that would be FirehoseEventToRedshift) was run successfully, but its complete() method still returns False afterwards. On my side it's always been because of a bug in the task or a problem with custom target code. Perhaps the eventual consistency of S3 is causing trouble?

@joeshaw
Copy link
Contributor Author

joeshaw commented Feb 19, 2016

@deuxpi Thanks for the info. This looks pretty similar to #506. I'm using pretty standard tasks (FirehoseEventToRedshift is a subclass of S3CopyJSONToTable and the S3 task is built on top of S3PathTask and S3Target) so I'll need to debug those further.

@joeshaw
Copy link
Contributor Author

joeshaw commented Feb 19, 2016

Ok, I figured it out. In my FirehoseDateHourTask, the firehose_files_for_date_hour() function returns the current files that match some criteria in S3. However, new files can be added to that while the Luigi pipeline is running. The first time it runs FirehoseDateHourTask.requires (when determining which tasks to run) and the second time it runs (when determining if all the tasks have finished running) the list of tasks may have grown and there really are unfulfilled new tasks.

I solve this by caching the list of files the first time requires is run and referring only to the cache.

Sorry about the noise.

@joeshaw joeshaw closed this as completed Feb 19, 2016
@joeshaw
Copy link
Contributor Author

joeshaw commented Feb 19, 2016

I was under the assumption that Luigi would internally call requires once on my class and cache the resulting list itself internally. (And why not, if requires is often built dynamically?) I hadn't considered that it might be called more than once. That could be something that could be more clearly documented if nothing else.

@joeshaw
Copy link
Contributor Author

joeshaw commented Feb 19, 2016

Ah, I spoke too soon. I hit the issue with code that didn't do the requires thing. I think this is an issue of S3's eventual consistency for listing files. All regions support read-after-write for new writes, so maybe that's a better way to go in the S3 module?

@erikbern
Copy link
Contributor

requires should be a deterministic function of the Task in most cases, with the exclusion maybe for "sink" tasks. It sounds like FirehoseDateHourTask isn't deterministic, which is probably a bad thing and might lead to problems downstream. At Spotify we had all kinds of tricks to determine when a directory was fully copied and when the nightly tasks could begin to run. Occasionally that functionality broke and it lead to all kinds of nasty bugs

@kwilcox
Copy link
Contributor

kwilcox commented May 9, 2017

I was hitting this occasionally with WrapperTasks that depend on the state of external (not under my control) files. The call to requires() in complete() was returning a different task list than the original call to requires(). Commenting here for future Googlers.

My solution:

class CustomWrapperTask(luigi.WrapperTask):
    CACHED_REQUIRES = []

    def cached_requires(self):
        # Only report on the tasks that were originally available on the first call to `requires()`
        # A `requires()` method will need to append required tasks to self.CACHED_REQUIRES
        # before yielding or returning them. This is backwards compatible for WrapperTasks that
        # have not implemented this yet (the `or` below).
        #
        # https://luigi.readthedocs.io/en/stable/api/luigi.task.html#luigi.task.WrapperTask.complete
        return self.CACHED_REQUIRES or self.requires()

    def complete(self):
        return all(r.complete() for r in self.cached_requires())
class MyWrapperTask(CustomWrapperTask):
    ...
    def requires(self):
        for x in range(10):
            req = MyOtherTask(i=x)
            self.CACHED_REQUIRES.append(req)
            yield req

or

class MyWrapperTask(CustomWrapperTask):
    ...
    def requires(self):
        for x in range(10):
            req = MyOtherTask(i=x)
            self.CACHED_REQUIRES.append(req)
        return self.CACHED_REQUIRES

@dbolkunov
Copy link

@kwilcox, thank you for the answer. It seems like CustomWrapperTask should be a bit extended to completely avoid problems with "Unfulfilled dependency". Method deps() should be also added, so CustomWrapperTask will look like following

class CustomWrapperTask(luigi.WrapperTask):
    CACHED_REQUIRES = []

    def cached_requires(self):
        # Only report on the tasks that were originally available on the first call to `requires()`
        # A `requires()` method will need to append required tasks to self.CACHED_REQUIRES
        # before yielding or returning them. This is backwards compatible for WrapperTasks that
        # have not implemented this yet (the `or` below).
        #
        # https://luigi.readthedocs.io/en/stable/api/luigi.task.html#luigi.task.WrapperTask.complete
        return self.CACHED_REQUIRES or self.requires()

    def complete(self):
        return all(r.complete() for r in self.cached_requires())

    def deps(self):
        return self.cached_requires()

It is needed because of this line in luigi TaskProcess.

@stynejohn
Copy link

stynejohn commented Nov 30, 2018

I am also getting same error while running shell scripts in parallel. I tried the solutions mentioned but that doesn't solved the issue

class Task2(ExternalProgramTask):
def requires(self):
return[TaskParallel1(), TaskParallel2()]
def program_args(self):
return ["/simpleScripts/shell6.bash"]
def program_environment(self):
env = os.environ.copy()
return env

class TaskParallel1(ExternalProgramTask):
def program_args(self):
return ["/simpleScripts/shell1.bash"]
def program_environment(self):
env = os.environ.copy()
return env

class TaskParallel2(ExternalProgramTask):
def program_args(self):
return ["/simpleScripts/shell2.bash"]
def program_environment(self):
env = os.environ.copy()
return env

@kwilcox
Copy link
Contributor

kwilcox commented Nov 30, 2018

@stynejohn Your issue is unrelated. Your tasks don't have any output methods so when luigi tries to resolve the completeness of Task2 it raises this error, as it should, since it has dependencies but can not tell if they are done. You should be adding output methods to the parallel tasks if they output anything. If they don't output something you can create a file in the run method that uniquely describes the task run. In your case that make be some combination of env variables? Please have a look at the documentation and luigi code regarding Targets if you have more questions. Hope this helps.

import os
import luigi
from luigi.contrib.external_program import ExternalProgramTask


class Task2(ExternalProgramTask):
    def requires(self):
        return[TaskParallel1(), TaskParallel2()]

    def program_args(self):
        return ["echo", "******hi********"]

    def run(self):
        super().run()
        with open(self.__class__.__name__, 'w') as f:
            f.write('done')

    def output(self):
        return luigi.LocalTarget(self.__class__.__name__)


class TaskParallel1(ExternalProgramTask):
    def program_args(self):
        return ["echo", "one"]

    def run(self):
        super().run()
        with open(self.__class__.__name__, 'w') as f:
            f.write('done')

    def output(self):
        return luigi.LocalTarget(self.__class__.__name__)


class TaskParallel2(ExternalProgramTask):
    def program_args(self):
        return ["echo", "two"]

    def run(self):
        super().run()
        with open(self.__class__.__name__, 'w') as f:
            f.write('done')

    def output(self):
        return luigi.LocalTarget(self.__class__.__name__)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants