Skip to content

Commit

Permalink
Merge branch 'master' into ttl
Browse files Browse the repository at this point in the history
  • Loading branch information
rleungx committed Nov 27, 2023
2 parents 3eed535 + 4e9240a commit c42f892
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 18 deletions.
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ error = '''
redirect failed
'''

["PD:apiutil:ErrRedirectToNotLeader"]
error = '''
redirect to not leader
'''

["PD:autoscaling:ErrEmptyMetricsResponse"]
error = '''
metrics response from Prometheus is empty
Expand Down
2 changes: 2 additions & 0 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ var (
var (
ErrRedirect = errors.Normalize("redirect failed", errors.RFCCodeText("PD:apiutil:ErrRedirect"))
ErrOptionNotExist = errors.Normalize("the option %s does not exist", errors.RFCCodeText("PD:apiutil:ErrOptionNotExist"))
// ErrRedirectToNotLeader is the error message for redirect to not leader.
ErrRedirectToNotLeader = errors.Normalize("redirect to not leader", errors.RFCCodeText("PD:apiutil:ErrRedirectToNotLeader"))
)

// grpcutil errors
Expand Down
7 changes: 1 addition & 6 deletions pkg/utils/apiutil/apiutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,6 @@ const (
// ForwardToMicroServiceHeader is used to mark the request is forwarded to micro service.
ForwardToMicroServiceHeader = "Forward-To-Micro-Service"

// ErrRedirectFailed is the error message for redirect failed.
ErrRedirectFailed = "redirect failed"
// ErrRedirectToNotLeader is the error message for redirect to not leader.
ErrRedirectToNotLeader = "redirect to not leader"

chunkSize = 4096
)

Expand Down Expand Up @@ -459,7 +454,7 @@ func (p *customReverseProxies) ServeHTTP(w http.ResponseWriter, r *http.Request)
}
return
}
http.Error(w, ErrRedirectFailed, http.StatusInternalServerError)
http.Error(w, errs.ErrRedirect.FastGenByArgs().Error(), http.StatusInternalServerError)
}

// copyHeader duplicates the HTTP headers from the source `src` to the destination `dst`.
Expand Down
12 changes: 9 additions & 3 deletions pkg/utils/apiutil/serverapi/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,21 @@ func (h *redirector) matchMicroServiceRedirectRules(r *http.Request) (bool, stri
func (h *redirector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http.HandlerFunc) {
redirectToMicroService, targetAddr := h.matchMicroServiceRedirectRules(r)
allowFollowerHandle := len(r.Header.Get(apiutil.PDAllowFollowerHandleHeader)) > 0
if !h.s.IsClosed() && (allowFollowerHandle || h.s.GetMember().IsLeader()) && !redirectToMicroService {

if h.s.IsClosed() {
http.Error(w, errs.ErrServerNotStarted.FastGenByArgs().Error(), http.StatusInternalServerError)
return
}

if (allowFollowerHandle || h.s.GetMember().IsLeader()) && !redirectToMicroService {
next(w, r)
return
}

// Prevent more than one redirection.
if name := r.Header.Get(apiutil.PDRedirectorHeader); len(name) != 0 {
log.Error("redirect but server is not leader", zap.String("from", name), zap.String("server", h.s.Name()), errs.ZapError(errs.ErrRedirect))
http.Error(w, apiutil.ErrRedirectToNotLeader, http.StatusInternalServerError)
http.Error(w, errs.ErrRedirectToNotLeader.FastGenByArgs().Error(), http.StatusInternalServerError)
return
}

Expand All @@ -189,7 +195,7 @@ func (h *redirector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http
var clientUrls []string
if redirectToMicroService {
if len(targetAddr) == 0 {
http.Error(w, apiutil.ErrRedirectFailed, http.StatusInternalServerError)
http.Error(w, errs.ErrRedirect.FastGenByArgs().Error(), http.StatusInternalServerError)
return
}
clientUrls = append(clientUrls, targetAddr)
Expand Down
16 changes: 13 additions & 3 deletions scripts/ci-subtask.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,22 @@ else
weight() {
[[ $1 == "github.com/tikv/pd/server/api" ]] && return 30
[[ $1 == "github.com/tikv/pd/pkg/schedule" ]] && return 30
[[ $1 == "pd/tests/server/api" ]] && return 30
[[ $1 == "github.com/tikv/pd/pkg/core" ]] && return 30
[[ $1 == "github.com/tikv/pd/tests/server/api" ]] && return 30
[[ $1 =~ "pd/tests" ]] && return 5
return 1
}

# Create an associative array to store the weight of each task.
declare -A task_weights
for t in ${tasks[@]}; do
weight $t
task_weights[$t]=$?
done

# Sort tasks by weight in descending order.
tasks=($(printf "%s\n" "${tasks[@]}" | sort -rn))

scores=($(seq "$1" | xargs -I{} echo 0))

res=()
Expand All @@ -42,8 +53,7 @@ else
for i in ${!scores[@]}; do
[[ ${scores[i]} -lt ${scores[$min_i]} ]] && min_i=$i
done
weight $t
scores[$min_i]=$((${scores[$min_i]} + $?))
scores[$min_i]=$((${scores[$min_i]} + ${task_weights[$t]}))
[[ $(($min_i + 1)) -eq $2 ]] && res+=($t)
done
printf "%s " "${res[@]}"
Expand Down
13 changes: 8 additions & 5 deletions server/apiv2/middlewares/redirector.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,13 @@ import (
func Redirector() gin.HandlerFunc {
return func(c *gin.Context) {
svr := c.MustGet(ServerContextKey).(*server.Server)

if svr.IsClosed() {
c.AbortWithStatusJSON(http.StatusInternalServerError, errs.ErrServerNotStarted.FastGenByArgs().Error())
return
}
allowFollowerHandle := len(c.Request.Header.Get(apiutil.PDAllowFollowerHandleHeader)) > 0
isLeader := svr.GetMember().IsLeader()
if !svr.IsClosed() && (allowFollowerHandle || isLeader) {
if allowFollowerHandle || svr.GetMember().IsLeader() {
c.Next()
return
}
Expand All @@ -46,12 +50,11 @@ func Redirector() gin.HandlerFunc {

c.Request.Header.Set(apiutil.PDRedirectorHeader, svr.Name())

leader := svr.GetMember().GetLeader()
if leader == nil {
if svr.GetMember().GetLeader() == nil {
c.AbortWithStatusJSON(http.StatusServiceUnavailable, errs.ErrLeaderNil.FastGenByArgs().Error())
return
}
clientUrls := leader.GetClientUrls()
clientUrls := svr.GetMember().GetLeader().GetClientUrls()
urls := make([]url.URL, 0, len(clientUrls))
for _, item := range clientUrls {
u, err := url.Parse(item)
Expand Down
5 changes: 4 additions & 1 deletion tests/integrations/mcs/keyspace/tso_keyspace_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,10 @@ func (suite *keyspaceGroupTestSuite) TestDefaultKeyspaceGroup() {
return code == http.StatusOK && kg != nil
}, testutil.WithWaitFor(time.Second*1))
suite.Equal(utils.DefaultKeyspaceGroupID, kg.ID)
suite.Len(kg.Members, utils.DefaultKeyspaceGroupReplicaCount)
// the allocNodesToAllKeyspaceGroups loop will run every 100ms.
testutil.Eventually(suite.Require(), func() bool {
return len(kg.Members) == utils.DefaultKeyspaceGroupReplicaCount
})
for _, member := range kg.Members {
suite.Contains(nodes, member.Address)
}
Expand Down

0 comments on commit c42f892

Please sign in to comment.