Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed concurrency bug in StartupContext #39419

Merged

Conversation

Arcane561
Copy link
Contributor

Fixed the issue of parallel addition and reading after. ArrayList is not a concurrency safe collection and subsequent threads may not see the changes. The add streams and read streams are different. And shutdown events can also be added during the application (pg vertex as an example) and this does not happen in the main thread. Please consider the corrections

@geoand
Copy link
Contributor

geoand commented Mar 14, 2024

Thanks for the contribution, but I'd like to know whether you are actually encountered a problem that this change fixes.

My point is that theoretically, sure the addShutdownTask and addLastShutdownTask could be called on any thread, but I am pretty certain that is not the case currently - all tasks are added and run on the main thread.

@Arcane561
Copy link
Contributor Author

Arcane561 commented Mar 14, 2024

Thanks for the contribution, but I'd like to know whether you are actually encountered a problem that this change fixes.

My point is that theoretically, sure the addShutdownTask and addLastShutdownTask could be called on any thread, but I am pretty certain that is not the case currently - all tasks are added and run on the main thread.

Have a nice day, thank you for reviewing it so quickly. To be honest, I didn't have any problems with this specifically (although I didn't test it in detail). I was developing an extension that would allow you to accurately register the shutdownhandler after all the infrastructure addons have already registered their shutdown handlers (since, for example, I need to leave an entry in the database if the program terminates, but now it turns out that pgpoll closes before the code in the @shutdown methods is called) and I noticed, that changing the collection and reading when the application is closed occurs in different thread (screenshots are attached below). I have prepared a test project for demonstration.

Repository https://github.com/Arcane561/quarkus-concurrency-example

To run it, you need a postgres database, and you will also need changes to the properties.
https://github.com/Arcane561/quarkus-concurrency-example/blob/master/src/main/resources/application.properties

To quickly launch postgres, I provide the following commands

docker run -d -p 127.0.0.1:5432:5432 \      
  --name some-timescaledb-example \
  -e POSTGRES_PASSWORD=example \
  -e POSTGRES_USER=example \
  -e POSTGRES_DB=example \
  timescale/timescaledb:latest-pg14

and then the property needs to be changed to

quarkus.datasource.db-kind=postgresql
quarkus.datasource.username=example
quarkus.datasource.password=example
quarkus.datasource.reactive.url=postgresql://localhost:5432/example

The debager stop points should be placed here:


Screenshot_20240315_004153
Screenshot_20240315_003708

@geoand
Copy link
Contributor

geoand commented Mar 15, 2024

Thanks for the feedback, but unless I am missing something, I don't see what you describe in the code you uploaded.

@Arcane561
Copy link
Contributor Author

Arcane561 commented Mar 15, 2024

Thanks for the feedback, but unless I am missing something, I don't see what you describe in the code you uploaded.

Good day to you, to eliminate misunderstandings, we are now talking about the code in https://github.com/Arcane561/quarkus-concurrency-example ? if so, then I can clearly see on 2 independent machines how PgPollRecorder adds a Shutdown handler not in the main thread, but in the thread of my ScheduledExecutorService. And also the reading does not take place in the main thread in the shutdown thread. I can describe in more detail the points of how and what to run and to reproduce this behavior

@geoand
Copy link
Contributor

geoand commented Mar 15, 2024

Ah okay, I thought you had added code that adds a shutdown handler.

I'll look into it more soon.

@geoand
Copy link
Contributor

geoand commented Mar 15, 2024

if so, then I can clearly see on 2 independent machines how PgPollRecorder adds a Shutdown handler not in the main thread

I am not seeing this...

@Arcane561
Copy link
Contributor Author

I am not seeing this...

Just in case, I notify you that it is being PgPollRecorder constructed (and therefore will add a shutdown event) only 2 seconds after the application is launched. This is regulated by this line
https://github.com/Arcane561/quarkus-concurrency-example/blob/e28ab236f3362a97137120c9306e42ea902047a5/src/main/java/org/quarkus/ExampleService.java#L24

