-
Notifications
You must be signed in to change notification settings - Fork 14
/
messagebroker.go
80 lines (59 loc) · 2.82 KB
/
messagebroker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package messagebroker
import "context"
// Broker interface for all types of ops
type Broker interface {
Admin
Producer
Consumer
}
// Admin for admin operations on topics, partitions, updating schema registry etc
type Admin interface {
// CreateTopic creates a new topic if not available
CreateTopic(context.Context, CreateTopicRequest) (CreateTopicResponse, error)
// DeleteTopic deletes an existing topic
DeleteTopic(context.Context, DeleteTopicRequest) (DeleteTopicResponse, error)
// AddTopicPartitions adds partitions to an existing topic
AddTopicPartitions(context.Context, AddTopicPartitionRequest) (*AddTopicPartitionResponse, error)
// AlterTopicConfigs alters topic configuration
AlterTopicConfigs(context.Context, ModifyTopicConfigRequest) ([]string, error)
// DescribeTopicConfigs describes topic configuration
DescribeTopicConfigs(context.Context, []string) (map[string]map[string]string, error)
// IsHealthy checks health of underlying broker
IsHealthy(context.Context) (bool, error)
// FetchProjectTopics fetches a list of all topics for a given project
FetchProjectTopics(ctx context.Context, project string) (map[string]bool, error)
}
// Producer for produce operations
type Producer interface {
// SendMessage sends a message on the topic
SendMessage(context.Context, SendMessageToTopicRequest) (*SendMessageToTopicResponse, error)
// IsClosed checks if producer has been closed
IsClosed(context.Context) bool
// Shutdown closes the producer
Shutdown(context.Context)
// Flush flushes the producer buffer
Flush(timeoutMs int) error
}
// Consumer interface for consuming messages
type Consumer interface {
// ReceiveMessages gets tries to get the number of messages mentioned in the param "numOfMessages"
// from the previous committed offset. If the available messages in the queue are less, returns
// how many ever messages are available
ReceiveMessages(context.Context, GetMessagesFromTopicRequest) (*GetMessagesFromTopicResponse, error)
// CommitByPartitionAndOffset Commits messages if any
// This func will commit the message consumed
// by all the previous calls to GetMessages
CommitByPartitionAndOffset(context.Context, CommitOnTopicRequest) (CommitOnTopicResponse, error)
// CommitByMsgID Commits a message by ID
CommitByMsgID(context.Context, CommitOnTopicRequest) (CommitOnTopicResponse, error)
// GetTopicMetadata gets the topic metadata
GetTopicMetadata(context.Context, GetTopicMetadataRequest) (GetTopicMetadataResponse, error)
// Pause pause the consumer
Pause(context.Context, PauseOnTopicRequest) error
// Resume resume the consumer
Resume(context.Context, ResumeOnTopicRequest) error
// FetchConsumerLag returns the watermark and current offset for a consumer
FetchConsumerLag(context.Context) (map[string]uint64, error)
// Close closes the consumer
Close(context.Context) error
}