Skip to content

Commit 30abee4

Browse files
committed
refactors omegajobs, made jobs runnable on cluster via runtime
1 parent 6655f54 commit 30abee4

File tree

9 files changed

+263
-279
lines changed

9 files changed

+263
-279
lines changed

omegajobs/tasks.py

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
"""
2+
omega runtime job tasks
3+
"""
4+
from __future__ import absolute_import
5+
6+
from celery import Task
7+
from celery import shared_task
8+
from mongoengine.errors import DoesNotExist
9+
10+
from omegaml.documents import Metadata
11+
from omegaml.runtime.auth import get_omega_for_task
12+
from omegaml.tasks import OmegamlTask
13+
14+
15+
class NotebookTask(Task):
16+
abstract = True
17+
18+
def on_success(self, retval, task_id, *args, **kwargs):
19+
om = self.om
20+
args, kwargs = args[0:2]
21+
nbfile = args[0]
22+
meta = om.jobs.metadata(nbfile)
23+
attrs = meta.attributes
24+
attrs['state'] = 'SUCCESS'
25+
attrs['task_id'] = task_id
26+
meta.kind = Metadata.OMEGAML_JOBS
27+
28+
if not kwargs:
29+
pass
30+
else:
31+
attrs['last_run_time'] = kwargs.get('run_at')
32+
attrs['next_run_time'] = kwargs.get('next_run_time')
33+
34+
meta.attributes = attrs
35+
meta.save()
36+
37+
def on_failure(self, retval, task_id, *args, **kwargs):
38+
om = self.om
39+
args, kwargs = args[0:2]
40+
nbfile = args[0]
41+
meta = om.jobs.metadata(nbfile)
42+
attrs = meta.attributes
43+
attrs['state'] = 'FAILURE'
44+
attrs['task_id'] = task_id
45+
meta.kind = Metadata.OMEGAML_JOBS
46+
47+
if not kwargs:
48+
pass
49+
else:
50+
attrs['last_run_time'] = kwargs.get('run_at')
51+
attrs['next_run_time'] = kwargs.get('next_run_time')
52+
53+
meta.attributes = attrs
54+
meta.save()
55+
56+
57+
@shared_task(bind=True, base=NotebookTask)
58+
def run_omegaml_job(self, nb_file, **kwargs):
59+
"""
60+
runs omegaml job
61+
"""
62+
self.om = om = get_omega_for_task(auth=kwargs.pop('auth', None))
63+
result = om.jobs.run_notebook(nb_file)
64+
return result
65+
66+
67+
@shared_task(base=NotebookTask)
68+
def schedule_omegaml_job(nb_file, **kwargs):
69+
"""
70+
schedules the running of omegaml job
71+
"""
72+
om = get_omega_for_task(auth=kwargs.pop('auth', None))
73+
result = om.jobs.schedule(nb_file)
74+
return result
75+
76+
77+
@shared_task(base=OmegamlTask)
78+
def execute_scripts(**kwargs):
79+
"""
80+
81+
will retrieve all scripts from the mongodb
82+
(as per a respective OMEGAML_SCRIPTS_GRIDFS setting),
83+
provided they are marked for execution at the time of execution
84+
"""
85+
om = get_omega_for_task(auth=kwargs.pop('auth', None))
86+
# Search tasks from mongo
87+
job_list = om.jobs.list()
88+
for nb_file in job_list:
89+
try:
90+
metadata = Metadata.objects.get(
91+
name=nb_file, kind=Metadata.OMEGAML_RUNNING_JOBS)
92+
task_state = metadata.attributes.get('state')
93+
if task_state == "RECEIVED":
94+
pass
95+
else:
96+
om.jobs.schedule(nb_file)
97+
except DoesNotExist:
98+
om.jobs.schedule(nb_file)

omegaml/celeryapp.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,5 @@
1515

1616
app = Celery('omegaml')
1717
app.config_from_object(defaults.OMEGA_CELERY_CONFIG)
18-
app.autodiscover_tasks(['omegaml.tasks'], related_name='tasks')
18+
app.autodiscover_tasks(['omegaml.tasks',
19+
'omegajobs.tasks'], related_name='tasks')

