Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send concrete messages instead of random ones #48

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,14 @@ While using this tool, keep in mind that benchmarking is always an examination o

### Example

#### With random messages
Sangrenel takes [configurable](https://github.com/jamiealquiza/sangrenel#usage) message/batch sizing, concurrency and other settings and writes messages to a reference topic. Message throughput, batch write latency (p99, harmonic mean, min, max) and a latency histogram are dumped every 5 seconds.

![img_0856](https://user-images.githubusercontent.com/4108044/27497484-20821454-5818-11e7-81c9-9773597753d1.gif)

#### With concrete messages
`sangrenel -topic test --interval 30 -produce-rate 10 --messages-data "{\"field1\":{\"field2\":\"value\"}}" -messages-data "{\"field3\":\"value2\"}"`

### Installation

Assuming Go is installed (tested with 1.7+) and $GOPATH is set:
Expand Down Expand Up @@ -47,6 +51,8 @@ Usage of sangrenel:
Messages per batch (default 500)
-message-size int
Message size (bytes) (default 300)
-messages-data string
Concrete message to be sent. If this parameter is defined, it overrides the randomly generated one (based on the message-size). This parameter can be repeated as many time as needed. The messages will be sent in a round robin fashion.
-noop
Test message generation performance (does not connect to Kafka)
-produce-rate uint
Expand All @@ -63,7 +69,7 @@ Usage of sangrenel:

Sangrenel uses the Kafka client library [Sarama](https://github.com/Shopify/sarama). Sangrenel starts one or more workers, each of which instantiate a unique Kafka client connection to the target cluster. Each worker has a number of writers which generate and send message data to Kafka, sharing the parent worker client connection. The number of workers is configurable via the `-workers` flag, the number of writers per worker via the `-writers-per-worker`. This is done for scaling purposes; while a single Sarama client can be used for multiple writers (which live in separate goroutines), performance begins to flatline at some point. It's best to leave the writers-per-worker at the default 5 and scaling the worker count as needed, but the option is exposed for more control. Left as a technical exercise for the user, there's a different between 2 workers with 5 writers each and 1 worker with 10 writers.

The `-topic` flag specifies which topic is used, allowing configs such as parition count and replication factor to be prepared ahead of time for performance comparisons (by switching which topic Sangrenel is using). The `-message-batch-size`, `-message-size` and `-produce-rate` flags can be used to dictate message size, number of messages to batch per write, and the total Sangrenel write rate. `-required-acks` sets the Sarama [RequiredAcks](https://godoc.org/github.com/Shopify/sarama#RequiredAcks) config. The `-api-version` flag allows Sarama to be configured at a specific API level (See the `version` field of the Sarama [Config](https://godoc.org/github.com/Shopify/sarama#Config) struct). The supported API versions are: "0.8.2.0", "0.8.2.1", "0.8.2.2", "0.9.0.0", "0.9.0.1", "0.10.0.0", "0.10.0.1", "0.10.1.0", "0.10.2.0", "0.11.0.0", "1.0.0.0".
The `-topic` flag specifies which topic is used, allowing configs such as parition count and replication factor to be prepared ahead of time for performance comparisons (by switching which topic Sangrenel is using). The `-message-batch-size`, `-message-size`, `-messages-data` and `-produce-rate` flags can be used to dictate number of messages to batch per write, message size, concrete messages to send, and the total Sangrenel write rate. `-required-acks` sets the Sarama [RequiredAcks](https://godoc.org/github.com/Shopify/sarama#RequiredAcks) config. The `-api-version` flag allows Sarama to be configured at a specific API level (See the `version` field of the Sarama [Config](https://godoc.org/github.com/Shopify/sarama#Config) struct). The supported API versions are: "0.8.2.0", "0.8.2.1", "0.8.2.2", "0.9.0.0", "0.9.0.1", "0.10.0.0", "0.10.0.1", "0.10.1.0", "0.10.2.0", "0.11.0.0", "1.0.0.0".

Two important factors to note:
- Sangrenel uses Sarama's [SyncProducer](https://godoc.org/github.com/Shopify/sarama#SyncProducer), meaning messages are written synchronously
Expand Down
51 changes: 41 additions & 10 deletions sangrenel.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@ import (
"gopkg.in/Shopify/sarama.v1"
)

type stringArray []string

type config struct {
brokers []string
topic string
msgSize int
msgRate uint64
messagesData stringArray
msgDataCounter int
batchSize int
compression sarama.CompressionCodec
compressionName string
Expand All @@ -45,9 +49,20 @@ var (
errCnt uint64
)

func (i *stringArray) String() string {
return "my string representation"
}

func (i *stringArray) Set(value string) error {
*i = append(*i, value)
return nil
}


func init() {
flag.StringVar(&Config.topic, "topic", "sangrenel", "Kafka topic to produce to")
flag.IntVar(&Config.msgSize, "message-size", 300, "Message size (bytes)")
flag.IntVar(&Config.msgSize, "message-size", 300, "Message size (bytes): size of the random generated message")
flag.Var(&Config.messagesData, "messages-data", "Messages data: messages to be sent. Can be repeated as needed. If this parameter is defined, it will override the message-size")
flag.Uint64Var(&Config.msgRate, "produce-rate", 100000000, "Global write rate limit (messages/sec)")
flag.IntVar(&Config.batchSize, "message-batch-size", 500, "Messages per batch")
flag.StringVar(&Config.compressionName, "compression", "none", "Message compression: none, gzip, snappy")
Expand Down Expand Up @@ -268,6 +283,7 @@ func writer(c sarama.Client, t *tachymeter.Tachymeter) {
}
defer producer.Close()

var msgData []byte
source := rand.NewSource(time.Now().UnixNano())
generator := rand.New(source)
msgBatch := make([]*sarama.ProducerMessage, 0, Config.batchSize)
Expand Down Expand Up @@ -295,8 +311,7 @@ func writer(c sarama.Client, t *tachymeter.Tachymeter) {

for i := 0; i < n; i++ {
// Gen message.
msgData := make([]byte, Config.msgSize)
randMsg(msgData, *generator)
msgData = getMessageToSend(*generator)
msg := &sarama.ProducerMessage{Topic: Config.topic, Value: sarama.ByteEncoder(msgData)}
// Append to batch.
msgBatch = append(msgBatch, msg)
Expand Down Expand Up @@ -338,15 +353,14 @@ func writer(c sarama.Client, t *tachymeter.Tachymeter) {
// but doesn't connect to / attempt to send anything to Kafka. This is used
// purely for testing message generation performance.
func dummyWriter(t *tachymeter.Tachymeter) {
var msgData []byte
source := rand.NewSource(time.Now().UnixNano())
generator := rand.New(source)
msgBatch := make([]*sarama.ProducerMessage, 0, Config.batchSize)

for {
for i := 0; i < Config.batchSize; i++ {
// Gen message.
msgData := make([]byte, Config.msgSize)
randMsg(msgData, *generator)
msgData = getMessageToSend(*generator)
msg := &sarama.ProducerMessage{Topic: Config.topic, Value: sarama.ByteEncoder(msgData)}
// Append to batch.
msgBatch = append(msgBatch, msg)
Expand All @@ -359,12 +373,29 @@ func dummyWriter(t *tachymeter.Tachymeter) {
}
}

func getMessageToSend(generator rand.Rand) []byte {
if len(Config.messagesData) > 0 {
result := []byte(Config.messagesData[Config.msgDataCounter])

if Config.msgDataCounter + 1 >= len(Config.messagesData) {
Config.msgDataCounter = 0
} else {
Config.msgDataCounter = Config.msgDataCounter + 1
}
return result
} else {
return randMsg(generator)
}
}

// randMsg returns a random message generated from the chars byte slice.
// Message length of m bytes as defined by Config.msgSize.
func randMsg(m []byte, generator rand.Rand) {
for i := range m {
m[i] = chars[generator.Intn(len(chars))]
// Message length of msgData bytes as defined by Config.msgSize.
func randMsg(generator rand.Rand) []byte {
msgData := make([]byte, Config.msgSize)
for i := range msgData {
msgData[i] = chars[generator.Intn(len(chars))]
}
return msgData
}

// calcOutput takes a duration t and messages sent
Expand Down