@geoand
Copy link
Contributor

geoand commented Mar 15, 2024

Never mind my comment above, I see this as well.

@geoand
Copy link
Contributor

geoand commented Mar 15, 2024

Okay, I see why this happens.

It comes down to the fact that your Database bean is "initialized" on your custom executor. If you change your Database bean to @Singleton, that results in the bean actually being initialized eagerly on the main thread.

Given how specific this is and how easily it is to work around it, I am not inclined to introduce the thread safe variants in StartupContext

@Arcane561
Copy link
Contributor Author

Okay, I see why this happens.

It comes down to the fact that your Database bean is "initialized" on your custom executor. If you change your Database bean to @Singleton, that results in the bean actually being initialized eagerly on the main thread.

Given how specific this is and how easily it is to work around it, I am not inclined to introduce the thread safe variants in StartupContext

Screenshot_20240315_152704

Unfortunately, this will not help

@Arcane561
Copy link
Contributor Author

And I would also like to draw attention to the fact that shutdownthread is not the main thread either.

Screenshot_20240315_004153

@geoand
Copy link
Contributor

geoand commented Mar 15, 2024

Unfortunately, this will not help

I tried it and it does :)

@geoand
Copy link
Contributor

geoand commented Mar 15, 2024

It seems that it's indeed not determenistic with @Singleton.

Even so, I want to try everything to avoid changing StartupContext

@Arcane561
Copy link
Contributor Author

It seems that it's indeed not determenistic with @Singleton.

Even so, I want to try everything to avoid changing StartupContext

I understand you, you can close my pr if you think you can do something better and more convenient for you. It's just that there's a race there right now and it scares me a little bit.

@geoand
Copy link
Contributor

geoand commented Mar 15, 2024

I need to think about it more

@Arcane561
Copy link
Contributor Author

Arcane561 commented Mar 15, 2024

I need to think about it more

And I have a big request for you, if you decide not to add concurrent collections, then please think about making sure that the @shutdown methods are always executed first, without exceptions, because now I have to wait until PgPollRecorder is constructed and add my shutdown handler with the completion event

@geoand
Copy link
Contributor

geoand commented Mar 15, 2024

Right, I need to see why that happens

@Arcane561
Copy link
Contributor Author

Arcane561 commented Mar 21, 2024

@geoand Good day to you. Is there any news? If I can help you in any way, please let me know.

@geoand
Copy link
Contributor

geoand commented Mar 21, 2024

👋

I haven't had any time for it unfortunately

@Arcane561
Copy link
Contributor Author

@geoand Have a nice day. Have you not been able to watch it?

@geoand
Copy link
Contributor

geoand commented Mar 28, 2024

Hi,

I have not, no time.

