Skip to content

Commit

Permalink
refactored processing steps
Browse files Browse the repository at this point in the history
  • Loading branch information
jakubkrafka committed Aug 12, 2021
1 parent ae307ba commit d7a984b
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 9 deletions.
6 changes: 4 additions & 2 deletions process/management/commands/compiler.py
Expand Up @@ -113,9 +113,10 @@ def _publish_releases(self, connection, channel, collection):
message = {
"ocid": item["ocid"],
"collection_id": collection.id,
"compiled_collection_id": compiled_collection.id,
}

self._createStep(ProcessingStep.Types.COMPILE, collection_id=collection.id, ocid=item["ocid"])
self._createStep(ProcessingStep.Types.COMPILE, collection_id=compiled_collection.id, ocid=item["ocid"])
self._publish_async(connection, channel, json.dumps(message), "compiler_release")
except Collection.DoesNotExist:
self._warning(
Expand Down Expand Up @@ -148,9 +149,10 @@ def _publish_records(self, connection, channel, collection_file):
message = {
"ocid": item["ocid"],
"collection_id": collection_file.collection.id,
"compiled_collection_id": compiled_collection.id,
}

self._createStep(ProcessingStep.Types.COMPILE,
collection_id=collection_file.collection.id,
collection_id=compiled_collection.id,
ocid=item["ocid"])
self._publish_async(connection, channel, json.dumps(message), "compiler_record")
2 changes: 1 addition & 1 deletion process/management/commands/finisher.py
Expand Up @@ -43,7 +43,7 @@ def process(self, connection, channel, delivery_tag, body):
collection.save()
self._debug("Processing of collection_id: {} finished. Set as completed.".format(collection_id))
else:
self._debug("Processing of collection_id: {} not finished yet".format(collection_id))
self._debug("Processing of collection_id: {} not completable".format(collection_id))

except Exception:
self._exception("Something went wrong when processing {}".format(body))
Expand Down
7 changes: 3 additions & 4 deletions process/management/commands/record_compiler.py
Expand Up @@ -34,14 +34,13 @@ def process(self, connection, channel, delivery_tag, body):

ocid = input_message["ocid"]
collection_id = input_message["collection_id"]
compiled_collection_id = input_message["compiled_collection_id"]

with transaction.atomic():
self._info("Compiling record collection_id: {} ocid: {}".format(collection_id, ocid))
release = compile_record(collection_id, ocid)

self._deleteStep(ProcessingStep.Types.COMPILE, collection_id=collection_id, ocid=ocid)

compiled_collection = Collection.objects.get(id=input_message["collection_id"]).get_compiled_collection()
self._deleteStep(ProcessingStep.Types.COMPILE, collection_id=compiled_collection_id, ocid=ocid)

release_id = None

Expand All @@ -52,7 +51,7 @@ def process(self, connection, channel, delivery_tag, body):
message = {
"ocid": ocid,
"compiled_release_id": release_id,
"collection_id": compiled_collection.id,
"collection_id": compiled_collection_id,
}

self._publish_async(connection, channel, json.dumps(message))
Expand Down
5 changes: 3 additions & 2 deletions process/management/commands/release_compiler.py
Expand Up @@ -33,18 +33,19 @@ def process(self, connection, channel, delivery_tag, body):

ocid = input_message["ocid"]
collection_id = input_message["collection_id"]
compiled_collection_id = input_message["compiled_collection_id"]

with transaction.atomic():
self._info("Compiling release collection_id: {} ocid: {}".format(collection_id, ocid))
release = compile_release(collection_id, ocid)

self._deleteStep(ProcessingStep.Types.COMPILE, collection_id=collection_id, ocid=ocid)
self._deleteStep(ProcessingStep.Types.COMPILE, collection_id=compiled_collection_id, ocid=ocid)

# publish message about processed item
message = {
"ocid": ocid,
"compiled_release_id": release.pk,
"collection_id": release.collection.id,
"collection_id": compiled_collection_id,
}

self._publish_async(connection, channel, json.dumps(message))
Expand Down

0 comments on commit d7a984b

Please sign in to comment.