Skip to content

Commit

Permalink
add a context to Connection.ReceiveMessage (#3926)
Browse files Browse the repository at this point in the history
* add context to ReceiveMessage

* add newlines

---------

Co-authored-by: Marten Seemann <martenseemann@gmail.com>
  • Loading branch information
Glonee and marten-seemann committed Jun 27, 2023
1 parent f387514 commit 435444a
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 17 deletions.
4 changes: 2 additions & 2 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -2302,11 +2302,11 @@ func (s *connection) SendMessage(p []byte) error {
return s.datagramQueue.AddAndWait(f)
}

func (s *connection) ReceiveMessage() ([]byte, error) {
func (s *connection) ReceiveMessage(ctx context.Context) ([]byte, error) {
if !s.config.EnableDatagrams {
return nil, errors.New("datagram support disabled")
}
return s.datagramQueue.Receive()
return s.datagramQueue.Receive(ctx)
}

func (s *connection) LocalAddr() net.Addr {
Expand Down
5 changes: 4 additions & 1 deletion datagram_queue.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package quic

import (
"context"
"sync"

"github.com/quic-go/quic-go/internal/protocol"
Expand Down Expand Up @@ -98,7 +99,7 @@ func (h *datagramQueue) HandleDatagramFrame(f *wire.DatagramFrame) {
}

// Receive gets a received DATAGRAM frame.
func (h *datagramQueue) Receive() ([]byte, error) {
func (h *datagramQueue) Receive(ctx context.Context) ([]byte, error) {
for {
h.rcvMx.Lock()
if len(h.rcvQueue) > 0 {
Expand All @@ -113,6 +114,8 @@ func (h *datagramQueue) Receive() ([]byte, error) {
continue
case <-h.closed:
return nil, h.closeErr
case <-ctx.Done():
return nil, ctx.Err()
}
}
}
Expand Down
23 changes: 19 additions & 4 deletions datagram_queue_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package quic

import (
"context"
"errors"

"github.com/quic-go/quic-go/internal/utils"
Expand Down Expand Up @@ -81,10 +82,10 @@ var _ = Describe("Datagram Queue", func() {
It("receives DATAGRAM frames", func() {
queue.HandleDatagramFrame(&wire.DatagramFrame{Data: []byte("foo")})
queue.HandleDatagramFrame(&wire.DatagramFrame{Data: []byte("bar")})
data, err := queue.Receive()
data, err := queue.Receive(context.Background())
Expect(err).ToNot(HaveOccurred())
Expect(data).To(Equal([]byte("foo")))
data, err = queue.Receive()
data, err = queue.Receive(context.Background())
Expect(err).ToNot(HaveOccurred())
Expect(data).To(Equal([]byte("bar")))
})
Expand All @@ -93,7 +94,7 @@ var _ = Describe("Datagram Queue", func() {
c := make(chan []byte, 1)
go func() {
defer GinkgoRecover()
data, err := queue.Receive()
data, err := queue.Receive(context.Background())
Expect(err).ToNot(HaveOccurred())
c <- data
}()
Expand All @@ -103,11 +104,25 @@ var _ = Describe("Datagram Queue", func() {
Eventually(c).Should(Receive(Equal([]byte("foobar"))))
})

It("blocks until context is done", func() {
ctx, cancel := context.WithCancel(context.Background())
errChan := make(chan error)
go func() {
defer GinkgoRecover()
_, err := queue.Receive(ctx)
errChan <- err
}()

Consistently(errChan).ShouldNot(Receive())
cancel()
Eventually(errChan).Should(Receive(Equal(context.Canceled)))
})

It("closes", func() {
errChan := make(chan error, 1)
go func() {
defer GinkgoRecover()
_, err := queue.Receive()
_, err := queue.Receive(context.Background())
errChan <- err
}()

Expand Down
2 changes: 1 addition & 1 deletion integrationtests/self/datagram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ var _ = Describe("Datagram test", func() {
for {
// Close the connection if no message is received for 100 ms.
timer := time.AfterFunc(scaleDuration(100*time.Millisecond), func() { conn.CloseWithError(0, "") })
if _, err := conn.ReceiveMessage(); err != nil {
if _, err := conn.ReceiveMessage(context.Background()); err != nil {
break
}
timer.Stop()
Expand Down
2 changes: 1 addition & 1 deletion interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ type Connection interface {
// SendMessage sends a message as a datagram, as specified in RFC 9221.
SendMessage([]byte) error
// ReceiveMessage gets a message received in a datagram, as specified in RFC 9221.
ReceiveMessage() ([]byte, error)
ReceiveMessage(context.Context) ([]byte, error)
}

// An EarlyConnection is a connection that is handshaking.
Expand Down
8 changes: 4 additions & 4 deletions internal/mocks/quic/early_conn.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions mock_quic_conn_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 435444a

Please sign in to comment.