Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Additions to provide support for the Load Sharing Facility (LSF) job scheduler #2373

Merged
merged 42 commits into from
Jul 12, 2018
Merged

Conversation

markmcdowall
Copy link
Contributor

@markmcdowall markmcdowall commented Mar 13, 2018

Description

Provides support for the use of the Load Sharing Facility (LSF) job scheduler for the distributed computation of tasks within a high performance cluster environment. The work is based off the SGE wrapper and work that had been done by @alexbw.

Motivation and Context

Needed this so that we could use the Luigi infrastructure on our local LSF cluster

#1936

Have you tested this? If so, how?

I have run this code as part of a local pipeline and everything ran smoothly. There is also a small unittest script

Notes

Updates were based on the work by Alex with changes so that jobs could run within the latest version of Luigi.
Pulled in the latest changes from master
Added new parameters for naming of jobs, listing queues and memory sizes.
Added in a print out to provide a rough timing for how long a bsub job ran for. This is dependent on the polling time.
The LSF wrapper has been moved into the contrib folder
Some code tidying based on pylint output
Added doc sections and copied over the Apache 2.0 license from Spotify. You'll need to add your institute into the list, I have left a gap for this already

@dlstadther
Copy link
Collaborator

@markmcdowall Please address travis errors and add tests were applicable, if possible.

@markmcdowall
Copy link
Contributor Author

Hi @dlstadther,

I am still getting a failure with the docs not building correctly. Would you be able to advise on why it might be failing?

Cheers,

Mark

@dlstadther
Copy link
Collaborator

/home/travis/build/spotify/luigi/luigi/contrib/lsf.py:docstring of luigi.contrib.lsf.LSFJobTask.run:5: ERROR: Unexpected indentation.

You also have some other Py3 errors

@Tarrasch
Copy link
Contributor

@markmcdowall, Can you please update the PR title and description to be more useful for casual reaters? What's LSF? Who should care etc. Thanks. :)

@markmcdowall markmcdowall changed the title Lsf Additions to provide support for the Load Sharing Facility (LSF) job scheduler Mar 14, 2018
@markmcdowall
Copy link
Contributor Author

Hi @dlstadther,

I have addressed, or commented on, each of he issues that you raised and Travis is giving the green light.

Cheers,

Mark

Copy link
Collaborator

@dlstadther dlstadther left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mind getting one other reviewer? Thanks!

@markmcdowall
Copy link
Contributor Author

markmcdowall commented Apr 9, 2018

Hi @dlstadther ,

Thanks for approving the changes. As for the second reviewer would this need to be someone else that has already contributed to the Luigi repo? If so is there a list of people that are happy to be approached?

@markmcdowall
Copy link
Contributor Author

Hi @dlstadther,

Would it be possible to add @lairdm as a reviewer?

Cheers,

Mark

@dlstadther
Copy link
Collaborator

@lairdm, could you oblige @markmcdowall 's review request? Thanks!

