Skip to content

Commit

Permalink
Fix occasional deadlocks when doing multiple similar syncs concurrently.
Browse files Browse the repository at this point in the history
Forcing deadlocks requires a lot of time and pulpcore-workers running.
There is therefore no specific CI test for this, but there is a reproducer
script that will force deadlocks to happen (and show that they're fixed) here:

https://github.com/ggainey/pulp_startup/blob/main/8750_deadlocks/file_repro.sh

backports #8750
fixes #9379

(cherry picked from commit 6af3519)
  • Loading branch information
ggainey authored and dralley committed Sep 29, 2021
1 parent b89f579 commit 9bcae2e
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 2 deletions.
3 changes: 3 additions & 0 deletions CHANGES/9379.bugfix
@@ -0,0 +1,3 @@
Ordered several ContentStages paths to fix deadlocks in high-concurrency scenarios.

(backported from #8750)
30 changes: 30 additions & 0 deletions pulpcore/app/models/content.py
Expand Up @@ -77,6 +77,7 @@ def bulk_get_or_create(self, objs, batch_size=None):
Returns:
List of instances that were inserted into the database.
"""

objs = list(objs)
try:
with transaction.atomic():
Expand Down Expand Up @@ -554,6 +555,35 @@ class ContentArtifact(BaseModel, QueryMixin):
class Meta:
unique_together = ("content", "relative_path")

@staticmethod
def sort_key(ca):
"""
Static method for defining a sort-key for a specified ContentArtifact.
Sorting lists of ContentArtifacts is critical for avoiding deadlocks in high-concurrency
environments, when multiple workers may be operating on similar sets of content at the
same time. Providing a stable sort-order becomes problematic when the CAs in question
haven't been persisted - in that case, pulp_id can't be relied on, as it will change
when the object is stored in the DB and its "real" key is generated.
This method produces a key based on the content/artifact represented by the CA.
Args:
ca (:class:`~pulpcore.plugin.models.ContentArtifact`): The CA we need a key for
Returns:
a tuple of (str(content-key), str(artifact-key)) that can be reliably sorted on
"""
c_key = ""
a_key = ""
# It's possible to only have one of content/artifact - handle that
if ca.content:
# Some key-fields aren't str, handle that
c_key = "".join(map(str, ca.content.natural_key()))
if ca.artifact:
a_key = str(ca.artifact.sha256)
return c_key, a_key


class RemoteArtifact(BaseModel, QueryMixin):
"""
Expand Down
9 changes: 7 additions & 2 deletions pulpcore/plugin/stages/artifact_stages.py
Expand Up @@ -384,10 +384,15 @@ def _handle_remote_artifacts(self, batch):
key = f"{content_artifact.pk}-{d_artifact.remote.pk}"
ras_to_create[key] = remote_artifact

# Make sure we create/update RemoteArtifacts in a stable order, to help
# prevent deadlocks in high-concurrency environments. We can rely on the
# Artifact sha256 for our ordering.
if ras_to_create:
RemoteArtifact.objects.bulk_create(list(ras_to_create.values()))
ras_to_create_ordered = sorted(list(ras_to_create.values()), key=lambda x: x.sha256)
RemoteArtifact.objects.bulk_create(ras_to_create_ordered)
if ras_to_update:
RemoteArtifact.objects.bulk_update(list(ras_to_update.values()), fields=["url"])
ras_to_update_ordered = sorted(list(ras_to_update.values()), key=lambda x: x.sha256)
RemoteArtifact.objects.bulk_update(ras_to_update_ordered, fields=["url"])

@staticmethod
def _create_remote_artifact(d_artifact, content_artifact):
Expand Down
9 changes: 9 additions & 0 deletions pulpcore/plugin/stages/content_stages.py
Expand Up @@ -104,6 +104,10 @@ async def run(self):
with transaction.atomic():
await self._pre_save(batch)

# Process the batch in dc.content.natural_keys order.
# This prevents deadlocks when we're processing the same/similar content
# in concurrent workers.
batch.sort(key=lambda x: "".join(map(str, x.content.natural_key())))
for d_content in batch:
# Are we saving to the database for the first time?
content_already_saved = not d_content.content._state.adding
Expand Down Expand Up @@ -148,6 +152,11 @@ async def run(self):
# Maybe remove dict elements after to reduce memory?
content_artifact.artifact = to_update_ca_artifact[key]
to_update_ca_bulk.append(content_artifact)
# Sort the lists we're about to do bulk updates/creates on.
# We know to_update_ca_bulk entries already are in the DB, so we can enforce
# order just using pulp_id.
to_update_ca_bulk.sort(key=lambda x: x.pulp_id)
content_artifact_bulk.sort(key=lambda x: ContentArtifact.sort_key(x))
ContentArtifact.objects.bulk_update(to_update_ca_bulk, ["artifact"])
ContentArtifact.objects.bulk_get_or_create(content_artifact_bulk)
await self._post_save(batch)
Expand Down

0 comments on commit 9bcae2e

Please sign in to comment.