Skip to content

Commit

Permalink
Merge pull request #161 from open-contracting/quicker-compiled-release
Browse files Browse the repository at this point in the history
Only run one query per collection to get ocids not transformed.
  • Loading branch information
odscjames committed May 29, 2019
2 parents e543f52 + 575fec1 commit 13d71be
Showing 1 changed file with 10 additions and 20 deletions.
30 changes: 10 additions & 20 deletions ocdskingfisherprocess/transform/compile_releases.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ def process(self):

# Do the work ...
for ocid in self.get_ocids():
if not self.has_ocid_been_transformed(ocid):
self.process_ocid(ocid)
self.process_ocid(ocid)
# Early return?
if self.run_until_timestamp and self.run_until_timestamp < datetime.datetime.utcnow().timestamp():
return
Expand All @@ -31,34 +30,22 @@ def process(self):
self.database.mark_collection_store_done(self.destination_collection.database_id)

def get_ocids(self):
''' Gets the ocids for this collection that have not been transformed'''
ocids = []

with self.database.get_engine().begin() as engine:
query = engine.execute(sa.text(
" SELECT release.ocid FROM release " +
" JOIN collection_file_item ON collection_file_item.id = release.collection_file_item_id " +
" JOIN collection_file ON collection_file.id = collection_file_item.collection_file_id " +
" WHERE collection_file.collection_id = :collection_id " +
" GROUP BY release.ocid "
), collection_id=self.source_collection.database_id)
" SELECT r.ocid FROM release_with_collection AS r" +
" LEFT JOIN compiled_release_with_collection AS cr on cr.ocid = r.ocid and cr.collection_id = :destination_collection_id" +
" WHERE r.collection_id = :collection_id and cr.ocid is NULL" +
" GROUP BY r.ocid "
), collection_id=self.source_collection.database_id, destination_collection_id=self.destination_collection.database_id)

for row in query:
ocids.append(row['ocid'])

return ocids

def has_ocid_been_transformed(self, ocid):

with self.database.get_engine().begin() as engine:
query = engine.execute(sa.text(
" SELECT compiled_release.ocid FROM compiled_release " +
" JOIN collection_file_item ON collection_file_item.id = compiled_release.collection_file_item_id " +
" JOIN collection_file ON collection_file.id = collection_file_item.collection_file_id " +
" WHERE collection_file.collection_id = :collection_id AND compiled_release.ocid = :ocid "
), collection_id=self.destination_collection.database_id, ocid=ocid)

return query.rowcount == 1

def process_ocid(self, ocid):

releases = []
Expand All @@ -76,4 +63,7 @@ def process_ocid(self, ocid):

out = ocdsmerge.merge(releases)

# In the occurence of a race condition where two concurrent transforms have run the same ocid
# we rely on the fact that collection_id and filename are unique in the file_item table. Therefore this will
# error with a violation of unique key contraint and not cause dupliate entries.
self.store.store_file_item(ocid+'.json', None, 'compiled_release', out, 1)

0 comments on commit 13d71be

Please sign in to comment.