Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update PubSub and include topic filter #7496

Merged
merged 43 commits into from
Oct 16, 2020
Merged

Conversation

prestonvanloon
Copy link
Member

@prestonvanloon prestonvanloon commented Oct 11, 2020

What type of PR is this?

Other

What does this PR do? Why is it needed?

This PR updates pubsub to the latest release and introduces a pubsub topic filter to improve pubsub peer management.

Which issues(s) does this PR fix?

Other notes for review

The basic idea of the pubsub topic filter is that our peers will announce what topics they are interested in or current subscribed to and we keep this information in memory in the libp2p pubsub implementation. A key benefit to the topic filter is that Prysm doesn't need to keep track of topics it will never subscribe since this in-memory storage of peer's topics is to facilitate topic subscriptions when Prysm does want to subscribe to some topic.

@prestonvanloon prestonvanloon added this to the v1.0.0-beta milestone Oct 12, 2020
Copy link
Member

@nisdas nisdas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Main concerns are that this PR introduces another state initialization watcher, and it is unclear why it is actually needed.

Ex: What we use now.

func (s *Service) awaitStateInitialized() {

beacon-chain/p2p/pubsub_filter_test.go Outdated Show resolved Hide resolved
beacon-chain/p2p/pubsub_filter.go Show resolved Hide resolved
}

// Monitor the state feed notifier for the state initialization event.
func (sf *subscriptionFilter) monitorStateInitialized() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does the filter need its own state initialization watcher ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need genesis time and genesis validator root to determine the fork digest. I'll add a comment.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed in favor of using the existing state init watcher

Resolved issues in the following files via DeepSource Autofix:
1. beacon-chain/p2p/pubsub_filter.go
2. beacon-chain/p2p/pubsub_filter_test.go
@CLAassistant
Copy link

CLAassistant commented Oct 13, 2020

CLA assistant check
All committers have signed the CLA.

@prestonvanloon prestonvanloon marked this pull request as ready for review October 13, 2020 15:04
@prestonvanloon prestonvanloon requested a review from a team as a code owner October 13, 2020 15:04
@prestonvanloon prestonvanloon added the Ready For Review A pull request ready for code review label Oct 13, 2020
@prestonvanloon
Copy link
Member Author

Ready for review, but please do not merge until I can run this for at least an hour at runtime.

Comment on lines +102 to +103
msn.feedLock.Lock()
defer msn.feedLock.Unlock()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this to prevent potential race condition on accessing the state feed mock in test.

Copy link
Member

@nisdas nisdas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We dont subscribe new topics until state is initialized in sync service. So all the extra checks for state initialization are not required.

@@ -69,6 +69,8 @@ func (s *Service) PublishToTopic(ctx context.Context, topic string, data []byte,

// SubscribeToTopic joins (if necessary) and subscribes to PubSub topic.
func (s *Service) SubscribeToTopic(topic string, opts ...pubsub.SubOpt) (*pubsub.Subscription, error) {
s.awaitStateInitialized() // Genesis time and genesis validator root are required to subscribe.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't the correct place, this will end up blocking subscription of new topics.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/prysmaticlabs/prysm/blob/master/beacon-chain/sync/service.go#L267

This will only call it after we receive the state initialized and synced events.

https://github.com/prysmaticlabs/prysm/blob/master/beacon-chain/p2p/service.go#L377

This will end up being blocked here as no more state initialized events are emmited now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This solves the race condition and yes it should block subscription of new topics until the state is initialized.

Note: If awaitStateInitialized() is called and the state is already initialized then it returns immediately.

beacon-chain/p2p/pubsub_filter.go Outdated Show resolved Hide resolved
beacon-chain/p2p/pubsub_filter.go Outdated Show resolved Hide resolved
beacon-chain/p2p/service.go Show resolved Hide resolved
@prestonvanloon
Copy link
Member Author

Thanks @nisdas, I will refactor the pubsub filter to be a pointer receiver of the p2p service such that it can share the existing state initialization subscriber.

Discussed offline that we should still keep this subscription check to awaitStateInitialization since subscribe will indiscriminately fail if the p2p service isn't aware of the genesis time or genesis validator root for fork digest.

Link to discussion on discord: https://discordapp.com/channels/476244492043812875/483017808658169866/765763343029895178

}
if parts[2] != fmt.Sprintf("%x", fd) {
return false
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe a small comment on why we skip parts[3]

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

nisdas
nisdas previously approved these changes Oct 15, 2020
Copy link
Member

@nisdas nisdas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushed a fix for the test , reapproving again

@prylabs-bulldozer prylabs-bulldozer bot merged commit 07e7e03 into master Oct 16, 2020
@delete-merged-branch delete-merged-branch bot deleted the update-pubsub branch October 16, 2020 07:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Ready For Review A pull request ready for code review
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants