Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

apiv2(ticdc): implmenet the list changefeeds #8116

Merged
merged 2 commits into from
Feb 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions cdc/api/v2/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func RegisterOpenAPIV2Routes(router *gin.Engine, api OpenAPIV2) {
changefeedGroup.Use(middleware.ForwardToOwnerMiddleware(api.capture))
changefeedGroup.GET("/:changefeed_id", api.getChangeFeed)
changefeedGroup.POST("", api.createChangefeed)
changefeedGroup.GET("", api.listChangeFeeds)
changefeedGroup.PUT("/:changefeed_id", api.updateChangefeed)
changefeedGroup.DELETE("/:changefeed_id", api.deleteChangefeed)
changefeedGroup.GET("/:changefeed_id/meta_info", api.getChangeFeedMetaInfo)
Expand Down
28 changes: 23 additions & 5 deletions cdc/api/v2/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,13 @@ func (c *mockPDClient) Close() {}

type mockStatusProvider struct {
owner.StatusProvider
changefeedStatus *model.ChangeFeedStatus
changefeedInfo *model.ChangeFeedInfo
processors []*model.ProcInfoSnap
taskStatus map[model.CaptureID]*model.TaskStatus
err error
changefeedStatus *model.ChangeFeedStatus
changefeedInfo *model.ChangeFeedInfo
processors []*model.ProcInfoSnap
taskStatus map[model.CaptureID]*model.TaskStatus
changefeedInfos map[model.ChangeFeedID]*model.ChangeFeedInfo
changefeedStatuses map[model.ChangeFeedID]*model.ChangeFeedStatus
err error
}

// GetChangeFeedStatus returns a changefeeds' runtime status.
Expand Down Expand Up @@ -101,3 +103,19 @@ func (m *mockStatusProvider) GetAllTaskStatuses(
) {
return m.taskStatus, m.err
}

// GetAllChangeFeedInfo returns a list of mock changefeed info.
func (m *mockStatusProvider) GetAllChangeFeedInfo(_ context.Context) (
map[model.ChangeFeedID]*model.ChangeFeedInfo,
error,
) {
return m.changefeedInfos, m.err
}

// GetAllChangeFeedStatuses returns a list of mock changefeed status.
func (m *mockStatusProvider) GetAllChangeFeedStatuses(_ context.Context) (
map[model.ChangeFeedID]*model.ChangeFeedStatus,
error,
) {
return m.changefeedStatuses, m.err
}
85 changes: 84 additions & 1 deletion cdc/api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package v2
import (
"context"
"net/http"
"sort"
"strings"
"time"

Expand All @@ -35,7 +36,12 @@ import (
"go.uber.org/zap"
)

const apiOpVarChangefeedID = "changefeed_id"
const (
// apiOpVarChangefeedState is the key of changefeed state in HTTP API
apiOpVarChangefeedState = "state"
// apiOpVarChangefeedID is the key of changefeed ID in HTTP API
apiOpVarChangefeedID = "changefeed_id"
)

// createChangefeed handles create changefeed request,
// it returns the changefeed's changefeedInfo that it just created
Expand Down Expand Up @@ -150,6 +156,83 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) {
c.JSON(http.StatusCreated, toAPIModel(info, true))
}

// listChangeFeeds lists all changgefeeds in cdc cluster
// @Summary List changefeed
// @Description list all changefeeds in cdc cluster
// @Tags changefeed
// @Accept json
// @Produce json
// @Param state query string false "state"
// @Success 200 {array} model.ChangefeedCommonInfo
// @Failure 500 {object} model.HTTPError
// @Router /api/v2/changefeeds [get]
func (h *OpenAPIV2) listChangeFeeds(c *gin.Context) {
ctx := c.Request.Context()
state := c.Query(apiOpVarChangefeedState)
statuses, err := h.capture.StatusProvider().GetAllChangeFeedStatuses(ctx)
if err != nil {
_ = c.Error(err)
return
}

infos, err := h.capture.StatusProvider().GetAllChangeFeedInfo(ctx)
if err != nil {
_ = c.Error(err)
return
}

commonInfos := make([]ChangefeedCommonInfo, 0)
changefeeds := make([]model.ChangeFeedID, 0)

for cfID := range statuses {
changefeeds = append(changefeeds, cfID)
}
sort.Slice(changefeeds, func(i, j int) bool {
if changefeeds[i].Namespace == changefeeds[j].Namespace {
return changefeeds[i].ID < changefeeds[j].ID
}

return changefeeds[i].Namespace < changefeeds[j].Namespace
})

for _, cfID := range changefeeds {
cfInfo, exist := infos[cfID]
if !exist {
continue
}
cfStatus := statuses[cfID]

if !cfInfo.State.IsNeeded(state) {
// if the value of `state` is not 'all', only return changefeed
// with state 'normal', 'stopped', 'failed'
continue
}

// return the common info only.
commonInfo := &ChangefeedCommonInfo{
UpstreamID: cfInfo.UpstreamID,
Namespace: cfID.Namespace,
ID: cfID.ID,
FeedState: cfInfo.State,
RunningError: cfInfo.Error,
}

if cfStatus != nil {
commonInfo.CheckpointTSO = cfStatus.CheckpointTs
tm := oracle.GetTimeFromTS(cfStatus.CheckpointTs)
commonInfo.CheckpointTime = model.JSONTime(tm)
}

commonInfos = append(commonInfos, *commonInfo)
}
resp := &ListResponse[ChangefeedCommonInfo]{
Total: len(commonInfos),
Items: commonInfos,
}

c.JSON(http.StatusOK, resp)
}

// verifyTable verify table, return ineligibleTables and EligibleTables.
func (h *OpenAPIV2) verifyTable(c *gin.Context) {
cfg := getDefaultVerifyTableConfig()
Expand Down
79 changes: 79 additions & 0 deletions cdc/api/v2/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"net/http"
"net/http/httptest"
"sort"
"testing"

"github.com/golang/mock/gomock"
Expand Down Expand Up @@ -478,6 +479,84 @@ func TestUpdateChangefeed(t *testing.T) {
require.Equal(t, http.StatusOK, w.Code)
}

func TestListChangeFeeds(t *testing.T) {
t.Parallel()

cp := mock_capture.NewMockCapture(gomock.NewController(t))
cp.EXPECT().IsReady().Return(true).AnyTimes()
cp.EXPECT().IsOwner().Return(true).AnyTimes()

apiV2 := NewOpenAPIV2ForTest(cp, APIV2HelpersImpl{})
router := newRouter(apiV2)
sorted := func(s []model.ChangefeedCommonInfo) bool {
return sort.SliceIsSorted(s, func(i, j int) bool {
cf1, cf2 := s[i], s[j]
if cf1.Namespace == cf2.Namespace {
return cf1.ID < cf2.ID
}
return cf1.Namespace < cf2.Namespace
})
}

// case 1: list all changefeeds regardless of the state
provider1 := &mockStatusProvider{
changefeedInfos: map[model.ChangeFeedID]*model.ChangeFeedInfo{
model.DefaultChangeFeedID("cf1"): {
State: model.StateNormal,
},
model.DefaultChangeFeedID("cf2"): {
State: model.StateError,
},
model.DefaultChangeFeedID("cf3"): {
State: model.StateStopped,
},
},
changefeedStatuses: map[model.ChangeFeedID]*model.ChangeFeedStatus{
model.DefaultChangeFeedID("cf1"): {},
model.DefaultChangeFeedID("cf2"): {},
model.DefaultChangeFeedID("cf3"): {},
},
}
cp.EXPECT().StatusProvider().Return(provider1).AnyTimes()
w := httptest.NewRecorder()
metaInfo := testCase{
url: "/api/v2/changefeeds?state=all",
method: "GET",
}
req, _ := http.NewRequestWithContext(
context.Background(),
metaInfo.method,
metaInfo.url,
nil,
)
router.ServeHTTP(w, req)
resp := ListResponse[model.ChangefeedCommonInfo]{}
err := json.NewDecoder(w.Body).Decode(&resp)
require.Nil(t, err)
require.Equal(t, 3, resp.Total)
// changefeed info must be sorted by ID
require.Equal(t, true, sorted(resp.Items))

// case 2: only list changefeed with state 'normal', 'stopped' and 'failed'
metaInfo2 := testCase{
url: "/api/v2/changefeeds",
method: "GET",
}
req2, _ := http.NewRequestWithContext(
context.Background(),
metaInfo2.method,
metaInfo2.url,
nil,
)
router.ServeHTTP(w, req2)
resp2 := ListResponse[model.ChangefeedCommonInfo]{}
err = json.NewDecoder(w.Body).Decode(&resp2)
require.Nil(t, err)
require.Equal(t, 2, resp2.Total)
// changefeed info must be sorted by ID
require.Equal(t, true, sorted(resp2.Items))
}

func TestGetChangeFeedMetaInfo(t *testing.T) {
t.Parallel()

Expand Down
11 changes: 11 additions & 0 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,17 @@ func (c ChangeFeedDetail) MarshalJSON() ([]byte, error) {
})
}

// ChangefeedCommonInfo holds some common usage information of a changefeed
type ChangefeedCommonInfo struct {
UpstreamID uint64 `json:"upstream_id"`
Namespace string `json:"namespace"`
ID string `json:"id"`
FeedState model.FeedState `json:"state"`
CheckpointTSO uint64 `json:"checkpoint_tso"`
CheckpointTime model.JSONTime `json:"checkpoint_time"`
RunningError *model.RunningError `json:"error"`
}

// ChangefeedConfig use by create changefeed api
type ChangefeedConfig struct {
Namespace string `json:"namespace"`
Expand Down
40 changes: 40 additions & 0 deletions docs/swagger/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,46 @@ var doc = `{
}
}
},
"/api/v2/changefeeds": {
"get": {
"description": "list all changefeeds in cdc cluster",
"consumes": [
"application/json"
],
"produces": [
"application/json"
],
"tags": [
"changefeed"
],
"summary": "List changefeed",
"parameters": [
{
"type": "string",
"description": "state",
"name": "state",
"in": "query"
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "array",
"items": {
"$ref": "#/definitions/model.ChangefeedCommonInfo"
}
}
},
"500": {
"description": "Internal Server Error",
"schema": {
"$ref": "#/definitions/model.HTTPError"
}
}
}
}
},
"/api/v2/changefeeds/{changefeed_id}": {
"get": {
"description": "get detail information of a changefeed",
Expand Down
40 changes: 40 additions & 0 deletions docs/swagger/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,46 @@
}
}
},
"/api/v2/changefeeds": {
"get": {
"description": "list all changefeeds in cdc cluster",
"consumes": [
"application/json"
],
"produces": [
"application/json"
],
"tags": [
"changefeed"
],
"summary": "List changefeed",
"parameters": [
{
"type": "string",
"description": "state",
"name": "state",
"in": "query"
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "array",
"items": {
"$ref": "#/definitions/model.ChangefeedCommonInfo"
}
}
},
"500": {
"description": "Internal Server Error",
"schema": {
"$ref": "#/definitions/model.HTTPError"
}
}
}
}
},
"/api/v2/changefeeds/{changefeed_id}": {
"get": {
"description": "get detail information of a changefeed",
Expand Down
26 changes: 26 additions & 0 deletions docs/swagger/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,32 @@ paths:
summary: Get server status
tags:
- common
/api/v2/changefeeds:
get:
consumes:
- application/json
description: list all changefeeds in cdc cluster
parameters:
- description: state
in: query
name: state
type: string
produces:
- application/json
responses:
"200":
description: OK
schema:
items:
$ref: '#/definitions/model.ChangefeedCommonInfo'
type: array
"500":
description: Internal Server Error
schema:
$ref: '#/definitions/model.HTTPError'
summary: List changefeed
tags:
- changefeed
/api/v2/changefeeds/{changefeed_id}:
get:
consumes:
Expand Down