Skip to content

Commit

Permalink
Merge pull request #263 from vingar/patch-262-postgresql_fix
Browse files Browse the repository at this point in the history
Rewrite query for postgresql Fix #262
  • Loading branch information
bari12 committed Dec 4, 2017
2 parents e578691 + 83f3851 commit 94fc77c
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 64 deletions.
92 changes: 64 additions & 28 deletions lib/rucio/core/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
# Authors:
# - Martin Barisits, <martin.barisits@cern.ch>, 2013-2017
# - Mario Lassnig, <mario.lassnig@cern.ch>, 2013-2014
# - Vincent Garonne, <vincent.garonne@cern.ch>, 2014
# - Vincent Garonne, <vincent.garonne@cern.ch>, 2014-2017
# - Cedric Serfon, <cedric.serfon@cern.ch>, 2014-2017
# - Thomas Beermann, <thomas.beermann@cern.ch>, 2014

Expand Down Expand Up @@ -194,39 +194,75 @@ def get_files_and_replica_locks_of_dataset(scope, name, nowait=False, restrict_r
and as value: [LockObject]
:raises: NoResultFound
"""
# with_hint(models.ReplicaLock, "INDEX(LOCKS LOCKS_PK)", 'oracle').\
query = session.query(models.DataIdentifierAssociation.child_scope,
models.DataIdentifierAssociation.child_name,
models.ReplicaLock).\
with_hint(models.DataIdentifierAssociation, "INDEX_RS_ASC(CONTENTS CONTENTS_PK) NO_INDEX_FFS(CONTENTS CONTENTS_PK)", 'oracle').\
outerjoin(models.ReplicaLock,
and_(models.DataIdentifierAssociation.child_scope == models.ReplicaLock.scope,
models.DataIdentifierAssociation.child_name == models.ReplicaLock.name))\
.filter(models.DataIdentifierAssociation.scope == scope, models.DataIdentifierAssociation.name == name)

if restrict_rses is not None:
rse_clause = []
for rse_id in restrict_rses:
rse_clause.append(models.ReplicaLock.rse_id == rse_id)
if rse_clause:
query = session.query(models.DataIdentifierAssociation.child_scope,
models.DataIdentifierAssociation.child_name,
models.ReplicaLock).\
with_hint(models.DataIdentifierAssociation, "INDEX_RS_ASC(CONTENTS CONTENTS_PK) NO_INDEX_FFS(CONTENTS CONTENTS_PK)", 'oracle').\
outerjoin(models.ReplicaLock,
and_(models.DataIdentifierAssociation.child_scope == models.ReplicaLock.scope,
models.DataIdentifierAssociation.child_name == models.ReplicaLock.name,
or_(*rse_clause)))\
.filter(models.DataIdentifierAssociation.scope == scope,
models.DataIdentifierAssociation.name == name)
locks = {}
if session.bind.dialect.name == 'postgresql':
content_query = session.query(models.DataIdentifierAssociation.child_scope,
models.DataIdentifierAssociation.child_name).\
with_hint(models.DataIdentifierAssociation,
"INDEX_RS_ASC(CONTENTS CONTENTS_PK) NO_INDEX_FFS(CONTENTS CONTENTS_PK)",
'oracle').\
filter(models.DataIdentifierAssociation.scope == scope,
models.DataIdentifierAssociation.name == name)

for child_scope, child_name in content_query.yield_per(1000):
locks[(child_scope, child_name)] = []

query = session.query(models.DataIdentifierAssociation.child_scope,
models.DataIdentifierAssociation.child_name,
models.ReplicaLock).\
with_hint(models.DataIdentifierAssociation,
"INDEX_RS_ASC(CONTENTS CONTENTS_PK) NO_INDEX_FFS(CONTENTS CONTENTS_PK)",
'oracle').\
filter(and_(models.DataIdentifierAssociation.child_scope == models.ReplicaLock.scope,
models.DataIdentifierAssociation.child_name == models.ReplicaLock.name))\
.filter(models.DataIdentifierAssociation.scope == scope,
models.DataIdentifierAssociation.name == name)

if restrict_rses is not None:
rse_clause = []
for rse_id in restrict_rses:
rse_clause.append(models.ReplicaLock.rse_id == rse_id)
if rse_clause:
query = session.query(models.DataIdentifierAssociation.child_scope,
models.DataIdentifierAssociation.child_name,
models.ReplicaLock).\
with_hint(models.DataIdentifierAssociation, "INDEX_RS_ASC(CONTENTS CONTENTS_PK) NO_INDEX_FFS(CONTENTS CONTENTS_PK)", 'oracle').\
filter(and_(models.DataIdentifierAssociation.child_scope == models.ReplicaLock.scope,
models.DataIdentifierAssociation.child_name == models.ReplicaLock.name,
or_(*rse_clause)))\
.filter(models.DataIdentifierAssociation.scope == scope,
models.DataIdentifierAssociation.name == name)
else:
query = session.query(models.DataIdentifierAssociation.child_scope,
models.DataIdentifierAssociation.child_name,
models.ReplicaLock).\
with_hint(models.DataIdentifierAssociation, "INDEX_RS_ASC(CONTENTS CONTENTS_PK) NO_INDEX_FFS(CONTENTS CONTENTS_PK)", 'oracle').\
outerjoin(models.ReplicaLock,
and_(models.DataIdentifierAssociation.child_scope == models.ReplicaLock.scope,
models.DataIdentifierAssociation.child_name == models.ReplicaLock.name))\
.filter(models.DataIdentifierAssociation.scope == scope, models.DataIdentifierAssociation.name == name)

if restrict_rses is not None:
rse_clause = []
for rse_id in restrict_rses:
rse_clause.append(models.ReplicaLock.rse_id == rse_id)
if rse_clause:
query = session.query(models.DataIdentifierAssociation.child_scope,
models.DataIdentifierAssociation.child_name,
models.ReplicaLock).\
with_hint(models.DataIdentifierAssociation, "INDEX_RS_ASC(CONTENTS CONTENTS_PK) NO_INDEX_FFS(CONTENTS CONTENTS_PK)", 'oracle').\
outerjoin(models.ReplicaLock,
and_(models.DataIdentifierAssociation.child_scope == models.ReplicaLock.scope,
models.DataIdentifierAssociation.child_name == models.ReplicaLock.name,
or_(*rse_clause)))\
.filter(models.DataIdentifierAssociation.scope == scope,
models.DataIdentifierAssociation.name == name)

if only_stuck:
query = query.filter(models.ReplicaLock.state == LockState.STUCK)

query = query.with_for_update(nowait=nowait, of=models.ReplicaLock.state)

locks = {}

for child_scope, child_name, lock in query:
if (child_scope, child_name) not in locks:
if lock is None:
Expand Down
135 changes: 99 additions & 36 deletions lib/rucio/core/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -1558,7 +1558,8 @@ def get_source_replicas(scope, name, source_rses=None, session=None):


@transactional_session
def get_and_lock_file_replicas_for_dataset(scope, name, nowait=False, restrict_rses=None, session=None):
def get_and_lock_file_replicas_for_dataset(scope, name, nowait=False, restrict_rses=None,
session=None):
"""
Get file replicas for all files of a dataset.
Expand All @@ -1569,47 +1570,109 @@ def get_and_lock_file_replicas_for_dataset(scope, name, nowait=False, restrict_r
:param session: The db session in use.
:returns: (files in dataset, replicas in dataset)
"""

query = session.query(models.DataIdentifierAssociation.child_scope,
models.DataIdentifierAssociation.child_name,
models.DataIdentifierAssociation.bytes,
models.DataIdentifierAssociation.md5,
models.DataIdentifierAssociation.adler32,
models.RSEFileAssociation)\
.with_hint(models.DataIdentifierAssociation, "INDEX_RS_ASC(CONTENTS CONTENTS_PK) NO_INDEX_FFS(CONTENTS CONTENTS_PK)", 'oracle')\
.outerjoin(models.RSEFileAssociation,
and_(models.DataIdentifierAssociation.child_scope == models.RSEFileAssociation.scope,
models.DataIdentifierAssociation.child_name == models.RSEFileAssociation.name,
models.RSEFileAssociation.state != ReplicaState.BEING_DELETED)).\
filter(models.DataIdentifierAssociation.scope == scope, models.DataIdentifierAssociation.name == name)

if restrict_rses is not None:
if len(restrict_rses) < 10:
rse_clause = []
for rse_id in restrict_rses:
rse_clause.append(models.RSEFileAssociation.rse_id == rse_id)
if rse_clause:
query = session.query(models.DataIdentifierAssociation.child_scope,
files, replicas = {}, {}
if session.bind.dialect.name == 'postgresql':
# Get content
content_query = session.query(models.DataIdentifierAssociation.child_scope,
models.DataIdentifierAssociation.child_name,
models.DataIdentifierAssociation.bytes,
models.DataIdentifierAssociation.md5,
models.DataIdentifierAssociation.adler32,
models.RSEFileAssociation)\
.with_hint(models.DataIdentifierAssociation, "INDEX_RS_ASC(CONTENTS CONTENTS_PK) NO_INDEX_FFS(CONTENTS CONTENTS_PK)", 'oracle')\
.outerjoin(models.RSEFileAssociation,
and_(models.DataIdentifierAssociation.child_scope == models.RSEFileAssociation.scope,
models.DataIdentifierAssociation.child_name == models.RSEFileAssociation.name,
models.RSEFileAssociation.state != ReplicaState.BEING_DELETED,
or_(*rse_clause)))\
.filter(models.DataIdentifierAssociation.scope == scope,
models.DataIdentifierAssociation.name == name)
models.DataIdentifierAssociation.adler32).\
with_hint(models.DataIdentifierAssociation,
"INDEX_RS_ASC(CONTENTS CONTENTS_PK) NO_INDEX_FFS(CONTENTS CONTENTS_PK)",
'oracle').\
filter(models.DataIdentifierAssociation.scope == scope,
models.DataIdentifierAssociation.name == name)

query = query.with_for_update(nowait=nowait, of=models.RSEFileAssociation.lock_cnt)
for child_scope, child_name, bytes, md5, adler32 in content_query.yield_per(1000):
files[(child_scope, child_name)] = {'scope': child_scope,
'name': child_name,
'bytes': bytes,
'md5': md5,
'adler32': adler32}
replicas[(child_scope, child_name)] = []

files = {}
replicas = {}
# Get replicas and lock them
query = session.query(models.DataIdentifierAssociation.child_scope,
models.DataIdentifierAssociation.child_name,
models.DataIdentifierAssociation.bytes,
models.DataIdentifierAssociation.md5,
models.DataIdentifierAssociation.adler32,
models.RSEFileAssociation)\
.with_hint(models.DataIdentifierAssociation,
"INDEX_RS_ASC(CONTENTS CONTENTS_PK) NO_INDEX_FFS(CONTENTS CONTENTS_PK)",
'oracle')\
.filter(and_(models.DataIdentifierAssociation.child_scope == models.RSEFileAssociation.scope,
models.DataIdentifierAssociation.child_name == models.RSEFileAssociation.name,
models.RSEFileAssociation.state != ReplicaState.BEING_DELETED)).\
filter(models.DataIdentifierAssociation.scope == scope,
models.DataIdentifierAssociation.name == name)

if restrict_rses is not None:
if len(restrict_rses) < 10:
rse_clause = []
for rse_id in restrict_rses:
rse_clause.append(models.RSEFileAssociation.rse_id == rse_id)
if rse_clause:
query = session.query(models.DataIdentifierAssociation.child_scope,
models.DataIdentifierAssociation.child_name,
models.DataIdentifierAssociation.bytes,
models.DataIdentifierAssociation.md5,
models.DataIdentifierAssociation.adler32,
models.RSEFileAssociation)\
.with_hint(models.DataIdentifierAssociation,
"INDEX_RS_ASC(CONTENTS CONTENTS_PK) NO_INDEX_FFS(CONTENTS CONTENTS_PK)",
'oracle')\
.filter(and_(models.DataIdentifierAssociation.child_scope == models.RSEFileAssociation.scope,
models.DataIdentifierAssociation.child_name == models.RSEFileAssociation.name,
models.RSEFileAssociation.state != ReplicaState.BEING_DELETED,
or_(*rse_clause)))\
.filter(models.DataIdentifierAssociation.scope == scope,
models.DataIdentifierAssociation.name == name)

else:
query = session.query(models.DataIdentifierAssociation.child_scope,
models.DataIdentifierAssociation.child_name,
models.DataIdentifierAssociation.bytes,
models.DataIdentifierAssociation.md5,
models.DataIdentifierAssociation.adler32,
models.RSEFileAssociation)\
.with_hint(models.DataIdentifierAssociation,
"INDEX_RS_ASC(CONTENTS CONTENTS_PK) NO_INDEX_FFS(CONTENTS CONTENTS_PK)",
'oracle')\
.outerjoin(models.RSEFileAssociation,
and_(models.DataIdentifierAssociation.child_scope == models.RSEFileAssociation.scope,
models.DataIdentifierAssociation.child_name == models.RSEFileAssociation.name,
models.RSEFileAssociation.state != ReplicaState.BEING_DELETED)).\
filter(models.DataIdentifierAssociation.scope == scope,
models.DataIdentifierAssociation.name == name)

if restrict_rses is not None:
if len(restrict_rses) < 10:
rse_clause = []
for rse_id in restrict_rses:
rse_clause.append(models.RSEFileAssociation.rse_id == rse_id)
if rse_clause:
query = session.query(models.DataIdentifierAssociation.child_scope,
models.DataIdentifierAssociation.child_name,
models.DataIdentifierAssociation.bytes,
models.DataIdentifierAssociation.md5,
models.DataIdentifierAssociation.adler32,
models.RSEFileAssociation)\
.with_hint(models.DataIdentifierAssociation,
"INDEX_RS_ASC(CONTENTS CONTENTS_PK) NO_INDEX_FFS(CONTENTS CONTENTS_PK)",
'oracle')\
.outerjoin(models.RSEFileAssociation,
and_(models.DataIdentifierAssociation.child_scope == models.RSEFileAssociation.scope,
models.DataIdentifierAssociation.child_name == models.RSEFileAssociation.name,
models.RSEFileAssociation.state != ReplicaState.BEING_DELETED,
or_(*rse_clause)))\
.filter(models.DataIdentifierAssociation.scope == scope,
models.DataIdentifierAssociation.name == name)

query = query.with_for_update(nowait=nowait, of=models.RSEFileAssociation.lock_cnt)

for child_scope, child_name, bytes, md5, adler32, replica in query:
for child_scope, child_name, bytes, md5, adler32, replica in query.yield_per(1000):
if (child_scope, child_name) not in files:
files[(child_scope, child_name)] = {'scope': child_scope,
'name': child_name,
Expand Down

0 comments on commit 94fc77c

Please sign in to comment.