-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Put backfill in separate thread #88
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You call HistoricalEvents too often for every header received. Why dont you call HistoricalEvents for a longer range to reduce the number of RPCs?
chain/ethereum/subscription.go
Outdated
for bfLog := range backfillLogs { | ||
s.inLogs <- bfLog | ||
go func() { | ||
if backfillLogs, err := HistoricalEvents(ctx, s.client, s.addresses, s.latestBlockNumber.Uint64(), header.Number.Uint64()); err == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Conflicting due to latestBlockNumber is read and written from different goroutines at the same time
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is written in the parent thread
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is written in the parent goroutine and is read in child goroutines. That will cause conflict. You should copy latestBlockNumber to another variable then pass the copy to the child goroutine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is passed immediately to the backfill function as value. it is not shared resource. can you tell me a scenario where they conflict?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it is not passed by value. You just access it by reference inside the child goroutine: s.latestBlockNumber.Uint64()
line 221
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new goroutine is created and it immediately calls HistoricalEvents
which takes in uint64. So you are saying between creating the goroutine and calling HistoricalEvents
, this property will be modified, while the only place modifies this property is in line 235?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, multi-threading is unpredictable, anything can happen. Whenever an object is accessed by multiple goroutines concurrently, it should be protected by a mutex.
It is not every header. It is only triggered after reconnection and when there is a gap. |
I am not sure what headers channel returns but if you want to use mutil-threading, you should control the number of goroutines that will be spawned to avoid having too many goroutines. |
chain/ethereum/subscription.go
Outdated
for bfLog := range backfillLogs { | ||
s.inLogs <- bfLog | ||
go func() { | ||
if backfillLogs, err := HistoricalEvents(ctx, s.client, s.addresses, s.latestBlockNumber.Uint64(), header.Number.Uint64()); err == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dont you handle the error returned by HistoricalEvents?
as the name implies, headers channel returns headers. you can read rest of the code. |
Will headers channel return a list of consecutive headers? If they are consecutive, you should call HistoricalEvents on a range from the first header to the last header to reduce RPC usage. |
please read the existing code and the diff |
What I am suggesting may be out of this PR's scope and can be improved later but can you fix the conflicting with latestBlockNumber? |
if backfillLogs, err := HistoricalEvents(ctx, s.client, s.addresses, s.latestBlockNumber.Uint64(), header.Number.Uint64()); err == nil { | ||
for bfLog := range backfillLogs { | ||
s.inLogs <- bfLog | ||
go func(c context.Context, cl *ethclient.Client, adr []common.Address, fromBl, toBl uint64) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approved but this function can be re-written to improve readability. Only latestBlockNumber will be changed so you don't need to pass other variables to the child goroutine by value as well.
There is another way to pass an object to an anonymous function by value you might not know. It can be written like this:
var v int // A variable that might be changed by other goroutine
func someFunction() {
copy := v // A copy of v
go func () {
// Now value of v can be safely used inside the goroutine without worrying about v being modified
doSomethingWithValueOfV(copy)
}()
}
No description provided.