Skip to content

Commit

Permalink
refactor how sessions are deleted
Browse files Browse the repository at this point in the history
Replacing sessions with different structs representing a closed session
doesn't work if a session is using multiple connection IDs.
  • Loading branch information
marten-seemann committed Sep 25, 2019
1 parent 9e6bff0 commit 03483d5
Show file tree
Hide file tree
Showing 13 changed files with 165 additions and 179 deletions.
101 changes: 51 additions & 50 deletions closed_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,44 +3,58 @@ package quic
import (
"sync"

"github.com/lucas-clemente/quic-go/internal/protocol"
"github.com/lucas-clemente/quic-go/internal/utils"
)

type closedSession interface {
destroy()
}

// A closedLocalSession is a session that we closed locally.
// When receiving packets for such a session, we need to retransmit the packet containing the CONNECTION_CLOSE frame,
// with an exponential backoff.
type closedLocalSession struct {
conn connection
connClosePacket []byte

type closedBaseSession struct {
closeOnce sync.Once
closeChan chan struct{} // is closed when the session is closed or destroyed

receivedPackets chan *receivedPacket
counter uint64 // number of packets received
receivedPackets <-chan *receivedPacket
}

perspective protocol.Perspective
func (s *closedBaseSession) destroy() {
s.closeOnce.Do(func() {
close(s.closeChan)
})
}

logger utils.Logger
func newClosedBaseSession(receivedPackets <-chan *receivedPacket) closedBaseSession {
return closedBaseSession{
receivedPackets: receivedPackets,
closeChan: make(chan struct{}),
}
}

var _ packetHandler = &closedLocalSession{}
type closedLocalSession struct {
closedBaseSession

conn connection
connClosePacket []byte
counter uint64 // number of packets received

logger utils.Logger
}

// newClosedLocalSession creates a new closedLocalSession and runs it.
func newClosedLocalSession(
conn connection,
receivedPackets <-chan *receivedPacket,
connClosePacket []byte,
perspective protocol.Perspective,
logger utils.Logger,
) packetHandler {
) closedSession {
s := &closedLocalSession{
conn: conn,
connClosePacket: connClosePacket,
perspective: perspective,
logger: logger,
closeChan: make(chan struct{}),
receivedPackets: make(chan *receivedPacket, 64),
closedBaseSession: newClosedBaseSession(receivedPackets),
conn: conn,
connClosePacket: connClosePacket,
logger: logger,
}
go s.run()
return s
Expand All @@ -50,21 +64,14 @@ func (s *closedLocalSession) run() {
for {
select {
case p := <-s.receivedPackets:
s.handlePacketImpl(p)
s.handlePacket(p)
case <-s.closeChan:
return
}
}
}

func (s *closedLocalSession) handlePacket(p *receivedPacket) {
select {
case s.receivedPackets <- p:
default:
}
}

func (s *closedLocalSession) handlePacketImpl(_ *receivedPacket) {
func (s *closedLocalSession) handlePacket(_ *receivedPacket) {
s.counter++
// exponential backoff
// only send a CONNECTION_CLOSE for the 1st, 2nd, 4th, 8th, 16th, ... packet arriving
Expand All @@ -79,35 +86,29 @@ func (s *closedLocalSession) handlePacketImpl(_ *receivedPacket) {
}
}

func (s *closedLocalSession) Close() error {
s.destroy(nil)
return nil
}

func (s *closedLocalSession) destroy(error) {
s.closeOnce.Do(func() {
close(s.closeChan)
})
}

func (s *closedLocalSession) getPerspective() protocol.Perspective {
return s.perspective
}

// A closedRemoteSession is a session that was closed remotely.
// For such a session, we might receive reordered packets that were sent before the CONNECTION_CLOSE.
// We can just ignore those packets.
type closedRemoteSession struct {
perspective protocol.Perspective
closedBaseSession
}

var _ packetHandler = &closedRemoteSession{}
var _ closedSession = &closedRemoteSession{}

func newClosedRemoteSession(pers protocol.Perspective) packetHandler {
return &closedRemoteSession{perspective: pers}
func newClosedRemoteSession(receivedPackets <-chan *receivedPacket) closedSession {
s := &closedRemoteSession{
closedBaseSession: newClosedBaseSession(receivedPackets),
}
go s.run()
return s
}

func (s *closedRemoteSession) handlePacket(*receivedPacket) {}
func (s *closedRemoteSession) Close() error { return nil }
func (s *closedRemoteSession) destroy(error) {}
func (s *closedRemoteSession) getPerspective() protocol.Perspective { return s.perspective }
func (s *closedRemoteSession) run() {
for {
select {
case <-s.receivedPackets: // discard packets
case <-s.closeChan:
return
}
}
}
56 changes: 37 additions & 19 deletions closed_session_test.go
Original file line number Diff line number Diff line change
@@ -1,53 +1,71 @@
package quic

import (
"errors"
"time"

"github.com/lucas-clemente/quic-go/internal/protocol"
"github.com/lucas-clemente/quic-go/internal/utils"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)

var _ = Describe("Closed local session", func() {
var _ = Describe("closed local session", func() {
var (
sess packetHandler
mconn *mockConnection
sess closedSession
mconn *mockConnection
receivedPackets chan *receivedPacket
)

BeforeEach(func() {
mconn = newMockConnection()
sess = newClosedLocalSession(mconn, []byte("close"), protocol.PerspectiveClient, utils.DefaultLogger)
})

AfterEach(func() {
Eventually(areClosedSessionsRunning).Should(BeFalse())
})

It("tells its perspective", func() {
Expect(sess.getPerspective()).To(Equal(protocol.PerspectiveClient))
// stop the session
Expect(sess.Close()).To(Succeed())
receivedPackets = make(chan *receivedPacket, 10)
sess = newClosedLocalSession(mconn, receivedPackets, []byte("close"), utils.DefaultLogger)
})

It("repeats the packet containing the CONNECTION_CLOSE frame", func() {
for i := 1; i <= 20; i++ {
sess.handlePacket(&receivedPacket{})
receivedPackets <- &receivedPacket{}
if i == 1 || i == 2 || i == 4 || i == 8 || i == 16 {
Eventually(mconn.written).Should(Receive(Equal([]byte("close")))) // receive the CONNECTION_CLOSE
} else {
Consistently(mconn.written, 10*time.Millisecond).Should(HaveLen(0))
}
}
// stop the session
Expect(sess.Close()).To(Succeed())
sess.destroy()
Eventually(areClosedSessionsRunning).Should(BeFalse())
})

It("destroys sessions", func() {
Expect(areClosedSessionsRunning()).To(BeTrue())
sess.destroy()
Eventually(areClosedSessionsRunning).Should(BeFalse())
})
})

var _ = Describe("closed remote session", func() {
var (
sess closedSession
receivedPackets chan *receivedPacket
)

BeforeEach(func() {
receivedPackets = make(chan *receivedPacket, 10)
sess = newClosedRemoteSession(receivedPackets)
})

It("discards packets", func() {
for i := 0; i < 1000; i++ {
receivedPackets <- &receivedPacket{}
}
// stop the session
sess.destroy()
Eventually(areClosedSessionsRunning).Should(BeFalse())
})

It("destroys sessions", func() {
Expect(areClosedSessionsRunning()).To(BeTrue())
sess.destroy(errors.New("destroy"))
sess.destroy()
Eventually(areClosedSessionsRunning).Should(BeFalse())
})
})
12 changes: 0 additions & 12 deletions mock_packet_handler_manager_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 0 additions & 12 deletions mock_session_runner_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 1 addition & 13 deletions packet_handler_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,26 +65,14 @@ func newPacketHandlerMap(

func (h *packetHandlerMap) Add(id protocol.ConnectionID, handler packetHandler) {
h.mutex.Lock()
h.addLocked(id, handler)
h.mutex.Unlock()
}

func (h *packetHandlerMap) addLocked(id protocol.ConnectionID, handler packetHandler) {
h.handlers[string(id)] = handler
}

func (h *packetHandlerMap) Remove(id protocol.ConnectionID) {
h.mutex.Lock()
h.removeByConnectionIDAsString(string(id))
h.mutex.Unlock()
}

func (h *packetHandlerMap) ReplaceWithClosed(id protocol.ConnectionID, handler packetHandler) {
func (h *packetHandlerMap) Remove(id protocol.ConnectionID) {
h.mutex.Lock()
h.removeByConnectionIDAsString(string(id))
h.addLocked(id, handler)
h.mutex.Unlock()
h.retireByConnectionIDAsString(string(id))
}

func (h *packetHandlerMap) removeByConnectionIDAsString(id string) {
Expand Down
18 changes: 18 additions & 0 deletions quic_suite_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package quic

import (
"bytes"
"runtime/pprof"
"strings"
"sync"

"github.com/golang/mock/gomock"
Expand All @@ -24,6 +27,21 @@ var _ = BeforeEach(func() {
connMuxerOnce = *new(sync.Once)
})

func areSessionsRunning() bool {
var b bytes.Buffer
pprof.Lookup("goroutine").WriteTo(&b, 1)
return strings.Contains(b.String(), "quic-go.(*session).run")
}

func areClosedSessionsRunning() bool {
var b bytes.Buffer
pprof.Lookup("goroutine").WriteTo(&b, 1)
return strings.Contains(b.String(), "quic-go.(*closedLocalSession).run") ||
strings.Contains(b.String(), "quic-go.(*closedRemoteSession).run")
}

var _ = AfterEach(func() {
mockCtrl.Finish()
Eventually(areSessionsRunning).Should(BeFalse())
Eventually(areClosedSessionsRunning).Should(BeFalse())
})
2 changes: 0 additions & 2 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ type packetHandlerManager interface {
Add(protocol.ConnectionID, packetHandler)
Retire(protocol.ConnectionID)
Remove(protocol.ConnectionID)
ReplaceWithClosed(protocol.ConnectionID, packetHandler)
AddResetToken([16]byte, packetHandler)
RemoveResetToken([16]byte)
GetStatelessResetToken(protocol.ConnectionID) [16]byte
Expand All @@ -60,7 +59,6 @@ type quicSession interface {
type sessionRunner interface {
Retire(protocol.ConnectionID)
Remove(protocol.ConnectionID)
ReplaceWithClosed(protocol.ConnectionID, packetHandler)
AddResetToken([16]byte, packetHandler)
RemoveResetToken([16]byte)
}
Expand Down
Loading

0 comments on commit 03483d5

Please sign in to comment.