Skip to content
This repository has been archived by the owner on Oct 8, 2020. It is now read-only.

Added timeouts for channels #29

Merged
merged 2 commits into from
Feb 10, 2020
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 79 additions & 50 deletions services/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ var propagateShuffleToChildren = "PropShuffleToChildren"
type Service struct {
*onet.ServiceProcessor

Timeout time.Duration

shuffleGetData protocols.PropagationFunc
shufflePutData protocols.PropagationFunc

Expand All @@ -48,6 +50,7 @@ type Service struct {
// NewService constructor which registers the needed messages.
func NewService(c *onet.Context) (onet.Service, error) {
newUnLynxInstance := &Service{
Timeout: 20 * time.Minute,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This timeout should be configurable. E.g. using the cli library, that way we will be able to pass this through the env for the docker image.

ServiceProcessor: onet.NewServiceProcessor(c),
MapSurveyKS: concurrent.NewConcurrentMap(),
MapSurveyShuffle: concurrent.NewConcurrentMap(),
Expand Down Expand Up @@ -300,31 +303,35 @@ func (s *Service) HandleSurveyShuffleRequest(ssr *SurveyShuffleRequest) (network
ssr.MessageSource = s.ServerIdentity()

// wait for root to be ready to send the local aggregate result
<-surveyShuffle.SurveyChannel
select {
case <-surveyShuffle.SurveyChannel:
// update the local survey with the shuffled results
surveyShuffle, err = s.getSurveyShuffle(ssr.SurveyID)
if err != nil {
return nil, xerrors.Errorf("%+v", err)
}

// update the local survey with the shuffled results
surveyShuffle, err = s.getSurveyShuffle(ssr.SurveyID)
if err != nil {
return nil, xerrors.Errorf("%+v", err)
}
// key switch the results
keySwitchingResult, execTime, communicationTime, err := s.KeySwitchingPhase(ssr.SurveyID, ShuffleRequestName, &ssr.Roster)
if err != nil {
return nil, xerrors.Errorf("key switching error: %+v", err)
}

// key switch the results
keySwitchingResult, execTime, communicationTime, err := s.KeySwitchingPhase(ssr.SurveyID, ShuffleRequestName, &ssr.Roster)
if err != nil {
return nil, xerrors.Errorf("key switching error: %+v", err)
}
surveyShuffle.TR.MapTR[KSTimeExec] = execTime
surveyShuffle.TR.MapTR[KSTimeCommunication] = communicationTime

surveyShuffle.TR.MapTR[KSTimeExec] = execTime
surveyShuffle.TR.MapTR[KSTimeCommunication] = communicationTime
// get server index
index, _ := ssr.Roster.Search(s.ServerIdentity().ID)
if index < 0 {
return nil, xerrors.New("couldn't find this node in the roster")
}

// get server index
index, _ := ssr.Roster.Search(s.ServerIdentity().ID)
if index < 0 {
return nil, xerrors.New("couldn't find this node in the roster")
}
return &Result{Result: libunlynx.CipherVector{keySwitchingResult[index]},
TR: surveyShuffle.TR}, nil

return &Result{Result: libunlynx.CipherVector{keySwitchingResult[index]},
TR: surveyShuffle.TR}, nil
case <-time.After(s.Timeout):
return nil, xerrors.Errorf(s.ServerIdentity().String(), "didn't get a reply from the nodes in time.")
}
}

// HandleSurveyAggRequest handles the reception of the aggregate local result to be shared/shuffled/switched
Expand Down Expand Up @@ -592,19 +599,28 @@ func (s *Service) NewProtocol(tn *onet.TreeNodeInstance,
})
surveyShuffleChan := make(chan SurveyShuffle)
prop.RegisterOnDataToRoot(func() network.Message {
ss := <-surveyShuffleChan
return &ss.Request
select {
case ss := <-surveyShuffleChan:
return &ss.Request
case <-time.After(s.Timeout):
return errors.New(s.ServerIdentity().String() + "didn't get the data from the nodes in time.")
}
})
go func() {
surveyID := <-surveyIDChan
for {
surveyShuffle, err := s.getSurveyShuffle(surveyID)
if err != nil {
time.Sleep(100 * time.Millisecond)
} else {
surveyShuffleChan <- surveyShuffle
break
select {
case surveyID := <-surveyIDChan:
for {
surveyShuffle, err := s.getSurveyShuffle(surveyID)
if err != nil {
time.Sleep(100 * time.Millisecond)
} else {
surveyShuffleChan <- surveyShuffle
break
}
}
case <-time.After(s.Timeout):
log.Error(s.ServerIdentity().String() + "didn't get the survey notification in time.")
return
}
}()

Expand All @@ -631,6 +647,7 @@ func (s *Service) NewProtocol(tn *onet.TreeNodeInstance,
return xerrors.Errorf(
"couldn't store new surveyShuffle: %+v", err)
}

surveyShuffle.SurveyChannel <- 1
return nil
})
Expand Down Expand Up @@ -700,10 +717,13 @@ func (s *Service) TaggingPhase(targetSurvey *SurveyDDTRequest,
if err != nil {
return nil, 0, 0, fmt.Errorf("couldn't start protocol: %+v", err)
}
deterministicTaggingResult := <-pi.(*protocolsunlynx.DeterministicTaggingProtocol).FeedbackChannel

execTime := pi.(*protocolsunlynx.DeterministicTaggingProtocol).ExecTime
return deterministicTaggingResult, execTime, time.Since(start) - execTime, nil
select {
case deterministicTaggingResult := <-pi.(*protocolsunlynx.DeterministicTaggingProtocol).FeedbackChannel:
execTime := pi.(*protocolsunlynx.DeterministicTaggingProtocol).ExecTime
return deterministicTaggingResult, execTime, time.Since(start) - execTime, nil
case <-time.After(s.Timeout):
return nil, 0, 0, fmt.Errorf("couldn't finish tagging protocol in time")
}
}

// CollectiveAggregationPhase performs a collective aggregation between the participating nodes
Expand All @@ -714,15 +734,18 @@ func (s *Service) CollectiveAggregationPhase(targetSurvey SurveyID, roster *onet
if err != nil {
return libunlynx.CipherText{}, 0, err
}
aggregationResult := <-pi.(*protocolsunlynx.CollectiveAggregationProtocol).FeedbackChannel

// in the resulting map there is only one element
var finalResult libunlynx.CipherText
for _, v := range aggregationResult.GroupedData {
finalResult = v.AggregatingAttributes[0]
break
select {
case aggregationResult := <-pi.(*protocolsunlynx.CollectiveAggregationProtocol).FeedbackChannel:
// in the resulting map there is only one element
var finalResult libunlynx.CipherText
for _, v := range aggregationResult.GroupedData {
finalResult = v.AggregatingAttributes[0]
break
}
return finalResult, time.Since(start), nil
case <-time.After(s.Timeout):
return libunlynx.CipherText{}, 0, fmt.Errorf("couldn't finish collective aggregation protocol in time")
}
return finalResult, time.Since(start), nil
}

// ShufflingPhase performs the shuffling aggregated results from each of the nodes
Expand All @@ -733,10 +756,13 @@ func (s *Service) ShufflingPhase(targetSurvey SurveyID, roster *onet.Roster) ([]
if err != nil {
return nil, 0, 0, err
}
shufflingResult := <-pi.(*protocolsunlynx.ShufflingProtocol).FeedbackChannel

execTime := pi.(*protocolsunlynx.ShufflingProtocol).ExecTime
return shufflingResult, execTime, time.Since(start) - execTime, nil
select {
case shufflingResult := <-pi.(*protocolsunlynx.ShufflingProtocol).FeedbackChannel:
execTime := pi.(*protocolsunlynx.ShufflingProtocol).ExecTime
return shufflingResult, execTime, time.Since(start) - execTime, nil
case <-time.After(s.Timeout):
return nil, 0, 0, fmt.Errorf("couldn't finish shuffling protocol in time")
}
}

// KeySwitchingPhase performs the switch to the querier key on the currently aggregated data.
Expand All @@ -747,10 +773,13 @@ func (s *Service) KeySwitchingPhase(targetSurvey SurveyID, typeQ string, roster
if err != nil {
return nil, 0, 0, err
}
keySwitchedAggregatedResponses := <-pi.(*protocolsunlynx.KeySwitchingProtocol).FeedbackChannel

execTime := pi.(*protocolsunlynx.KeySwitchingProtocol).ExecTime
return keySwitchedAggregatedResponses, execTime, time.Since(start) - execTime, nil
select {
case keySwitchedAggregatedResponses := <-pi.(*protocolsunlynx.KeySwitchingProtocol).FeedbackChannel:
execTime := pi.(*protocolsunlynx.KeySwitchingProtocol).ExecTime
return keySwitchedAggregatedResponses, execTime, time.Since(start) - execTime, nil
case <-time.After(s.Timeout):
return nil, 0, 0, fmt.Errorf("couldn't finish key switching protocol in time")
}
}

// Support functions
Expand Down