Skip to content

Commit

Permalink
Merge branch 'master' into apiv2/getchangefeeds
Browse files Browse the repository at this point in the history
  • Loading branch information
charleszheng44 committed Jan 31, 2023
2 parents 28402e4 + c83ba5d commit 3d0ad98
Show file tree
Hide file tree
Showing 25 changed files with 244 additions and 4,444 deletions.
5 changes: 0 additions & 5 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/owner"
"github.com/pingcap/tiflow/cdc/processor"
"github.com/pingcap/tiflow/cdc/processor/pipeline/system"
"github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine/factory"
ssystem "github.com/pingcap/tiflow/cdc/sorter/db/system"
"github.com/pingcap/tiflow/pkg/config"
Expand Down Expand Up @@ -95,7 +94,6 @@ type captureImpl struct {
// createEtcdClient used to create etcd client when capture restarts
createEtcdClient createEtcdClientFunc
EtcdClient etcd.CDCEtcdClient
tableActorSystem *system.System

// useSortEngine indicates whether to use the new pull based sort engine or
// the old push based sorter system. the latter will be removed after all sorter
Expand Down Expand Up @@ -135,7 +133,6 @@ type captureImpl struct {
func NewCapture(pdEndpoints []string,
createEtcdClient createEtcdClientFunc,
grpcService *p2p.ServerWrapper,
tableActorSystem *system.System,
sortEngineMangerFactory *factory.SortEngineFactory,
sorterSystem *ssystem.System,
) Capture {
Expand All @@ -146,7 +143,6 @@ func NewCapture(pdEndpoints []string,
grpcService: grpcService,
cancel: func() {},
pdEndpoints: pdEndpoints,
tableActorSystem: tableActorSystem,
newProcessorManager: processor.NewManager,
newOwner: owner.NewOwner,
info: &model.CaptureInfo{},
Expand Down Expand Up @@ -330,7 +326,6 @@ func (c *captureImpl) run(stdCtx context.Context) error {
ctx := cdcContext.NewContext(stdCtx, &cdcContext.GlobalVars{
CaptureInfo: c.info,
EtcdClient: c.EtcdClient,
TableActorSystem: c.tableActorSystem,
MessageServer: c.MessageServer,
MessageRouter: c.MessageRouter,
SorterSystem: c.sorterSystem,
Expand Down
39 changes: 14 additions & 25 deletions cdc/processor/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,30 +23,26 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/tablepb"
"github.com/pingcap/tiflow/pkg/config"
cdcContext "github.com/pingcap/tiflow/pkg/context"
cerrors "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/etcd"
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/spanz"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/stretchr/testify/require"
)

type managerTester struct {
manager *managerImpl
state *orchestrator.GlobalReactorState
tester *orchestrator.ReactorStateTester
manager *managerImpl
state *orchestrator.GlobalReactorState
tester *orchestrator.ReactorStateTester
//nolint:unused
liveness model.Liveness
}

// NewManager4Test creates a new processor manager for test
func NewManager4Test(
t *testing.T,
createTablePipeline func(
ctx cdcContext.Context, span tablepb.Span, replicaInfo *model.TableReplicaInfo,
) (tablepb.TablePipeline, error),
liveness *model.Liveness,
) *managerImpl {
captureInfo := &model.CaptureInfo{ID: "capture-test", AdvertiseAddr: "127.0.0.1:0000"}
Expand All @@ -60,23 +56,14 @@ func NewManager4Test(
liveness *model.Liveness,
cfg *config.SchedulerConfig,
) *processor {
return newProcessor4Test(t, state, captureInfo, createTablePipeline, m.liveness, cfg)
return newProcessor4Test(t, state, captureInfo, m.liveness, cfg)
}
return m
}

//nolint:unused
func (s *managerTester) resetSuit(ctx cdcContext.Context, t *testing.T) {
s.manager = NewManager4Test(t, func(
ctx cdcContext.Context, span tablepb.Span, replicaInfo *model.TableReplicaInfo,
) (tablepb.TablePipeline, error) {
return &mockTablePipeline{
span: span,
name: fmt.Sprintf("`test`.`table%d`", span),
state: tablepb.TableStateReplicating,
resolvedTs: replicaInfo.StartTs,
checkpointTs: replicaInfo.StartTs,
}, nil
}, &s.liveness)
s.manager = NewManager4Test(t, &s.liveness)
s.state = orchestrator.NewGlobalState(etcd.DefaultCDCClusterID)
captureInfoBytes, err := ctx.GlobalVars().CaptureInfo.Marshal()
require.Nil(t, err)
Expand All @@ -88,6 +75,7 @@ func (s *managerTester) resetSuit(ctx cdcContext.Context, t *testing.T) {
}

func TestChangefeed(t *testing.T) {
t.Skip("FIXME: Use pull-based-sink")
ctx := cdcContext.NewBackendContext4Test(false)
s := &managerTester{}
s.resetSuit(ctx, t)
Expand Down Expand Up @@ -141,6 +129,7 @@ func TestChangefeed(t *testing.T) {
}

func TestDebugInfo(t *testing.T) {
t.Skip("FIXME: Use pull-based-sink")
ctx := cdcContext.NewBackendContext4Test(false)
s := &managerTester{}
s.resetSuit(ctx, t)
Expand Down Expand Up @@ -196,6 +185,7 @@ func TestDebugInfo(t *testing.T) {
}

func TestClose(t *testing.T) {
t.Skip("FIXME: Use pull-based-sink")
ctx := cdcContext.NewBackendContext4Test(false)
s := &managerTester{}
s.resetSuit(ctx, t)
Expand Down Expand Up @@ -237,6 +227,7 @@ func TestClose(t *testing.T) {
}

func TestSendCommandError(t *testing.T) {
t.Skip("FIXME: Use pull-based-sink")
liveness := model.LivenessCaptureAlive
cfg := config.NewDefaultSchedulerConfig()
m := NewManager(&model.CaptureInfo{ID: "capture-test"}, nil, &liveness, cfg).(*managerImpl)
Expand All @@ -255,6 +246,7 @@ func TestSendCommandError(t *testing.T) {
}

func TestManagerLiveness(t *testing.T) {
t.Skip("FIXME: Use pull-based-sink")
ctx := cdcContext.NewBackendContext4Test(false)
s := &managerTester{}
s.resetSuit(ctx, t)
Expand Down Expand Up @@ -300,15 +292,12 @@ func TestManagerLiveness(t *testing.T) {
}

func TestQueryTableCount(t *testing.T) {
t.Skip("FIXME: add tables")
liveness := model.LivenessCaptureAlive
cfg := config.NewDefaultSchedulerConfig()
m := NewManager(&model.CaptureInfo{ID: "capture-test"}, nil, &liveness, cfg).(*managerImpl)
ctx := context.TODO()
// Add some tables to processor.
tables := spanz.NewHashMap[tablepb.TablePipeline]()
tables.ReplaceOrInsert(spanz.TableIDToComparableSpan(1), nil)
tables.ReplaceOrInsert(spanz.TableIDToComparableSpan(2), nil)
m.processors[model.ChangeFeedID{ID: "test"}] = &processor{tableSpans: tables}
m.processors[model.ChangeFeedID{ID: "test"}] = &processor{}

done := make(chan error, 1)
tableCh := make(chan int, 1)
Expand Down
2 changes: 0 additions & 2 deletions cdc/processor/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package processor

import (
"github.com/pingcap/tiflow/cdc/processor/pipeline"
"github.com/pingcap/tiflow/cdc/processor/sinkmanager"
"github.com/prometheus/client_golang/prometheus"
)
Expand Down Expand Up @@ -94,6 +93,5 @@ func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(tableMemoryHistogram)
registry.MustRegister(processorMemoryGauge)
registry.MustRegister(remainKVEventsGauge)
pipeline.InitMetrics(registry)
sinkmanager.InitMetrics(registry)
}
98 changes: 0 additions & 98 deletions cdc/processor/pipeline/actor_node.go

This file was deleted.

0 comments on commit 3d0ad98

Please sign in to comment.