Skip to content

Commit

Permalink
Merge pull request #72 from loopholelabs/staging
Browse files Browse the repository at this point in the history
Release v0.2.2
  • Loading branch information
ShivanshVij committed Mar 9, 2022
2 parents 908d072 + 3a3a567 commit 67310d3
Show file tree
Hide file tree
Showing 11 changed files with 355 additions and 89 deletions.
2 changes: 1 addition & 1 deletion .trunk/trunk.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
version: 0.1
cli:
version: 0.8.0-beta
version: 0.8.1-beta
lint:
enabled:
- gitleaks@7.6.1
Expand Down
22 changes: 21 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,25 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

## [v0.2.2] - 2022-03-09 (Beta)

### Fixes

- Closing a connection doesn't mean the other side loses data it hasn't reacted to yet

### Changes

- A `Drain` function was added to the `internal/queue` package to allow the `killGoroutines` function in `async.go` to
drain the queue once it had closed it and once it had killed all existing goroutines
- The `heartbeat` function in `client.go` was modified to exit early if it detected that the underlying connection had
closed
- The `async` connection type was modified to hold `stale` data once a connection is closed. The `killGoroutines`
function will drain the `incoming` queue after killing all goroutines, and store those drained packets in
the `async.stale` variable - future and existing ReadPacket calls will first check whether there is data available in
the `stale` variable before they error out
- Refactored the `handlePacket` functions for both servers and clients to be clearer and avoid allocations
- The `close` function call in `Async` connections was modified to set a final write deadline for its final writer flush

## [v0.2.1] - 2022-03-02 (Beta)

### Fixes
Expand Down Expand Up @@ -122,7 +141,8 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

Initial Release of Frisbee