omegaml/jobs.py

Lines changed: 57 additions & 138 deletions
Original file line numberDiff line numberDiff line change
@@ -2,24 +2,21 @@
22
from __future__ import absolute_import
33

44
import datetime
5-
import os
65
import re
76
from uuid import uuid4
87

98
from croniter import croniter
10-
from django.conf import settings
119
import gridfs
1210
from mongoengine.fields import GridFSProxy
11+
from nbconvert.preprocessors.execute import ExecutePreprocessor
1312
from nbformat import read as nbread, write as nbwrite
14-
from nbformat.v4.nbbase import nbformat
15-
from runipy.notebook_runner import NotebookRunner
1613
from six import StringIO, BytesIO
1714
import yaml
1815

16+
from omegajobs.tasks import run_omegaml_job
1917
from omegaml import signals
2018
from omegaml.documents import Metadata
2119
from omegaml.store import OmegaStore
22-
from omegaml.tasks import run_omegaml_job
2320
from omegaml.util import settings as omega_settings
2421

2522

@@ -47,29 +44,42 @@ def _fs(self):
4744
return self.store.fs
4845

4946
def collection(self, name):
47+
if not name.endswith('.ipynb'):
48+
name += '.ipynb'
5049
return self.store.collection(name)
5150

5251
def drop(self, name):
52+
if not name.endswith('.ipynb'):
53+
name += '.ipynb'
5354
return self.store.drop(name)
5455

5556
def metadata(self, name):
57+
if not name.endswith('.ipynb'):
58+
name += '.ipynb'
5659
return self.store.metadata(name)
5760

61+
def exists(self, name):
62+
if not name.endswith('.ipynb'):
63+
name += '.ipynb'
64+
return len(self.store.list(name)) > 0
65+
5866
def put(self, obj, name, attributes=None):
5967
"""
6068
Store a NotebookNode
6169
6270
:param obj: the NotebookNode to store
6371
:param name: the name of the notebook
6472
"""
73+
if not name.endswith('.ipynb'):
74+
name += '.ipynb'
6575
sbuf = StringIO()
6676
bbuf = BytesIO()
6777
# nbwrite expects string, fs.put expects bytes
6878
nbwrite(obj, sbuf, version=4)
6979
sbuf.seek(0)
7080
bbuf.write(sbuf.getvalue().encode('utf8'))
7181
bbuf.seek(0)
72-
# see if we have a file alredy, if so replace the gridfile
82+
# see if we have a file already, if so replace the gridfile
7383
meta = self.store.metadata(name)
7484
if not meta:
7585
filename = uuid4().hex
@@ -88,6 +98,8 @@ def get(self, name):
8898
"""
8999
Retrieve a notebook and return a NotebookNode
90100
"""
101+
if not name.endswith('.ipynb'):
102+
name += '.ipynb'
91103
meta = self.store.metadata(name)
92104
if meta:
93105
try:
@@ -127,48 +139,12 @@ def list(self, jobfilter='.*', raw=False):
127139
job_list = self.store.list(regexp=jobfilter, raw=raw)
128140
return job_list
129141

