Skip to content

Commit

Permalink
Downgraded delete topics request/response to v0 (#205)
Browse files Browse the repository at this point in the history
This change makes the DeleteTopics function work with Kafka 0.10.0
  • Loading branch information
stevevls committed Feb 13, 2019
1 parent a3bd32e commit 9863bcf
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 42 deletions.
2 changes: 1 addition & 1 deletion conn.go
Expand Up @@ -151,7 +151,7 @@ func NewConnWith(conn net.Conn, config ConnConfig) *Conn {

// DeleteTopics deletes the specified topics.
func (c *Conn) DeleteTopics(topics ...string) error {
_, err := c.deleteTopics(deleteTopicsRequestV1{
_, err := c.deleteTopics(deleteTopicsRequestV0{
Topics: topics,
})
return err
Expand Down
14 changes: 6 additions & 8 deletions conn_test.go
Expand Up @@ -233,15 +233,13 @@ func TestConn(t *testing.T) {
},

{
scenario: "test delete topics",
function: testDeleteTopics,
minVersion: "0.11.0",
scenario: "test delete topics",
function: testDeleteTopics,
},

{
scenario: "test delete topics with an invalid topic",
function: testDeleteTopicsInvalidTopic,
minVersion: "0.11.0",
scenario: "test delete topics with an invalid topic",
function: testDeleteTopicsInvalidTopic,
},
}

Expand Down Expand Up @@ -925,7 +923,7 @@ func testDeleteTopicsInvalidTopic(t *testing.T, conn *Conn) {
if err != nil {
t.Fatalf("bad CreateTopics: %v", err)
}
conn.SetDeadline(time.Now().Add(time.Second))
conn.SetDeadline(time.Now().Add(5 * time.Second))
err = conn.DeleteTopics("invalid-topic", topic)
if err != UnknownTopicOrPartition {
t.Fatalf("expected UnknownTopicOrPartition error, but got %v", err)
Expand All @@ -935,7 +933,7 @@ func testDeleteTopicsInvalidTopic(t *testing.T, conn *Conn) {
t.Fatalf("bad ReadPartitions: %v", err)
}
if len(partitions) != 0 {
t.Fatal("exepected partitions to be empty")
t.Fatal("expected partitions to be empty")
}
}

Expand Down
48 changes: 19 additions & 29 deletions deletetopics.go
Expand Up @@ -6,7 +6,7 @@ import (
)

// See http://kafka.apache.org/protocol.html#The_Messages_DeleteTopics
type deleteTopicsRequestV1 struct {
type deleteTopicsRequestV0 struct {
// Topics holds the topic names
Topics []string

Expand All @@ -16,68 +16,58 @@ type deleteTopicsRequestV1 struct {
Timeout int32
}

func (t deleteTopicsRequestV1) size() int32 {
func (t deleteTopicsRequestV0) size() int32 {
return sizeofStringArray(t.Topics) +
sizeofInt32(t.Timeout)
}

func (t deleteTopicsRequestV1) writeTo(w *bufio.Writer) {
func (t deleteTopicsRequestV0) writeTo(w *bufio.Writer) {
writeStringArray(w, t.Topics)
writeInt32(w, t.Timeout)
}

type deleteTopicsResponseV1 struct {
// ThrottleTimeMS holds the duration in milliseconds for which the request
// was throttled due to quota violation (Zero if the request did not violate
// any quota)
ThrottleTimeMS int32

type deleteTopicsResponseV0 struct {
// TopicErrorCodes holds per topic error codes
TopicErrorCodes []deleteTopicsResponseV1TopicErrorCode
TopicErrorCodes []deleteTopicsResponseV0TopicErrorCode
}

func (t deleteTopicsResponseV1) size() int32 {
return sizeofInt32(t.ThrottleTimeMS) +
sizeofArray(len(t.TopicErrorCodes), func(i int) int32 { return t.TopicErrorCodes[i].size() })
func (t deleteTopicsResponseV0) size() int32 {
return sizeofArray(len(t.TopicErrorCodes), func(i int) int32 { return t.TopicErrorCodes[i].size() })
}

func (t *deleteTopicsResponseV1) readFrom(r *bufio.Reader, size int) (remain int, err error) {
if remain, err = readInt32(r, size, &t.ThrottleTimeMS); err != nil {
return
}
func (t *deleteTopicsResponseV0) readFrom(r *bufio.Reader, size int) (remain int, err error) {
fn := func(withReader *bufio.Reader, withSize int) (fnRemain int, fnErr error) {
var item deleteTopicsResponseV1TopicErrorCode
var item deleteTopicsResponseV0TopicErrorCode
if fnRemain, fnErr = (&item).readFrom(withReader, withSize); err != nil {
return
}
t.TopicErrorCodes = append(t.TopicErrorCodes, item)
return
}
if remain, err = readArrayWith(r, remain, fn); err != nil {
if remain, err = readArrayWith(r, size, fn); err != nil {
return
}
return
}

func (t deleteTopicsResponseV1) writeTo(w *bufio.Writer) {
writeInt32(w, t.ThrottleTimeMS)
func (t deleteTopicsResponseV0) writeTo(w *bufio.Writer) {
writeArray(w, len(t.TopicErrorCodes), func(i int) { t.TopicErrorCodes[i].writeTo(w) })
}

type deleteTopicsResponseV1TopicErrorCode struct {
type deleteTopicsResponseV0TopicErrorCode struct {
// Topic holds the topic name
Topic string

// ErrorCode holds the error code
ErrorCode int16
}

func (t deleteTopicsResponseV1TopicErrorCode) size() int32 {
func (t deleteTopicsResponseV0TopicErrorCode) size() int32 {
return sizeofString(t.Topic) +
sizeofInt16(t.ErrorCode)
}

func (t *deleteTopicsResponseV1TopicErrorCode) readFrom(r *bufio.Reader, size int) (remain int, err error) {
func (t *deleteTopicsResponseV0TopicErrorCode) readFrom(r *bufio.Reader, size int) (remain int, err error) {
if remain, err = readString(r, size, &t.Topic); err != nil {
return
}
Expand All @@ -87,24 +77,24 @@ func (t *deleteTopicsResponseV1TopicErrorCode) readFrom(r *bufio.Reader, size in
return
}

func (t deleteTopicsResponseV1TopicErrorCode) writeTo(w *bufio.Writer) {
func (t deleteTopicsResponseV0TopicErrorCode) writeTo(w *bufio.Writer) {
writeString(w, t.Topic)
writeInt16(w, t.ErrorCode)
}

// deleteTopics deletes the specified topics.
//
// See http://kafka.apache.org/protocol.html#The_Messages_DeleteTopics
func (c *Conn) deleteTopics(request deleteTopicsRequestV1) (deleteTopicsResponseV1, error) {
var response deleteTopicsResponseV1
func (c *Conn) deleteTopics(request deleteTopicsRequestV0) (deleteTopicsResponseV0, error) {
var response deleteTopicsResponseV0
err := c.writeOperation(
func(deadline time.Time, id int32) error {
if request.Timeout == 0 {
now := time.Now()
deadline = adjustDeadlineForRTT(deadline, now, defaultRTT)
request.Timeout = milliseconds(deadlineToTimeout(deadline, now))
}
return c.writeRequest(deleteTopicsRequest, v1, id, request)
return c.writeRequest(deleteTopicsRequest, v0, id, request)
},
func(deadline time.Time, size int) error {
return expectZeroSize(func() (remain int, err error) {
Expand All @@ -113,7 +103,7 @@ func (c *Conn) deleteTopics(request deleteTopicsRequestV1) (deleteTopicsResponse
},
)
if err != nil {
return deleteTopicsResponseV1{}, err
return deleteTopicsResponseV0{}, err
}
for _, c := range response.TopicErrorCodes {
if c.ErrorCode != 0 {
Expand Down
7 changes: 3 additions & 4 deletions deletetopics_test.go
Expand Up @@ -8,9 +8,8 @@ import (
)

func TestDeleteTopicsResponseV1(t *testing.T) {
item := deleteTopicsResponseV1{
ThrottleTimeMS: 123,
TopicErrorCodes: []deleteTopicsResponseV1TopicErrorCode{
item := deleteTopicsResponseV0{
TopicErrorCodes: []deleteTopicsResponseV0TopicErrorCode{
{
Topic: "a",
ErrorCode: 7,
Expand All @@ -23,7 +22,7 @@ func TestDeleteTopicsResponseV1(t *testing.T) {
item.writeTo(w)
w.Flush()

var found deleteTopicsResponseV1
var found deleteTopicsResponseV0
remain, err := (&found).readFrom(bufio.NewReader(buf), buf.Len())
if err != nil {
t.Fatal(err)
Expand Down

0 comments on commit 9863bcf

Please sign in to comment.