Skip to content

Commit

Permalink
apiv2(ticdc): implmenet the list changefeeds
Browse files Browse the repository at this point in the history
  • Loading branch information
charleszheng44 committed Jan 31, 2023
1 parent 827ac33 commit 47f213c
Show file tree
Hide file tree
Showing 8 changed files with 304 additions and 4 deletions.
1 change: 1 addition & 0 deletions cdc/api/v2/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func RegisterOpenAPIV2Routes(router *gin.Engine, api OpenAPIV2) {
changefeedGroup := v2.Group("/changefeeds")
changefeedGroup.Use(middleware.ForwardToOwnerMiddleware(api.capture))
changefeedGroup.POST("", api.createChangefeed)
changefeedGroup.GET("", api.listChangeFeeds)
changefeedGroup.PUT("/:changefeed_id", api.updateChangefeed)
changefeedGroup.DELETE("/:changefeed_id", api.deleteChangefeed)
changefeedGroup.GET("/:changefeed_id/meta_info", api.getChangeFeedMetaInfo)
Expand Down
24 changes: 21 additions & 3 deletions cdc/api/v2/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,11 @@ func (c *mockPDClient) Close() {}

type mockStatusProvider struct {
owner.StatusProvider
changefeedStatus *model.ChangeFeedStatus
changefeedInfo *model.ChangeFeedInfo
err error
changefeedStatus *model.ChangeFeedStatus
changefeedInfo *model.ChangeFeedInfo
changefeedInfos map[model.ChangeFeedID]*model.ChangeFeedInfo
changefeedStatuses map[model.ChangeFeedID]*model.ChangeFeedStatus
err error
}

// GetChangeFeedStatus returns a changefeeds' runtime status.
Expand All @@ -80,3 +82,19 @@ func (m *mockStatusProvider) GetChangeFeedInfo(ctx context.Context,
) (*model.ChangeFeedInfo, error) {
return m.changefeedInfo, m.err
}

// GetAllChangeFeedInfo returns a list of mock changefeed info.
func (m *mockStatusProvider) GetAllChangeFeedInfo(_ context.Context) (
map[model.ChangeFeedID]*model.ChangeFeedInfo,
error,
) {
return m.changefeedInfos, m.err
}

// GetAllChangeFeedStatuses returns a list of mock changefeed status.
func (m *mockStatusProvider) GetAllChangeFeedStatuses(_ context.Context) (
map[model.ChangeFeedID]*model.ChangeFeedStatus,
error,
) {
return m.changefeedStatuses, m.err
}
87 changes: 86 additions & 1 deletion cdc/api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package v2
import (
"context"
"net/http"
"sort"
"strings"
"time"

Expand All @@ -31,10 +32,16 @@ import (
"github.com/pingcap/tiflow/pkg/txnutil/gc"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/util"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
)

const apiOpVarChangefeedID = "changefeed_id"
const (
// apiOpVarChangefeedState is the key of changefeed state in HTTP API
apiOpVarChangefeedState = "state"
// apiOpVarChangefeedID is the key of changefeed ID in HTTP API
apiOpVarChangefeedID = "changefeed_id"
)

// createChangefeed handles create changefeed request,
// it returns the changefeed's changefeedInfo that it just created
Expand Down Expand Up @@ -149,6 +156,84 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) {
c.JSON(http.StatusCreated, toAPIModel(info, true))
}

// listChangeFeeds lists all changgefeeds in cdc cluster
// @Summary List changefeed
// @Description list all changefeeds in cdc cluster
// @Tags changefeed
// @Accept json
// @Produce json
// @Param state query string false "state"
// @Success 200 {array} model.ChangefeedCommonInfo
// @Failure 500 {object} model.HTTPError
// @Router /api/v2/changefeeds [get]
func (h *OpenAPIV2) listChangeFeeds(c *gin.Context) {
ctx := c.Request.Context()
state := c.Query(apiOpVarChangefeedState)
statuses, err := h.capture.StatusProvider().GetAllChangeFeedStatuses(ctx)
if err != nil {
_ = c.Error(err)
return
}

infos, err := h.capture.StatusProvider().GetAllChangeFeedInfo(ctx)
if err != nil {
_ = c.Error(err)
return
}

commonInfos := make([]ChangefeedCommonInfo, 0)
changefeeds := make([]model.ChangeFeedID, 0)

for cfID := range statuses {
changefeeds = append(changefeeds, cfID)
}
sort.Slice(changefeeds, func(i, j int) bool {
if changefeeds[i].Namespace == changefeeds[j].Namespace {
return changefeeds[i].ID < changefeeds[j].ID
}

return changefeeds[i].Namespace < changefeeds[j].Namespace
})

for _, cfID := range changefeeds {
cfInfo, exist := infos[cfID]
if !exist {
continue
}
cfStatus := statuses[cfID]

if !cfInfo.State.IsNeeded(state) {
// if the value of `state` is not 'all', only return changefeed
// with state 'normal', 'stopped', 'failed'
continue
}

// return the common info only.
commonInfo := &ChangefeedCommonInfo{
UpstreamID: cfInfo.UpstreamID,
Namespace: cfID.Namespace,
ID: cfID.ID,
}

commonInfo.FeedState = cfInfo.State
commonInfo.RunningError = cfInfo.Error

if cfStatus != nil {
commonInfo.CheckpointTSO = cfStatus.CheckpointTs
tm := oracle.GetTimeFromTS(cfStatus.CheckpointTs)
commonInfo.CheckpointTime = model.JSONTime(tm)
}

commonInfos = append(commonInfos, *commonInfo)
}
resp := &ListResponse[ChangefeedCommonInfo]{
Total: len(commonInfos),
Items: commonInfos,
}

c.JSON(http.StatusOK, resp)
}

// verifyTable verify table, return ineligibleTables and EligibleTables.
func (h *OpenAPIV2) verifyTable(c *gin.Context) {
cfg := getDefaultVerifyTableConfig()
Expand Down
79 changes: 79 additions & 0 deletions cdc/api/v2/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"net/http"
"net/http/httptest"
"sort"
"testing"

"github.com/golang/mock/gomock"
Expand Down Expand Up @@ -393,6 +394,84 @@ func TestUpdateChangefeed(t *testing.T) {
require.Equal(t, http.StatusOK, w.Code)
}

func TestListChangeFeeds(t *testing.T) {
t.Parallel()

cp := mock_capture.NewMockCapture(gomock.NewController(t))
cp.EXPECT().IsReady().Return(true).AnyTimes()
cp.EXPECT().IsOwner().Return(true).AnyTimes()

apiV2 := NewOpenAPIV2ForTest(cp, APIV2HelpersImpl{})
router := newRouter(apiV2)
sorted := func(s []model.ChangefeedCommonInfo) bool {
return sort.SliceIsSorted(s, func(i, j int) bool {
cf1, cf2 := s[i], s[j]
if cf1.Namespace == cf2.Namespace {
return cf1.ID < cf2.ID
}
return cf1.Namespace < cf2.Namespace
})
}

// case 1: list all changefeeds regardless of the state
provider1 := &mockStatusProvider{
changefeedInfos: map[model.ChangeFeedID]*model.ChangeFeedInfo{
model.DefaultChangeFeedID("cf1"): {
State: model.StateNormal,
},
model.DefaultChangeFeedID("cf2"): {
State: model.StateError,
},
model.DefaultChangeFeedID("cf3"): {
State: model.StateStopped,
},
},
changefeedStatuses: map[model.ChangeFeedID]*model.ChangeFeedStatus{
model.DefaultChangeFeedID("cf1"): {},
model.DefaultChangeFeedID("cf2"): {},
model.DefaultChangeFeedID("cf3"): {},
},
}
cp.EXPECT().StatusProvider().Return(provider1).AnyTimes()
w := httptest.NewRecorder()
metaInfo := testCase{
url: "/api/v2/changefeeds?state=all",
method: "GET",
}
req, _ := http.NewRequestWithContext(
context.Background(),
metaInfo.method,
metaInfo.url,
nil,
)
router.ServeHTTP(w, req)
resp := ListResponse[model.ChangefeedCommonInfo]{}
err := json.NewDecoder(w.Body).Decode(&resp)
require.Nil(t, err)
require.Equal(t, 3, resp.Total)
// changefeed info must be sorted by ID
require.Equal(t, true, sorted(resp.Items))

// case 2: only list changefeed with state 'normal', 'stopped' and 'failed'
metaInfo2 := testCase{
url: "/api/v2/changefeeds",
method: "GET",
}
req2, _ := http.NewRequestWithContext(
context.Background(),
metaInfo2.method,
metaInfo2.url,
nil,
)
router.ServeHTTP(w, req2)
resp2 := ListResponse[model.ChangefeedCommonInfo]{}
err = json.NewDecoder(w.Body).Decode(&resp2)
require.Nil(t, err)
require.Equal(t, 2, resp2.Total)
// changefeed info must be sorted by ID
require.Equal(t, true, sorted(resp2.Items))
}

func TestGetChangeFeedMetaInfo(t *testing.T) {
t.Parallel()

Expand Down
11 changes: 11 additions & 0 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,17 @@ type PDConfig struct {
CertAllowedCN []string `json:"cert_allowed_cn,omitempty"`
}

// ChangefeedCommonInfo holds some common usage information of a changefeed
type ChangefeedCommonInfo struct {
UpstreamID uint64 `json:"upstream_id"`
Namespace string `json:"namespace"`
ID string `json:"id"`
FeedState model.FeedState `json:"state"`
CheckpointTSO uint64 `json:"checkpoint_tso"`
CheckpointTime model.JSONTime `json:"checkpoint_time"`
RunningError *model.RunningError `json:"error"`
}

// ChangefeedConfig use by create changefeed api
type ChangefeedConfig struct {
Namespace string `json:"namespace"`
Expand Down
40 changes: 40 additions & 0 deletions docs/swagger/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,46 @@ var doc = `{
}
}
}
},
"/api/v2/changefeeds": {
"get": {
"description": "list all changefeeds in cdc cluster",
"consumes": [
"application/json"
],
"produces": [
"application/json"
],
"tags": [
"changefeed"
],
"summary": "List changefeed",
"parameters": [
{
"type": "string",
"description": "state",
"name": "state",
"in": "query"
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "array",
"items": {
"$ref": "#/definitions/model.ChangefeedCommonInfo"
}
}
},
"500": {
"description": "Internal Server Error",
"schema": {
"$ref": "#/definitions/model.HTTPError"
}
}
}
}
}
},
"definitions": {
Expand Down
40 changes: 40 additions & 0 deletions docs/swagger/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,46 @@
}
}
}
},
"/api/v2/changefeeds": {
"get": {
"description": "list all changefeeds in cdc cluster",
"consumes": [
"application/json"
],
"produces": [
"application/json"
],
"tags": [
"changefeed"
],
"summary": "List changefeed",
"parameters": [
{
"type": "string",
"description": "state",
"name": "state",
"in": "query"
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "array",
"items": {
"$ref": "#/definitions/model.ChangefeedCommonInfo"
}
}
},
"500": {
"description": "Internal Server Error",
"schema": {
"$ref": "#/definitions/model.HTTPError"
}
}
}
}
}
},
"definitions": {
Expand Down
26 changes: 26 additions & 0 deletions docs/swagger/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -725,4 +725,30 @@ paths:
summary: Get server status
tags:
- common
/api/v2/changefeeds:
get:
consumes:
- application/json
description: list all changefeeds in cdc cluster
parameters:
- description: state
in: query
name: state
type: string
produces:
- application/json
responses:
"200":
description: OK
schema:
items:
$ref: '#/definitions/model.ChangefeedCommonInfo'
type: array
"500":
description: Internal Server Error
schema:
$ref: '#/definitions/model.HTTPError'
summary: List changefeed
tags:
- changefeed
swagger: "2.0"

0 comments on commit 47f213c

Please sign in to comment.