- "EXIT"
based on the LSF documentation
"""
cmd = "bjobs -noheader -o stat %d" % job_id
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should you use new style .format()? Old style hasn't been deprecated yet, but the threat is still hanging out there.

based on the LSF documentation
"""
cmd = "bjobs -noheader -o stat %d" % job_id
track_job_proc = subprocess.Popen(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to put this in a try block, or is it alright if things die loudly if the sub-process can't be made?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really as blank will get returned and handled later in the script

"""
Kill a running LSF job
"""
subprocess.call(['bkill', job_id])
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Luigi support returning if a kill was successful or not?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only response would be from LSF saying that it is terminating the job with the given id. There is nothing from LSF to say if it was successful or not.

Read in the output file
"""
# Read in the output file
with open(os.path.join(self.tmp_dir, "job.out"), "r") as f_out:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpicky, but with errors you check if the file exists, is there a reason the same logic isn't mirrored here?

self.tmp_dir = os.path.join(base_tmp_dir, task_name)
# Max filename length
max_filename_length = os.fstatvfs(0).f_namemax
self.tmp_dir = self.tmp_dir[:max_filename_length]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I follow what you're trying to do. I'm unsure if f_namemax is the max length of a filename component or the entire path. Either way, you're truncating the base_tmp_dir + task_name concatenated path to f__namemax then adding packages.tar a bit later. If f_namemax is the max total path length, wouldn't making it longer be an issue? If it's the max length of a component of the path, shouldn't it be truncated before prepending the base path?

args += ["-W", str(self.runtime_flag)]
if self.job_name_flag:
args += ["-J", str(self.job_name_flag)]
args += ["-o", "/".join(log_output[0:-1]) + "/job.out"]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to drop the last component of the path, why this way rather than rsplit as with the path earlier, then us the OS safe os.path module?

args += self.extra_bsub_args.split()

# Find where the runner file is
runner_path = lsf_runner.__file__
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure that's what you want, depending on the configuration of sys.path that could be a relative or absolute path:
https://stackoverflow.com/a/7116925/3076561


def _track_job(self):
time0 = 0
while True:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it ok if we get stuck in this call forever if the job never releases or is suspended?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should only happen if jobs are hanging while in the queue to get onto LSF. If a job has been held back this could be due to the LSF manager prioritising other jobs over yours or you could have held your jobs from completing to clear a backlog of other jobs that are waiting.

@markmcdowall
Copy link
Contributor Author

Hi @dlstadther ,

Sorry for the delay. I have gone through the comments made by @lairdm about the code and made relevant changes and also run a few pipelines to check that the changes did not break anything.

Are you happy with the changes that have been made to the code?

Cheers,

Mark

@@ -236,8 +235,8 @@ def _run_job(self):
args += ["-W", str(self.runtime_flag)]
if self.job_name_flag:
args += ["-J", str(self.job_name_flag)]
args += ["-o", os.path.join(log_output[0:-1], "job.out")]
args += ["-e", os.path.join(log_output[0:-1], "job.err")]
args += ["-o", os.path.join("/".join(log_output[0:-1]), "job.out")]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, why are you flipping back to "/".join()? That's not OS safe. Unless I'm missing something.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've modified the code to use the os.path.split and os.path.join functions.

The reason for using the "/".join() is that the os.path.join could only take in strings. Using the os.path.split should get around this issue.

@lairdm
Copy link

lairdm commented Jul 5, 2018

Other than the "/".join() question, I'm happy. Thanks.

Copy link
Collaborator

@dlstadther dlstadther left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't impact core, so I'm good with this if @lairdm is (which appears to be the case)

@markmcdowall
Copy link
Contributor Author

Hi @dlstadther ,

Thank you for accepting the changes. Do I need to provide any further information or is it now over to you guys?

Cheers,

Mark

@dlstadther dlstadther merged commit 1192003 into spotify:master Jul 12, 2018
thisiscab pushed a commit to glossier/luigi that referenced this pull request Aug 3, 2018
…scheduler (spotify#2373)

* Beginning work on LSF batching

* Code is all written down right from my head. Now need to test and push and break in earnest

* Flags for output and error files

* More thoughts on a daemon process

* First (as far as I can tell) working LSF submission system

* Reporting unknown status. Also, everything's nice and tested now on chained jobs, and multiple workers

* Now saving directories as parameters doesn't act funny

* Added a local LSF task, for debugging.

* Can now pass extra args to the batch submission

* An inheritance decorator

* Method and practice for instantiating classes that are inherited from

* Removed log

* More tests, and ability to nullify parameters

* Added better naming to common_params, and an assertion and test to boot

* Moved the LSF modules to match the use of the contrib folder. Added some extra bsub params, timing of jobs and some clean up based on pylint

* Removed extra newline

* Improvements based on flake8 feedback

* Import issues with dill and improvements to style

* Fixed error with indentation of docs and error handling

* Unit test for the LSF code

* Spelling correction

* Fixes to get the tests to work

* Added to the unit tests

* Removed unused imports

* Removed unused imports

* Tidied up the track_job function to remove the requirement on awk

* Changes based on the feedback from @dlstadther

* Removed print statement

* Fix for the default value for the job_name_flag

* Tests failed using the OptionalParameter, reverted back to Parameter with a default

* Better handling of testing if jobs failed

* Fixes based on feedback from @lairdm

* Modified the shortening of the file names to the affect just the name and not the whole path

* Testing the previsou shorter naming

* Shortened the naming as was not the issue. Changed the creation of the job output file paths

* Fix to fully use the os.path manipulation for dir location
thisiscab pushed a commit to glossier/luigi that referenced this pull request Aug 8, 2018
…scheduler (spotify#2373)

* Beginning work on LSF batching

* Code is all written down right from my head. Now need to test and push and break in earnest

* Flags for output and error files

* More thoughts on a daemon process

* First (as far as I can tell) working LSF submission system

* Reporting unknown status. Also, everything's nice and tested now on chained jobs, and multiple workers

* Now saving directories as parameters doesn't act funny

* Added a local LSF task, for debugging.

* Can now pass extra args to the batch submission

* An inheritance decorator

* Method and practice for instantiating classes that are inherited from

* Removed log

* More tests, and ability to nullify parameters

* Added better naming to common_params, and an assertion and test to boot

* Moved the LSF modules to match the use of the contrib folder. Added some extra bsub params, timing of jobs and some clean up based on pylint

* Removed extra newline

* Improvements based on flake8 feedback

* Import issues with dill and improvements to style

* Fixed error with indentation of docs and error handling

* Unit test for the LSF code

* Spelling correction

* Fixes to get the tests to work

* Added to the unit tests

* Removed unused imports

* Removed unused imports

* Tidied up the track_job function to remove the requirement on awk

* Changes based on the feedback from @dlstadther

* Removed print statement

* Fix for the default value for the job_name_flag

* Tests failed using the OptionalParameter, reverted back to Parameter with a default

* Better handling of testing if jobs failed

* Fixes based on feedback from @lairdm

* Modified the shortening of the file names to the affect just the name and not the whole path

* Testing the previsou shorter naming

* Shortened the naming as was not the issue. Changed the creation of the job output file paths

* Fix to fully use the os.path manipulation for dir location
dlstadther added a commit to dlstadther/luigi that referenced this pull request Aug 14, 2018
* upstream-master: (82 commits)
  S3 client refactor (spotify#2482)
  Rename to rpc_log_retries, and make it apply to all the logging involved
  Factor log_exceptions into a configuration parameter
  Fix attribute forwarding for tasks with dynamic dependencies (spotify#2478)
  Add a visiblity level for luigi.Parameters (spotify#2278)
  Add support for multiple requires and inherits arguments (spotify#2475)
  Add metadata columns to the RDBMS contrib (spotify#2440)
  Fix race condition in luigi.lock.acquire_for (spotify#2357) (spotify#2477)
  tests: Use RunOnceTask where possible (spotify#2476)
  Optional TOML configs support (spotify#2457)
  Added default port behaviour for Redshift (spotify#2474)
  Add codeowners file with default and specific example (spotify#2465)
  Add Data Revenue to the `blogged` list (spotify#2472)
  Fix Scheduler.add_task to overwrite accepts_messages attribute. (spotify#2469)
  Use task_id comparison in Task.__eq__. (spotify#2462)
  Add stale config
  Move github templates to .github dir
  Fix transfer config import (spotify#2458)
  Additions to provide support for the Load Sharing Facility (LSF) job scheduler (spotify#2373)
  Version 2.7.6
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants