New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple threads concurrently execute SubscribeAsync function #144

Closed
guessmyname opened this Issue Nov 11, 2016 · 10 comments

Comments

Projects
None yet
3 participants
@guessmyname

guessmyname commented Nov 11, 2016

When I publish multiple messages to a queue I notice that the function passed to SubscribeAsync is being called by multiple threads, sometimes concurrently. This behavior is different than when using the RabbitMQ client. Is this by design?

@pardahlman

This comment has been minimized.

Show comment
Hide comment
@pardahlman

pardahlman Nov 11, 2016

Owner

@guessmyname - yes. RawRabbit is designed to handle incoming messages as fast as possible, which is why new tasks are created for each received message. In general, the entire RawRabbit API is async, which can cause execution to occur on multiple threads throughout the application.

Owner

pardahlman commented Nov 11, 2016

@guessmyname - yes. RawRabbit is designed to handle incoming messages as fast as possible, which is why new tasks are created for each received message. In general, the entire RawRabbit API is async, which can cause execution to occur on multiple threads throughout the application.

@guessmyname

This comment has been minimized.

Show comment
Hide comment
@guessmyname

guessmyname Nov 11, 2016

Would it be possible to make that configurable? I have a use case where I
need to process serially.

On Nov 11, 2016 4:39 PM, "pardahlman" notifications@github.com wrote:

@guessmyname https://github.com/guessmyname - yes. RawRabbit is
designed to handle incoming messages as fast as possible, which is why new
tasks are created for each received message. In general, the entire
RawRabbit API is async, which can cause execution to occur on multiple
threads throughout the application.


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
#144 (comment),
or mute the thread
https://github.com/notifications/unsubscribe-auth/AFieLyuhs8dBP9BtHmwxqhl0BrIFnkeYks5q9OB6gaJpZM4KwI6P
.

guessmyname commented Nov 11, 2016

Would it be possible to make that configurable? I have a use case where I
need to process serially.

On Nov 11, 2016 4:39 PM, "pardahlman" notifications@github.com wrote:

@guessmyname https://github.com/guessmyname - yes. RawRabbit is
designed to handle incoming messages as fast as possible, which is why new
tasks are created for each received message. In general, the entire
RawRabbit API is async, which can cause execution to occur on multiple
threads throughout the application.


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
#144 (comment),
or mute the thread
https://github.com/notifications/unsubscribe-auth/AFieLyuhs8dBP9BtHmwxqhl0BrIFnkeYks5q9OB6gaJpZM4KwI6P
.

@pardahlman

This comment has been minimized.

Show comment
Hide comment
@pardahlman

pardahlman Nov 12, 2016

Owner

It is not something that I plan to add as a configuration option. What is your scenario? If you want to act on multiple messages in sequence, perhaps the BulkGet extension is something for you?

If this does not suit you, I would suggest that you create a custom implementation of IConsumerFactory that does not start new tasks for each received message and then register it in the IoC when creating your bus client instance.

Hope this helps!

Owner

pardahlman commented Nov 12, 2016

It is not something that I plan to add as a configuration option. What is your scenario? If you want to act on multiple messages in sequence, perhaps the BulkGet extension is something for you?

If this does not suit you, I would suggest that you create a custom implementation of IConsumerFactory that does not start new tasks for each received message and then register it in the IoC when creating your bus client instance.

Hope this helps!

@guessmyname

This comment has been minimized.

Show comment
Hide comment
@guessmyname

guessmyname Nov 12, 2016

Yes, that helps. Thanks for the suggestion.

On Nov 12, 2016 8:28 AM, "pardahlman" notifications@github.com wrote:

It is not something that I plan to add as a configuration option. What is
your scenario? If you want to act on multiple messages in sequence, perhaps
the BulkGet extension
http://rawrabbit.readthedocs.io/en/master/Bulk-fetching-messages.html
is something for you?

If this does not suit you, I would suggest that you create a custom
implementation of IConsumerFactory that does not start new tasks for each
received message
https://github.com/pardahlman/RawRabbit/blob/stable/src/RawRabbit/Consumer/Eventing/EventingBasicConsumerFactory.cs#L30
and then register it in the IoC when creating your bus client instance
https://github.com/pardahlman/RawRabbit#dependecy-injection.

Hope this helps!


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
#144 (comment),
or mute the thread
https://github.com/notifications/unsubscribe-auth/AFieL_F9OlgKX4_RFpz1ttFe3uy-kS3Yks5q9b8FgaJpZM4KwI6P
.