130-
def run(self, nb_file):
131-
"""
132-
run the notebook on the runtime cluster
133-
"""
134-
from omegaml.tasks import run_omegaml_job
135-
result = run_omegaml_job.delay(nb_file)
136-
signals.job_run.send(sender=None, name=nb_file)
137-
return result.get()
138-
139-
def open_notebook(self, nb_filename):
140-
"""
141-
Reads and returns a notebook
142-
"""
143-
try:
144-
# for version 3
145-
notebook = nbread(open(nb_filename), as_version=3)
146-
except Exception:
147-
# for version 4
148-
notebook = nbread(open(nb_filename), as_version=4)
149-
except Exception:
150-
raise ValueError(
151-
"Notebook {0} do not match any applicable versions!".format(
152-
nb_filename))
153-
return notebook
154-
155142
def get_notebook_config(self, nb_filename):
156143
"""
157144
returns the omegaml script config on
158145
the notebook's first cell
159146
"""
160-
gfs = self.get_fs()
161-
try:
162-
# nb_filename = 'job_'+nb_file+'.ipynb'
163-
outf = gfs.get_last_version(nb_filename)
164-
with open(nb_filename, 'wb') as nb_file:
165-
nb_file.write(outf.read())
166-
except gridfs.errors.NoFile:
167-
raise gridfs.errors.NoFile(
168-
"Notebook {0} does not exist in collection '{1}'".format(
169-
nb_filename, self.defaults.OMEGA_NOTEBOOK_COLLECTION))
170-
171-
notebook = self.open_notebook(nb_filename)
147+
notebook = self.get(nb_filename)
172148
config_cell = notebook.get('worksheets')[0].get('cells')[0]
173149
yaml_conf = '\n'.join(
174150
[re.sub('#', '', x, 1) for x in str(
@@ -188,78 +164,49 @@ def get_notebook_config(self, nb_filename):
188164

189165
return yaml_conf.get("omegaml.script")
190166

191-
def run_notebook(self, nb_filename):
167+
def run(self, name):
168+
"""
169+
Run a job immediately
170+
171+
The job is run and the results are stored in the given filename
172+
173+
:param name: the name of the jobfile
174+
:return: the metadata of the job
192175
"""
193-
run the job immediately.
176+
return self.run_notebook(name)
177+
178+
def run_notebook(self, name):
179+
"""
180+
run a given notebook immediately.
194181
the job parameter is the name of the job script as in ipynb.
195182
Inserts and returns the Metadata document for the job.
196183
"""
197-
from pycloudfs import S3Helper
198-
gfs = self.get_fs()
199-
# FIXME get the notebook from mongo store without storing locally
200-
config = self.get_notebook_config(nb_filename)
201-
# nb_filename = 'job_'+nb_file+'.ipynb'
202-
# FIXME this only works because get_notebook_config stored the file
203-
# locally
204-
notebook = self.open_notebook(nb_filename)
205-
r = NotebookRunner(notebook)
206-
r.run_notebook(skip_exceptions=True)
207-
filename, ext = os.path.splitext(nb_filename)
184+
notebook = self.get(name)
185+
meta_job = self.metadata(name)
208186
ts = datetime.datetime.now().strftime('%s')
209-
result_nb = 'result' + filename.lstrip('job') + '_{0}.ipynb'.format(ts)
210-
nbwrite(r.nb, open(result_nb, 'w',), version=3)
211-
# store results
212-
s3file = {}
213-
fileid = None
214-
if config.get('results-store') == 's3':
215-
AWS_ACCESS_KEY_ID = os.environ.get(
216-
'AWS_ACCESS_KEY_ID', getattr(
217-
settings, 'AWS_ACCESS_KEY_ID'))
218-
AWS_SECRET_ACCESS_KEY = os.environ.get(
219-
'AWS_SECRET_ACCESS_KEY', getattr(
220-
settings, 'AWS_SECRET_ACCESS_KEY'))
221-
bucket = os.environ.get('AWS_TEST_BUCKET', 'shrebo')
222-
path = 'ipynb_results'
223-
s3 = S3Helper(
224-
bucket=bucket,
225-
path=path,
226-
aws_access_key_id=AWS_ACCESS_KEY_ID,
227-
aws_secret_access_key=AWS_SECRET_ACCESS_KEY)
228-
s3file = dict(
229-
bucket=bucket,
230-
prefix=path,
231-
name=result_nb)
232-
s3.upload_file(result_nb)
233-
if config.get('results-store') == 'gridfs':
234-
with open(result_nb, 'rb') as fin:
235-
fileid = gfs.put(fin, filename=os.path.basename(result_nb))
236-
os.remove(result_nb) if os.path.isfile(result_nb) else None
237-
# shutdown the ipython kernel
238-
r.shutdown_kernel()
239-
# check if this job was scheduled earlier
187+
# execute
240188
try:
241-
metadata = Metadata.objects.get(
242-
name=nb_filename, kind=Metadata.OMEGAML_RUNNING_JOBS)
243-
metadata.gridfile = GridFSProxy(
244-
grid_id=fileid,
245-
collection_name=self.defaults.OMEGA_NOTEBOOK_COLLECTION)
246-
metadata.attributes['state'] = 'EXECUTED'
247-
metadata.s3file = s3file
248-
metadata.save()
249-
# FIXME return only at function end, same below
250-
return metadata
251-
except Metadata.DoesNotExist:
252-
attrs = {}
253-
attrs['config'] = config
254-
attrs['state'] = 'EXECUTED'
255-
return Metadata(
256-
name=nb_filename,
257-
kind=Metadata.OMEGAML_RUNNING_JOBS,
258-
s3file=s3file,
259-
gridfile=GridFSProxy(
260-
grid_id=fileid,
261-
collection_name=self.defaults.OMEGA_NOTEBOOK_COLLECTION),
262-
attributes=attrs).save()
189+
ep = ExecutePreprocessor()
190+
ep.preprocess(notebook, {'metadata': {'path': '/'}})
191+
except Exception as e:
192+
status = str(e)
193+
else:
194+
status = 'OK'
195+
# record results
196+
meta_results = self.put(
197+
notebook, 'results/{name}_{ts}'.format(**locals()))
198+
meta_results.attributes['source_job'] = name
199+
meta_results.save()
200+
job_results = meta_job.attributes.get('job_results', [])
201+
job_results.append(meta_results.name)
202+
meta_job.attributes['job_results'] = job_results
203+
# record final job status
204+
job_runs = meta_job.attributes.get('job_runs', {})
205+
job_runs[ts] = status
206+
meta_job.attributes['job_runs'] = job_runs
207+
meta_job.save()
208+
209+
return meta_job
263210

264211
def schedule(self, nb_file):
265212
"""
@@ -279,7 +226,7 @@ def schedule(self, nb_file):
279226
iter_next = croniter(interval, now)
280227
run_at = iter_next.get_next(datetime.datetime)
281228
next_run_time = iter_next.get_next(datetime.datetime)
282-
from omegaml.tasks import schedule_omegaml_job
229+
from omegajobs.tasks import schedule_omegaml_job
283230
kwargs = dict(
284231
config=config,
285232
run_at=run_at,
@@ -309,31 +256,3 @@ def get_status(self, job):
309256
"""
310257
returns list of Metadata objects for this job
311258
"""
312-
# FIXME this should use the store.metadata
313-
return Metadata.objects.filter(name=job, kind__in=Metadata.KINDS)
314-
315-
def get_result(self, job):
316-
"""
317-
returns the result gridfile object for the respective Metadata
318-
"""
319-
fs = self.get_fs(self.defaults.OMEGA_NOTEBOOK_COLLECTION)
320-
if isinstance(job, Metadata):
321-
return fs.get(job.gridfile.grid_id)
322-
323-
try:
324-
metadata = Metadata.objects.order_by(
325-
'-created').filter(name=job).first()
326-
if not metadata:
327-
raise Metadata.DoesNotExist
328-
return fs.get(metadata.gridfile.grid_id)
329-
except Metadata.DoesNotExist:
330-
try:
331-
collection = self.get_collection('metadata')
332-
doc = collection.find_one({'attributes.task_id': job})
333-
metadata = Metadata.objects.get(gridfile=doc.get('gridfile'))
334-
if not metadata:
335-
raise Exception
336-
return fs.get(metadata.gridfile.grid_id)
337-
except Exception:
338-
raise Metadata.DoesNotExist(
339-
'No job found related to the name or task id: {0}'.format(job))

omegaml/runtime/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
from .runtime import OmegaRuntime
2-
from .proxy import OmegaModelProxy
2+
from .modelproxy import OmegaModelProxy
3+
from .jobproxy import OmegaJobProxy

0 commit comments

Comments
 (0)