Skip to content

Commit

Permalink
apiv2(ticdc): implmenet the list changefeeds
Browse files Browse the repository at this point in the history
  • Loading branch information
charleszheng44 committed Jan 26, 2023
1 parent 0914a56 commit 343af84
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 4 deletions.
1 change: 1 addition & 0 deletions cdc/api/v2/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func RegisterOpenAPIV2Routes(router *gin.Engine, api OpenAPIV2) {
changefeedGroup := v2.Group("/changefeeds")
changefeedGroup.Use(middleware.ForwardToOwnerMiddleware(api.capture))
changefeedGroup.POST("", api.createChangefeed)
changefeedGroup.GET("", api.listChangeFeeds)
changefeedGroup.PUT("/:changefeed_id", api.updateChangefeed)
changefeedGroup.DELETE("/:changefeed_id", api.deleteChangefeed)
changefeedGroup.GET("/:changefeed_id/meta_info", api.getChangeFeedMetaInfo)
Expand Down
24 changes: 21 additions & 3 deletions cdc/api/v2/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,11 @@ func (c *mockPDClient) Close() {}

type mockStatusProvider struct {
owner.StatusProvider
changefeedStatus *model.ChangeFeedStatus
changefeedInfo *model.ChangeFeedInfo
err error
changefeedStatus *model.ChangeFeedStatus
changefeedInfo *model.ChangeFeedInfo
changefeedInfos map[model.ChangeFeedID]*model.ChangeFeedInfo
changefeedStatuses map[model.ChangeFeedID]*model.ChangeFeedStatus
err error
}

// GetChangeFeedStatus returns a changefeeds' runtime status.
Expand All @@ -80,3 +82,19 @@ func (m *mockStatusProvider) GetChangeFeedInfo(ctx context.Context,
) (*model.ChangeFeedInfo, error) {
return m.changefeedInfo, m.err
}

// GetAllChangeFeedInfo returns a list of mock changefeed info.
func (m *mockStatusProvider) GetAllChangeFeedInfo(_ context.Context) (
map[model.ChangeFeedID]*model.ChangeFeedInfo,
error,
) {
return m.changefeedInfos, m.err
}

// GetAllChangeFeedStatuses returns a list of mock changefeed status.
func (m *mockStatusProvider) GetAllChangeFeedStatuses(_ context.Context) (
map[model.ChangeFeedID]*model.ChangeFeedStatus,
error,
) {
return m.changefeedStatuses, m.err
}
88 changes: 87 additions & 1 deletion cdc/api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package v2
import (
"context"
"net/http"
"sort"
"strings"
"time"

Expand All @@ -31,10 +32,16 @@ import (
"github.com/pingcap/tiflow/pkg/txnutil/gc"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/util"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
)

const apiOpVarChangefeedID = "changefeed_id"
const (
// apiOpVarChangefeedState is the key of changefeed state in HTTP API
apiOpVarChangefeedState = "state"
// apiOpVarChangefeedID is the key of changefeed ID in HTTP API
apiOpVarChangefeedID = "changefeed_id"
)

// createChangefeed handles create changefeed request,
// it returns the changefeed's changefeedInfo that it just created
Expand Down Expand Up @@ -149,6 +156,85 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) {
c.JSON(http.StatusCreated, toAPIModel(info, true))
}

// listChangeFeeds lists all changgefeeds in cdc cluster
// @Summary List changefeed
// @Description list all changefeeds in cdc cluster
// @Tags changefeed
// @Accept json
// @Produce json
// @Param state query string false "state"
// @Success 200 {array} model.ChangefeedCommonInfo
// @Failure 500 {object} model.HTTPError
// @Router /api/v1/changefeeds [get]
func (h *OpenAPIV2) listChangeFeeds(c *gin.Context) {
ctx := c.Request.Context()
state := c.Query(apiOpVarChangefeedState)
statuses, err := h.capture.StatusProvider().GetAllChangeFeedStatuses(ctx)
if err != nil {
_ = c.Error(err)
return
}

infos, err := h.capture.StatusProvider().GetAllChangeFeedInfo(ctx)
if err != nil {
_ = c.Error(err)
return
}

commonInfos := make([]model.ChangefeedCommonInfo, 0)
changefeeds := make([]model.ChangeFeedID, 0)

for cfID := range statuses {
changefeeds = append(changefeeds, cfID)
}
sort.Slice(changefeeds, func(i, j int) bool {
if changefeeds[i].Namespace == changefeeds[j].Namespace {
return changefeeds[i].ID < changefeeds[j].ID
}

return changefeeds[i].Namespace < changefeeds[j].Namespace
})

for _, cfID := range changefeeds {
cfInfo, exist := infos[cfID]
if !exist {
continue
}
cfStatus := statuses[cfID]

if !cfInfo.State.IsNeeded(state) {
// if the value of `state` is not 'all', only return changefeed
// with state 'normal', 'stopped', 'failed'
continue
}

// return the common info only.
commonInfo := &model.ChangefeedCommonInfo{
UpstreamID: cfInfo.UpstreamID,
Namespace: cfID.Namespace,
ID: cfID.ID,
}

commonInfo.FeedState = cfInfo.State
commonInfo.RunningError = cfInfo.Error

if cfStatus != nil {
commonInfo.CheckpointTSO = cfStatus.CheckpointTs
tm := oracle.GetTimeFromTS(cfStatus.CheckpointTs)
commonInfo.CheckpointTime = model.JSONTime(tm)
}

commonInfos = append(commonInfos, *commonInfo)
}

resp := &ListResponse[model.ChangefeedCommonInfo]{
Total: len(commonInfos),
Items: commonInfos,
}

c.IndentedJSON(http.StatusOK, resp)
}

// verifyTable verify table, return ineligibleTables and EligibleTables.
func (h *OpenAPIV2) verifyTable(c *gin.Context) {
cfg := getDefaultVerifyTableConfig()
Expand Down
69 changes: 69 additions & 0 deletions cdc/api/v2/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"net/http"
"net/http/httptest"
"sort"
"testing"

"github.com/golang/mock/gomock"
Expand Down Expand Up @@ -393,6 +394,74 @@ func TestUpdateChangefeed(t *testing.T) {
require.Equal(t, http.StatusOK, w.Code)
}

func TestListChangefeeds(t *testing.T) {
t.Parallel()

cp := mock_capture.NewMockCapture(gomock.NewController(t))
cp.EXPECT().IsReady().Return(true).AnyTimes()
cp.EXPECT().IsOwner().Return(true).AnyTimes()

apiV2 := NewOpenAPIV2ForTest(cp, APIV2HelpersImpl{})
router := newRouter(apiV2)
sorted := func(s []model.ChangefeedCommonInfo) bool {
return sort.SliceIsSorted(s, func(i, j int) bool {
cf1, cf2 := s[i], s[j]
if cf1.Namespace == cf2.Namespace {
return cf1.ID < cf2.ID
}
return cf1.Namespace < cf2.Namespace
})
}

// case 1: list all changefeeds regardless of the state
provider1 := &mockStatusProvider{
changefeedInfos: map[model.ChangeFeedID]*model.ChangeFeedInfo{
model.DefaultChangeFeedID("cf1"): {
State: model.StateNormal,
},
model.DefaultChangeFeedID("cf2"): {
State: model.StateError,
},
model.DefaultChangeFeedID("cf3"): {
State: model.StateStopped,
},
},
}
cp.EXPECT().StatusProvider().Return(provider1).AnyTimes()
w := httptest.NewRecorder()
metaInfo := testCase{
url: "/api/v2/changefeeds",
method: "GET",
}
req, _ := http.NewRequestWithContext(
context.Background(),
metaInfo.method,
metaInfo.url,
nil,
)
req.URL.Query().Set(apiOpVarChangefeedState, "all")
router.ServeHTTP(w, req)
resp := ListResponse[model.ChangefeedCommonInfo]{}
err := json.NewDecoder(w.Body).Decode(&resp)
require.Nil(t, err)
require.Equal(t, resp.Total, 3)
require.Equal(t, sorted(resp.Items), true)

// case 2: only list changefeed with state 'normal', 'stopped' and 'failed'
req2, _ := http.NewRequestWithContext(
context.Background(),
metaInfo.method,
metaInfo.url,
nil,
)
router.ServeHTTP(w, req2)
resp2 := ListResponse[model.ChangefeedCommonInfo]{}
err = json.NewDecoder(w.Body).Decode(&resp2)
require.Nil(t, err)
require.Equal(t, resp.Total, 2)
require.Equal(t, sorted(resp.Items), true)
}

func TestGetChangeFeedMetaInfo(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit 343af84

Please sign in to comment.