Skip to content

Commit

Permalink
Better redis ques
Browse files Browse the repository at this point in the history
* #151 (comment)
* Also change que so messages added at end, taken of front - oldest processed first!
  • Loading branch information
odscjames committed May 29, 2019
1 parent 13d71be commit 0965a2c
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 41 deletions.
42 changes: 30 additions & 12 deletions ocdskingfisherprocess/checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,38 @@ def process_all_files(self):

self.logger.info('process_all_files called for collection ' + str(self.collection.database_id))

# Is deleted?
if self.collection.deleted_at:
return

for collection_file in self.database.get_all_files_in_collection(self.collection.database_id):

self.process_file_id(collection_file.database_id)

# Early return?
if self.run_until_timestamp and self.run_until_timestamp < datetime.datetime.utcnow().timestamp():
return

def process_file_id(self, collection_file_id):
self.logger.info(
'process_file_id called for collection ' + str(self.collection.database_id) +
" with file_id " + str(collection_file_id)
)

# Is deleted?
if self.collection.deleted_at:
return

# Normal Checks
if self.collection.check_data:

self.process_all_files_releases()
self.process_all_releases_for_collection_file_id(collection_file_id)

# Early return?
if self.run_until_timestamp and self.run_until_timestamp < datetime.datetime.utcnow().timestamp():
return

self.process_all_files_records()
self.process_all_records_for_collection_file_id(collection_file_id)

# Early return?
if self.run_until_timestamp and self.run_until_timestamp < datetime.datetime.utcnow().timestamp():
Expand All @@ -43,16 +61,16 @@ def process_all_files(self):
# Checks with schema V1.1
if self.collection.check_older_data_with_schema_version_1_1:

self.process_all_files_releases_with_override_schema_version_1_1()
self.process_all_releases_with_override_schema_version_1_1_for_collection_file_id(collection_file_id)

# Early return?
if self.run_until_timestamp and self.run_until_timestamp < datetime.datetime.utcnow().timestamp():
return

self.process_all_files_records_with_override_schema_version_1_1()
self.process_all_records_with_override_schema_version_1_1_for_collection_file_id(collection_file_id)

def process_all_files_releases(self):
results = self.database.get_releases_to_check(self.collection.database_id)
def process_all_releases_for_collection_file_id(self, collection_file_id):
results = self.database.get_releases_to_check_for_collection_file_id(collection_file_id)
for release_row in results:
# Do Normal Check?
if not self.database.is_release_check_done(release_row['id']):
Expand All @@ -61,8 +79,8 @@ def process_all_files_releases(self):
if self.run_until_timestamp and self.run_until_timestamp < datetime.datetime.utcnow().timestamp():
return

def process_all_files_records(self):
results = self.database.get_records_to_check(self.collection.database_id)
def process_all_records_for_collection_file_id(self, collection_file_id):
results = self.database.get_records_to_check_for_collection_file_id(collection_file_id)
for record_row in results:
# Do Normal Check?
if not self.database.is_record_check_done(record_row['id']):
Expand All @@ -71,8 +89,8 @@ def process_all_files_records(self):
if self.run_until_timestamp and self.run_until_timestamp < datetime.datetime.utcnow().timestamp():
return

def process_all_files_releases_with_override_schema_version_1_1(self):
results = self.database.get_releases_to_check(self.collection.database_id, override_schema_version="1.1")
def process_all_releases_with_override_schema_version_1_1_for_collection_file_id(self, collection_file_id):
results = self.database.get_releases_to_check_for_collection_file_id(collection_file_id, override_schema_version="1.1")
for release_row in results:
# Do 1.1 check?
if self.is_schema_version_less_than_1_1(release_row['package_data_id']) \
Expand All @@ -82,8 +100,8 @@ def process_all_files_releases_with_override_schema_version_1_1(self):
if self.run_until_timestamp and self.run_until_timestamp < datetime.datetime.utcnow().timestamp():
return

def process_all_files_records_with_override_schema_version_1_1(self):
results = self.database.get_records_to_check(self.collection.database_id, override_schema_version="1.1")
def process_all_records_with_override_schema_version_1_1_for_collection_file_id(self, collection_file_id):
results = self.database.get_records_to_check_for_collection_file_id(collection_file_id, override_schema_version="1.1")
for record_row in results:
# Do 1.1 check?
if self.is_schema_version_less_than_1_1(record_row['package_data_id']) \
Expand Down
37 changes: 22 additions & 15 deletions ocdskingfisherprocess/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -630,42 +630,44 @@ def delete_collection(self, collection_id):
with self.get_engine().begin() as connection:
connection.execute(sa.sql.expression.text(sql), data)

