Skip to content

Commit

Permalink
Use named constants for special offsets
Browse files Browse the repository at this point in the history
  • Loading branch information
jnjackins committed Sep 26, 2018
1 parent 48c37f7 commit f2ddea9
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 41 deletions.
27 changes: 13 additions & 14 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func NewConn(conn net.Conn, topic string, partition int) *Conn {
}

// NewConnWith returns a new kafka connection configured with config.
// The offset is initialized to FirstOffset.
func NewConnWith(conn net.Conn, config ConnConfig) *Conn {
if len(config.ClientID) == 0 {
config.ClientID = DefaultClientID
Expand All @@ -116,7 +117,7 @@ func NewConnWith(conn net.Conn, config ConnConfig) *Conn {
clientID: config.ClientID,
topic: config.Topic,
partition: int32(config.Partition),
offset: -2,
offset: FirstOffset,
requiredAcks: -1,
}

Expand Down Expand Up @@ -446,18 +447,16 @@ func (c *Conn) SetWriteDeadline(t time.Time) error {
//
// See Seek for more details about the offset and whence values.
func (c *Conn) Offset() (offset int64, whence int) {
c.mutex.Lock()
offset = c.offset
c.mutex.Unlock()
switch offset {
case -1:
offset = 0
whence = 2
case -2:
offset = 0
whence = 0
case FirstOffset:
whence = SeekStart
case LastOffset:
whence = SeekEnd
default:
whence = 1
c.mutex.Lock()
offset = c.offset
c.mutex.Unlock()
whence = SeekAbsolute
}
return
}
Expand All @@ -479,7 +478,7 @@ func (c *Conn) Seek(offset int64, whence int) (int64, error) {
switch whence {
case SeekStart, SeekAbsolute, SeekEnd, SeekCurrent:
default:
return 0, fmt.Errorf("whence must be one of 0, 1, 2, 3, or 4. (whence = %d)", whence)
return 0, fmt.Errorf("whence must be one of 0, 1, 2, 3. (whence = %d)", whence)
}

if whence == SeekAbsolute {
Expand Down Expand Up @@ -642,12 +641,12 @@ func (c *Conn) ReadOffset(t time.Time) (int64, error) {

// ReadFirstOffset returns the first offset available on the connection.
func (c *Conn) ReadFirstOffset() (int64, error) {
return c.readOffset(-2)
return c.readOffset(FirstOffset)
}

// ReadLastOffset returns the last offset available on the connection.
func (c *Conn) ReadLastOffset() (int64, error) {
return c.readOffset(-1)
return c.readOffset(LastOffset)
}

// ReadOffsets returns the absolute first and last offsets of the topic used by
Expand Down
28 changes: 14 additions & 14 deletions protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,20 @@ type apiKey int16

const (
produceRequest apiKey = 0
fetchRequest apiKey = 1
listOffsetRequest apiKey = 2
metadataRequest apiKey = 3
offsetCommitRequest apiKey = 8
offsetFetchRequest apiKey = 9
groupCoordinatorRequest apiKey = 10
joinGroupRequest apiKey = 11
heartbeatRequest apiKey = 12
leaveGroupRequest apiKey = 13
syncGroupRequest apiKey = 14
describeGroupsRequest apiKey = 15
listGroupsRequest apiKey = 16
createTopicsRequest apiKey = 19
deleteTopicsRequest apiKey = 20
fetchRequest = 1
listOffsetRequest = 2
metadataRequest = 3
offsetCommitRequest = 8
offsetFetchRequest = 9
groupCoordinatorRequest = 10
joinGroupRequest = 11
heartbeatRequest = 12
leaveGroupRequest = 13
syncGroupRequest = 14
describeGroupsRequest = 15
listGroupsRequest = 16
createTopicsRequest = 19
deleteTopicsRequest = 20
)

type apiVersion int16
Expand Down
22 changes: 9 additions & 13 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import (
)

const (
firstOffset = -1
lastOffset = -2
LastOffset int64 = -1 // The most recent offset available for a partition.
FirstOffset = -2 // The least recent offset available for a partition.

This comment has been minimized.

Copy link
@yaoguais

yaoguais Nov 12, 2018

Why do you exchange the two value of firstOffset and lastOffset?
It's for some meanings?

This comment has been minimized.

Copy link
@achille-roussel
)

const (
Expand Down Expand Up @@ -958,6 +958,7 @@ type readerStats struct {
}

// NewReader creates and returns a new Reader configured with config.
// The offset is initialized to FirstOffset.
func NewReader(config ReaderConfig) *Reader {
if len(config.Brokers) == 0 {
panic("cannot create a new kafka reader with an empty list of broker addresses")
Expand Down Expand Up @@ -1070,7 +1071,7 @@ func NewReader(config ReaderConfig) *Reader {
done: make(chan struct{}),
commits: make(chan commitRequest),
stop: stop,
offset: firstOffset,
offset: FirstOffset,
stctx: stctx,
stats: &readerStats{
dialTime: makeSummary(),
Expand Down Expand Up @@ -1309,10 +1310,10 @@ func (r *Reader) ReadLag(ctx context.Context) (lag int64, err error) {
select {
case off := <-offch:
switch cur := r.Offset(); {
case cur == firstOffset:
case cur == FirstOffset:
lag = off.last - off.first

case cur == lastOffset:
case cur == LastOffset:
lag = 0

default:
Expand Down Expand Up @@ -1354,12 +1355,7 @@ func (r *Reader) Lag() int64 {
}

// SetOffset changes the offset from which the next batch of messages will be
// read.
//
// Setting the offset ot -1 means to seek to the first offset.
// Setting the offset to -2 means to seek to the last offset.
//
// The method fails with io.ErrClosedPipe if the reader has already been closed.
// read. The method fails with io.ErrClosedPipe if the reader has already been closed.
func (r *Reader) SetOffset(offset int64) error {
if r.useConsumerGroup() {
return errNotAvailableWithGroup
Expand Down Expand Up @@ -1700,10 +1696,10 @@ func (r *reader) initialize(ctx context.Context, offset int64) (conn *Conn, star
}

switch {
case offset == firstOffset:
case offset == FirstOffset:
offset = first

case offset == lastOffset:
case offset == LastOffset:
offset = last

case offset < first:
Expand Down

0 comments on commit f2ddea9

Please sign in to comment.