Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix lint errors #78

Merged
merged 1 commit into from
May 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 4 additions & 7 deletions .github/workflows/codeql-analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,9 @@ jobs:
# https://docs.github.com/en/free-pro-team@latest/github/finding-security-vulnerabilities-and-errors-in-your-code/configuring-code-scanning#changing-the-languages-that-are-analyzed

steps:
- name: Checkout repository
uses: actions/checkout@v2

# Initializes the CodeQL tools for scanning.
- uses: actions/checkout@v3
- name: Initialize CodeQL
uses: github/codeql-action/init@v1
uses: github/codeql-action/init@v2
with:
languages: ${{ matrix.language }}
# If you wish to specify custom queries, you can do so here or in a config file.
Expand All @@ -54,7 +51,7 @@ jobs:
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
# If this step fails, then you should remove it and run the build manually (see below)
- name: Autobuild
uses: github/codeql-action/autobuild@v1
uses: github/codeql-action/autobuild@v2

# ℹ️ Command-line programs to run using the OS shell.
# 📚 https://git.io/JvXDl
Expand All @@ -68,4 +65,4 @@ jobs:
# make release

- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v1
uses: github/codeql-action/analyze@v2
10 changes: 5 additions & 5 deletions .github/workflows/golangci-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ jobs:
steps:
- uses: actions/setup-go@v3
with:
go-version: 1.18
go-version: 1.18.x
check-latest: true
- uses: actions/checkout@v3
- name: golangci-lint
uses: golangci/golangci-lint-action@v3
- uses: golangci/golangci-lint-action@v3
with:
version: v1.45
only-new-issues: true
version: latest
only-new-issues: false
6 changes: 4 additions & 2 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@ jobs:
runs-on: windows-latest
strategy:
matrix:
go-version: [1.16.x, 1.17.x, 1.18.x]
go-version: [1.17.x, 1.18.x]
steps:
- uses: actions/checkout@v3
- uses: actions/setup-go@v3
with:
go-version: ${{ matrix.go-version }}
check-latest: true
- name: Cache installers
uses: actions/cache@v3
with:
Expand All @@ -34,7 +35,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
go-version: [1.16.x, 1.17.x, 1.18.x]
go-version: [1.17.x, 1.18.x]
services:
rabbitmq:
image: rabbitmq
Expand All @@ -45,5 +46,6 @@ jobs:
- uses: actions/setup-go@v3
with:
go-version: ${{ matrix.go-version }}
check-latest: true
- name: Tests
run: make tests
6 changes: 4 additions & 2 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,8 @@ func (ch *Channel) dispatch(msg message) {
// to avoid unexpected interleaving with basic.publish frames if
// publishing is happening concurrently
ch.m.Lock()
ch.send(&channelCloseOk{})
// TODO check error case
ch.send(&channelCloseOk{}) //nolint
ch.m.Unlock()
ch.connection.closeChannel(ch, newError(m.ReplyCode, m.ReplyText))

Expand All @@ -288,7 +289,8 @@ func (ch *Channel) dispatch(msg message) {
c <- m.Active
}
ch.notifyM.RUnlock()
ch.send(&channelFlowOk{Active: m.Active})
// TODO check error case
ch.send(&channelFlowOk{Active: m.Active}) //nolint

case *basicCancel:
ch.notifyM.RLock()
Expand Down
88 changes: 66 additions & 22 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (t *server) expectBytes(b []byte) {
t.Fatalf("io error expecting bytes: %v", err)
}

if bytes.Compare(b, in) != 0 {
if !bytes.Equal(b, in) {
t.Fatalf("failed bytes: expected: %s got: %s", string(b), string(in))
}
}
Expand All @@ -83,25 +83,33 @@ func (t *server) send(channel int, m message) {
if msg, ok := m.(messageWithContent); ok {
props, body := msg.getContent()
class, _ := msg.id()
t.w.WriteFrame(&methodFrame{
if err := t.w.WriteFrame(&methodFrame{
ChannelId: uint16(channel),
Method: msg,
})
t.w.WriteFrame(&headerFrame{
}); err != nil {
t.Fatalf("WriteFrame error: %v", err)
}
if err := t.w.WriteFrame(&headerFrame{
ChannelId: uint16(channel),
ClassId: class,
Size: uint64(len(body)),
Properties: props,
})
t.w.WriteFrame(&bodyFrame{
}); err != nil {
t.Fatalf("WriteFrame error: %v", err)
}
if err := t.w.WriteFrame(&bodyFrame{
ChannelId: uint16(channel),
Body: body,
})
}); err != nil {
t.Fatalf("WriteFrame error: %v", err)
}
} else {
t.w.WriteFrame(&methodFrame{
if err := t.w.WriteFrame(&methodFrame{
ChannelId: uint16(channel),
Method: m,
})
}); err != nil {
t.Fatalf("WriteFrame error: %v", err)
}
}
}

Expand Down Expand Up @@ -435,13 +443,25 @@ func TestConfirmMultipleOrdersDeliveryTags(t *testing.T) {

confirm := ch.NotifyPublish(make(chan Confirmation))

ch.Confirm(false)
err = ch.Confirm(false)
if err != nil {
t.Fatalf("channel error setting confirm mode: %v (%s)", ch, err)
}

go func() {
ch.Publish("", "q", false, false, Publishing{Body: []byte("pub 1")})
ch.Publish("", "q", false, false, Publishing{Body: []byte("pub 2")})
ch.Publish("", "q", false, false, Publishing{Body: []byte("pub 3")})
ch.Publish("", "q", false, false, Publishing{Body: []byte("pub 4")})
var e error
if e = ch.Publish("", "q", false, false, Publishing{Body: []byte("pub 1")}); e != nil {
t.Errorf("publish error: %v", err)
}
if e = ch.Publish("", "q", false, false, Publishing{Body: []byte("pub 2")}); e != nil {
t.Errorf("publish error: %v", err)
}
if e = ch.Publish("", "q", false, false, Publishing{Body: []byte("pub 3")}); e != nil {
t.Errorf("publish error: %v", err)
}
if e = ch.Publish("", "q", false, false, Publishing{Body: []byte("pub 4")}); e != nil {
t.Errorf("publish error: %v", err)
}
}()

// received out of order, consumed in order
Expand All @@ -452,10 +472,19 @@ func TestConfirmMultipleOrdersDeliveryTags(t *testing.T) {
}

go func() {
ch.Publish("", "q", false, false, Publishing{Body: []byte("pub 5")})
ch.Publish("", "q", false, false, Publishing{Body: []byte("pub 6")})
ch.Publish("", "q", false, false, Publishing{Body: []byte("pub 7")})
ch.Publish("", "q", false, false, Publishing{Body: []byte("pub 8")})
var e error
if e = ch.Publish("", "q", false, false, Publishing{Body: []byte("pub 5")}); e != nil {
t.Errorf("publish error: %v", err)
}
if e = ch.Publish("", "q", false, false, Publishing{Body: []byte("pub 6")}); e != nil {
t.Errorf("publish error: %v", err)
}
if e = ch.Publish("", "q", false, false, Publishing{Body: []byte("pub 7")}); e != nil {
t.Errorf("publish error: %v", err)
}
if e = ch.Publish("", "q", false, false, Publishing{Body: []byte("pub 8")}); e != nil {
t.Errorf("publish error: %v", err)
}
}()

for i, tag := range []uint64{5, 6, 7, 8} {
Expand Down Expand Up @@ -493,7 +522,10 @@ func TestDeferredConfirmations(t *testing.T) {
t.Fatalf("could not open channel: %v (%s)", ch, err)
}

ch.Confirm(false)
err = ch.Confirm(false)
if err != nil {
t.Fatalf("channel error setting confirm mode: %v (%s)", ch, err)
}

var results []*DeferredConfirmation
for i := 1; i < 5; i++ {
Expand Down Expand Up @@ -665,7 +697,11 @@ func TestPublishBodySliceIssue74(t *testing.T) {
}

for i := 0; i < publishings; i++ {
go ch.Publish("", "q", false, false, Publishing{Body: base[0:i]})
go func(ii int) {
if err := ch.Publish("", "q", false, false, Publishing{Body: base[0:ii]}); err != nil {
t.Errorf("publish error: %v", err)
}
}(i)
}

<-done
Expand Down Expand Up @@ -709,7 +745,11 @@ func TestPublishZeroFrameSizeIssue161(t *testing.T) {
}

for i := 0; i < publishings; i++ {
go ch.Publish("", "q", false, false, Publishing{Body: []byte("anything")})
go func() {
if err := ch.Publish("", "q", false, false, Publishing{Body: []byte("anything")}); err != nil {
t.Errorf("publish error: %v", err)
}
}()
}

<-done
Expand Down Expand Up @@ -813,7 +853,11 @@ func TestLeakClosedConsumersIssue264(t *testing.T) {
if err != nil {
t.Fatalf("could not open channel: %v (%s)", ch, err)
}
ch.Qos(2, 0, false)

err = ch.Qos(2, 0, false)
if err != nil {
t.Fatalf("channel Qos error: %v (%s)", ch, err)
}

consumer, err := ch.Consume("queue", tag, false, false, false, false, nil)
if err != nil {
Expand Down
30 changes: 18 additions & 12 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package amqp091
import (
"bufio"
"crypto/tls"
"errors"
"io"
"net"
"reflect"
Expand Down Expand Up @@ -465,11 +466,9 @@ func (c *Connection) dispatch0(f frame) {
switch m := mf.Method.(type) {
case *connectionClose:
// Send immediately as shutdown will close our side of the writer.
c.send(&methodFrame{
ChannelId: 0,
Method: &connectionCloseOk{},
})

// TODO check error case
f := &methodFrame{ChannelId: 0, Method: &connectionCloseOk{}}
c.send(f) //nolint
c.shutdown(newError(m.ReplyCode, m.ReplyText))
case *connectionBlocked:
for _, c := range c.blocks {
Expand All @@ -486,7 +485,8 @@ func (c *Connection) dispatch0(f frame) {
// kthx - all reads reset our deadline. so we can drop this
default:
// lolwat - channel0 only responds to methods and heartbeats
c.closeWith(ErrUnexpectedFrame)
// TODO check error case
c.closeWith(ErrUnexpectedFrame) //nolint
}
}

Expand Down Expand Up @@ -518,15 +518,15 @@ func (c *Connection) dispatchClosed(f frame) {
if mf, ok := f.(*methodFrame); ok {
switch mf.Method.(type) {
case *channelClose:
c.send(&methodFrame{
ChannelId: f.channel(),
Method: &channelCloseOk{},
})
// TODO check error case
f := &methodFrame{ChannelId: f.channel(), Method: &channelCloseOk{}}
c.send(f) //nolint
case *channelCloseOk:
// we are already closed, so do nothing
default:
// unexpected method on closed channel
c.closeWith(ErrClosed)
// TODO check error case
c.closeWith(ErrClosed) //nolint
}
}
}
Expand Down Expand Up @@ -600,7 +600,13 @@ func (c *Connection) heartbeater(interval time.Duration, done chan *Error) {
// When reading, reset our side of the deadline, if we've negotiated one with
// a deadline that covers at least 2 server heartbeats
if interval > 0 {
conn.SetReadDeadline(time.Now().Add(maxServerHeartbeatsInFlight * interval))
if err := conn.SetReadDeadline(time.Now().Add(maxServerHeartbeatsInFlight * interval)); err != nil {
var opErr *net.OpError
if !errors.As(err, &opErr) {
// TODO check error case
return
}
}
}

case <-done:
Expand Down
35 changes: 31 additions & 4 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,20 +94,37 @@ func TestRaceBetweenChannelAndConnectionClose(t *testing.T) {
// See https://github.com/streadway/amqp/pull/253#issuecomment-292464811 for
// more details - thanks to jmalloc again.
func TestRaceBetweenChannelShutdownAndSend(t *testing.T) {
const concurrency = 10
defer time.AfterFunc(10*time.Second, func() { panic("Close deadlock") }).Stop()

conn := integrationConnection(t, "channel close/send race")
defer conn.Close()

ch, _ := conn.Channel()

go ch.Close()
for i := 0; i < 10; i++ {

errs := make(chan error, concurrency)
wg := sync.WaitGroup{}
wg.Add(concurrency)

for i := 0; i < concurrency; i++ {
go func() {
defer wg.Done()
// ch.Ack calls ch.send() internally.
ch.Ack(42, false)
if err := ch.Ack(42, false); err != nil {
errs <- err
}
}()
}

wg.Wait()
close(errs)

for err := range errs {
if err != nil {
t.Logf("[INFO] %#v (%s) of type %T", err, err, err)
}
}
}

func TestQueueDeclareOnAClosedConnectionFails(t *testing.T) {
Expand All @@ -127,8 +144,10 @@ func TestConcurrentClose(t *testing.T) {
conn := integrationConnection(t, "concurrent close")
defer conn.Close()

errs := make(chan error, concurrency)
wg := sync.WaitGroup{}
wg.Add(concurrency)

for i := 0; i < concurrency; i++ {
go func() {
defer wg.Done()
Expand Down Expand Up @@ -156,11 +175,19 @@ func TestConcurrentClose(t *testing.T) {

// A different/protocol error occurred indicating a race or missed condition
if _, other := err.(*Error); other {
t.Fatalf("Expected no error, or ErrClosed, or a net.OpError from conn.Close(), got %#v (%s) of type %T", err, err, err)
errs <- err
}
}()
}

wg.Wait()
close(errs)

for err := range errs {
if err != nil {
t.Fatalf("Expected no error, or ErrClosed, or a net.OpError from conn.Close(), got %#v (%s) of type %T", err, err, err)
}
}
}

// TestPlaintextDialTLS ensures amqp:// connections succeed when using DialTLS.
Expand Down