def _get_check_query(self, obj_type, collection_id, override_schema_version):
data = {'collection_id': collection_id}
def _get_check_query_for_collection_file_id(self, obj_type, collection_file_id, override_schema_version):
data = {'collection_file_id': collection_file_id}
sql = """ SELECT
release_with_collection.id,
release_with_collection.data_id,
release_with_collection.package_data_id
FROM release_with_collection"""
release.id,
release.data_id,
release.package_data_id
FROM release
JOIN collection_file_item ON collection_file_item.id = release.collection_file_item_id
"""
if override_schema_version:
sql += """ LEFT JOIN release_check ON release_check.release_id = release_with_collection.id
sql += """ LEFT JOIN release_check ON release_check.release_id = release.id
AND release_check.override_schema_version = :override_schema_version
LEFT JOIN release_check_error ON release_check_error.release_id = release_check_error.id
AND release_check_error.override_schema_version = :override_schema_version
LEFT JOIN package_data on package_data.id = package_data_id
WHERE release_with_collection.collection_id = :collection_id
WHERE collection_file_item.collection_file_id = :collection_file_id
AND release_check.id IS NULL AND release_check_error.id IS NULL
AND coalesce(data ->> 'version', '1.0') <> :override_schema_version"""
data['override_schema_version'] = override_schema_version
else:
sql += """ LEFT JOIN release_check ON release_check.release_id = release_with_collection.id
sql += """ LEFT JOIN release_check ON release_check.release_id = release.id
AND release_check.override_schema_version IS NULL
LEFT JOIN release_check_error ON release_check_error.release_id = release_check_error.id
AND release_check_error.override_schema_version IS NULL
WHERE release_with_collection.collection_id = :collection_id
WHERE collection_file_item.collection_file_id = :collection_file_id
AND release_check.id IS NULL AND release_check_error.id IS NULL """

return sql.replace('release', obj_type), data

def get_releases_to_check(self, collection_id, override_schema_version=None):
sql, data = self._get_check_query('release', collection_id, override_schema_version)
def get_releases_to_check_for_collection_file_id(self, collection_file_id, override_schema_version=None):
sql, data = self._get_check_query_for_collection_file_id('release', collection_file_id, override_schema_version)

with self.get_engine().begin() as connection:
query = sa.sql.expression.text(sql)
return connection.execute(query, data)

def get_records_to_check(self, collection_id, override_schema_version=None):
sql, data = self._get_check_query('record', collection_id, override_schema_version)
def get_records_to_check_for_collection_file_id(self, collection_file_id, override_schema_version=None):
sql, data = self._get_check_query_for_collection_file_id('record', collection_file_id, override_schema_version)

with self.get_engine().begin() as connection:
query = sa.sql.expression.text(sql)
Expand Down Expand Up @@ -794,7 +796,12 @@ def __exit__(self, type, value, traceback):

self.connection.close()

KINGFISHER_SIGNALS.signal('collection-data-store-finished').send('anonymous', collection_id=self.collection_id)
KINGFISHER_SIGNALS\
.signal('collection-data-store-finished')\
.send('anonymous',
collection_id=self.collection_id,
collection_file_id=self.collection_file_id,
)

def insert_record(self, row, package_data):
ocid = row.get('ocid')
Expand Down
6 changes: 5 additions & 1 deletion ocdskingfisherprocess/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,8 @@ def process(self, message_as_string, run_until_timestamp=None):
collection = self.database.get_collection(message_as_data['collection_id'])
if collection:
checks = Checks(self.database, collection, run_until_timestamp=run_until_timestamp)
checks.process_all_files()
# Older messages might not have 'collection_file_id' in, so we need to check for this.
if 'collection_file_id' in message_as_data and message_as_data['collection_file_id']:
checks.process_file_id(message_as_data['collection_file_id'])
else:
checks.process_all_files()
16 changes: 7 additions & 9 deletions ocdskingfisherprocess/signals/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,11 @@ def run_standard_pipeline_on_new_collection_created(sender, collection_id=None,
our_database.mark_collection_check_older_data_with_schema_version_1_1(collection_id, True)


def collection_data_store_finished_to_redis(sender, collection_id=None, **kwargs):
def collection_data_store_finished_to_redis(sender, collection_id=None, collection_file_id=None, **kwargs):
redis_conn = redis.Redis(host=our_config.redis_host, port=our_config.redis_port, db=our_config.redis_database)
message = json.dumps({'type': 'collection-data-store-finished', 'collection_id': collection_id})
# We only want one message of a type to be in the que at a time.
# So remove any existing messages, and add the message we want again (This is how you do it in Redis).
pipe = redis_conn.pipeline()
pipe.lrem('kingfisher_work', 0, message)
pipe.lpush('kingfisher_work', message)
pipe.execute()
# There are better solutions possible - discussed in https://github.com/open-contracting/kingfisher-process/issues/151
message = json.dumps({
'type': 'collection-data-store-finished',
'collection_id': collection_id,
'collection_file_id': collection_file_id,
})
redis_conn.rpush('kingfisher_work', message)
30 changes: 26 additions & 4 deletions tests/test_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -643,8 +643,19 @@ def test_records(self):

store.store_file_from_local("test.json", "http://example.com", "record", "utf-8", json_filename)

assert len([res for res in self.database.get_records_to_check(collection_id, override_schema_version='1.1')]) == 0
assert len([res for res in self.database.get_records_to_check(collection_id, override_schema_version='1.2')]) == 1
collection_files = self.database.get_all_files_in_collection(collection_id)
collection_file_id = collection_files[0].database_id

assert len(
[res for res in
self.database.get_records_to_check_for_collection_file_id(collection_file_id,
override_schema_version='1.1')]
) == 0
assert len(
[res for res in
self.database.get_records_to_check_for_collection_file_id(collection_file_id,
override_schema_version='1.2')]
) == 1

def test_releases(self):

Expand All @@ -660,6 +671,17 @@ def test_releases(self):

store.store_file_from_local("test.json", "http://example.com", "release_package", "utf-8", json_filename)

assert len([res for res in self.database.get_releases_to_check(collection_id, override_schema_version='1.1')]) == 0
collection_files = self.database.get_all_files_in_collection(collection_id)
collection_file_id = collection_files[0].database_id

assert len(
[res for res in
self.database.get_releases_to_check_for_collection_file_id(collection_file_id,
override_schema_version='1.1')]
) == 0
# the file has 6 releases
assert len([res for res in self.database.get_releases_to_check(collection_id, override_schema_version='1.2')]) == 6
assert len(
[res for res in
self.database.get_releases_to_check_for_collection_file_id(collection_file_id,
override_schema_version='1.2')]
) == 6

0 comments on commit 0965a2c

Please sign in to comment.