Skip to content

teng231/kafclient

Repository files navigation

Kafclient

move by teng231/kafclient helper for create system using kafka to streaming and events driven base.

Go Go Reference

Install

 go get github.com/teng231/kafclient

have 3 env variables for config:

  • KAFKA_NUM_PARTITION: config number of partition( default: 3)
  • KAFKA_REPLICATION_FACTOR: config replication factor.(default: -1)
  • KAFKA_VERSION: for set kafka version for sarama config

you can overide 3 variables for cluster configs

Usage

Library have 2 modules publisher and consumer, or use can use both. For example to setup:

  • Both setup
func NewKafClient() error {
	ps := &kafclient.Client{}
	brokers := strings.Split("localhost:9002,localhost:9003", ",")
	ps.InitPublisher(brokers...)
	err := ps.InitConsumerGroup(cf.PubsubSubscription, brokers...)
	if err != nil {
		return err
	}
	return nil
}
  • Publisher setup
func NewKafClient() error {
	ps := &kafclient.Client{}
	brokers := strings.Split("localhost:9002,localhost:9003", ",")
	ps.InitPublisher(brokers...)
	if err != nil {
		return err
	}
	return nil
}


// using for publish event
func SendMessage(msg string) {
    sms := map[string]interface{}{
		Body: "hello everyone",
        App:  "company.vn",
		To:   []string{"0374140511"},
	}
    if err := ps.Publish("topic_send_example", sms); err != nil {
		log.Print(err)
	}
}
  • Consumer setup
func main() {

    // if this task running will block main thread
    onStreamRunning(...)

}


func NewKafClient() error {
	ps := &kafclient.Client{}
	brokers := strings.Split("localhost:9002,localhost:9003", ",")
	err := ps.InitConsumerGroup(cf.KafClientSubscription, brokers...)
	if err != nil {
		return err
	}
	return nil
}

// using for publish event
func SendMessage() error {
    sms := map[string]interface{}{
		Body: "hello everyone",
        App:  "company.vn",
		To:   []string{"0374140511"},
	}
    if err := ps.Publish("topic_send_example", sms); err != nil {
		log.Print(err)
        return err
	}
    return nil
}


func SendMessageExtraConfig() error {
    sms := map[string]interface{}{
		Body: "hello everyone",
        App:  "company.vn",
		To:   []string{"0374140511"},
	}
    if err := ps.PublishWithConfig(Topic{Name:"topic_send_example", AutoCommit: false}, &SenderConfig{
		Header: map[string]string{"sender_by": "containerX"},
		Metadata: "any_if_you_want",
	}, sms); err != nil {
		log.Print(err)
        return err
	}
    return nil
}

// send message help sync task
func SendMessageToSinglePartition() error {
    sms := map[string]interface{}{
		Body: "hello everyone",
        App:  "company.vn",
		To:   []string{"0374140511"},
	}
    if err := ps.PublishWithConfig(Topic{Name:"topic_send_example", AutoCommit: false, Partition: ToPInt32(1)}, &SenderConfig{
		Header: map[string]string{"sender_by": "containerX"},
		Metadata: "any_if_you_want",
	}, sms); err != nil {
		log.Print(err)
        return err
	}
    return nil
}


func onListenUserInsert(payload []byte, h *Account, fnCommit func()) {
	defer fnCommit()
	user := &User{}
    onCreateUser(user)
    ...
}


type MsgHandler struct {
	f     func([]byte, *Account, func())
	topic *KafClient.Topic
}

var (
	backend_user_insert_user = "backend_user_insert_user"
    numberOfConcurrents = 1
	subsList                   = map[string]*MsgHandler{
		backend_user_insert_user: {onListenUserInsert, &KafClient.Topic{
			Name:       backend_user_insert_user,
			AutoCommit: false,
		}},
	}
)

func onStreamRunning(ps *KafClient.KafClient, h *Account) {
	log.Print("Stream start")
	msgBuf := make(chan KafClient.Message, 1000)

	numberOfConcurrents := 1
	topics := make([]*KafClient.Topic, 0, 100)

	for _, msghandler := range kafkaTopics {
		topics = append(topics, msghandler.topic)
	}

	go func() {
		for {
			psMessage := <-msgBuf
			msghandler := subsList[psMessage.Topic]
			if msghandler.f != nil {
				msghandler.f(psMessage.Body, h, psMessage.Commit)
			}
		}
	}()
	err := ps.OnAsyncSubscribe(topics, numberOfConcurrents, msgBuf)
	if err != nil {
		log.Print(err)
	}
}

About

pubsub controller using kafka and base on sarama. Easy controll flow for actions streamming, event driven.

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published