Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

added lost ExTENCI code

  • Loading branch information...
commit 47ba3f9138f567df2c6d0f2f36ba139b1245e8e0 1 parent 9114297
Ole Weidner authored
View
86 bigjob_osg/README.md
@@ -0,0 +1,86 @@
+BigJob implementation for OSG
+=============================
+
+This directory contains an implementation of the BigJob and Pilot APIs for use with the Open Science Grid (OSG) Condor pool. bigjob_osg is API compatible with the regular BigJob implementation, so it can be used with existing applications and experiments. However, its implementation and internal behavior is different.
+
+Implementation
+--------------
+
+The <b>regular BigJob</b> encapsulates a resource reservation into a BigJob object which starts a pilot-job agent on a (remote) HPC cluster:
+
+```
+bj = bigjob(COORDINATION_URL)
+ bj.start_pilot_job
+```
+
+The state of the reservation can be queried via: ```bj.get_state()``` and is initially ```Unknown``` and only switches to ```Running```, once the pilot-job agents are scheduled by the HPC queuing system.
+
+<b>bigjob_osg</b> uses the same API calls to instantiate a bigjob pilot-job object, but instead of scheduling a pilot-job agent on an HPC system, it just connects to the local OSG Condor pool. OSG uses <i>glide-in WMS</i> to dynamically create a Condor glide-in (pilot-job) pool for HTC workloads, so it wouldn't be effective to try to overlay this pool with bigjob's own pilot-job agent mechanism. Instead, <b>bigjob_osg</b> internally handles the OSG Condor pool as a bigjob object with an <i>unlimited</i> amount of resources which is available (in ```Running``` state) as soon as it is instantiated.
+
+Consequently, in the <bigjob_osg> implementation, subjobs that are added to a bigjob instance are passed on directly to the local condor pool.
+
+Example
+-------
+
+While <b>bigjob_osg</b> is 100% API compatible with the existing, regular BigJob implementation, it doesn't require a lot of parameters, since it doesn't make use of pilot-job agents or a centralized communication and coordination mechanism. Non-relevant parameters will be silently ignored.
+
+This is an example of how to submit a single <i>BFAST</i> task to the OSG Condor pool via BigJob:
+
+```
+import time
+from bigjob_osg import bigjob, subjob, description, state
+
+def bigjob_osg_example_simple():
+ lrms_url = "condor://localhost"
+
+ print "Start Pilot Job/BigJob at: " + lrms_url
+ bj = bigjob(None)
+
+ ##############################
+ # Start the pilot job
+ bj.start_pilot_job(lrms_url)
+ print "Pilot Job/BigJob URL: " + bj.pilot_url + " State: " + str(bj.get_state())
+
+ ##############################
+ # Define a workload
+ jd = description()
+ jd.set_attribute ("Executable", "/home/oweidner/software/bfast/bin/bfast")
+ jd.set_vector_attribute ("Arguments", ["match", "-r", "reads.10.fastq", "-A", "1", "-f", "hg_2122.fa"])
+ jd.set_attribute ("Output", "my_out.$(Cluster).$(Process)")
+ jd.set_attribute ("Error", "my_err.$(Cluster).$(Process)")
+
+ input_files = ["/home/oweidner/software/bfast/data/small/reference/hg_2122.fa > hg_2122.fa",
+ "/home/oweidner/software/bfast/data/small/reference/hg_2122.fa.cs.10.1.bif > hg_2122.fa.cs.10.1.bif",
+ "/home/oweidner/software/bfast/data/small/reference/hg_2122.fa.cs.9.1.bif > hg_2122.fa.cs.9.1.bif",
+ "/home/oweidner/software/bfast/data/small/reference/hg_2122.fa.cs.8.1.bif > hg_2122.fa.cs.8.1.bif",
+ "/home/oweidner/software/bfast/data/small/reference/hg_2122.fa.cs.7.1.bif > hg_2122.fa.cs.7.1.bif",
+ "/home/oweidner/software/bfast/data/small/reference/hg_2122.fa.cs.6.1.bif > hg_2122.fa.cs.6.1.bif",
+ "/home/oweidner/software/bfast/data/small/reference/hg_2122.fa.cs.5.1.bif > hg_2122.fa.cs.5.1.bif",
+ "/home/oweidner/software/bfast/data/small/reference/hg_2122.fa.cs.4.1.bif > hg_2122.fa.cs.4.1.bif",
+ "/home/oweidner/software/bfast/data/small/reference/hg_2122.fa.cs.3.1.bif > hg_2122.fa.cs.3.1.bif",
+ "/home/oweidner/software/bfast/data/small/reference/hg_2122.fa.cs.2.1.bif > hg_2122.fa.cs.2.1.bif",
+ "/home/oweidner/software/bfast/data/small/reference/hg_2122.fa.cs.1.1.bif > hg_2122.fa.cs.1.1.bif",
+ "/home/oweidner/software/bfast/data/small/reference/hg_2122.fa.cs.brg > hg_2122.fa.cs.brg",
+ "/home/oweidner/software/bfast/data/small/reference/hg_2122.fa.nt.brg > hg_2122.fa.nt.brg",
+ "/home/oweidner/software/bfast/data/small/reads_5K/reads.10.fastq > reads.10.fastq"]
+ jd.set_vector_attribute("FileTransfer", input_files)
+
+ ##############################
+ # Submit the workload to the pilot job
+ sj = subjob()
+ sj.submit_job(bj, jd)
+ while 1:
+ state = str(sj.get_state())
+ print "Subjob state: " + state
+ if(state=="Failed" or state=="Done"):
+ break
+ time.sleep(5)
+
+ ##############################
+ # Stop the pilot job
+ bj.cancel()
+ print "Pilot Job/BigJob URL: " + bj.pilot_url + " State: " + str(bj.get_state())
+
+if __name__ == "__main__":
+ bigjob_osg_example_simple()
+```
View
11 bigjob_osg/__init__.py
@@ -0,0 +1,11 @@
+#!/usr/bin/env python
+
+# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
+
+__author__ = "Ole Christian Weidner"
+__copyright__ = "Copyright 2012, Ole Christian Weidner"
+__license__ = "MIT"
+
+from description import description
+from bigjob_impl import osg_bigjob as bigjob
+from subjob_impl import osg_subjob as subjob
View
58 bigjob_osg/bigjob_impl.py
@@ -0,0 +1,58 @@
+#!/usr/bin/env python
+
+# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
+
+__author__ = "Ole Christian Weidner"
+__copyright__ = "Copyright 2012, Ole Christian Weidner"
+__license__ = "MIT"
+
+import uuid
+import saga
+import state
+
+class osg_bigjob(object):
+
+ __slots__ = ['lrms_url', 'pilot_url', 'state', '_condor_pool']
+
+ def __init__(self, database_host):
+ self.lrms_url = None
+ self.pilot_url = None
+ self.state = state.Unknown
+ self._condor_pool = None
+
+ def start_pilot_job(self,
+ lrms_url,
+ bigjob_agent_executable=None,
+ number_nodes=None,
+ queue=None,
+ project=None,
+ working_directory=None,
+ userproxy=None,
+ walltime=None,
+ processes_per_node=None):
+
+ if lrms_url.startswith("condor://localhost") is not True:
+ raise Exception("bigjob_osg only supports 'condor://localhost' as lrms_url.")
+ else:
+ self.lrms_url = lrms_url
+ self.pilot_url = "%s/%s" % (lrms_url, uuid.uuid4())
+
+ try:
+ self._condor_pool = saga.job.service("condor://localhost")
+ except saga.exception, e:
+ print "Oh noes! A SAGA error: "
+ for err in e.get_all_messages():
+ print err
+ raise Exception("A SAGA error occured.")
+
+ self.state = state.Running
+
+ def get_state(self):
+ return self.state
+
+ def get_state_detail(self):
+ pass
+
+ def cancel(self):
+ self.state = state.Done
+
View
13 bigjob_osg/description.py
@@ -0,0 +1,13 @@
+#!/usr/bin/env python
+
+# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
+
+__author__ = "Ole Christian Weidner"
+__copyright__ = "Copyright 2012, Ole Christian Weidner"
+__license__ = "MIT"
+
+import saga
+
+class description(saga.job.description):
+ pass
+
View
64 bigjob_osg/example/single.py
@@ -0,0 +1,64 @@
+#!/usr/bin/env python
+
+# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
+
+__author__ = "Ole Christian Weidner"
+__copyright__ = "Copyright 2012, Ole Christian Weidner"
+__license__ = "MIT"
+
+import time
+from bigjob_osg import bigjob, subjob, description, state
+
+def bigjob_osg_example_simple():
+ lrms_url = "condor://localhost"
+
+ print "Start Pilot Job/BigJob at: " + lrms_url
+ bj = bigjob(None)
+
+ ##############################
+ # Start the pilot job
+ bj.start_pilot_job(lrms_url)
+ print "Pilot Job/BigJob URL: " + bj.pilot_url + " State: " + str(bj.get_state())
+
+ ##############################
+ # Define a workload
+ jd = description()
+ jd.set_attribute ("Executable", "/home/oweidner/software/bfast/bin/bfast")
+ jd.set_vector_attribute ("Arguments", ["match", "-r", "reads.10.fastq", "-A", "1", "-f", "hg_2122.fa"])
+ jd.set_attribute ("Output", "my_out.$(Cluster).$(Process)")
+ jd.set_attribute ("Error", "my_err.$(Cluster).$(Process)")
+
+ input_files = ["/home/oweidner/software/bfast/data/small/reference/hg_2122.fa > hg_2122.fa",
+ "/home/oweidner/software/bfast/data/small/reference/hg_2122.fa.cs.10.1.bif > hg_2122.fa.cs.10.1.bif",
+ "/home/oweidner/software/bfast/data/small/reference/hg_2122.fa.cs.9.1.bif > hg_2122.fa.cs.9.1.bif",
+ "/home/oweidner/software/bfast/data/small/reference/hg_2122.fa.cs.8.1.bif > hg_2122.fa.cs.8.1.bif",
+ "/home/oweidner/software/bfast/data/small/reference/hg_2122.fa.cs.7.1.bif > hg_2122.fa.cs.7.1.bif",
+ "/home/oweidner/software/bfast/data/small/reference/hg_2122.fa.cs.6.1.bif > hg_2122.fa.cs.6.1.bif",
+ "/home/oweidner/software/bfast/data/small/reference/hg_2122.fa.cs.5.1.bif > hg_2122.fa.cs.5.1.bif",
+ "/home/oweidner/software/bfast/data/small/reference/hg_2122.fa.cs.4.1.bif > hg_2122.fa.cs.4.1.bif",
+ "/home/oweidner/software/bfast/data/small/reference/hg_2122.fa.cs.3.1.bif > hg_2122.fa.cs.3.1.bif",
+ "/home/oweidner/software/bfast/data/small/reference/hg_2122.fa.cs.2.1.bif > hg_2122.fa.cs.2.1.bif",
+ "/home/oweidner/software/bfast/data/small/reference/hg_2122.fa.cs.1.1.bif > hg_2122.fa.cs.1.1.bif",
+ "/home/oweidner/software/bfast/data/small/reference/hg_2122.fa.cs.brg > hg_2122.fa.cs.brg",
+ "/home/oweidner/software/bfast/data/small/reference/hg_2122.fa.nt.brg > hg_2122.fa.nt.brg",
+ "/home/oweidner/software/bfast/data/small/reads_5K/reads.10.fastq > reads.10.fastq"]
+ jd.set_vector_attribute("FileTransfer", input_files)
+
+ ##############################
+ # Submit the workload to the pilot job
+ sj = subjob()
+ sj.submit_job(bj, jd)
+ while 1:
+ state = str(sj.get_state())
+ print "Subjob state: " + state
+ if(state=="Failed" or state=="Done"):
+ break
+ time.sleep(5)
+
+ ##############################
+ # Stop the pilot job
+ bj.cancel()
+ print "Pilot Job/BigJob URL: " + bj.pilot_url + " State: " + str(bj.get_state())
+
+if __name__ == "__main__":
+ bigjob_osg_example_simple()
View
27 bigjob_osg/state.py
@@ -0,0 +1,27 @@
+#!/usr/bin/env python
+
+# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
+
+__author__ = "Ole Christian Weidner"
+__copyright__ = "Copyright 2012, Ole Christian Weidner"
+__license__ = "MIT"
+
+import saga
+
+Running = "Running"
+New = "New"
+Failed = "Failed"
+Done = "Done"
+Unknown = "Unknown"
+
+def saga_to_subjob_state(saga_state):
+ if saga_state == saga.job.Running:
+ return Running
+ elif saga_state == saga.job.Failed:
+ return Failed
+ elif saga_state == saga.job.New:
+ return New
+ elif saga_state == saga.job.Done:
+ return Done
+ else:
+ return Unknown
View
46 bigjob_osg/subjob_impl.py
@@ -0,0 +1,46 @@
+#!/usr/bin/env python
+
+# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
+
+__author__ = "Ole Christian Weidner"
+__copyright__ = "Copyright 2012, Ole Christian Weidner"
+__license__ = "MIT"
+
+import saga
+import state
+
+class osg_subjob(object):
+
+ __slots__ = ['state', 'description', '_condor_job']
+
+ def __init__(self):
+ self.state = state.Unknown
+ self._condor_job = None
+
+ def submit_job(self, bigjob, jd):
+ self.description = jd
+ try:
+ self._condor_job = bigjob._condor_pool.create_job(self.description)
+ self._condor_job.run()
+ except saga.exception, e:
+ print "Oh noes! A SAGA error: "
+ for err in e.get_all_messages():
+ print err
+ raise Exception("A SAGA error occured.")
+
+
+ def get_state(self):
+ try:
+ cstate = self._condor_job.get_state()
+ print cstate
+ self.state = state.saga_to_subjob_state(cstate)
+ return self.state
+ except saga.exception, e:
+ print "Oh noes! A SAGA error: "
+ for err in e.get_all_messages():
+ print err
+ raise Exception("A SAGA error occured.")
+
+ def cancel(self):
+ self._condor_job.cancel()
+
Please sign in to comment.
Something went wrong with that request. Please try again.