Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Employ mastership info for session management #1189

Merged
merged 23 commits into from
Sep 11, 2020

Conversation

adibrastegarnia
Copy link
Contributor

@adibrastegarnia adibrastegarnia commented Aug 31, 2020

This PR is making the required changes in the SB of the onos-config to employee mastership information for session management. It is a WIP and the code still needs more work and I will update this PR with more details about the changes.

#960 #843 #962

P.S. there are some log messages for debugging but I will remove them at the end.

@adibrastegarnia adibrastegarnia added enhancement New feature or request WIP Work in progress do not merge ⚠️ Do not Merge, used for testing or not ready labels Aug 31, 2020
return err
}
case topodevice.ListResponse_UPDATED:
log.Info("Process device event updated")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kuujo
In the current code, we check the address and if there is a change, we reconnect but if we change any other fields, we also need to update the state of a device. Should we create a new session anyway?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think either way is fine.

* Check device is added as a synchronizer correctly, times out on no gRPC device
* and then un-does everything
*/
func TestSessionManager(t *testing.T) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a new unit test here

@adibrastegarnia adibrastegarnia force-pushed the mastership_sb branch 2 times, most recently from 37887ee to e4d07ab Compare September 1, 2020 23:46
@@ -65,6 +66,7 @@ func (s *TestSuite) SetupTestSuite() error {
err = helm.Chart("onos-config", onostest.OnosChartRepo).
Release("onos-config").
Set("image.tag", "latest").
Set("replicaCount", 2).
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will change it later.

Copy link
Contributor

@kuujo kuujo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks really good. We just have to make sure the device state will be retried until it’s either successful or it’s superseded by a later master. Optimistic locking requires retries to function correctly. Also, if we’re going to retry device updates, we have to ensure multiple concurrent state changes to the same device occur in the correct order. If we just start a new goroutine for each update then they could be performed in any order. Perhaps in addition to the term we should store a local logical timestamp in the device attributes.

pkg/southbound/synchronizer/deviceUpdate.go Outdated Show resolved Hide resolved
return err
}
case topodevice.ListResponse_UPDATED:
log.Info("Process device event updated")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think either way is fine.

@kuujo
Copy link
Contributor

kuujo commented Sep 2, 2020

Either that or we can serialize updates to each device state on a separate channel.

}

