Skip to content

Commit

Permalink
Allow using a DB URL as session name.
Browse files Browse the repository at this point in the history
In fact, any URL pointing to a GC3Pie `Store` will do.
Since, in this case, the session metadata is missing
(as it's normally only stored in the session directory)
then we reconstruct it or just guess.

This allows one to use GC3Utils for inspecting DB-based task
stores; e.g.::

    gstat --brief --session postgresql://user:passwd@db.example.org/
  • Loading branch information
riccardomurri committed Jan 5, 2018
1 parent 3e647b9 commit 0288ffa
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 56 deletions.
10 changes: 9 additions & 1 deletion docs/glossary.rst
Expand Up @@ -88,8 +88,16 @@ Glossary
to the command to tag a message as "standard output" or "standard error".

Session

A :term:`persistent` collection of GC3Pie tasks and jobs. Sessions
are used by `GC3Apps`:ref: to store job status across program runs.
are used by `GC3Apps`:ref: to store job status across program
runs. A session is specified by giving the filesystem path to a
*session directory*: the directory contains some files with
meta-data about the tasks that comprise the session. It is also
possible to *simulate* a session by specifying a *task store URL*
(path to a filesystem directory where the jobs are stored, or
connection URL to a database); in this case the session meta-data
will be reconstructed from the set of tasks in the store.

Walltime
Short for *wall-clock time*: indicates the total running time of a
Expand Down
131 changes: 105 additions & 26 deletions gc3libs/session.py
Expand Up @@ -21,11 +21,13 @@


# stdlib imports
import atexit
import csv
import itertools
import os
import sys
import shutil
import tempfile

# GC3Pie imports
import gc3libs
Expand All @@ -37,7 +39,6 @@


class Session(list):

"""
A 'session' is a persistent collection of tasks.
Expand Down Expand Up @@ -131,7 +132,8 @@ class Session(list):

DEFAULT_JOBS_DIR = 'jobs'

def __init__(self, path, create=True, store_or_url=None, **extra_args):
def __init__(self, path, create=True, store_or_url=None,
load=True, **extra_args):
"""
First argument `path` is the path to the session directory.
Expand All @@ -158,6 +160,12 @@ def __init__(self, path, create=True, store_or_url=None, **extra_args):
By default `gc3libs.persistence.filesystem.FileSystemStore`:class:
(which see) is used for providing a new session with a store.
Finally, if optional argument `load` is ``False`` then an
already-existing session at `path` will be discarded and a new
one will be created in its place. By default, `load` is
``True``, meaning that data from existing sessions is loaded
into memory.
"""
self.path = os.path.abspath(path)
self.name = os.path.basename(self.path)
Expand All @@ -167,7 +175,7 @@ def __init__(self, path, create=True, store_or_url=None, **extra_args):
self.finished = -1

# load or make session
if os.path.isdir(self.path):
if os.path.isdir(self.path) and load:
# Session already exists?
try:
self._load_session(**extra_args)
Expand All @@ -190,7 +198,7 @@ def __init__(self, path, create=True, store_or_url=None, **extra_args):
self._create_session(store_or_url, **extra_args)
else:
raise gc3libs.exceptions.InvalidArgument(
"Session '%s' not found" % self.path)
"Session directory '%s' not found" % self.path)

def _create_session(self, store_or_url, **extra_args):
if isinstance(store_or_url, gc3libs.persistence.store.Store):
Expand Down Expand Up @@ -256,28 +264,7 @@ def _load_session(self, store=None, **extra_args):
"Unable to recover starting time from existing session:"
" file `%s` is missing." % (start_file))

for task_id in ids:
try:
self.tasks[task_id] = self.store.load(task_id)
except Exception as err:
if gc3libs.error_ignored(
# context:
# - module
'session',
# - class
'Session',
# - method
'load',
# - actual error class
err.__class__.__name__,
# - additional keywords
'persistence',
):
gc3libs.log.warning(
"Ignoring error from loading '%s': %s", task_id, err)
else:
# propagate exception back to caller
raise
self.tasks = self.load_many(ids)

def destroy(self):
"""
Expand Down Expand Up @@ -430,6 +417,38 @@ def load(self, obj_id):
"""
return self.store.load(obj_id)

def load_many(self, obj_ids):
"""
Load objects given their IDs from persistent storage.
Return a dictionary mapping task ID to the actual
retrieved `Task`:class: object.
"""
tasks = {}
for task_id in obj_ids:
try:
tasks[task_id] = self.store.load(task_id)
except Exception as err:
if gc3libs.error_ignored(
# context:
# - module
'session',
# - class
'Session',
# - method
'load',
# - actual error class
err.__class__.__name__,
# - additional keywords
'persistence',
):
gc3libs.log.warning(
"Ignoring error from loading '%s': %s", task_id, err)
else:
# propagate exception back to caller
raise
return tasks

def save(self, obj):
"""
Save an object to the persistent storage and return
Expand Down Expand Up @@ -531,6 +550,66 @@ def set_end_timestamp(self, time=None):
self.finished = self._touch_file(self.TIMESTAMP_FILES['end'], time)


class TemporarySession(Session):
"""
Create a session from a store URL.
In contrast with the regular `Session`:class: object, a
`TemporarySession`:class: does not persist any metadata about the
task collection.
In particular:
- The session index (list of task IDs belonging to the session) is
initialized from the entire list of jobs present in the given
`Store`:class: (unless a list is explicitly passed in the
`task_ids` argument to the constructor). This means that, unlike
plain `Session`:class: objects, two `TemporarySession`:class:
objects cannot share the same store.
- The session directory (``path`` in the `Session`:class:
constructor) is created on a temporary location on the
filesystem and deleted when the :class:`TemporarySession` is
destroyed.
- Timestamps will be set to the time the `TemporarySession`:class:
Python object is created; two `TemporarySession`:class:
instances with the same backing store can have different
creation timestamps, depending on when exactly they were
instanciated.
The `TemporarySession`:class: is only provided as a convenience to
use code that was built on top of a `Session`:class: with a
"naked" `Store`:class:.
"""

def __init__(self, store_or_url, task_ids=None, delete=True, **extra_args):
# make temporary session dir
path = tempfile.mkdtemp(
prefix='gc3pie.TemporarySession.',
suffix='.d')
# ensure temp directory is deleted
if delete:
def cleanup():
if os.path.exists(path):
shutil.rmtree(path)
atexit.register(cleanup)
# init `Session` class
super(TemporarySession, self).__init__(
path, True, store_or_url, False, **extra_args)
# populate index
if task_ids is None:
try:
task_ids = self.store.list()
except NotImplementedError:
raise RuntimeError(
"Cannot create temporary session:"
" Task store `{0}` does not support listing all task IDs.")
self.tasks = self.load_many(task_ids)
self._save_index_file()
# use url as the session name
self.name = str(self.store.url)


# main: run tests

if "__main__" == __name__:
Expand Down
62 changes: 33 additions & 29 deletions gc3utils/commands.py
@@ -1,7 +1,7 @@
#!/usr/bin/env python
#
"""
Implementation of the `core` command-line front-ends.
Implementation of the command-line front-ends.
"""
# Copyright (C) 2009-2018 University of Zurich. All rights reserved.
#
Expand Down Expand Up @@ -45,10 +45,11 @@
# local modules
from gc3libs import __version__, Run
from gc3libs.quantity import Duration, Memory
from gc3libs.session import Session
from gc3libs.session import Session, TemporarySession
import gc3libs.cmdline
import gc3libs.exceptions
import gc3libs.persistence
from gc3libs.url import Url
import gc3libs.utils as utils


Expand Down Expand Up @@ -171,18 +172,31 @@ def _get_tasks(self, task_ids, ignore_failures=True):
else:
raise

def _get_session(self, name):
def _get_session(self, url):
"""
Return a `gc3libs.session.Session` object corresponding to the
session identified by `name`.
session identified by `url`.
:raise gc3libs.exceptions.InvalidArgument:
If the session cannot be loaded (e.g., does not exist).
"""
try:
return Session(name, create=False)
url = Url(url)
if url.scheme == 'file':
return Session(url.path, create=False)
else:
return TemporarySession(url)
except gc3libs.exceptions.InvalidArgument as err:
raise RuntimeError('Session {0} not found: {1}'.format(name, err))
raise RuntimeError(
"Cannot load session `{0}`: {1}".format(url, err))

def _list_all_tasks(self):
try:
return self.session.store.list()
except NotImplementedError:
raise NotImplementedError(
"Task storage module does not allow listing all tasks."
" Please specify the task IDs you wish to operate on.")


# ====== Main ========
Expand Down Expand Up @@ -222,7 +236,7 @@ def main(self):
"Option '-A' conflicts with list of job IDs to remove.")

if self.params.all:
args = [job.persistent_id for job in self.session.iter_workflow()]
self.params.args = self._list_all_tasks()
if len(args) == 0:
self.log.info("No jobs in session: nothing to do.")
else:
Expand Down Expand Up @@ -365,7 +379,6 @@ def main(self):
only_keys = self.params.keys.split(',')
else:
if self.params.verbose < 2:

def names_not_starting_with_underscore(name):
return not name.startswith('_')
only_keys = names_not_starting_with_underscore
Expand All @@ -381,13 +394,7 @@ def names_not_starting_with_underscore(name):

if len(self.params.args) == 0:
# if no arguments, operate on all known jobs
try:
self.params.args = [job.persistent_id
for job in self.session.iter_workflow()]
except NotImplementedError:
raise NotImplementedError(
"Job storage module does not allow listing all jobs."
" Please specify the job IDs you wish to operate on.")
self.params.args = self._list_all_tasks()

if posix.isatty(sys.stdout.fileno()):
# try to screen width
Expand Down Expand Up @@ -615,9 +622,7 @@ def main(self):

if len(self.params.args) == 0:
# if no arguments, operate on all known jobs
# self.params.args = self.session.list_ids()
self.params.args = [job.persistent_id
for job in self.session.iter_workflow()]
self.params.args = self._list_all_tasks()

if len(self.params.args) == 0:
print("No jobs submitted.")
Expand Down Expand Up @@ -668,10 +673,9 @@ def main(self):
app.update_state()
self.session.store.replace(jobid, app)
if states is None or app.execution.in_state(*states):
# XXX: use `... if ... else ...` in Py > 2.4
if hasattr(app, 'jobname'):
try:
jobname = app.jobname
else:
except AttributeError:
jobname = ''

key_values = []
Expand Down Expand Up @@ -816,7 +820,7 @@ def main(self):
" use either '-A' or explicitly list task IDs.")

if self.params.all:
args = [job.persistent_id for job in self.session.iter_workflow()]
args = self._list_all_tasks()
if len(args) == 0:
self.log.info("No jobs in session: nothing to do.")
else:
Expand Down Expand Up @@ -916,7 +920,7 @@ def main(self):
"Option '-A' conflicts with list of job IDs to remove.")

if self.params.all:
args = [job.persistent_id for job in self.session.iter_workflow()]
args = self._list_all_tasks()
if len(args) == 0:
self.log.info("No jobs in session: nothing to do.")
else:
Expand Down Expand Up @@ -1279,14 +1283,14 @@ def delete_session(self):

def list_jobs(self):
"""
Called with subcommand `list`.
Called with subcommand ``list``.
List the content of a session, like `gstat -n -v -s SESSION` does.
Unlike `gstat`, though, display stops at the top-level jobs
unless option `--recursive` is also given.
List the content of a session, like ``gstat -n -v -s SESSION``
does. Unlike ``gstat``, though, display stops at the top-level
jobs unless option `--recursive` is also given.
With option `--recursive`, indent job ids to show the tree-like
organization of jobs in the task collections.
With option ``--recursive``, indent job ids to show the
tree-like organization of jobs in the task collections.
"""
self.session = self._get_session(self.params.session)

Expand Down

0 comments on commit 0288ffa

Please sign in to comment.