-
Notifications
You must be signed in to change notification settings - Fork 1
/
consume.go
80 lines (63 loc) · 1.66 KB
/
consume.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package main
import (
"fmt"
"strconv"
"time"
"github.com/nikgalushko/kafka-drivers-competition/sarama"
"github.com/nikgalushko/kafka-drivers-competition/segmentio"
)
type Consumer interface {
Consume(func([]byte) bool) error
}
func initSarama(groupID string, addr []string, topic string) Consumer {
ret, err := sarama.NewConsumer(groupID, addr, topic)
if err != nil {
panic(fmt.Sprintf("sarama: %s", err.Error()))
}
return ret
}
func initSegmentio(groupID string, addr []string, topic string) Consumer {
ret, err := segmentio.NewConsumer(groupID, addr, topic)
if err != nil {
panic(fmt.Sprintf("segmentio: %s", err.Error()))
}
return ret
}
func consume() {
p, err := sarama.NewProducer([]string{brokers})
if err != nil {
panic(fmt.Sprintf("producer initialize failed %s", err))
}
goldenResult := 0
go func() {
time.Sleep(500 * time.Millisecond)
defer p.Close()
for i := 0; i < records; i++ {
goldenResult += i
s := strconv.Itoa(i)
_ = p.SendAsync([]byte(s), s, topic)
}
}()
consumers := map[string]func(string, []string, string) Consumer{
"sarama": initSarama,
"segmentio": initSegmentio,
}
PrintMemUsage("before test")
var (
consumer = consumers[driver](time.Now().GoString(), []string{brokers}, topic)
count = uint(0)
sum = 0
start time.Time
)
err = consumer.Consume(func(data []byte) bool {
if count == 0 {
start = time.Now()
}
count++
i, _ := strconv.Atoi(string(data))
sum += i
return count != uint(records)
})
PrintMemUsage(driver)
fmt.Printf("%s finished: %s; result error: %v; consumed: %d. Is result valid - %t\n", driver, time.Since(start), err, count, sum == goldenResult)
}