Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

worker: Log which outputs are missing when task is unexpectedly incomplete #3258

Merged
merged 2 commits into from
Oct 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions luigi/contrib/dropbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,9 @@
self.client = DropboxClient(token, user_agent)
self.format = format or luigi.format.get_default_format()

def __str__(self):
return self.path

Check warning on line 302 in luigi/contrib/dropbox.py

View check run for this annotation

Codecov / codecov/patch

luigi/contrib/dropbox.py#L301-L302

Added lines #L301 - L302 were not covered by tests

@property
def fs(self):
return self.client
Expand Down
3 changes: 3 additions & 0 deletions luigi/contrib/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,9 @@
self.client = client or get_default_client()
self.fail_missing_table = fail_missing_table

def __str__(self):
return self.path

Check warning on line 489 in luigi/contrib/hive.py

View check run for this annotation

Codecov / codecov/patch

luigi/contrib/hive.py#L489

Added line #L489 was not covered by tests

def exists(self):
"""
returns `True` if the partition/table exists
Expand Down
3 changes: 3 additions & 0 deletions luigi/contrib/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
self._index = index
self._collection = collection

def __str__(self):
return f'{self._index}/{self._collection}'

Check warning on line 39 in luigi/contrib/mongodb.py

View check run for this annotation

Codecov / codecov/patch

luigi/contrib/mongodb.py#L39

Added line #L39 was not covered by tests

def get_collection(self):
"""
Return targeted mongo collection to query on
Expand Down
3 changes: 3 additions & 0 deletions luigi/contrib/mssqldb.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@
self.table = table
self.update_id = update_id

def __str__(self):
return self.table

Check warning on line 72 in luigi/contrib/mssqldb.py

View check run for this annotation

Codecov / codecov/patch

luigi/contrib/mssqldb.py#L71-L72

Added lines #L71 - L72 were not covered by tests

def touch(self, connection=None):
"""
Mark this update as complete.
Expand Down
3 changes: 3 additions & 0 deletions luigi/contrib/mysqldb.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@
self.update_id = update_id
self.cnx_kwargs = cnx_kwargs

def __str__(self):
return self.table

Check warning on line 72 in luigi/contrib/mysqldb.py

View check run for this annotation

Codecov / codecov/patch

luigi/contrib/mysqldb.py#L72

Added line #L72 was not covered by tests

def touch(self, connection=None):
"""
Mark this update as complete.
Expand Down
3 changes: 3 additions & 0 deletions luigi/contrib/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,9 @@
self.table = table
self.update_id = update_id

def __str__(self):
return self.table

Check warning on line 204 in luigi/contrib/postgres.py

View check run for this annotation

Codecov / codecov/patch

luigi/contrib/postgres.py#L204

Added line #L204 was not covered by tests

def touch(self, connection=None):
"""
Mark this update as complete.
Expand Down
3 changes: 3 additions & 0 deletions luigi/contrib/presto.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@
self._client = client
self._count = None

def __str__(self):
return self.table

Check warning on line 135 in luigi/contrib/presto.py

View check run for this annotation

Codecov / codecov/patch

luigi/contrib/presto.py#L135

Added line #L135 was not covered by tests

@property
def _count_query(self):
partition = OrderedDict(self.partition or {1: 1})
Expand Down
3 changes: 3 additions & 0 deletions luigi/contrib/redis_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@
socket_timeout=self.socket_timeout,
)

def __str__(self):
return self.marker_key()

Check warning on line 77 in luigi/contrib/redis_store.py

View check run for this annotation

Codecov / codecov/patch

luigi/contrib/redis_store.py#L76-L77

Added lines #L76 - L77 were not covered by tests

def marker_key(self):
"""
Generate a key for the indicator hash.
Expand Down
3 changes: 3 additions & 0 deletions luigi/contrib/simulate.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ def __init__(self, task_obj):
shutil.rmtree(path)
logger.debug('Deleted temporary directory %s', path)

def __str__(self):
return self.task_id

def get_path(self):
"""
Returns a temporary file path based on a MD5 hash generated with the task's name and its arguments
Expand Down
3 changes: 3 additions & 0 deletions luigi/contrib/sqla.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@
self.connect_args = connect_args
self.marker_table_bound = None

def __str__(self):
return self.target_table

Check warning on line 193 in luigi/contrib/sqla.py

View check run for this annotation

Codecov / codecov/patch

luigi/contrib/sqla.py#L193

Added line #L193 was not covered by tests

@property
def engine(self):
"""
Expand Down
3 changes: 3 additions & 0 deletions luigi/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,9 @@
# cast to str to allow path to be objects like pathlib.PosixPath and py._path.local.LocalPath
self.path = str(path)

def __str__(self):
return self.path

Check warning on line 219 in luigi/target.py

View check run for this annotation

Codecov / codecov/patch

luigi/target.py#L219

Added line #L219 was not covered by tests

@property
@abc.abstractmethod
def fs(self):
Expand Down
9 changes: 8 additions & 1 deletion luigi/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,14 @@
# checking completeness of self.task so outputs of dependencies are
# irrelevant.
if self.check_unfulfilled_deps and not _is_external(self.task):
missing = [dep.task_id for dep in self.task.deps() if not self.check_complete(dep)]
missing = []
for dep in self.task.deps():
if not self.check_complete(dep):
nonexistent_outputs = [output for output in dep.output() if not output.exists()]
if nonexistent_outputs:
missing.append(f'{dep.task_id} ({", ".join(map(str, nonexistent_outputs))})')

Check warning on line 190 in luigi/worker.py

View check run for this annotation

Codecov / codecov/patch

luigi/worker.py#L190

Added line #L190 was not covered by tests
else:
missing.append(dep.task_id)
if missing:
deps = 'dependency' if len(missing) == 1 else 'dependencies'
raise RuntimeError('Unfulfilled %s at run time: %s' % (deps, ', '.join(missing)))
Expand Down