guessmyname commented Nov 12, 2016

Yes, that helps. Thanks for the suggestion.

On Nov 12, 2016 8:28 AM, "pardahlman" notifications@github.com wrote:

It is not something that I plan to add as a configuration option. What is
your scenario? If you want to act on multiple messages in sequence, perhaps
the BulkGet extension
http://rawrabbit.readthedocs.io/en/master/Bulk-fetching-messages.html
is something for you?

If this does not suit you, I would suggest that you create a custom
implementation of IConsumerFactory that does not start new tasks for each
received message
https://github.com/pardahlman/RawRabbit/blob/stable/src/RawRabbit/Consumer/Eventing/EventingBasicConsumerFactory.cs#L30
and then register it in the IoC when creating your bus client instance
https://github.com/pardahlman/RawRabbit#dependecy-injection.

Hope this helps!


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
#144 (comment),
or mute the thread
https://github.com/notifications/unsubscribe-auth/AFieL_F9OlgKX4_RFpz1ttFe3uy-kS3Yks5q9b8FgaJpZM4KwI6P
.

@pardahlman

This comment has been minimized.

Show comment
Hide comment
@pardahlman

pardahlman Nov 12, 2016

Owner

No problems -- good luck!

Owner

pardahlman commented Nov 12, 2016

No problems -- good luck!

@LDonato

This comment has been minimized.

Show comment
Hide comment
@LDonato

LDonato Jan 22, 2018

I had a similar problem and I managed that by using a Semaphore in the SubscribeAsync ... is that bad?

 databus.SubscribeAsync<List<Data>> ( 
                            //this.cache is accessed and modified by the callbacks in different threads
                            //So I must ensure that data received are processed in series to avoid conflicts
                            async (message) => {
                                await semaphoreSlim.WaitAsync();
                                try
                                {
                                    await DataReceived(message);
                                }
                                finally
                                {
                                    semaphoreSlim.Release();
                                }
                            },
                            ctx => ctx.UseSubscribeConfiguration(
                                cfg => cfg.OnDeclaredExchange(exchange => exchange.WithName(this.Name))
                                        .FromDeclaredQueue(queue => queue.WithName(this.Name))
                            ) 
            );

LDonato commented Jan 22, 2018

I had a similar problem and I managed that by using a Semaphore in the SubscribeAsync ... is that bad?

 databus.SubscribeAsync<List<Data>> ( 
                            //this.cache is accessed and modified by the callbacks in different threads
                            //So I must ensure that data received are processed in series to avoid conflicts
                            async (message) => {
                                await semaphoreSlim.WaitAsync();
                                try
                                {
                                    await DataReceived(message);
                                }
                                finally
                                {
                                    semaphoreSlim.Release();
                                }
                            },
                            ctx => ctx.UseSubscribeConfiguration(
                                cfg => cfg.OnDeclaredExchange(exchange => exchange.WithName(this.Name))
                                        .FromDeclaredQueue(queue => queue.WithName(this.Name))
                            ) 
            );
@pardahlman

This comment has been minimized.

Show comment
Hide comment
@pardahlman

pardahlman Jan 23, 2018

Owner

Hello @LDonato - you should be fine with the semaphore (or perhaps even TPL), even though performance wise I think it would make more sense to register a custom IConsumerFactory (given that you are on 1.x).

Owner

pardahlman commented Jan 23, 2018

Hello @LDonato - you should be fine with the semaphore (or perhaps even TPL), even though performance wise I think it would make more sense to register a custom IConsumerFactory (given that you are on 1.x).

@LDonato

This comment has been minimized.

Show comment
Hide comment
@LDonato

LDonato Jan 23, 2018

Thanks for your reply!
Actually that code is from after I migrated 2.0-rc2... am I using it wrong?

LDonato commented Jan 23, 2018

Thanks for your reply!
Actually that code is from after I migrated 2.0-rc2... am I using it wrong?

@pardahlman

This comment has been minimized.

Show comment
Hide comment
@pardahlman

pardahlman Jan 23, 2018

Owner

If you're on 2.x, you could use the UseConsumerConcurrency extension of the subscribe context, see this test suite for more information on usage.

Owner

pardahlman commented Jan 23, 2018

If you're on 2.x, you could use the UseConsumerConcurrency extension of the subscribe context, see this test suite for more information on usage.

@LDonato

This comment has been minimized.

Show comment
Hide comment
@LDonato

LDonato Jan 23, 2018

Thanks for your kind help!

LDonato commented Jan 23, 2018

Thanks for your kind help!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment