Skip to content

Commit

Permalink
pb namespace conflict fixed
Browse files Browse the repository at this point in the history
1. proto分包
2. NONE和兼容pb2的处理方式相同。否则启动时会panic
  • Loading branch information
jackey.pu authored and kdpujie committed Apr 16, 2021
1 parent 533d438 commit a39ef4b
Show file tree
Hide file tree
Showing 12 changed files with 1,756 additions and 1,973 deletions.
71 changes: 36 additions & 35 deletions client/simple_canal_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ import (
"bytes"
"encoding/binary"
"fmt"
pb "github.com/withlin/canal-go/protocol"
"io"
"net"
"strconv"
"sync"

pb "github.com/withlin/canal-go/protocol"
pbp "github.com/withlin/canal-go/protocol/packet"

"github.com/golang/protobuf/proto"
)
Expand Down Expand Up @@ -128,7 +129,7 @@ func (c SimpleCanalConnector) doConnect() error {
}
conn = con

p := new(pb.Packet)
p := new(pbp.Packet)
data, err := readNextPacket()
if err != nil {
return err
Expand All @@ -142,27 +143,27 @@ func (c SimpleCanalConnector) doConnect() error {
return fmt.Errorf(versionErr)
}

if p.GetType() != pb.PacketType_HANDSHAKE {
if p.GetType() != pbp.PacketType_HANDSHAKE {
return fmt.Errorf(handshakeErr)
}

handshake := &pb.Handshake{}
handshake := &pbp.Handshake{}
seed := &handshake.Seeds
err = proto.Unmarshal(p.GetBody(), handshake)
if err != nil {
return err
}
bytePas :=[]byte(c.PassWord)
pas := []byte(ByteSliceToHexString(Scramble411(&bytePas,seed)))
ca := &pb.ClientAuth{
bytePas := []byte(c.PassWord)
pas := []byte(ByteSliceToHexString(Scramble411(&bytePas, seed)))
ca := &pbp.ClientAuth{
Username: c.UserName,
Password: pas,
NetReadTimeoutPresent: &pb.ClientAuth_NetReadTimeout{NetReadTimeout: c.IdleTimeOut},
NetWriteTimeoutPresent: &pb.ClientAuth_NetWriteTimeout{NetWriteTimeout: c.IdleTimeOut},
NetReadTimeoutPresent: &pbp.ClientAuth_NetReadTimeout{NetReadTimeout: c.IdleTimeOut},
NetWriteTimeoutPresent: &pbp.ClientAuth_NetWriteTimeout{NetWriteTimeout: c.IdleTimeOut},
}
caByteArray, _ := proto.Marshal(ca)
packet := &pb.Packet{
Type: pb.PacketType_CLIENTAUTHENTICATION,
packet := &pbp.Packet{
Type: pbp.PacketType_CLIENTAUTHENTICATION,
Body: caByteArray,
}

Expand All @@ -174,18 +175,18 @@ func (c SimpleCanalConnector) doConnect() error {
if err != nil {
return err
}
pk := &pb.Packet{}
pk := &pbp.Packet{}

err = proto.Unmarshal(pp, pk)
if err != nil {
return err
}

if pk.Type != pb.PacketType_ACK {
if pk.Type != pbp.PacketType_ACK {
return fmt.Errorf(packetAckErr)
}

ackBody := &pb.Ack{}
ackBody := &pbp.Ack{}
err = proto.Unmarshal(pk.GetBody(), ackBody)
if err != nil {
return err
Expand Down Expand Up @@ -228,20 +229,20 @@ func (c *SimpleCanalConnector) GetWithOutAck(batchSize int32, timeOut *int64, un
if units == nil {
units = &i
}
get := new(pb.Get)
get.AutoAckPresent = &pb.Get_AutoAck{AutoAck: false}
get := new(pbp.Get)
get.AutoAckPresent = &pbp.Get_AutoAck{AutoAck: false}
get.Destination = c.ClientIdentity.Destination
get.ClientId = strconv.Itoa(c.ClientIdentity.ClientId)
get.FetchSize = size
get.TimeoutPresent = &pb.Get_Timeout{Timeout: *time}
get.UnitPresent = &pb.Get_Unit{Unit: *units}
get.TimeoutPresent = &pbp.Get_Timeout{Timeout: *time}
get.UnitPresent = &pbp.Get_Unit{Unit: *units}

getBody, err := proto.Marshal(get)
if err != nil {
return nil, err
}
packet := new(pb.Packet)
packet.Type = pb.PacketType_GET
packet := new(pbp.Packet)
packet.Type = pbp.PacketType_GET
packet.Body = getBody
pa, err := proto.Marshal(packet)
if err != nil {
Expand Down Expand Up @@ -275,7 +276,7 @@ func (c *SimpleCanalConnector) UnSubscribe() error {
return nil
}

us := new(pb.Unsub)
us := new(pbp.Unsub)
us.Destination = c.ClientIdentity.Destination
us.ClientId = strconv.Itoa(c.ClientIdentity.ClientId)

Expand All @@ -284,8 +285,8 @@ func (c *SimpleCanalConnector) UnSubscribe() error {
return err
}

pa := new(pb.Packet)
pa.Type = pb.PacketType_UNSUBSCRIPTION
pa := new(pbp.Packet)
pa.Type = pbp.PacketType_UNSUBSCRIPTION
pa.Body = unSub

pack, err := proto.Marshal(pa)
Expand All @@ -300,7 +301,7 @@ func (c *SimpleCanalConnector) UnSubscribe() error {
if err != nil {
return err
}
ack := new(pb.Ack)
ack := new(pbp.Ack)
err = proto.Unmarshal(pa.Body, ack)
if err != nil {
return err
Expand Down Expand Up @@ -328,7 +329,7 @@ func (c *SimpleCanalConnector) Ack(batchId int64) error {
return nil
}

ca := new(pb.ClientAck)
ca := new(pbp.ClientAck)
ca.Destination = c.ClientIdentity.Destination
ca.ClientId = strconv.Itoa(c.ClientIdentity.ClientId)
ca.BatchId = batchId
Expand All @@ -337,8 +338,8 @@ func (c *SimpleCanalConnector) Ack(batchId int64) error {
if err != nil {
return err
}
pa := new(pb.Packet)
pa.Type = pb.PacketType_CLIENTACK
pa := new(pbp.Packet)
pa.Type = pbp.PacketType_CLIENTACK
pa.Body = clientAck
pack, err := proto.Marshal(pa)
if err != nil {
Expand All @@ -352,7 +353,7 @@ func (c *SimpleCanalConnector) Ack(batchId int64) error {
//RollBack 回滚操作
func (c *SimpleCanalConnector) RollBack(batchId int64) error {
c.waitClientRunning()
cb := new(pb.ClientRollback)
cb := new(pbp.ClientRollback)
cb.Destination = c.ClientIdentity.Destination
cb.ClientId = strconv.Itoa(c.ClientIdentity.ClientId)
cb.BatchId = batchId
Expand All @@ -362,8 +363,8 @@ func (c *SimpleCanalConnector) RollBack(batchId int64) error {
return err
}

pa := new(pb.Packet)
pa.Type = pb.PacketType_CLIENTROLLBACK
pa := new(pbp.Packet)
pa.Type = pbp.PacketType_CLIENTROLLBACK
pa.Body = clientBollBack
pack, err := proto.Marshal(pa)
if err != nil {
Expand Down Expand Up @@ -432,15 +433,15 @@ func (c *SimpleCanalConnector) Subscribe(filter string) error {
if !c.Running {
return nil
}
body, _ := proto.Marshal(&pb.Sub{Destination: c.ClientIdentity.Destination, ClientId: strconv.Itoa(c.ClientIdentity.ClientId), Filter: filter})
pack := new(pb.Packet)
pack.Type = pb.PacketType_SUBSCRIPTION
body, _ := proto.Marshal(&pbp.Sub{Destination: c.ClientIdentity.Destination, ClientId: strconv.Itoa(c.ClientIdentity.ClientId), Filter: filter})
pack := new(pbp.Packet)
pack.Type = pbp.PacketType_SUBSCRIPTION
pack.Body = body

packet, _ := proto.Marshal(pack)
WriteWithHeader(packet)

p := new(pb.Packet)
p := new(pbp.Packet)

paBytes, err := readNextPacket()
if err != nil {
Expand All @@ -450,7 +451,7 @@ func (c *SimpleCanalConnector) Subscribe(filter string) error {
if err != nil {
return err
}
ack := new(pb.Ack)
ack := new(pbp.Ack)
err = proto.Unmarshal(p.Body, ack)
if err != nil {
return err
Expand Down
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@ module github.com/withlin/canal-go

require (
github.com/gogo/protobuf v1.3.1
github.com/golang/protobuf v1.2.0
github.com/golang/protobuf v1.5.2
github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208 // indirect
)

go 1.13
14 changes: 10 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls=
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec h1:6ncX5ko6B9LntYM0YBRXkiSaZMmLYeZ/NWcmeB43mMY=
github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208 h1:qwRHBd0NqMbJxfbotnDhm2ByMI1Shq4Y6oRJo21SGJA=
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
25 changes: 13 additions & 12 deletions protocol/Message.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package com_alibaba_otter_canal_protocol
package protocol

import (
"errors"
"fmt"

"github.com/gogo/protobuf/proto"
pbe "github.com/withlin/canal-go/protocol/entry"
pbp "github.com/withlin/canal-go/protocol/packet"
)

type Message struct {
Id int64
Entries []Entry
Entries []pbe.Entry
Raw bool
RawEntries interface{}
}
Expand All @@ -36,22 +37,22 @@ func NewMessage(id int64) *Message {
}

func Decode(data []byte, lazyParseEntry bool) (*Message, error) {
p := new(Packet)
p := new(pbp.Packet)
err := proto.Unmarshal(data, p)
if err != nil {
return nil, err
}
messages := new(Messages)
messages := new(pbp.Messages)
message := new(Message)

length := len(messages.Messages)
message.Entries = make([]Entry, length)
ack := new(Ack)
var items []Entry
var entry Entry
message.Entries = make([]pbe.Entry, length)
ack := new(pbp.Ack)
var items []pbe.Entry
var entry pbe.Entry
switch p.Type {
case PacketType_MESSAGES:
if !(p.GetCompression() == Compression_NONE) {
case pbp.PacketType_MESSAGES:
if !(p.GetCompression() == pbp.Compression_NONE) && !(p.GetCompression() == pbp.Compression_COMPRESSIONCOMPATIBLEPROTO2) { // NONE和兼容pb2的处理方式相同
panic("compression is not supported in this connector")
}
err := proto.Unmarshal(p.Body, messages)
Expand All @@ -75,7 +76,7 @@ func Decode(data []byte, lazyParseEntry bool) (*Message, error) {
message.Id = messages.GetBatchId()
return message, nil

case PacketType_ACK:
case pbp.PacketType_ACK:
err := proto.Unmarshal(p.Body, ack)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion protocol/client_identity.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package com_alibaba_otter_canal_protocol
package protocol

type ClientIdentity struct {
Destination string
Expand Down
2 changes: 1 addition & 1 deletion protocol/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package com_alibaba_otter_canal_protocol
package protocol
Loading

0 comments on commit a39ef4b

Please sign in to comment.