Skip to content

Commit

Permalink
ticdc : http & https support in pulsar schema for pulsar downstream t…
Browse files Browse the repository at this point in the history
…ype changefeed creation. (#11338)

close #11336
  • Loading branch information
SandeepPadhi committed Jun 25, 2024
1 parent c001638 commit a2e6219
Show file tree
Hide file tree
Showing 8 changed files with 171 additions and 130 deletions.
2 changes: 1 addition & 1 deletion cdc/sink/ddlsink/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func New(
return mysql.NewDDLSink(ctx, changefeedID, sinkURI, cfg)
case sink.S3Scheme, sink.FileScheme, sink.GCSScheme, sink.GSScheme, sink.AzblobScheme, sink.AzureScheme, sink.CloudStorageNoopScheme:
return cloudstorage.NewDDLSink(ctx, changefeedID, sinkURI, cfg)
case sink.PulsarScheme, sink.PulsarSSLScheme:
case sink.PulsarScheme, sink.PulsarSSLScheme, sink.PulsarHTTPScheme, sink.PulsarHTTPSScheme:
return mq.NewPulsarDDLSink(ctx, changefeedID, sinkURI, cfg, manager.NewPulsarTopicManager,
pulsarConfig.NewCreatorFactory, ddlproducer.NewPulsarProducer)
default:
Expand Down
174 changes: 89 additions & 85 deletions cdc/sink/ddlsink/mq/pulsar_ddl_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package mq

import (
"context"
"fmt"
"net/url"
"testing"

Expand All @@ -24,6 +25,7 @@ import (
"github.com/pingcap/tiflow/cdc/sink/ddlsink/mq/ddlproducer"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/manager"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/sink"
pulsarConfig "github.com/pingcap/tiflow/pkg/sink/pulsar"
"github.com/stretchr/testify/require"
)
Expand All @@ -33,17 +35,17 @@ const (
MockPulsarTopic = "pulsar_test"
)

var pulsarSchemaList = []string{sink.PulsarScheme, sink.PulsarSSLScheme, sink.PulsarHTTPScheme, sink.PulsarHTTPSScheme}

// newPulsarConfig set config
func newPulsarConfig(t *testing.T) (*config.PulsarConfig, *url.URL) {
sinkURL := "pulsar://127.0.0.1:6650/persistent://public/default/test?" +
func newPulsarConfig(t *testing.T, schema string) (*config.PulsarConfig, *url.URL) {
sinkURL := fmt.Sprintf("%s://127.0.0.1:6650/persistent://public/default/test?", schema) +
"protocol=canal-json&pulsar-version=v2.10.0&enable-tidb-extension=true&" +
"authentication-token=eyJhbcGcixxxxxxxxxxxxxx"

sinkURI, err := url.Parse(sinkURL)
require.NoError(t, err)
replicaConfig := config.GetDefaultReplicaConfig()
require.NoError(t, replicaConfig.ValidateAndAdjust(sinkURI))
require.NoError(t, err)
c, err := pulsarConfig.NewPulsarConfig(sinkURI, replicaConfig.Sink.PulsarConfig)
require.NoError(t, err)
return c, sinkURI
Expand All @@ -52,110 +54,112 @@ func newPulsarConfig(t *testing.T) (*config.PulsarConfig, *url.URL) {
// TestNewPulsarDDLSink tests the NewPulsarDDLSink
func TestNewPulsarDDLSink(t *testing.T) {
t.Parallel()

_, sinkURI := newPulsarConfig(t)
changefeedID := model.DefaultChangeFeedID("test")
replicaConfig := config.GetDefaultReplicaConfig()
replicaConfig.Sink = &config.SinkConfig{
Protocol: aws.String("canal-json"),
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ctx = context.WithValue(ctx, "testing.T", t)
ddlSink, err := NewPulsarDDLSink(ctx, changefeedID, sinkURI, replicaConfig,
manager.NewMockPulsarTopicManager, pulsarConfig.NewMockCreatorFactory, ddlproducer.NewMockPulsarProducerDDL)

require.NoError(t, err)
require.NotNil(t, ddlSink)

checkpointTs := uint64(417318403368288260)
tables := []*model.TableInfo{
{
TableName: model.TableName{
Schema: "cdc",
Table: "person",
for _, schema := range pulsarSchemaList {
_, sinkURI := newPulsarConfig(t, schema)
changefeedID := model.DefaultChangeFeedID("test")
replicaConfig := config.GetDefaultReplicaConfig()
replicaConfig.Sink = &config.SinkConfig{
Protocol: aws.String("canal-json"),
}

ctx = context.WithValue(ctx, "testing.T", t)
ddlSink, err := NewPulsarDDLSink(ctx, changefeedID, sinkURI, replicaConfig,
manager.NewMockPulsarTopicManager, pulsarConfig.NewMockCreatorFactory, ddlproducer.NewMockPulsarProducerDDL)

require.NoError(t, err)
require.NotNil(t, ddlSink)

checkpointTs := uint64(417318403368288260)
tables := []*model.TableInfo{
{
TableName: model.TableName{
Schema: "cdc",
Table: "person",
},
},
},
{
TableName: model.TableName{
Schema: "cdc",
Table: "person1",
{
TableName: model.TableName{
Schema: "cdc",
Table: "person1",
},
},
},
{
TableName: model.TableName{
Schema: "cdc",
Table: "person2",
{
TableName: model.TableName{
Schema: "cdc",
Table: "person2",
},
},
},
}
}

err = ddlSink.WriteCheckpointTs(ctx, checkpointTs, tables)
require.NoError(t, err)
err = ddlSink.WriteCheckpointTs(ctx, checkpointTs, tables)
require.NoError(t, err)

events := ddlSink.producer.(*ddlproducer.PulsarMockProducers).GetAllEvents()
require.Len(t, events, 1, "All topics and partitions should be broadcast")
events := ddlSink.producer.(*ddlproducer.PulsarMockProducers).GetAllEvents()
require.Len(t, events, 1, "All topics and partitions should be broadcast")
}
}

// TestPulsarDDLSinkNewSuccess tests the NewPulsarDDLSink write a event to pulsar
func TestPulsarDDLSinkNewSuccess(t *testing.T) {
t.Parallel()

_, sinkURI := newPulsarConfig(t)
changefeedID := model.DefaultChangeFeedID("test")
replicaConfig := config.GetDefaultReplicaConfig()
replicaConfig.Sink = &config.SinkConfig{
Protocol: aws.String("canal-json"),
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ctx = context.WithValue(ctx, "testing.T", t)
s, err := NewPulsarDDLSink(ctx, changefeedID, sinkURI, replicaConfig, manager.NewMockPulsarTopicManager,
pulsarConfig.NewMockCreatorFactory, ddlproducer.NewMockPulsarProducerDDL)
require.NoError(t, err)
require.NotNil(t, s)
for _, schema := range pulsarSchemaList {
_, sinkURI := newPulsarConfig(t, schema)
changefeedID := model.DefaultChangeFeedID("test")
replicaConfig := config.GetDefaultReplicaConfig()
replicaConfig.Sink = &config.SinkConfig{
Protocol: aws.String("canal-json"),
}

ctx = context.WithValue(ctx, "testing.T", t)
s, err := NewPulsarDDLSink(ctx, changefeedID, sinkURI, replicaConfig, manager.NewMockPulsarTopicManager,
pulsarConfig.NewMockCreatorFactory, ddlproducer.NewMockPulsarProducerDDL)
require.NoError(t, err)
require.NotNil(t, s)
}
}

func TestPulsarWriteDDLEventToZeroPartition(t *testing.T) {
t.Parallel()

_, sinkURI := newPulsarConfig(t)
changefeedID := model.DefaultChangeFeedID("test")
replicaConfig := config.GetDefaultReplicaConfig()
replicaConfig.Sink = &config.SinkConfig{
Protocol: aws.String("canal-json"),
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ctx = context.WithValue(ctx, "testing.T", t)
ddlSink, err := NewPulsarDDLSink(ctx, changefeedID, sinkURI, replicaConfig,
manager.NewMockPulsarTopicManager, pulsarConfig.NewMockCreatorFactory, ddlproducer.NewMockPulsarProducerDDL)

require.NoError(t, err)
require.NotNil(t, ddlSink)

ddl := &model.DDLEvent{
CommitTs: 417318403368288260,
TableInfo: &model.TableInfo{
TableName: model.TableName{
Schema: "cdc", Table: "person",
for _, schema := range pulsarSchemaList {
_, sinkURI := newPulsarConfig(t, schema)
changefeedID := model.DefaultChangeFeedID("test")
replicaConfig := config.GetDefaultReplicaConfig()
replicaConfig.Sink = &config.SinkConfig{
Protocol: aws.String("canal-json"),
}

ctx = context.WithValue(ctx, "testing.T", t)
ddlSink, err := NewPulsarDDLSink(ctx, changefeedID, sinkURI, replicaConfig,
manager.NewMockPulsarTopicManager, pulsarConfig.NewMockCreatorFactory, ddlproducer.NewMockPulsarProducerDDL)

require.NoError(t, err)
require.NotNil(t, ddlSink)

ddl := &model.DDLEvent{
CommitTs: 417318403368288260,
TableInfo: &model.TableInfo{
TableName: model.TableName{
Schema: "cdc", Table: "person",
},
},
},
Query: "create table person(id int, name varchar(32), primary key(id))",
Type: mm.ActionCreateTable,
}
err = ddlSink.WriteDDLEvent(ctx, ddl)
require.NoError(t, err)
Query: "create table person(id int, name varchar(32), primary key(id))",
Type: mm.ActionCreateTable,
}
err = ddlSink.WriteDDLEvent(ctx, ddl)
require.NoError(t, err)

err = ddlSink.WriteDDLEvent(ctx, ddl)
require.NoError(t, err)
err = ddlSink.WriteDDLEvent(ctx, ddl)
require.NoError(t, err)

require.Len(t, ddlSink.producer.(*ddlproducer.PulsarMockProducers).GetAllEvents(),
2, "Write DDL 2 Events")
require.Len(t, ddlSink.producer.(*ddlproducer.PulsarMockProducers).GetAllEvents(),
2, "Write DDL 2 Events")
}
}
2 changes: 1 addition & 1 deletion cdc/sink/dmlsink/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func New(
bs := blackhole.NewDMLSink()
s.rowSink = bs
s.category = CategoryBlackhole
case sink.PulsarScheme, sink.PulsarSSLScheme:
case sink.PulsarScheme, sink.PulsarSSLScheme, sink.PulsarHTTPScheme, sink.PulsarHTTPSScheme:
mqs, err := mq.NewPulsarDMLSink(ctx, changefeedID, sinkURI, cfg, errCh,
manager.NewPulsarTopicManager,
pulsarConfig.NewCreatorFactory, dmlproducer.NewPulsarDMLProducer)
Expand Down
86 changes: 46 additions & 40 deletions cdc/sink/dmlsink/mq/dmlproducer/pulsar_dml_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,24 @@ package dmlproducer

import (
"context"
"fmt"
"net/url"
"testing"

"github.com/aws/aws-sdk-go/aws"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
pulsarConfig "github.com/pingcap/tiflow/pkg/sink/pulsar"
"github.com/stretchr/testify/require"
)

var pulsarSchemaList = []string{sink.PulsarScheme, sink.PulsarSSLScheme, sink.PulsarHTTPScheme, sink.PulsarHTTPSScheme}

// newPulsarConfig set config
func newPulsarConfig(t *testing.T) (sinkURI *url.URL, replicaConfig *config.ReplicaConfig) {
sinkURL := "pulsar://127.0.0.1:6650/persistent://public/default/test?" +
func newPulsarConfig(t *testing.T, schema string) (sinkURI *url.URL, replicaConfig *config.ReplicaConfig) {
sinkURL := fmt.Sprintf("%s://127.0.0.1:6650/persistent://public/default/test?", schema) +
"protocol=canal-json&pulsar-version=v2.10.0&enable-tidb-extension=true&" +
"authentication-token=eyJhbcGcixxxxxxxxxxxxxx"
var err error
Expand All @@ -45,59 +49,61 @@ func newPulsarConfig(t *testing.T) (sinkURI *url.URL, replicaConfig *config.Repl

func TestNewPulsarDMLProducer(t *testing.T) {
t.Parallel()

sinkURI, rc := newPulsarConfig(t)
replicaConfig := config.GetDefaultReplicaConfig()
replicaConfig.Sink = &config.SinkConfig{
Protocol: aws.String("canal-json"),
}
t.Logf(sinkURI.String())

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

errCh := make(chan error, 1)
for _, schema := range pulsarSchemaList {
sinkURI, rc := newPulsarConfig(t, schema)
replicaConfig := config.GetDefaultReplicaConfig()
replicaConfig.Sink = &config.SinkConfig{
Protocol: aws.String("canal-json"),
}
t.Logf(sinkURI.String())

ctx = context.WithValue(ctx, "testing.T", t)
changefeed := model.DefaultChangeFeedID("changefeed-test")
errCh := make(chan error, 1)

failpointCh := make(chan error, 1)
ctx = context.WithValue(ctx, "testing.T", t)
changefeed := model.DefaultChangeFeedID("changefeed-test")

client, err := pulsarConfig.NewMockCreatorFactory(rc.Sink.PulsarConfig, changefeed, rc.Sink)
require.NoError(t, err)
dml, err := NewPulsarDMLProducerMock(ctx, changefeed, client, rc.Sink, errCh, failpointCh)
require.NoError(t, err)
require.NotNil(t, dml)
failpointCh := make(chan error, 1)

client, err := pulsarConfig.NewMockCreatorFactory(rc.Sink.PulsarConfig, changefeed, rc.Sink)
require.NoError(t, err)
dml, err := NewPulsarDMLProducerMock(ctx, changefeed, client, rc.Sink, errCh, failpointCh)
require.NoError(t, err)
require.NotNil(t, dml)
}
}

func Test_pulsarDMLProducer_AsyncSendMessage(t *testing.T) {
t.Parallel()

_, rc := newPulsarConfig(t)
replicaConfig := config.GetDefaultReplicaConfig()
replicaConfig.Sink = &config.SinkConfig{
Protocol: aws.String("canal-json"),
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

errCh := make(chan error, 1)
for _, schema := range pulsarSchemaList {
_, rc := newPulsarConfig(t, schema)
replicaConfig := config.GetDefaultReplicaConfig()
replicaConfig.Sink = &config.SinkConfig{
Protocol: aws.String("canal-json"),
}

ctx = context.WithValue(ctx, "testing.T", t)
changefeed := model.DefaultChangeFeedID("changefeed-test")
errCh := make(chan error, 1)

failpointCh := make(chan error, 1)
ctx = context.WithValue(ctx, "testing.T", t)
changefeed := model.DefaultChangeFeedID("changefeed-test")

client, err := pulsarConfig.NewMockCreatorFactory(rc.Sink.PulsarConfig, changefeed, rc.Sink)
require.NoError(t, err)
dml, err := NewPulsarDMLProducerMock(ctx, changefeed, client, rc.Sink, errCh, failpointCh)
require.NoError(t, err)
require.NotNil(t, dml)
failpointCh := make(chan error, 1)

err = dml.AsyncSendMessage(ctx, "test", 0, &common.Message{
Value: []byte("this value for test input data"),
PartitionKey: str2Pointer("test_key"),
})
require.NoError(t, err)
client, err := pulsarConfig.NewMockCreatorFactory(rc.Sink.PulsarConfig, changefeed, rc.Sink)
require.NoError(t, err)
dml, err := NewPulsarDMLProducerMock(ctx, changefeed, client, rc.Sink, errCh, failpointCh)
require.NoError(t, err)
require.NotNil(t, dml)

err = dml.AsyncSendMessage(ctx, "test", 0, &common.Message{
Value: []byte("this value for test input data"),
PartitionKey: str2Pointer("test_key"),
})
require.NoError(t, err)
}
}
2 changes: 1 addition & 1 deletion pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -888,7 +888,7 @@ func (s *SinkConfig) ValidateProtocol(scheme string) error {
switch scheme {
case sink.KafkaScheme, sink.KafkaSSLScheme:
outputRawChangeEvent = s.KafkaConfig.GetOutputRawChangeEvent()
case sink.PulsarScheme, sink.PulsarSSLScheme:
case sink.PulsarScheme, sink.PulsarSSLScheme, sink.PulsarHTTPScheme, sink.PulsarHTTPSScheme:
outputRawChangeEvent = s.PulsarConfig.GetOutputRawChangeEvent()
default:
outputRawChangeEvent = s.CloudStorageConfig.GetOutputRawChangeEvent()
Expand Down
Loading

0 comments on commit a2e6219

Please sign in to comment.