Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MF-29 - goroutine in some cases never process channel messages #30

Merged
merged 2 commits into from
Nov 14, 2020

Conversation

pricelessrabbit
Copy link
Contributor

@pricelessrabbit pricelessrabbit commented Nov 10, 2020

Fixed #29 :
reference type cause the route struct channel to be overwritten by the goroutine that subscribe the routes.
The for := range channel reference is changed if not already used and consumer got stuck

Changed the Consume func receiver to value receiver. In this way each worker has owns a copy of route data

Signed-off-by: PricelessRabbit PricelessRabbit@gmail.com

…er goroutine

Signed-off-by: PricelessRabbit <PricelessRabbit@gmail.com>
@manuio manuio changed the title 29 - goroutine in some cases never process channel messages MF-29 - goroutine in some cases never process channel messages Nov 10, 2020
@mteodor
Copy link
Contributor

mteodor commented Nov 10, 2020

@pricelessrabbit can you point me where exactly you put the sleep() and it worked
you say that route.Messages gets overwritten?

@pricelessrabbit
Copy link
Contributor Author

@pricelessrabbit can you point me where exactly you put the sleep() and it worked
you say that route.Messages gets overwritten?

@mteodor
In this point there is a for loop that use local var r
https://github.com/mainflux/export/blob/cdbc6f22b1890ae32da86eeccbe962b17ac741de/pkg/export/service.go#L116-L126

but the r.Consume create a pointer reference to r.
https://github.com/mainflux/export/blob/cdbc6f22b1890ae32da86eeccbe962b17ac741de/pkg/export/route.go#L86-L88

so then when the loop in the main goroutine continues, it updates r fields with the next route data. But the worker has a pointer to r and not a copy, so all the r fields are updated also in the worker goroutine (also the channel r.Messages).

However, seems (but i'm not shure of that) that if the channel is already filled when the for msg := range line is executed (so there is no "wait" in the first iteration, that the range "keeps" an internal reference of the channel, also if then the channel reference changes, so in some cases (tried with a sleep) the channel is filled before the initialization of the worker and things work as expected also in the bugged implementation

@mteodor
Copy link
Contributor

mteodor commented Nov 10, 2020

i think that change should be like
https://github.com/mainflux/export/blob/master/pkg/export/service.go#L42
here map[string]*Route
and here

func NewRoute(rc config.Route, log logger.Logger, pub messages.Publisher) *Route {
	w := rc.Workers
	if w == 0 {
		w = workers
	}
	r := Route{
		NatsTopic: rc.NatsTopic + "." + NatsAll,
		MqttTopic: rc.MqttTopic,
		Subtopic:  rc.SubTopic,
		Type:      rc.Type,
		Workers:   w,
		Messages:  make(chan *nats.Msg, w),
		logger:    log,
		pub:       pub,
	}
	return &r
}

@pricelessrabbit
Copy link
Contributor Author

i think that change should be like
https://github.com/mainflux/export/blob/master/pkg/export/service.go#L42
here map[string]*Route
and here

func NewRoute(rc config.Route, log logger.Logger, pub messages.Publisher) *Route {
	w := rc.Workers
	if w == 0 {
		w = workers
	}
	r := Route{
		NatsTopic: rc.NatsTopic + "." + NatsAll,
		MqttTopic: rc.MqttTopic,
		Subtopic:  rc.SubTopic,
		Type:      rc.Type,
		Workers:   w,
		Messages:  make(chan *nats.Msg, w),
		logger:    log,
		pub:       pub,
	}
	return &r
}

this morning i evaluate and try also in that way. It works, but multiple workers goroutines gets the same route reference, and share the state. This is not a good thing imho because if the route struct change in some manner, all the workers will be affected and this can lead to unexpected behaviours.

@mteodor
Copy link
Contributor

mteodor commented Nov 10, 2020

that is the idea, there should be only one instance of each route, and it should not change during the runtime, workers should only process messages and read the route info to know where to send

@pricelessrabbit
Copy link
Contributor Author

yep it is for that very reason that i think that provide every worker with an immutable value-copy of the route is the most solid solution to avoid possible accidentally modification of data, but if you are ok with the shared state i refactor the fix in that way and update the PR

@mteodor
Copy link
Contributor

mteodor commented Nov 10, 2020

@pricelessrabbit yes, please do so, and I'll rethink this what you pointed out
and I want to thank you so much

Signed-off-by: PricelessRabbit <PricelessRabbit@gmail.com>
Copy link

@manuio manuio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@drasko drasko merged commit c462df2 into absmach:master Nov 14, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Consume goroutine in some cases never process channel messages
4 participants