Skip to content

Commit

Permalink
Use SQLAlchemy's transaction support in gc3libs.persistence.sql.
Browse files Browse the repository at this point in the history
Unlike the "basic" connection support we were using earlier,
SQLAlchemy should allow nesting transactions, which should solve the
issue with connections being closed before we are done using them.
  • Loading branch information
riccardomurri committed Apr 10, 2018
1 parent b62912a commit c8488e5
Showing 1 changed file with 19 additions and 20 deletions.
39 changes: 19 additions & 20 deletions gc3libs/persistence/sql.py
Expand Up @@ -209,9 +209,8 @@ def _delayed_init(self):
Perform initialization tasks that can interfere with
forking/multiprocess setup.
See `GC3Pie issue #550
<https://github.com/uzh/gc3pie/issues/550>`_ for more details
and motivation.
See `GC3Pie issue #550 <https://github.com/uzh/gc3pie/issues/550>`_
for more details and motivation.
"""
self._real_engine = sqla.create_engine(
self._to_sqlalchemy_url(self.url))
Expand Down Expand Up @@ -277,7 +276,7 @@ def extra_fields(self):
@same_docstring_as(Store.list)
def list(self):
q = sql.select([self._tables.c.id])
with closing(self._engine.connect()) as conn:
with self._engine.begin() as conn:
rows = conn.execute(q)
ids = [i[0] for i in rows.fetchall()]
return ids
Expand Down Expand Up @@ -319,41 +318,41 @@ def _save_or_replace(self, id_, obj):
"Error saving DB column '%s' of object '%s': %s: %s",
column, obj, ex.__class__.__name__, str(ex))

with closing(self._engine.connect()) as conn:
q = sql.select([self._tables.c.id]).where(self._tables.c.id == id_)
q = sql.select([self._tables.c.id]).where(self._tables.c.id == id_)
with self._engine.begin() as conn:
r = conn.execute(q)
if not r.fetchone():
# It's an insert
q = self._tables.insert().values(**fields)
conn.execute(q)
else:
# it's an update
q = self._tables.update().where(
self._tables.c.id == id_).values(**fields)
conn.execute(q)
obj.persistent_id = id_
if hasattr(obj, 'changed'):
obj.changed = False
self._tables.c.id == id_).values(**fields)
conn.execute(q)
obj.persistent_id = id_
if hasattr(obj, 'changed'):
obj.changed = False

# return id
return obj.persistent_id

@same_docstring_as(Store.load)
def load(self, id_):
with closing(self._engine.connect()) as conn:
q = sql.select([self._tables.c.data]).where(self._tables.c.id == id_)
q = sql.select([self._tables.c.data]).where(self._tables.c.id == id_)
with self._engine.begin() as conn:
rawdata = conn.execute(q).fetchone()
if not rawdata:
raise gc3libs.exceptions.LoadError(
"Unable to find any object with ID '%s'" % id_)
obj = make_unpickler(self, StringIO(rawdata[0])).load()
if not rawdata:
raise gc3libs.exceptions.LoadError(
"Unable to find any object with ID '%s'" % id_)
obj = make_unpickler(self, StringIO(rawdata[0])).load()
super(SqlStore, self)._update_to_latest_schema()
return obj

@same_docstring_as(Store.remove)
def remove(self, id_):
with closing(self._engine.connect()) as conn:
conn.execute(self._tables.delete().where(self._tables.c.id == id_))
with self._engine.begin() as conn:
conn.execute(
self._tables.delete().where(self._tables.c.id == id_))


# register all URLs that SQLAlchemy can handle
Expand Down

0 comments on commit c8488e5

Please sign in to comment.