Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

namespace conflict fixed #70

Merged
merged 1 commit into from Apr 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
71 changes: 36 additions & 35 deletions client/simple_canal_connector.go
Expand Up @@ -21,12 +21,13 @@ import (
"bytes"
"encoding/binary"
"fmt"
pb "github.com/withlin/canal-go/protocol"
"io"
"net"
"strconv"
"sync"

pb "github.com/withlin/canal-go/protocol"
pbp "github.com/withlin/canal-go/protocol/packet"

"github.com/golang/protobuf/proto"
)
Expand Down Expand Up @@ -128,7 +129,7 @@ func (c SimpleCanalConnector) doConnect() error {
}
conn = con

p := new(pb.Packet)
p := new(pbp.Packet)
data, err := readNextPacket()
if err != nil {
return err
Expand All @@ -142,27 +143,27 @@ func (c SimpleCanalConnector) doConnect() error {
return fmt.Errorf(versionErr)
}

if p.GetType() != pb.PacketType_HANDSHAKE {
if p.GetType() != pbp.PacketType_HANDSHAKE {
return fmt.Errorf(handshakeErr)
}

handshake := &pb.Handshake{}
handshake := &pbp.Handshake{}
seed := &handshake.Seeds
err = proto.Unmarshal(p.GetBody(), handshake)
if err != nil {
return err
}
bytePas :=[]byte(c.PassWord)
pas := []byte(ByteSliceToHexString(Scramble411(&bytePas,seed)))
ca := &pb.ClientAuth{
bytePas := []byte(c.PassWord)
pas := []byte(ByteSliceToHexString(Scramble411(&bytePas, seed)))
ca := &pbp.ClientAuth{
Username: c.UserName,
Password: pas,
NetReadTimeoutPresent: &pb.ClientAuth_NetReadTimeout{NetReadTimeout: c.IdleTimeOut},
NetWriteTimeoutPresent: &pb.ClientAuth_NetWriteTimeout{NetWriteTimeout: c.IdleTimeOut},
NetReadTimeoutPresent: &pbp.ClientAuth_NetReadTimeout{NetReadTimeout: c.IdleTimeOut},
NetWriteTimeoutPresent: &pbp.ClientAuth_NetWriteTimeout{NetWriteTimeout: c.IdleTimeOut},
}
caByteArray, _ := proto.Marshal(ca)
packet := &pb.Packet{
Type: pb.PacketType_CLIENTAUTHENTICATION,
packet := &pbp.Packet{
Type: pbp.PacketType_CLIENTAUTHENTICATION,
Body: caByteArray,
}

Expand All @@ -174,18 +175,18 @@ func (c SimpleCanalConnector) doConnect() error {
if err != nil {
return err
}
pk := &pb.Packet{}
pk := &pbp.Packet{}

err = proto.Unmarshal(pp, pk)
if err != nil {
return err
}

if pk.Type != pb.PacketType_ACK {
if pk.Type != pbp.PacketType_ACK {
return fmt.Errorf(packetAckErr)
}

ackBody := &pb.Ack{}
ackBody := &pbp.Ack{}
err = proto.Unmarshal(pk.GetBody(), ackBody)
if err != nil {
return err
Expand Down Expand Up @@ -228,20 +229,20 @@ func (c *SimpleCanalConnector) GetWithOutAck(batchSize int32, timeOut *int64, un
if units == nil {
units = &i
}
get := new(pb.Get)
get.AutoAckPresent = &pb.Get_AutoAck{AutoAck: false}
get := new(pbp.Get)
get.AutoAckPresent = &pbp.Get_AutoAck{AutoAck: false}
get.Destination = c.ClientIdentity.Destination
get.ClientId = strconv.Itoa(c.ClientIdentity.ClientId)
get.FetchSize = size
get.TimeoutPresent = &pb.Get_Timeout{Timeout: *time}
get.UnitPresent = &pb.Get_Unit{Unit: *units}
get.TimeoutPresent = &pbp.Get_Timeout{Timeout: *time}
get.UnitPresent = &pbp.Get_Unit{Unit: *units}

getBody, err := proto.Marshal(get)
if err != nil {
return nil, err
}
packet := new(pb.Packet)
packet.Type = pb.PacketType_GET
packet := new(pbp.Packet)
packet.Type = pbp.PacketType_GET
packet.Body = getBody
pa, err := proto.Marshal(packet)
if err != nil {
Expand Down Expand Up @@ -275,7 +276,7 @@ func (c *SimpleCanalConnector) UnSubscribe() error {
return nil
}

us := new(pb.Unsub)
us := new(pbp.Unsub)
us.Destination = c.ClientIdentity.Destination
us.ClientId = strconv.Itoa(c.ClientIdentity.ClientId)

Expand All @@ -284,8 +285,8 @@ func (c *SimpleCanalConnector) UnSubscribe() error {
return err
}

pa := new(pb.Packet)
pa.Type = pb.PacketType_UNSUBSCRIPTION
pa := new(pbp.Packet)
pa.Type = pbp.PacketType_UNSUBSCRIPTION
pa.Body = unSub

pack, err := proto.Marshal(pa)
Expand All @@ -300,7 +301,7 @@ func (c *SimpleCanalConnector) UnSubscribe() error {
if err != nil {
return err
}
ack := new(pb.Ack)
ack := new(pbp.Ack)
err = proto.Unmarshal(pa.Body, ack)
if err != nil {
return err
Expand Down Expand Up @@ -328,7 +329,7 @@ func (c *SimpleCanalConnector) Ack(batchId int64) error {
return nil
}

ca := new(pb.ClientAck)
ca := new(pbp.ClientAck)
ca.Destination = c.ClientIdentity.Destination
ca.ClientId = strconv.Itoa(c.ClientIdentity.ClientId)
ca.BatchId = batchId
Expand All @@ -337,8 +338,8 @@ func (c *SimpleCanalConnector) Ack(batchId int64) error {
if err != nil {
return err
}
pa := new(pb.Packet)
pa.Type = pb.PacketType_CLIENTACK
pa := new(pbp.Packet)
pa.Type = pbp.PacketType_CLIENTACK
pa.Body = clientAck
pack, err := proto.Marshal(pa)
if err != nil {
Expand All @@ -352,7 +353,7 @@ func (c *SimpleCanalConnector) Ack(batchId int64) error {
//RollBack 回滚操作
func (c *SimpleCanalConnector) RollBack(batchId int64) error {
c.waitClientRunning()
cb := new(pb.ClientRollback)
cb := new(pbp.ClientRollback)
cb.Destination = c.ClientIdentity.Destination
cb.ClientId = strconv.Itoa(c.ClientIdentity.ClientId)
cb.BatchId = batchId
Expand All @@ -362,8 +363,8 @@ func (c *SimpleCanalConnector) RollBack(batchId int64) error {
return err
}

pa := new(pb.Packet)
pa.Type = pb.PacketType_CLIENTROLLBACK
pa := new(pbp.Packet)
pa.Type = pbp.PacketType_CLIENTROLLBACK
pa.Body = clientBollBack
pack, err := proto.Marshal(pa)
if err != nil {
Expand Down Expand Up @@ -432,15 +433,15 @@ func (c *SimpleCanalConnector) Subscribe(filter string) error {
if !c.Running {
return nil
}
body, _ := proto.Marshal(&pb.Sub{Destination: c.ClientIdentity.Destination, ClientId: strconv.Itoa(c.ClientIdentity.ClientId), Filter: filter})
pack := new(pb.Packet)
pack.Type = pb.PacketType_SUBSCRIPTION
body, _ := proto.Marshal(&pbp.Sub{Destination: c.ClientIdentity.Destination, ClientId: strconv.Itoa(c.ClientIdentity.ClientId), Filter: filter})
pack := new(pbp.Packet)
pack.Type = pbp.PacketType_SUBSCRIPTION
pack.Body = body

packet, _ := proto.Marshal(pack)
WriteWithHeader(packet)

p := new(pb.Packet)
p := new(pbp.Packet)

paBytes, err := readNextPacket()
if err != nil {
Expand All @@ -450,7 +451,7 @@ func (c *SimpleCanalConnector) Subscribe(filter string) error {
if err != nil {
return err
}
ack := new(pb.Ack)
ack := new(pbp.Ack)
err = proto.Unmarshal(p.Body, ack)
if err != nil {
return err
Expand Down
3 changes: 1 addition & 2 deletions go.mod
Expand Up @@ -2,9 +2,8 @@ module github.com/withlin/canal-go

require (
github.com/gogo/protobuf v1.3.1
github.com/golang/protobuf v1.2.0
github.com/golang/protobuf v1.5.2
github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208 // indirect
)

go 1.13
14 changes: 10 additions & 4 deletions go.sum
@@ -1,11 +1,17 @@
github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls=
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec h1:6ncX5ko6B9LntYM0YBRXkiSaZMmLYeZ/NWcmeB43mMY=
github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208 h1:qwRHBd0NqMbJxfbotnDhm2ByMI1Shq4Y6oRJo21SGJA=
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
25 changes: 13 additions & 12 deletions protocol/Message.go
Expand Up @@ -14,18 +14,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package com_alibaba_otter_canal_protocol
package protocol

import (
"errors"
"fmt"

"github.com/gogo/protobuf/proto"
pbe "github.com/withlin/canal-go/protocol/entry"
pbp "github.com/withlin/canal-go/protocol/packet"
)

type Message struct {
Id int64
Entries []Entry
Entries []pbe.Entry
Raw bool
RawEntries interface{}
}
Expand All @@ -36,22 +37,22 @@ func NewMessage(id int64) *Message {
}

func Decode(data []byte, lazyParseEntry bool) (*Message, error) {
p := new(Packet)
p := new(pbp.Packet)
err := proto.Unmarshal(data, p)
if err != nil {
return nil, err
}
messages := new(Messages)
messages := new(pbp.Messages)
message := new(Message)

length := len(messages.Messages)
message.Entries = make([]Entry, length)
ack := new(Ack)
var items []Entry
var entry Entry
message.Entries = make([]pbe.Entry, length)
ack := new(pbp.Ack)
var items []pbe.Entry
var entry pbe.Entry
switch p.Type {
case PacketType_MESSAGES:
if !(p.GetCompression() == Compression_NONE) {
case pbp.PacketType_MESSAGES:
if !(p.GetCompression() == pbp.Compression_NONE) && !(p.GetCompression() == pbp.Compression_COMPRESSIONCOMPATIBLEPROTO2) { // NONE和兼容pb2的处理方式相同
panic("compression is not supported in this connector")
}
err := proto.Unmarshal(p.Body, messages)
Expand All @@ -75,7 +76,7 @@ func Decode(data []byte, lazyParseEntry bool) (*Message, error) {
message.Id = messages.GetBatchId()
return message, nil

case PacketType_ACK:
case pbp.PacketType_ACK:
err := proto.Unmarshal(p.Body, ack)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion protocol/client_identity.go
Expand Up @@ -14,7 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package com_alibaba_otter_canal_protocol
package protocol

type ClientIdentity struct {
Destination string
Expand Down
2 changes: 1 addition & 1 deletion protocol/doc.go
Expand Up @@ -14,4 +14,4 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package com_alibaba_otter_canal_protocol
package protocol