Skip to content
Permalink
Browse files

apiserver: fix hanging stream updates

  • Loading branch information...
birdayz committed May 1, 2019
1 parent 98246ea commit e13b5d0a75b2166b0224596d038d645e002e0439
@@ -64,7 +64,7 @@ func (s *shadowAPI) StreamReportedStateChanges(request *shadowpb.StreamReportedS
return status.Error(codes.Unauthenticated, "Unauthenticated")
}

resp, err := s.accountClient.IsAuthorized(context.TODO(), &nodepb.IsAuthorizedRequest{
resp, err := s.accountClient.IsAuthorized(srv.Context(), &nodepb.IsAuthorizedRequest{
Node: request.Id,
Account: account,
Action: nodepb.Action_READ,
@@ -76,7 +76,7 @@ func (s *shadowAPI) StreamReportedStateChanges(request *shadowpb.StreamReportedS
return status.Error(codes.PermissionDenied, "Permission denied")
}

c, err := s.client.StreamReportedStateChanges(context.Background(), request)
c, err := s.client.StreamReportedStateChanges(srv.Context(), request)
if err != nil {
return err
}
@@ -108,7 +108,6 @@ func (s *Server) PatchDesiredState(context context.Context, req *shadowpb.PatchD
}

func (s *Server) StreamReportedStateChanges(request *shadowpb.StreamReportedStateChangesRequest, srv shadowpb.Shadows_StreamReportedStateChangesServer) (err error) {
fmt.Println("only", request.OnlyDelta)
// TODO validate request/Id

var subPathReported string
@@ -118,8 +117,22 @@ func (s *Server) StreamReportedStateChanges(request *shadowpb.StreamReportedStat
subPathReported = "/reported/full"
}

events := s.PubSub.Sub(request.Id + subPathReported)
defer s.PubSub.Unsub(events)
topicEvents := request.Id + subPathReported
events := s.PubSub.Sub(topicEvents)
defer func() {
fmt.Println("Dferer")
go func() {
s.PubSub.Unsub(events)
}()

// Drain

for range events {

}

fmt.Println("Drained channel")
}()

var subPathDesired string
if request.OnlyDelta {
@@ -128,11 +141,26 @@ func (s *Server) StreamReportedStateChanges(request *shadowpb.StreamReportedStat
subPathDesired = "/desired/full"
}

eventsDesired := s.PubSub.Sub(request.Id + subPathDesired)
defer s.PubSub.Unsub(eventsDesired)
topicEventsDesired := request.Id + subPathDesired
eventsDesired := s.PubSub.Sub(topicEventsDesired)
defer func() {
fmt.Println("defer2")
go func() {
s.PubSub.Unsub(eventsDesired)
}()

// Drain

for range eventsDesired {

}

fmt.Println("Drained channel")
}()
outer:
for {

fmt.Println("Vor select")
select {
case reportedEvent := <-events:
value, err := toProto(reportedEvent)
@@ -162,6 +190,10 @@ outer:
fmt.Println(err)
break outer
}
case <-srv.Context().Done():
fmt.Println("DONE")
break outer

}

}

Some generated files are not rendered by default. Learn more.

Some generated files are not rendered by default. Learn more.

Some generated files are not rendered by default. Learn more.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

0 comments on commit e13b5d0

Please sign in to comment.
You can’t perform that action at this time.