-
Notifications
You must be signed in to change notification settings - Fork 0
/
consumer.go
94 lines (78 loc) · 1.87 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package consumer
import (
"encoding/json"
"fmt"
"time"
"github.com/ishanmadhav/aetherq/api"
"github.com/valyala/fasthttp"
)
const BROKER_URL = "http://localhost:3000"
// Consumers will contains their own offset
// that will tell them till wher they have already fetched the messages
type Consumer struct {
Offset int
Topic string
Partition int
httpClient *fasthttp.Client
}
type ConsumerConfig struct {
}
// NewConsumer will create a new consumer
func NewConsumer(conig ConsumerConfig) *Consumer {
var httpClient *fasthttp.Client = &fasthttp.Client{}
return &Consumer{
Offset: 0,
httpClient: httpClient,
}
}
func (c *Consumer) SubscribeToTopic(topic string, Partition int) {
c.Topic = topic
c.Partition = Partition
}
func (c *Consumer) ReadMessage(interval time.Duration) (api.Message, error) {
topic := api.TopicPartition{
Topic: c.Topic,
Partition: c.Partition,
Metadata: "none",
}
fetchMessage := api.FetchMessage{
Offset: c.Offset,
TopicPartition: topic,
}
url := BROKER_URL + "/consume"
req := fasthttp.AcquireRequest()
req.SetRequestURI(url)
req.Header.SetMethod("GET")
req.Header.SetContentType("application/json")
jsonstr, err := json.Marshal(fetchMessage)
if err != nil {
return api.Message{}, err
}
req.SetBody(jsonstr)
resp := fasthttp.AcquireResponse()
err = c.httpClient.Do(req, resp)
if err != nil {
return api.Message{}, err
}
if resp.StatusCode() != 200 {
fmt.Println("Error")
fmt.Println(string(resp.Body()))
return api.Message{}, err
}
var message api.Message
if err != nil {
return api.Message{}, err
}
err = json.Unmarshal(resp.Body(), &message)
if err != nil {
return api.Message{}, err
}
fasthttp.ReleaseRequest(req)
fasthttp.ReleaseResponse(resp)
//Increment the offset if new message received
if message.Value != nil {
c.Offset++
}
time.Sleep(interval)
return message, nil
}