[unreleased]: https://github.com/loopholelabs/frisbee/compare/v0.2.1...HEAD
[unreleased]: https://github.com/loopholelabs/frisbee/compare/v0.2.2...HEAD
[v0.2.2]: https://github.com/loopholelabs/frisbee/compare/v0.2.1...v0.2.2
[v0.2.1]: https://github.com/loopholelabs/frisbee/compare/v0.2.0...v0.2.1
[v0.2.0]: https://github.com/loopholelabs/frisbee/compare/v0.1.6...v0.2.0
[v0.1.6]: https://github.com/loopholelabs/frisbee/compare/v0.1.5...v0.1.6
Expand Down
29 changes: 28 additions & 1 deletion async.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ type Async struct {
logger *zerolog.Logger
wg sync.WaitGroup
error *atomic.Error
staleMu sync.Mutex
stale []*packet.Packet
pongCh chan struct{}
closeCh chan struct{}
}
Expand Down Expand Up @@ -232,12 +234,29 @@ func (c *Async) WritePacket(p *packet.Packet) error {
// In the event that the connection is closed, ReadPacket will return an error.
func (c *Async) ReadPacket() (*packet.Packet, error) {
if c.closed.Load() {
c.staleMu.Lock()
if len(c.stale) > 0 {
var p *packet.Packet
p, c.stale = c.stale[0], c.stale[1:]
c.staleMu.Unlock()
return p, nil
}
c.staleMu.Unlock()
c.Logger().Error().Err(ConnectionClosed).Msg("error while popping from packet queue")
return nil, ConnectionClosed
}

readPacket, err := c.incoming.Pop()
if err != nil {
if c.closed.Load() {
c.staleMu.Lock()
if len(c.stale) > 0 {
var p *packet.Packet
p, c.stale = c.stale[0], c.stale[1:]
c.staleMu.Unlock()
return p, nil
}
c.staleMu.Unlock()
c.Logger().Error().Err(ConnectionClosed).Msg("error while popping from packet queue")
return nil, ConnectionClosed
}
Expand Down Expand Up @@ -336,20 +355,26 @@ func (c *Async) killGoroutines() {
_ = c.conn.SetDeadline(emptyTime)
close(c.closeCh)
c.Logger().Debug().Msg("error channel closed, goroutines killed")

c.stale = c.incoming.Drain()
}

func (c *Async) close() error {
c.staleMu.Lock()
if c.closed.CAS(false, true) {
c.Logger().Debug().Msg("connection close called, killing goroutines")
c.killGoroutines()
c.staleMu.Unlock()
c.Lock()
if c.writer.Buffered() > 0 {
_ = c.conn.SetWriteDeadline(emptyTime)
_ = c.conn.SetWriteDeadline(time.Now().Add(defaultDeadline))
_ = c.writer.Flush()
_ = c.conn.SetWriteDeadline(emptyTime)
}
c.Unlock()
return nil
}
c.staleMu.Unlock()
return ConnectionClosed
}

Expand Down Expand Up @@ -384,6 +409,8 @@ func (c *Async) waitForPONG() {
timer := time.NewTimer(defaultDeadline * 10)
defer timer.Stop()
select {
case <-c.closeCh:
c.wg.Done()
case <-timer.C:
c.Logger().Error().Err(os.ErrDeadlineExceeded).Msg("timed out waiting for PONG, connection is not alive")
c.wg.Done()
Expand Down
52 changes: 51 additions & 1 deletion async_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func TestAsyncReadClose(t *testing.T) {

p, err = readerConn.ReadPacket()
require.NoError(t, err)
assert.NotNil(t, p.Metadata)
require.NotNil(t, p.Metadata)
assert.Equal(t, uint16(64), p.Metadata.Id)
assert.Equal(t, uint16(32), p.Metadata.Operation)
assert.Equal(t, uint32(0), p.Metadata.ContentLength)
Expand All @@ -245,6 +245,56 @@ func TestAsyncReadClose(t *testing.T) {
assert.NoError(t, err)
}

func TestAsyncReadAvailableClose(t *testing.T) {
t.Parallel()

reader, writer := net.Pipe()

emptyLogger := zerolog.New(ioutil.Discard)

readerConn := NewAsync(reader, &emptyLogger, false)
writerConn := NewAsync(writer, &emptyLogger, false)

p := packet.Get()
p.Metadata.Id = 64
p.Metadata.Operation = 32

err := writerConn.WritePacket(p)
require.NoError(t, err)

err = writerConn.WritePacket(p)
require.NoError(t, err)

packet.Put(p)

err = writerConn.Close()
require.NoError(t, err)

p, err = readerConn.ReadPacket()
require.NoError(t, err)
require.NotNil(t, p.Metadata)
assert.Equal(t, uint16(64), p.Metadata.Id)
assert.Equal(t, uint16(32), p.Metadata.Operation)
assert.Equal(t, uint32(0), p.Metadata.ContentLength)
assert.Equal(t, 0, len(p.Content.B))

p, err = readerConn.ReadPacket()
require.NoError(t, err)
require.NotNil(t, p.Metadata)
assert.Equal(t, uint16(64), p.Metadata.Id)
assert.Equal(t, uint16(32), p.Metadata.Operation)
assert.Equal(t, uint32(0), p.Metadata.ContentLength)
assert.Equal(t, 0, len(p.Content.B))

p, err = readerConn.ReadPacket()
require.Error(t, err)

err = readerConn.Close()
assert.NoError(t, err)
err = writerConn.Close()
assert.NoError(t, err)
}

func TestAsyncWriteClose(t *testing.T) {
t.Parallel()

Expand Down
93 changes: 50 additions & 43 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,16 +159,33 @@ func (c *Client) Logger() *zerolog.Logger {
return c.options.Logger
}

func (c *Client) handlePacket(ctx context.Context, conn *Async, p *packet.Packet) {
handlerFunc := c.handlerTable[p.Metadata.Operation]
func (c *Client) handleConn() {
var p *packet.Packet
var outgoing *packet.Packet
var action Action
var err error
var handlerFunc Handler
LOOP:
if c.closed.Load() {
c.wg.Done()
return
}
p, err = c.conn.ReadPacket()
if err != nil {
c.Logger().Error().Err(err).Msg("error while getting packet frisbee connection")
c.wg.Done()
_ = c.Close()
return
}
handlerFunc = c.handlerTable[p.Metadata.Operation]
if handlerFunc != nil {
packetCtx := ctx
packetCtx := c.ctx
if c.PacketContext != nil {
packetCtx = c.PacketContext(packetCtx, p)
}
outgoing, action := handlerFunc(packetCtx, p)
outgoing, action = handlerFunc(packetCtx, p)
if outgoing != nil && outgoing.Metadata.ContentLength == uint32(len(outgoing.Content.B)) {
err := conn.WritePacket(outgoing)
err = c.conn.WritePacket(outgoing)
if outgoing != p {
packet.Put(outgoing)
}
Expand All @@ -193,55 +210,45 @@ func (c *Client) handlePacket(ctx context.Context, conn *Async, p *packet.Packet
} else {
packet.Put(p)
}
}

func (c *Client) handleConn() {
var p *packet.Packet
var err error
LOOP:
if c.closed.Load() {
c.wg.Done()
return
}
p, err = c.conn.ReadPacket()
if err != nil {
c.Logger().Error().Err(err).Msg("error while getting packet frisbee connection")
c.wg.Done()
_ = c.Close()
return
}
c.handlePacket(c.ctx, c.conn, p)
goto LOOP
}

func (c *Client) heartbeat() {
for {
<-time.After(c.options.Heartbeat)
if c.closed.Load() {
select {
case <-c.CloseChannel():
c.wg.Done()
return
}
if c.conn.WriteBufferSize() == 0 {
err := c.WritePacket(HEARTBEATPacket)
if err != nil {
c.Logger().Error().Err(err).Msg("error while writing to frisbee conn")
case <-time.After(c.options.Heartbeat):
if c.closed.Load() {
c.wg.Done()
_ = c.Close()
return
}
start := time.Now()
c.Logger().Debug().Msgf("Heartbeat sent at %s", start)
select {
case <-c.heartbeatChannel:
c.Logger().Debug().Msgf("Heartbeat Received with RTT: %d", time.Since(start))
case <-time.After(c.options.Heartbeat):
c.Logger().Error().Msg("Heartbeat not received within timeout period")
c.wg.Done()
_ = c.Close()
return
if c.conn.WriteBufferSize() == 0 {
err := c.WritePacket(HEARTBEATPacket)
if err != nil {
c.Logger().Error().Err(err).Msg("error while writing to frisbee conn")
c.wg.Done()
_ = c.Close()
return
}
start := time.Now()
c.Logger().Debug().Msgf("Heartbeat sent at %s", start)
select {
case <-c.heartbeatChannel:
c.Logger().Debug().Msgf("Heartbeat Received with RTT: %d", time.Since(start))
case <-time.After(c.options.Heartbeat):
c.Logger().Error().Msg("Heartbeat not received within timeout period")
c.wg.Done()
_ = c.Close()
return
case <-c.CloseChannel():
c.wg.Done()
return
}
} else {
c.Logger().Debug().Msgf("Skipping heartbeat because write buffer size > 0")
}
} else {
c.Logger().Debug().Msgf("Skipping heartbeat because write buffer size > 0")
}
}
}
65 changes: 65 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,71 @@ func TestClientRaw(t *testing.T) {
assert.NoError(t, err)
}

func TestClientStaleClose(t *testing.T) {
t.Parallel()

const testSize = 100
const packetSize = 512

clientHandlerTable := make(HandlerTable)
serverHandlerTable := make(HandlerTable)

finished := make(chan struct{}, 1)

serverHandlerTable[metadata.PacketPing] = func(_ context.Context, incoming *packet.Packet) (outgoing *packet.Packet, action Action) {
if incoming.Metadata.Id == testSize-1 {
outgoing = incoming
action = CLOSE
}
return
}

clientHandlerTable[metadata.PacketPing] = func(_ context.Context, _ *packet.Packet) (outgoing *packet.Packet, action Action) {
finished <- struct{}{}
return
}

emptyLogger := zerolog.New(ioutil.Discard)
s, err := NewServer(":0", serverHandlerTable, WithLogger(&emptyLogger))
require.NoError(t, err)

err = s.Start()
require.NoError(t, err)

c, err := NewClient(s.listener.Addr().String(), clientHandlerTable, context.Background(), WithLogger(&emptyLogger))
assert.NoError(t, err)
_, err = c.Raw()
assert.ErrorIs(t, ConnectionNotInitialized, err)

err = c.Connect()
require.NoError(t, err)

data := make([]byte, packetSize)
_, _ = rand.Read(data)

p := packet.Get()
p.Metadata.Operation = metadata.PacketPing
p.Content.Write(data)
p.Metadata.ContentLength = packetSize

for q := 0; q < testSize; q++ {
p.Metadata.Id = uint16(q)
err := c.WritePacket(p)
assert.NoError(t, err)
}
packet.Put(p)
<-finished

_, err = c.conn.ReadPacket()
assert.ErrorIs(t, err, ConnectionClosed)

err = c.Close()
assert.NoError(t, err)

err = s.Shutdown()
assert.NoError(t, err)
}

func BenchmarkThroughputClient(b *testing.B) {
const testSize = 1<<16 - 1
const packetSize = 512
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/loopholelabs/frisbee
go 1.15

require (
github.com/loopholelabs/packet v0.2.0
github.com/loopholelabs/packet v0.2.2
github.com/loopholelabs/testing v0.2.3
github.com/pkg/errors v0.9.1
github.com/rs/zerolog v1.20.0
Expand Down
Loading

0 comments on commit 67310d3

Please sign in to comment.