Skip to content

Commit

Permalink
Merge pull request #81 from lukasmacko/kafka
Browse files Browse the repository at this point in the history
WIP: Refactor messaging + kafka
  • Loading branch information
jozef-slezak committed Aug 17, 2017
2 parents cda5aa4 + 39aa15e commit f4f0cfb
Show file tree
Hide file tree
Showing 13 changed files with 436 additions and 173 deletions.
62 changes: 24 additions & 38 deletions examples/kafka/main.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
package main

import (
"encoding/json"
"fmt"
"github.com/ligato/cn-infra/core"
"github.com/ligato/cn-infra/examples/model"
"github.com/ligato/cn-infra/flavors/etcdkafka"
log "github.com/ligato/cn-infra/logging/logrus"
"github.com/ligato/cn-infra/messaging/kafka"
"github.com/ligato/cn-infra/messaging/kafka/client"
"github.com/ligato/cn-infra/messaging/kafka/mux"
"github.com/ligato/cn-infra/messaging"
"github.com/ligato/cn-infra/utils/safeclose"
"time"
)
Expand Down Expand Up @@ -67,35 +63,38 @@ const PluginID core.PluginName = "example-plugin"
// ConsumerHandle is required to read messages from a topic, and PluginConnection is needed to start consuming on
// the topic
type ExamplePlugin struct {
Kafka kafka.Mux
subscription chan (*client.ConsumerMessage)
kafkaByteConn *mux.Connection
kafkaProtoConn *mux.ProtoConnection
Kafka messaging.Mux
subscription chan (messaging.ProtoMessage)
kafkaSyncPublisher messaging.ProtoPublisher
kafkaAsyncPublisher messaging.ProtoPublisher
kafkaWatcher messaging.ProtoWatcher
// Successfully published kafka message is sent through the message channel, error channel otherwise
asyncMessageChannel chan (*client.ProducerMessage)
asyncErrorChannel chan (*client.ProducerError)
asyncMessageChannel chan (messaging.ProtoMessage)
asyncErrorChannel chan (messaging.ProtoMessageErr)
}