// NewSessionManager create a new session manager
func NewSessionManager(options ...func(*SessionManager)) (*SessionManager, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I reviewed something like this last week - maybe this is the same PR - you should not leave it go too long with out merging, or you might have conflicts

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SeanCondon
Thanks for the review. This is the PR that I opened 2 days ago and was waiting to get some feedback from you and @kuujo before finalizing it. So I got some feedback now and I will fix them and after a bit testing we merge. I will add new unit tests later based on new changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SeanCondon
The PR that you reviewed was a prerequisite to this one and we merged that one.

operationalStateCacheLock: sm.operationalStateCacheLock,
deviceChangeStore: sm.deviceChangeStore,
device: device,
target: sm.newTargetFn(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this calls the function, rather than storing the function name. Remove the brackets

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to avoid any code changes which are not related to session management thing but these are minor things and I will improve them in the same PR.

session.device.Attributes = make(map[string]string)
}

err := session.open()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the mutex is locked at this stage. Is it necessary - it could block everything up

modelRegistry *modelregistry.ModelRegistry
sessions map[topodevice.ID]*Session
operationalStateCache map[topodevice.ID]devicechange.TypedValueMap
newTargetFn func() southbound.TargetIf
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need func() here - just the function name should be fine


session := sm.sessions[id]
if session != nil {
currentTerm, err := session.getCurrentTerm()
Copy link
Contributor

@kuujo kuujo Sep 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though we’re getting the device’s current term from the topology and checking it against the local mastership term, this particular implementation doesn’t guarantee the update cannot be overwritten by an older master. In order to prevent that, the read-modify-write sequence comparing the stored term with the local term and then updating the device must be atomic, and in order for it to be atomic it must operate on the same revision of the device. When a device is read from the topology service, the device is returned with metadata that includes a revision number. When a device is updated, the topology service compares the device’s revision number with the stored revision number and rejects the update if the revisions don’t match. If an update is rejected, the client attempts the atomic read-modify-write sequence again.

But the code here is actually reading the device twice: once to retrieve the term, and once to update the device, so the sequence is not atomic. The term can change between the first and second read, and that would allow an older master to overwrite an update from a newer master. For example:

  • Node A reads the device with revision 1 and verifies the stored term matches its local term
  • Node B is elected master for a later term and updates the device state, incrementing it’s revision number to 2
  • Node A proceeds to read the device again — this time with revision 2 — and updates it without checking the term. The update is successful since the device was read again, so it overwrites the update from new master node B

In this scenario, if node A’s update is attempted using the same object it read in the first step (revision 1), it would be rejected. Upon retrying, node A would read revision 2 and recognize the later term indicating it’s no longer the master.

Both the condition checking the device’s stored term and the write updating the device state must occur on the same device object returned from the topology service.


go sm.processDeviceEvents(sm.topoChannel)
go func() {
err := sm.updateDeviceState()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should create a separate event channel and goroutine for each session to allow session state changes to be notified and the topology to be updated concurrently. A new goroutine to process session state changes could be created in the session manager or in the session itself. We only need to maintain the order of updates within each session. Sharing a channel across sessions can only block the processing of session updates unnecessarily. A per-session loop could also simplify the algorithm for updating the device state.

protocolState.ChannelState = channel
protocolState.ServiceState = service
topoDevice.Protocols = append(topoDevice.Protocols, protocolState)
mastershipState, err := s.mastershipStore.GetMastership(topoDevice.ID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To ensure the update is atomic we just need to compare the mastership state to the stored device term after this.

@kuujo
Copy link
Contributor

kuujo commented Sep 4, 2020

The file naming convention in Go is to use lower_case_and_underscores.go. We're mixing this with lowerCamelCase.go here and in other areas of onos-config. We should avoid doing that here and fix the places where it's already done in another PR.

@adibrastegarnia adibrastegarnia marked this pull request as draft September 4, 2020 15:33
@adibrastegarnia adibrastegarnia added this to the Aether 2020Q3 milestone Sep 4, 2020
@adibrastegarnia adibrastegarnia force-pushed the mastership_sb branch 4 times, most recently from e2e6169 to c24f3f7 Compare September 5, 2020 06:09
@adibrastegarnia adibrastegarnia marked this pull request as ready for review September 5, 2020 06:15

// Do not update the state of a device if the node encounters a mastership term greater than its own
if uint64(mastershipState.Term) < uint64(currentTerm) {
return errors.New("device mastership term is greater than node mastership term")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a higher mastership term is discovered, this node needs to stop attempting this update. But since this function is being called by the backoff library, this error will just cause it to retry and create a deadlock since the update will always fail after this condition is met. The error returned has to be backoff.NewPermanent or whatever the error was that stops the retries.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ooops. You are right. We should not return the error here.

protocolState.ChannelState = channel
protocolState.ServiceState = service
topoDevice.Protocols = append(topoDevice.Protocols, protocolState)
mastershipState, err := s.mastershipStore.GetMastership(topoDevice.ID)
Copy link
Contributor

@kuujo kuujo Sep 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This mastership state doesn’t necessarily represent the state on which this update should be based. Since we’re reading from the mastership store on every update attempt, it’s possible for this node to read a different (newer) mastership state (e.g. for a new master/term) than this update is based on. Who knows if this node is even the master any more. Rather than reading from the store here, this function should take as an argument the mastership state on which the update is based so that every attempt to perform the update is based on that state. Reading stores multiple times can alway introduce this sort of consistency issue. The only reason we should read the store here is to determine whether the state on which the update is based should be cancelled due to a new master/higher term, but that’s not strictly necessary, it would just be an optimization since that will be determined when the device is updated by the new master anyways.

s.mu.Lock()
s.connected = false
s.mu.Unlock()
state, _ := s.mastershipStore.GetMastership(s.device.ID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The session should be created with a mastership state. It shouldn't need to read the mastership state at all. That creates another race where the mastership could change between the time the session was created and the time it was opened. Just remove the mastershipStore field.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kuujo
Done. I hope I addressed it properly.

@adibrastegarnia adibrastegarnia removed the do not merge ⚠️ Do not Merge, used for testing or not ready label Sep 11, 2020
@adibrastegarnia adibrastegarnia merged commit aed547f into onosproject:master Sep 11, 2020
@adibrastegarnia adibrastegarnia deleted the mastership_sb branch September 11, 2020 22:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Development

Successfully merging this pull request may close these issues.

None yet

3 participants