Skip to content

Commit

Permalink
Merge pull request #78 from lukebakken/fix-golangci-lint
Browse files Browse the repository at this point in the history
Fix lint errors
  • Loading branch information
lukebakken committed May 2, 2022
2 parents bec03c7 + 1fa17b7 commit b221bfd
Show file tree
Hide file tree
Showing 15 changed files with 406 additions and 198 deletions.
11 changes: 4 additions & 7 deletions .github/workflows/codeql-analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,9 @@ jobs:
# https://docs.github.com/en/free-pro-team@latest/github/finding-security-vulnerabilities-and-errors-in-your-code/configuring-code-scanning#changing-the-languages-that-are-analyzed

steps:
- name: Checkout repository
uses: actions/checkout@v2

# Initializes the CodeQL tools for scanning.
- uses: actions/checkout@v3
- name: Initialize CodeQL
uses: github/codeql-action/init@v1
uses: github/codeql-action/init@v2
with:
languages: ${{ matrix.language }}
# If you wish to specify custom queries, you can do so here or in a config file.
Expand All @@ -54,7 +51,7 @@ jobs:
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
# If this step fails, then you should remove it and run the build manually (see below)
- name: Autobuild
uses: github/codeql-action/autobuild@v1
uses: github/codeql-action/autobuild@v2

# ℹ️ Command-line programs to run using the OS shell.
# 📚 https://git.io/JvXDl
Expand All @@ -68,4 +65,4 @@ jobs:
# make release

- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v1
uses: github/codeql-action/analyze@v2
10 changes: 5 additions & 5 deletions .github/workflows/golangci-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ jobs:
steps:
- uses: actions/setup-go@v3
with:
go-version: 1.18
go-version: 1.18.x
check-latest: true
- uses: actions/checkout@v3
- name: golangci-lint
uses: golangci/golangci-lint-action@v3
- uses: golangci/golangci-lint-action@v3
with:
version: v1.45
only-new-issues: true
version: latest
only-new-issues: false
6 changes: 4 additions & 2 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@ jobs:
runs-on: windows-latest
strategy:
matrix:
go-version: [1.16.x, 1.17.x, 1.18.x]
go-version: [1.17.x, 1.18.x]
steps:
- uses: actions/checkout@v3
- uses: actions/setup-go@v3
with:
go-version: ${{ matrix.go-version }}
check-latest: true
- name: Cache installers
uses: actions/cache@v3
with:
Expand All @@ -34,7 +35,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
go-version: [1.16.x, 1.17.x, 1.18.x]
go-version: [1.17.x, 1.18.x]
services:
rabbitmq:
image: rabbitmq
Expand All @@ -45,5 +46,6 @@ jobs:
- uses: actions/setup-go@v3
with:
go-version: ${{ matrix.go-version }}
check-latest: true
- name: Tests
run: make tests
6 changes: 4 additions & 2 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,8 @@ func (ch *Channel) dispatch(msg message) {
// to avoid unexpected interleaving with basic.publish frames if
// publishing is happening concurrently
ch.m.Lock()
ch.send(&channelCloseOk{})
// TODO check error case
ch.send(&channelCloseOk{}) //nolint
ch.m.Unlock()
ch.connection.closeChannel(ch, newError(m.ReplyCode, m.ReplyText))

Expand All @@ -288,7 +289,8 @@ func (ch *Channel) dispatch(msg message) {
c <- m.Active
}
ch.notifyM.RUnlock()
ch.send(&channelFlowOk{Active: m.Active})
// TODO check error case
ch.send(&channelFlowOk{Active: m.Active}) //nolint

case *basicCancel:
ch.notifyM.RLock()
Expand Down
88 changes: 66 additions & 22 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (t *server) expectBytes(b []byte) {
t.Fatalf("io error expecting bytes: %v", err)
}

if bytes.Compare(b, in) != 0 {
if !bytes.Equal(b, in) {
t.Fatalf("failed bytes: expected: %s got: %s", string(b), string(in))
}
}
Expand All @@ -83,25 +83,33 @@ func (t *server) send(channel int, m message) {
if msg, ok := m.(messageWithContent); ok {
props, body := msg.getContent()
class, _ := msg.id()
t.w.WriteFrame(&methodFrame{
if err := t.w.WriteFrame(&methodFrame{
ChannelId: uint16(channel),
Method: msg,
})
t.w.WriteFrame(&headerFrame{
}); err != nil {
t.Fatalf("WriteFrame error: %v", err)
}
if err := t.w.WriteFrame(&headerFrame{
ChannelId: uint16(channel),
ClassId: class,
Size: uint64(len(body)),
Properties: props,
})
t.w.WriteFrame(&bodyFrame{
}); err != nil {
t.Fatalf("WriteFrame error: %v", err)
}
if err := t.w.WriteFrame(&bodyFrame{
ChannelId: uint16(channel),
Body: body,
})
}); err != nil {
t.Fatalf("WriteFrame error: %v", err)
}
} else {
t.w.WriteFrame(&methodFrame{
if err := t.w.WriteFrame(&methodFrame{
ChannelId: uint16(channel),
Method: m,
})
}); err != nil {
t.Fatalf("WriteFrame error: %v", err)
}
}
}

Expand Down Expand Up @@ -435,13 +443,25 @@ func TestConfirmMultipleOrdersDeliveryTags(t *testing.T) {

confirm := ch.NotifyPublish(make(chan Confirmation))

ch.Confirm(false)
err = ch.Confirm(false)
if err != nil {
t.Fatalf("channel error setting confirm mode: %v (%s)", ch, err)
}

go func() {
ch.Publish("", "q", false, false, Publishing{Body: []byte("pub 1")})
ch.Publish("", "q", false, false, Publishing{Body: []byte("pub 2")})
ch.Publish("", "q", false, false, Publishing{Body: []byte("pub 3")})
ch.Publish("", "q", false, false, Publishing{Body: []byte("pub 4")})
var e error
if e = ch.Publish("", "q", false, false, Publishing{Body: []byte("pub 1")}); e != nil {
t.Errorf("publish error: %v", err)
}
if e = ch.Publish("", "q", false, false, Publishing{Body: []byte("pub 2")}); e != nil {
t.Errorf("publish error: %v", err)
}
if e = ch.Publish("", "q", false, false, Publishing{Body: []byte("pub 3")}); e != nil {
t.Errorf("publish error: %v", err)
}
if e = ch.Publish("", "q", false, false, Publishing{Body: []byte("pub 4")}); e != nil {
t.Errorf("publish error: %v", err)
}
}()

// received out of order, consumed in order
Expand All @@ -452,10 +472,19 @@ func TestConfirmMultipleOrdersDeliveryTags(t *testing.T) {
}

go func() {
ch.Publish("", "q", false, false, Publishing{Body: []byte("pub 5")})
ch.Publish("", "q", false, false, Publishing{Body: []byte("pub 6")})
ch.Publish("", "q", false, false, Publishing{Body: []byte("pub 7")})
ch.Publish("", "q", false, false, Publishing{Body: []byte("pub 8")})
var e error
if e = ch.Publish("", "q", false, false, Publishing{Body: []byte("pub 5")}); e != nil {
t.Errorf("publish error: %v", err)
}
if e = ch.Publish("", "q", false, false, Publishing{Body: []byte("pub 6")}); e != nil {
t.Errorf("publish error: %v", err)
}
if e = ch.Publish("", "q", false, false, Publishing{Body: []byte("pub 7")}); e != nil {
t.Errorf("publish error: %v", err)
}
if e = ch.Publish("", "q", false, false, Publishing{Body: []byte("pub 8")}); e != nil {
t.Errorf("publish error: %v", err)
}
}()

for i, tag := range []uint64{5, 6, 7, 8} {
Expand Down Expand Up @@ -493,7 +522,10 @@ func TestDeferredConfirmations(t *testing.T) {
t.Fatalf("could not open channel: %v (%s)", ch, err)
}

ch.Confirm(false)
err = ch.Confirm(false)
if err != nil {
t.Fatalf("channel error setting confirm mode: %v (%s)", ch, err)
}

var results []*DeferredConfirmation
for i := 1; i < 5; i++ {
Expand Down Expand Up @@ -665,7 +697,11 @@ func TestPublishBodySliceIssue74(t *testing.T) {
}

for i := 0; i < publishings; i++ {
go ch.Publish("", "q", false, false, Publishing{Body: base[0:i]})
go func(ii int) {
if err := ch.Publish("", "q", false, false, Publishing{Body: base[0:ii]}); err != nil {
t.Errorf("publish error: %v", err)
}
}(i)
}

<-done
Expand Down Expand Up @@ -709,7 +745,11 @@ func TestPublishZeroFrameSizeIssue161(t *testing.T) {
}

for i := 0; i < publishings; i++ {
go ch.Publish("", "q", false, false, Publishing{Body: []byte("anything")})
go func() {
if err := ch.Publish("", "q", false, false, Publishing{Body: []byte("anything")}); err != nil {
t.Errorf("publish error: %v", err)
}
}()
}

<-done
Expand Down Expand Up @@ -813,7 +853,11 @@ func TestLeakClosedConsumersIssue264(t *testing.T) {
if err != nil {
t.Fatalf("could not open channel: %v (%s)", ch, err)
}
ch.Qos(2, 0, false)

err = ch.Qos(2, 0, false)
if err != nil {
t.Fatalf("channel Qos error: %v (%s)", ch, err)
}

consumer, err := ch.Consume("queue", tag, false, false, false, false, nil)
if err != nil {
Expand Down
30 changes: 18 additions & 12 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package amqp091
import (
"bufio"
"crypto/tls"
"errors"
"io"
"net"
"reflect"
Expand Down Expand Up @@ -465,11 +466,9 @@ func (c *Connection) dispatch0(f frame) {
switch m := mf.Method.(type) {
case *connectionClose:
// Send immediately as shutdown will close our side of the writer.
c.send(&methodFrame{
ChannelId: 0,
Method: &connectionCloseOk{},
})

// TODO check error case
f := &methodFrame{ChannelId: 0, Method: &connectionCloseOk{}}
c.send(f) //nolint
c.shutdown(newError(m.ReplyCode, m.ReplyText))
case *connectionBlocked:
for _, c := range c.blocks {
Expand All @@ -486,7 +485,8 @@ func (c *Connection) dispatch0(f frame) {
// kthx - all reads reset our deadline. so we can drop this
default:
// lolwat - channel0 only responds to methods and heartbeats
c.closeWith(ErrUnexpectedFrame)
// TODO check error case
c.closeWith(ErrUnexpectedFrame) //nolint
}
}

Expand Down Expand Up @@ -518,15 +518,15 @@ func (c *Connection) dispatchClosed(f frame) {
if mf, ok := f.(*methodFrame); ok {
switch mf.Method.(type) {
case *channelClose:
c.send(&methodFrame{
ChannelId: f.channel(),
Method: &channelCloseOk{},
})
// TODO check error case
f := &methodFrame{ChannelId: f.channel(), Method: &channelCloseOk{}}
c.send(f) //nolint
case *channelCloseOk:
// we are already closed, so do nothing
default:
// unexpected method on closed channel
c.closeWith(ErrClosed)
// TODO check error case
c.closeWith(ErrClosed) //nolint
}
}
}
Expand Down Expand Up @@ -600,7 +600,13 @@ func (c *Connection) heartbeater(interval time.Duration, done chan *Error) {
// When reading, reset our side of the deadline, if we've negotiated one with
// a deadline that covers at least 2 server heartbeats
if interval > 0 {
conn.SetReadDeadline(time.Now().Add(maxServerHeartbeatsInFlight * interval))
if err := conn.SetReadDeadline(time.Now().Add(maxServerHeartbeatsInFlight * interval)); err != nil {
var opErr *net.OpError
if !errors.As(err, &opErr) {
// TODO check error case
return
}
}
}

case <-done:
Expand Down
35 changes: 31 additions & 4 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,20 +94,37 @@ func TestRaceBetweenChannelAndConnectionClose(t *testing.T) {
// See https://github.com/streadway/amqp/pull/253#issuecomment-292464811 for
// more details - thanks to jmalloc again.
func TestRaceBetweenChannelShutdownAndSend(t *testing.T) {
const concurrency = 10
defer time.AfterFunc(10*time.Second, func() { panic("Close deadlock") }).Stop()

conn := integrationConnection(t, "channel close/send race")
defer conn.Close()

ch, _ := conn.Channel()

go ch.Close()
for i := 0; i < 10; i++ {

errs := make(chan error, concurrency)
wg := sync.WaitGroup{}
wg.Add(concurrency)

for i := 0; i < concurrency; i++ {
go func() {
defer wg.Done()
// ch.Ack calls ch.send() internally.
ch.Ack(42, false)
if err := ch.Ack(42, false); err != nil {
errs <- err
}
}()
}

wg.Wait()
close(errs)

for err := range errs {
if err != nil {
t.Logf("[INFO] %#v (%s) of type %T", err, err, err)
}
}
}

func TestQueueDeclareOnAClosedConnectionFails(t *testing.T) {
Expand All @@ -127,8 +144,10 @@ func TestConcurrentClose(t *testing.T) {
conn := integrationConnection(t, "concurrent close")
defer conn.Close()

errs := make(chan error, concurrency)
wg := sync.WaitGroup{}
wg.Add(concurrency)

for i := 0; i < concurrency; i++ {
go func() {
defer wg.Done()
Expand Down Expand Up @@ -156,11 +175,19 @@ func TestConcurrentClose(t *testing.T) {

// A different/protocol error occurred indicating a race or missed condition
if _, other := err.(*Error); other {
t.Fatalf("Expected no error, or ErrClosed, or a net.OpError from conn.Close(), got %#v (%s) of type %T", err, err, err)
errs <- err
}
}()
}

wg.Wait()
close(errs)

for err := range errs {
if err != nil {
t.Fatalf("Expected no error, or ErrClosed, or a net.OpError from conn.Close(), got %#v (%s) of type %T", err, err, err)
}
}
}

// TestPlaintextDialTLS ensures amqp:// connections succeed when using DialTLS.
Expand Down

0 comments on commit b221bfd

Please sign in to comment.