-
Notifications
You must be signed in to change notification settings - Fork 714
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
server push schedule commands. #669
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the rest LGTM! this approach is more clear than I do.
server/grpc_service.go
Outdated
@@ -290,27 +295,13 @@ func (s *Server) RegionHeartbeat(server pdpb.PD_RegionHeartbeatServer) error { | |||
continue | |||
} | |||
|
|||
var resp *pdpb.RegionHeartbeatResponse | |||
resp, err = cluster.handleRegionHeartbeat(region) | |||
err = cluster.handleRegionHeartbeat(region) | |||
if err != nil { | |||
msg := errors.Trace(err).Error() | |||
err = sendErrorRegionHeartbeatResponse(server, s.clusterID, pdpb.ErrorType_UNKNOWN, msg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should send error by hbstreams in here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's ok, server can be accessed by both here and there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
function name is too long.
Btw, do we need to close the stream in the heartbeat loop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think no need to drop the stream. The stream itself is in good condition, only some requests are not able to respond.
for { | ||
select { | ||
case update := <-s.streamCh: | ||
s.streams[update.storeID] = update.stream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to close old one if have?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to. ServerStream
does not have Close
function, it will close automatically after eof.
server/coordinator.go
Outdated
delete(s.streams, storeID) | ||
} | ||
} else { | ||
log.Debugf("heartbeat stream not found for store %v, skip send message", storeID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should add a metric for it.
server/coordinator.go
Outdated
} else { | ||
log.Debugf("heartbeat stream not found for store %v, skip send message", storeID) | ||
} | ||
case <-s.ctx.Done(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we should use a sub context to avoid using the same context in different goroutines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay.
PTAL @overvenus |
|
||
if region := c.cluster.getRegion(op.GetRegionID()); region != nil { | ||
if msg, _ := op.Do(region); msg != nil { | ||
c.hbStreams.sendMsg(region, msg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is better to put sendMsg
in dispatch
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When a scheduler calls addOperator, I want the operator be sent to tikv ASAP.
PTAL @huachaohuang |
# Conflicts: # server/cluster_worker_test.go
server/grpc_service.go
Outdated
return errors.Trace(err) | ||
} | ||
storeID := request.GetLeader().GetStoreId() | ||
cluster.coordinator.hbStreams.bindStream(storeID, server) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it safe to send with server
concurrently?
Seems server
will still be used in this function.
PTAL @huachaohuang |
server/grpc_service.go
Outdated
// RegionHeartbeat implements gRPC PDServer. | ||
func (s *Server) RegionHeartbeat(server pdpb.PD_RegionHeartbeatServer) error { | ||
syncServer := &syncHeartbeatServer{server: server} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the server
reused?
If it is, it will be protected by different locks on each call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think send the error message with hbstreams same as a normal response better than use lock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
server/coordinator.go
Outdated
delete(s.streams, storeID) | ||
regionHeartbeatCounter.WithLabelValues("down", "err") | ||
} else { | ||
regionHeartbeatCounter.WithLabelValues("down", "ok") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why down here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe push?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, "up" and "down" seem strange.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
push for server -> client, and what for client -> server? @huachaohuang @siddontang
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
server push operator to client, and client pull operator from server, make senses?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think not pull here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"report" and "schedule"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
report may be better
/cc @huachaohuang
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with "push" and "report".
Rest LGTM PTAL @huachaohuang @nolouch |
server/grpc_service.go
Outdated
cluster := s.GetRaftCluster() | ||
if cluster == nil { | ||
resp := &pdpb.RegionHeartbeatResponse{ | ||
Header: &pdpb.ResponseHeader{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this the same as s.notBootstrappedHeader
?
server/grpc_service.go
Outdated
storeID := request.GetLeader().GetStoreId() | ||
cluster.coordinator.hbStreams.bindStream(storeID, syncServer) | ||
hbStreams.bindStream(storeID, server) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we bind stream before the loop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. We don't know the storeID yet.
server/coordinator.go
Outdated
delete(s.streams, storeID) | ||
regionHeartbeatCounter.WithLabelValues("down", "err") | ||
} else { | ||
regionHeartbeatCounter.WithLabelValues("down", "ok") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with "push" and "report".
server/grpc_service.go
Outdated
cluster := s.GetRaftCluster() | ||
if cluster == nil { | ||
resp := &pdpb.RegionHeartbeatResponse{ | ||
Header: &pdpb.ResponseHeader{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this the same as s.notBootstrappedHeader()
?
|
||
if isNew { | ||
storeID := request.GetLeader().GetStoreId() | ||
hbStreams.bindStream(storeID, server) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we bind stream before the loop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
LGTM |
Another approach of #661.
Now
dispatch()
does not return responses, all responses transfers throughheartbeatStream
.