// Init is the entry point into the plugin that is called by Agent Core when the Agent is coming up.
// The Go native plugin mechanism that was introduced in Go 1.8
func (plugin *ExamplePlugin) Init() (err error) {
// Create new kafka connection. The connection allows to consume topic/partition and to publish
// messages in plugin
plugin.kafkaByteConn = plugin.Kafka.NewConnection("example-plugin")

// Create a new kafka connection that allows easily process proto-modelled messages.
plugin.kafkaProtoConn = plugin.Kafka.NewProtoConnection("example-plugin-proto")
// Create a synchronous publisher for the selected topic.
plugin.kafkaSyncPublisher = plugin.Kafka.NewSyncPublisher("example-plugin")

// Create an asynchronous publisher for the selected topic.
plugin.kafkaAsyncPublisher = plugin.Kafka.NewAsyncPublisher("example-plugin", messaging.ToProtoMsgChan(plugin.asyncMessageChannel), messaging.ToProtoMsgErrChan(plugin.asyncErrorChannel))

plugin.kafkaWatcher = plugin.Kafka.NewWatcher("example-plugin")

// ConsumePartition is called to start consuming a topic/partition.
topic := "example-topic"
plugin.subscription = make(chan *client.ConsumerMessage)
err = plugin.kafkaByteConn.ConsumeTopic(plugin.subscription, topic)
plugin.subscription = make(chan messaging.ProtoMessage)
err = plugin.kafkaWatcher.Watch(messaging.ToProtoMsgChan(plugin.subscription), topic)
if err != nil {
log.Error(err)
}
// Init channels required for async handler
plugin.asyncMessageChannel = make(chan *client.ProducerMessage, 0)
plugin.asyncErrorChannel = make(chan *client.ProducerError, 0)
plugin.asyncMessageChannel = make(chan messaging.ProtoMessage, 0)
plugin.asyncErrorChannel = make(chan messaging.ProtoMessageErr, 0)

log.Info("Initialization of the custom plugin for the Kafka example is completed")

Expand Down Expand Up @@ -125,18 +124,6 @@ func (plugin *ExamplePlugin) Close() error {
// Send Kafka notifications
func (plugin *ExamplePlugin) producer() {
time.Sleep(4 * time.Second)
exampleFile, _ := json.Marshal("{}")

log.Info("Sending Kafka notification (string)")
// Synchronous message with string encoded-message. The SendSyncMessage() call
// returns when the message has been successfully sent to Kafka.
offset, err := plugin.kafkaByteConn.SendSyncString("example-topic", fmt.Sprintf("%s", "string-key"),
string(exampleFile))
if err != nil {
log.Errorf("Failed to sync-send a string message, error %v", err)
} else {
log.Debugf("Sent sync string message, offset: %v", offset)
}

// Synchronous message with protobuf-encoded message
enc := &etcd_example.EtcdExample{
Expand All @@ -145,19 +132,18 @@ func (plugin *ExamplePlugin) producer() {
BoolVal: true,
}
log.Info("Sending Kafka notification (protobuf)")
offset, err = plugin.kafkaProtoConn.SendSyncMessage("example-topic", "proto-key", enc)
err := plugin.kafkaSyncPublisher.Publish("proto-key", enc)
if err != nil {
log.Errorf("Failed to sync-send a proto message, error %v", err)
} else {
log.Debugf("Sent sync proto message, offset: %v", offset)
log.Debugf("Sent sync proto message.")
}

// Asynchronous message with protobuf encoded message. A success event is sent to the app asynchronously
// on an event channel when the message has been successfully sent to Kafka. An error message is sent to
// the app asynchronously if the message could not be sent.
log.Info("Sending async Kafka notification (protobuf)")
plugin.kafkaProtoConn.SendAsyncMessage("example-topic", "async-proto-key", enc, "metadata",
plugin.asyncMessageChannel, plugin.asyncErrorChannel)
plugin.kafkaAsyncPublisher.Publish("async-proto-key", enc)
}

/************
Expand All @@ -171,7 +157,7 @@ func (plugin *ExamplePlugin) syncEventHandler() {

// Watch on message channel for sync kafka events
for message := range plugin.subscription {
log.Infof("Received Kafka Message, topic '%s', key: '%s', ", message.Topic, message.Key)
log.Infof("Received Kafka Message, topic '%s', key: '%s', ", message.GetTopic(), message.GetKey())
}
}

Expand All @@ -181,7 +167,7 @@ func (plugin *ExamplePlugin) asyncEventHandler() {
for {
select {
case message := <-plugin.asyncMessageChannel:
log.Infof("Received async Kafka Message, topic '%s', key: '%s', ", message.Topic, message.Key)
log.Infof("Received async Kafka Message, topic '%s', key: '%s', ", message.GetTopic(), message.GetKey())
case err := <-plugin.asyncErrorChannel:
log.Errorf("Failed to publish async message, %v", err)
}
Expand Down
89 changes: 89 additions & 0 deletions messaging/chan.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright (c) 2017 Cisco and/or its affiliates.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package messaging

import (
"github.com/ligato/cn-infra/logging"
"github.com/ligato/cn-infra/logging/logroot"
"time"
)

// DefaultMsgTimeout for delivery of notification
const DefaultMsgTimeout = 2 * time.Second

// ToProtoMsgChan allows to receive messages through channel instead of callback.
func ToProtoMsgChan(ch chan ProtoMessage, opts ...interface{}) func(ProtoMessage) {

timeout, logger := ParseOpts(opts...)

return func(msg ProtoMessage) {
select {
case ch <- msg:
case <-time.After(timeout):
logger.Warn("Unable to deliver message")
}
}
}

// ToProtoMsgErrChan allows to receive error messages through channel instead of callback.
func ToProtoMsgErrChan(ch chan ProtoMessageErr, opts ...interface{}) func(ProtoMessageErr) {

timeout, logger := ParseOpts(opts...)

return func(msg ProtoMessageErr) {
select {
case ch <- msg:
case <-time.After(timeout):
logger.Warn("Unable to deliver message")
}
}
}

// ParseOpts returns timeout and logger to be used based on the given options.
func ParseOpts(opts ...interface{}) (time.Duration, logging.Logger) {
timeout := DefaultMsgTimeout
logger := logroot.Logger()

for _, opt := range opts {
switch opt.(type) {
case *WithLoggerOpt:
logger = opt.(*WithLoggerOpt).logger
case *WithTimeoutOpt:
timeout = opt.(*WithTimeoutOpt).timeout
}
}
return timeout, logger

}

// WithTimeoutOpt defines the maximum time that is attempted to deliver notification.
type WithTimeoutOpt struct {
timeout time.Duration
}

// WithTimeout creates an option for ToChan function that defines a timeout for notification delivery.
func WithTimeout(timeout time.Duration) *WithTimeoutOpt {
return &WithTimeoutOpt{timeout: timeout}
}

// WithLoggerOpt defines a logger that logs if delivery of notification is unsuccessful.
type WithLoggerOpt struct {
logger logging.Logger
}

// WithLogger creates an option for ToChan function that specifies a logger to be used.
func WithLogger(logger logging.Logger) *WithLoggerOpt {
return &WithLoggerOpt{logger: logger}
}
4 changes: 2 additions & 2 deletions messaging/kafka/client/asyncproducer.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,8 @@ func (ref *AsyncProducer) errorHandler(in <-chan *sarama.ProducerError) {
Partition: msg.Partition,
}
perr2 := &ProducerError{
Msg: pmsg,
Err: err,
ProducerMessage: pmsg,
Err: err,
}
val, _ := msg.Value.Encode()
ref.Errorf("message %s errored in topic(%s)/partition(%d)/offset(%d)\n", string(val), pmsg.Topic, pmsg.Partition, pmsg.Offset)
Expand Down
99 changes: 81 additions & 18 deletions messaging/kafka/client/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,17 @@ type ConsumerMessage struct {
Timestamp time.Time
}

// GetKey returns key associated with the message
// GetTopic return topic associated with the message
func (cm *ConsumerMessage) GetTopic() string {
return cm.Topic
}

// GetKey returns key associated with the message.
func (cm *ConsumerMessage) GetKey() string {
return string(cm.Key)
}

// GetValue return value associated with the message
// GetValue return value associated with the message.
func (cm *ConsumerMessage) GetValue() []byte {
return cm.Value
}
Expand All @@ -59,12 +64,12 @@ func NewProtoConsumerMessage(msg *ConsumerMessage, serializer keyval.Serializer)
return &ProtoConsumerMessage{msg, serializer}
}

// GetKey returns key associated with the message
// GetKey returns key associated with the message.
func (cm *ProtoConsumerMessage) GetKey() string {
return string(cm.Key)
}

// GetValue return value associated with the message
// GetValue return value associated with the message.
func (cm *ProtoConsumerMessage) GetValue(msg proto.Message) error {
err := cm.serializer.Unmarshal(cm.ConsumerMessage.GetValue(), msg)
if err != nil {
Expand All @@ -73,6 +78,11 @@ func (cm *ProtoConsumerMessage) GetValue(msg proto.Message) error {
return nil
}

// GetTopic returns topic associated with the message.
func (cm *ProtoConsumerMessage) GetTopic() string {
return cm.Topic
}

// ProducerMessage is the collection of elements passed to the Producer in order to send a message.
type ProducerMessage struct {
// The Kafka topic for this message.
Expand Down Expand Up @@ -101,24 +111,26 @@ type ProducerMessage struct {
Partition int32
}

// ProducerError is the type of error generated when the producer fails to deliver a message.
// It contains the original ProducerMessage as well as the actual error value.
type ProducerError struct {
Msg *ProducerMessage
Err error
// GetTopic returns topic associated with the message.
func (pm *ProducerMessage) GetTopic() string {
return pm.Topic
}

func (ref *ProducerError) Error() string {
return ref.Err.Error()
// GetKey returns key associated with the message.
func (pm *ProducerMessage) GetKey() string {
key, _ := pm.Key.Encode()
return string(key)
}

func (ref *ProducerError) String() string {
return fmt.Sprintf("ProducerError: %s, error: %v\n", ref.Msg, ref.Err)
// GetValue returns the content of the message.
func (pm *ProducerMessage) GetValue() []byte {
val, _ := pm.Value.Encode()
return val
}

func (ref *ProducerMessage) String() string {
func (pm *ProducerMessage) String() string {
var meta string
switch t := ref.Metadata.(type) {
switch t := pm.Metadata.(type) {
default:
meta = fmt.Sprintf("unexpected type %T", t) // %T prints whatever type t has
case string:
Expand All @@ -137,8 +149,59 @@ func (ref *ProducerMessage) String() string {
meta = fmt.Sprintf("%d", *t) // t has type *int
}

key, _ := ref.Key.Encode()
val, _ := ref.Value.Encode()
key, _ := pm.Key.Encode()
val, _ := pm.Value.Encode()

return fmt.Sprintf("ProducerMessage - Topic: %s, Key: %s, Value: %s, Meta: %v, Offset: %d, Partition: %d\n", pm.Topic, string(key), string(val), meta, pm.Offset, pm.Partition)
}

// ProducerError is the type of error generated when the producer fails to deliver a message.
// It contains the original ProducerMessage as well as the actual error value.
type ProducerError struct {
*ProducerMessage
Err error
}

func (ref *ProducerError) Error() error {
return ref.Err
}

func (ref *ProducerError) String() string {
return fmt.Sprintf("ProducerError: %s, error: %v\n", ref.ProducerMessage, ref.Err.Error())
}

// ProtoProducerMessage is wrapper of a producer message that simplify work with proto-modelled data.
type ProtoProducerMessage struct {
*ProducerMessage
Serializer keyval.Serializer
}

// GetTopic returns topic associated with the message.
func (ppm *ProtoProducerMessage) GetTopic() string {
return ppm.Topic
}

// GetKey return key associated with the message.
func (ppm *ProtoProducerMessage) GetKey() string {
key, _ := ppm.Key.Encode()
return string(key)
}

// GetValue unmarshalls the content of the msg into provided structure.
func (ppm *ProtoProducerMessage) GetValue(msg proto.Message) error {
err := ppm.Serializer.Unmarshal(ppm.ProducerMessage.GetValue(), msg)
if err != nil {
return err
}
return nil
}

// ProtoProducerMessageErr represents a proto-modelled message that was not published successfully.
type ProtoProducerMessageErr struct {
*ProtoProducerMessage
Err error
}

return fmt.Sprintf("ProducerMessage - Topic: %s, Key: %s, Value: %s, Meta: %v, Offset: %d, Partition: %d\n", ref.Topic, string(key), string(val), meta, ref.Offset, ref.Partition)
func (pme *ProtoProducerMessageErr) Error() error {
return pme.Err
}
Loading

0 comments on commit f4f0cfb

Please sign in to comment.