forked from segmentio/kafka-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathoffsetdelete.go
106 lines (85 loc) · 2.78 KB
/
offsetdelete.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package kafka
import (
"context"
"fmt"
"net"
"time"
"github.com/segmentio/kafka-go/protocol/offsetdelete"
)
// OffsetDelete deletes the offset for a consumer group on a particular topic
// for a particular partition.
type OffsetDelete struct {
Topic string
Partition int
}
// OffsetDeleteRequest represents a request sent to a kafka broker to delete
// the offsets for a partition on a given topic associated with a consumer group.
type OffsetDeleteRequest struct {
// Address of the kafka broker to send the request to.
Addr net.Addr
// ID of the consumer group to delete the offsets for.
GroupID string
// Set of topic partitions to delete offsets for.
Topics map[string][]int
}
// OffsetDeleteResponse represents a response from a kafka broker to a delete
// offset request.
type OffsetDeleteResponse struct {
// An error that may have occurred while attempting to delete an offset
Error error
// The amount of time that the broker throttled the request.
Throttle time.Duration
// Set of topic partitions that the kafka broker has additional info (error?)
// for.
Topics map[string][]OffsetDeletePartition
}
// OffsetDeletePartition represents the state of a status of a partition in response
// to deleting offsets.
type OffsetDeletePartition struct {
// ID of the partition.
Partition int
// An error that may have occurred while attempting to delete an offset for
// this partition.
Error error
}
// OffsetDelete sends a delete offset request to a kafka broker and returns the
// response.
func (c *Client) OffsetDelete(ctx context.Context, req *OffsetDeleteRequest) (*OffsetDeleteResponse, error) {
topics := make([]offsetdelete.RequestTopic, 0, len(req.Topics))
for topicName, partitionIndexes := range req.Topics {
partitions := make([]offsetdelete.RequestPartition, len(partitionIndexes))
for i, c := range partitionIndexes {
partitions[i] = offsetdelete.RequestPartition{
PartitionIndex: int32(c),
}
}
topics = append(topics, offsetdelete.RequestTopic{
Name: topicName,
Partitions: partitions,
})
}
m, err := c.roundTrip(ctx, req.Addr, &offsetdelete.Request{
GroupID: req.GroupID,
Topics: topics,
})
if err != nil {
return nil, fmt.Errorf("kafka.(*Client).OffsetDelete: %w", err)
}
r := m.(*offsetdelete.Response)
res := &OffsetDeleteResponse{
Error: makeError(r.ErrorCode, ""),
Throttle: makeDuration(r.ThrottleTimeMs),
Topics: make(map[string][]OffsetDeletePartition, len(r.Topics)),
}
for _, topic := range r.Topics {
partitions := make([]OffsetDeletePartition, len(topic.Partitions))
for i, p := range topic.Partitions {
partitions[i] = OffsetDeletePartition{
Partition: int(p.PartitionIndex),
Error: makeError(p.ErrorCode, ""),
}
}
res.Topics[topic.Name] = partitions
}
return res, nil
}