-
Notifications
You must be signed in to change notification settings - Fork 107
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix race condition in handling of reserved resources #1228
Conversation
268672b
to
9e3b9ff
Compare
Attached issue: https://pulp.plan.io/issues/8352 |
643bb52
to
d9b481f
Compare
The code looks good to me. I am going to try to test out/debug the code some tomorrow with some sleep statements so I can understand it a bit better. |
pulpcore/tasking/tasks.py
Outdated
# no worker is ready so we need to wait | ||
time.sleep(0.25) | ||
continue | ||
with transaction.atomic(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm about 80% certain we still need the select_for_update()
here also to make it "complete" (these queries don't touch the actual worker so there's nothing for the lock to block otherwise)... however, apparently it doesn't get along with the distinct()
on line 46 (they are apparently incompatible constructs?). Any ideas? What is that distinct()
actually doing and do we need it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The "simple" option might be to just ignore locking during acquire_worker
and just drop a worker = Worker.objects.select_for_update().get(pk=worker.pk)
on line 127 below so that just the resource reservation is covered by the lock. I don't like the double-query but it would work. And it might be "good enough" if we want a fast patch with minimal risk.
Would still like to do better if possible though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I implemented ^
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dralley select_for_update
is 100% needed here to prevent related tables (resource reservation) from modification if another transaction holds a lock. An alternative to double query can be rewriting the query on line 48 to eliminate DISTINCT. However as a workaround and immediate solution it is acceptable in my opinion.
pulpcore/tasking/tasks.py
Outdated
except Worker.DoesNotExist: | ||
# from _acquire_worker | ||
# no worker is ready so we need to wait | ||
time.sleep(0.25) | ||
continue | ||
except IntegrityError: | ||
# we have a worker but we can't create the reservations so wait | ||
time.sleep(0.25) | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
except Worker.DoesNotExist: | |
# from _acquire_worker | |
# no worker is ready so we need to wait | |
time.sleep(0.25) | |
continue | |
except IntegrityError: | |
# we have a worker but we can't create the reservations so wait | |
time.sleep(0.25) | |
continue | |
except (Worker.DoesNotExist, IntegrityError): | |
time.sleep(0.25) | |
continue |
pulpcore/tasking/tasks.py
Outdated
# no worker is ready so we need to wait | ||
time.sleep(0.25) | ||
continue | ||
with transaction.atomic(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dralley select_for_update
is 100% needed here to prevent related tables (resource reservation) from modification if another transaction holds a lock. An alternative to double query can be rewriting the query on line 48 to eliminate DISTINCT. However as a workaround and immediate solution it is acceptable in my opinion.
pulpcore/tasking/tasks.py
Outdated
try: | ||
worker = _acquire_worker(resources) | ||
|
||
worker = Worker.objects.select_for_update().get(pk=worker.pk) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this is still susceptible to a race condition?
- Worker is returned from
_acquire_worker()
- Worker is cleaned up
- Worker is fetched here with
get()
and assigned reservations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. Yes, it will. Worker status is not checked here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternatively, I believe this should work:
Worker.objects.select_for_update().filter(pk__in=Worker.objects.filter(reservations__resource__in=resources))
It performs a single query by using a subselect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apart from this question, the rest looks good to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@daviddavis Is this meant to go on line 132 or on line 48? It has different semantics from line 48.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this could potentially work for line 48:
return Worker.objects.select_for_update().filter(pk__in=Worker.objects.filter(reservations__resource__in=resources)).get()
4971456
to
eb445bb
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@daviddavis This change appears as though it might be causing deadlocks? If I run the tests locally it hangs on the very first one. And the CI has been running for nearly an hour |
[noissue]
746e14e
to
777c476
Compare
pulpcore/tasking/tasks.py
Outdated
try: | ||
# A randomly-selected Worker instance that has zero ReservedResource entries | ||
# associated with it. | ||
return workers_qs_with_counts.filter(reservations__count=0).order_by("?")[0] | ||
return Worker.objects.select_for_update().filter(pk__in=workers_qs_no_res).first() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess the selected worker isn't necessarily random anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's fix that. What if we did this:
workers_qs_no_res = (
workers_qs.annotate(models.Count("reservations"))
.filter(reservations__count=0)
.order_by("?")[0:1]
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I think this is better:
return Worker.objects.select_for_update().filter(pk__in=workers_qs_no_res).first() | |
return Worker.objects.select_for_update().filter(pk__in=workers_qs_no_res[0:1]).first() |
pulpcore/tasking/tasks.py
Outdated
.order_by("?") | ||
.first() | ||
) | ||
except IndexError: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If query fetches no rows, .first()
will not throw IndexError
, but return None
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
limit(1).get()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no limit()
method for queryset. You have to use [0:1].get()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my experience says, that get
raises the MultipleReturned exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you want to get an exception here instead of simply checking first()
return value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also .get()
doesn't raise IndexError
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mdellweg yes, my bad. .get()
raises MultipleReturned indeed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Early morning drowsiness :) That's probably the simplest solution, I can do that.
c4b3b0c
to
33c1694
Compare
pulpcore/tasking/tasks.py
Outdated
try: | ||
# A randomly-selected Worker instance that has zero ReservedResource entries | ||
# associated with it. | ||
return ( | ||
Worker.objects.select_for_update() | ||
.filter(pk__in=workers_qs_no_res) | ||
.order_by("?")[0:1] | ||
.get() | ||
) | ||
except IndexError: | ||
# If all Workers have at least one ReservedResource entry. | ||
raise Worker.model.DoesNotExist() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
try: | |
# A randomly-selected Worker instance that has zero ReservedResource entries | |
# associated with it. | |
return ( | |
Worker.objects.select_for_update() | |
.filter(pk__in=workers_qs_no_res) | |
.order_by("?")[0:1] | |
.get() | |
) | |
except IndexError: | |
# If all Workers have at least one ReservedResource entry. | |
raise Worker.model.DoesNotExist() | |
# A randomly-selected Worker instance that has zero ReservedResource entries | |
# associated with it. | |
return ( | |
Worker.objects.select_for_update() | |
.filter(pk__in=workers_qs_no_res) | |
.order_by("?")[0:1] | |
.get() | |
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While this approach (queryset[0:1].get()
) is offered by official Django docs, it looks odd.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cutwater I went with your recommendation to just check the value of what is returned by first()
1285b09
to
32a2742
Compare
pulpcore/tasking/tasks.py
Outdated
|
||
if not worker: | ||
# If all Workers have at least one ReservedResource entry. | ||
raise Worker.model.DoesNotExist() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be Worker.DoesNotExist()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there's an alias - this is copied over so if it didn't work it would have broken before now. But I'll change it anyway
pulpcore/tasking/tasks.py
Outdated
Worker.objects.select_for_update().filter(pk__in=workers_without_res).order_by("?").first() | ||
) | ||
|
||
if not worker: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor comment: I'd rather use an explicit check here
if worker is not None
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated to if worker is None
61b4564
to
fe73534
Compare
worker=worker, resource=resource | ||
) | ||
TaskReservedResource.objects.create(resource=reservation, task=task_status) | ||
except (Worker.DoesNotExist, IntegrityError): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having break
statement at the end of the loop looks odd.
IMO this is more readable:
except (Worker.DoesNotExist, IntegrityError):
# if worker is ready, or we have a worker but we can't create the reservations, wait
time.sleep(0.25)
else:
break
pulpcore/tasking/tasks.py
Outdated
) | ||
TaskReservedResource.objects.create(resource=reservation, task=task_status) | ||
except (Worker.DoesNotExist, IntegrityError): | ||
# if worker is ready, or we have a worker but we can't create the reservations, wait | ||
time.sleep(0.25) | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant that continue
is not needed here anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@dralley Thank you for addressing my comments. |
CI failure looks unrelated |
@cutwater Merged. Do you need a new build with this patch or do you plan to upgrade to 3.12? |
closes: #8352
https://pulp.plan.io/issues/8352