Skip to content

Commit

Permalink
调整kafka默认配置参数, 去除new client
Browse files Browse the repository at this point in the history
  • Loading branch information
yumaojun03 committed Nov 27, 2020
1 parent f143226 commit ead727b
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 19 deletions.
15 changes: 4 additions & 11 deletions bus/broker/kafka/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,27 +56,20 @@ func (b *Publisher) Connect() error {
b.mux.Lock()
defer b.mux.Unlock()

b.l.Debugf("try connect: %v ...", b.conf.Hosts)

client, err := sarama.NewClient(b.conf.Hosts, b.kc)
if err != nil {
b.l.Errorf("new kafka client error, %s", err)
return err
}
b.l.Debugf("connect %v success", b.conf.Hosts)

// try to connect
producer, err := sarama.NewAsyncProducerFromClient(client)
b.l.Debugf("try connect: %v ...", b.conf.Hosts)
producer, err := sarama.NewAsyncProducer(b.conf.Hosts, b.kc)
if err != nil {
b.l.Errorf("new kafka producer fails with: %+v", err)
return err
}

b.producer = producer
b.pubChan = producer.Input()
b.l.Debugf("connect %v success", b.conf.Hosts)

go b.watchSuccess(producer.Successes())
go b.watchFailed(producer.Errors())

return nil
}

Expand Down
11 changes: 3 additions & 8 deletions bus/broker/kafka/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,20 +54,15 @@ func (s *Subscriber) Connect() error {
s.mux.Lock()
defer s.mux.Unlock()

// try to connect
s.l.Debugf("try connect: %v ...", s.conf.Hosts)
client, err := sarama.NewClient(s.conf.Hosts, s.kc)
if err != nil {
s.l.Errorf("new kafka client error, %s", err)
return err
}
s.l.Debugf("connect %v success", s.conf.Hosts)

consumer, err := sarama.NewConsumerGroupFromClient(s.conf.GroupID, client)
consumer, err := sarama.NewConsumerGroup(s.conf.Hosts, s.conf.GroupID, s.kc)
if err != nil {
s.l.Errorf("Kafka consummer connect fails with: %+v", err)
return err
}
s.comsummer = consumer
s.l.Debugf("connect %v success", s.conf.Hosts)
return nil
}

Expand Down

0 comments on commit ead727b

Please sign in to comment.