Since I doubt I will have to look at this any time soon, maybe @mkouba can have a look and decide if we want to go down the route of your PR (which admitedly can't cause any harm other a potentially tiny slowdown)

@Arcane561
Copy link
Contributor Author

Hi,

I have not, no time.

Since I doubt I will have to look at this any time soon, maybe @mkouba can have a look and decide if we want to go down the route of your PR (which admitedly can't cause any harm other a potentially tiny slowdown)

Thank you very much for the answer, sorry to interrupt (

@geoand
Copy link
Contributor

geoand commented Mar 28, 2024

Not a problem :)

@mkouba
Copy link
Contributor

mkouba commented Apr 2, 2024

Hi,

I have not, no time.

Since I doubt I will have to look at this any time soon, maybe @mkouba can have a look and decide if we want to go down the route of your PR (which admitedly can't cause any harm other a potentially tiny slowdown)

Well, given the fact that the list of shutdown tasks can be modified in a recorder method and there's no guarantee that it will be modified on the main thread I think that we should use a thread-safe structure here. I'm not so sure about ConcurrentLinkedDeque though. Collections.synchronizedList(List<T>) might be a more "straightforward" replacement?

@Arcane561
Copy link
Contributor Author

Good day to you mkouba. I used ConcurrentLinkedDeque for several reasons. The first is non-blocking addition and the second is that you don't have to reverse the list when reading handlers. I decided as an experiment to call the service in rest controller . And I found out that the handler is also added not in the main thread, but in threads serving the server (in vertx it is vertx poll threads, and in blocking it is executor).

non blocking rest point:

1

blocking rest point:
2

@Arcane561
Copy link
Contributor Author

@mkouba Have a nice day. Could you tell me who I should work with on this pr now?

@Arcane561
Copy link
Contributor Author

Good day to you @geoand , sorry if I am distracting could you please give feedback on what we will do about the problem...

@geoand
Copy link
Contributor

geoand commented Apr 10, 2024

Let's wait for @mkouba's feedback

@Arcane561
Copy link
Contributor Author

@mkouba Corrected, please check back when you have time.

Copy link
Contributor

@mkouba mkouba left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good now. I've added two minor comments.

@Arcane561
Copy link
Contributor Author

@mkouba Corrected, please check back when you have time.

mkouba
mkouba previously approved these changes May 9, 2024
@mkouba mkouba added the triage/waiting-for-ci Ready to merge when CI successfully finishes label May 9, 2024
@quarkus-bot

This comment has been minimized.

@mkouba
Copy link
Contributor

mkouba commented May 9, 2024

@Arcane561 there are some formatting issues. Could you pls build the core module with formatting on, e.g. with mvn clean install and squash the commits?

@mkouba mkouba dismissed their stale review May 9, 2024 11:42

Pls squash the commits

@gastaldi gastaldi removed the triage/waiting-for-ci Ready to merge when CI successfully finishes label May 9, 2024
@Arcane561 Arcane561 force-pushed the fix-concurrent-adding-shutdown-events branch from fe2b6fc to 4e1e84b Compare May 9, 2024 23:29
@Arcane561
Copy link
Contributor Author

@mkouba Good day to you. It's done!

@mkouba mkouba added the triage/waiting-for-ci Ready to merge when CI successfully finishes label May 10, 2024
@quarkus-bot
Copy link

quarkus-bot bot commented May 10, 2024

Status for workflow Quarkus CI

This is the status report for running Quarkus CI on commit 4e1e84b.

✅ The latest workflow run for the pull request has completed successfully.

It should be safe to merge provided you have a look at the other checks in the summary.

You can consult the Develocity build scans.


Flaky tests - Develocity

⚙️ JVM Tests - JDK 21

📦 extensions/smallrye-reactive-messaging-kafka/deployment

io.quarkus.smallrye.reactivemessaging.kafka.deployment.dev.KafkaDevServicesDevModeTestCase.sseStream - History

  • Assertion condition defined as a Lambda expression in io.quarkus.smallrye.reactivemessaging.kafka.deployment.dev.KafkaDevServicesDevModeTestCase Expecting size of: [] to be greater than or equal to 2 but was 0 within 10 seconds. - org.awaitility.core.ConditionTimeoutException
org.awaitility.core.ConditionTimeoutException: 
Assertion condition defined as a Lambda expression in io.quarkus.smallrye.reactivemessaging.kafka.deployment.dev.KafkaDevServicesDevModeTestCase 
Expecting size of:
  []
to be greater than or equal to 2 but was 0 within 10 seconds.
	at org.awaitility.core.ConditionAwaiter.await(ConditionAwaiter.java:167)
	at org.awaitility.core.AssertionCondition.await(AssertionCondition.java:119)
	at org.awaitility.core.AssertionCondition.await(AssertionCondition.java:31)

@mkouba mkouba merged commit 152a01c into quarkusio:main May 10, 2024
51 checks passed
@quarkus-bot quarkus-bot bot removed the triage/waiting-for-ci Ready to merge when CI successfully finishes label May 10, 2024
@quarkus-bot quarkus-bot bot added this to the 3.11 - main milestone May 10, 2024
@Arcane561
Copy link
Contributor Author

@gsmet Good day to you. Please take a look, I think the fix can be painlessly backported to other releases as well

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants