Skip to content

Commit

Permalink
Merge pull request #171 from loopholelabs/staging
Browse files Browse the repository at this point in the history
Updating Master
  • Loading branch information
ShivanshVij committed Mar 27, 2024
2 parents a3733d0 + acf1d47 commit d6a7a8a
Show file tree
Hide file tree
Showing 20 changed files with 210 additions and 157 deletions.
24 changes: 12 additions & 12 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v3
uses: actions/checkout@v4
- name: Install Go
uses: actions/setup-go@v3
uses: actions/setup-go@v4
with:
go-version: "1.20"
go-version: "1.22"
check-latest: true
cache: true
- name: Run Tests
Expand All @@ -20,11 +20,11 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v3
uses: actions/checkout@v4
- name: Install Go
uses: actions/setup-go@v3
uses: actions/setup-go@v4
with:
go-version: "1.20"
go-version: "1.22"
check-latest: true
cache: true
- name: Test with Race Conditions
Expand All @@ -34,11 +34,11 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v3
uses: actions/checkout@v4
- name: Install Go
uses: actions/setup-go@v3
uses: actions/setup-go@v4
with:
go-version: "1.20"
go-version: "1.22"
check-latest: true
cache: true
- name: Benchmark
Expand All @@ -47,11 +47,11 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v3
uses: actions/checkout@v4
- name: Install Go
uses: actions/setup-go@v3
uses: actions/setup-go@v4
with:
go-version: "1.20"
go-version: "1.22"
check-latest: true
cache: true
- name: Benchmark with Race Conditions
Expand Down
14 changes: 13 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,17 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

## [v0.7.3] - 2024-03-26

### Dependencies

- Updated `polyglot` to `v1.2.2`

### Fix

- Fixed bugs where test cases would not generate random data properly
- Fixed a bug where `ConnContext` would not get called until the first packet was read

## [v0.7.2] - 2023-08-26

### Features
Expand Down Expand Up @@ -363,7 +374,8 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

Initial Release of Frisbee

[unreleased]: https://github.com/loopholelabs/frisbee/compare/v0.7.2...HEAD
[unreleased]: https://github.com/loopholelabs/frisbee/compare/v0.7.3...HEAD
[v0.7.3]: https://github.com/loopholelabs/frisbee/compare/v0.7.1...v0.7.3
[v0.7.2]: https://github.com/loopholelabs/frisbee/compare/v0.7.1...v0.7.2
[v0.7.1]: https://github.com/loopholelabs/frisbee/compare/v0.7.0...v0.7.1
[v0.7.0]: https://github.com/loopholelabs/frisbee/compare/v0.6.0...v0.7.0
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ This is the [Go](http://golang.org) implementation of **Frisbee**, a bring-your-
protocol messaging framework designed for performance and
stability.

**This library requires Go1.18 or later.**
**This library requires Go1.20 or later.**

## Important note about releases and stability

Expand Down
56 changes: 33 additions & 23 deletions async.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,17 @@ import (
"context"
"crypto/tls"
"encoding/binary"
"errors"
"net"
"sync"
"time"

"github.com/loopholelabs/common/pkg/queue"
"github.com/loopholelabs/frisbee-go/internal/dialer"
"github.com/loopholelabs/frisbee-go/pkg/metadata"
"github.com/loopholelabs/frisbee-go/pkg/packet"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"go.uber.org/atomic"
"net"
"sync"
"time"
)

// Async is the underlying asynchronous frisbee connection which has extremely efficient read and write logic and
Expand Down Expand Up @@ -182,7 +183,7 @@ func (c *Async) WritePacket(p *packet.Packet) error {
if p.Metadata.Operation <= RESERVED9 {
return InvalidOperation
}
return c.writePacket(p)
return c.writePacket(p, true)
}

// ReadPacket is a blocking function that will wait until a Frisbee packet is available and then return it (and its content).
Expand Down Expand Up @@ -299,8 +300,8 @@ func (c *Async) Close() error {
}

// write packet is the internal write packet function that does not check for reserved operations.
func (c *Async) writePacket(p *packet.Packet) error {
if int(p.Metadata.ContentLength) != len(*p.Content) {
func (c *Async) writePacket(p *packet.Packet, closeOnErr bool) error {
if int(p.Metadata.ContentLength) != p.Content.Len() {
return InvalidContentLength
}

Expand All @@ -322,7 +323,10 @@ func (c *Async) writePacket(p *packet.Packet) error {
return ConnectionClosed
}
c.Logger().Debug().Err(err).Uint16("Packet ID", p.Metadata.Id).Msg("error while setting write deadline before writing packet")
return c.closeWithError(err)
if closeOnErr {
return c.closeWithError(err)
}
return err
}
_, err = c.writer.Write(encodedMetadata[:])
metadata.PutBuffer(encodedMetadata)
Expand All @@ -333,18 +337,24 @@ func (c *Async) writePacket(p *packet.Packet) error {
return ConnectionClosed
}
c.Logger().Debug().Err(err).Uint16("Packet ID", p.Metadata.Id).Msg("error while writing encoded metadata")
return c.closeWithError(err)
if closeOnErr {
return c.closeWithError(err)
}
return err
}
if p.Metadata.ContentLength != 0 {
_, err = c.writer.Write((*p.Content)[:p.Metadata.ContentLength])
_, err = c.writer.Write(p.Content.Bytes()[:p.Metadata.ContentLength])
if err != nil {
c.Unlock()
if c.closed.Load() {
c.Logger().Debug().Err(ConnectionClosed).Uint16("Packet ID", p.Metadata.Id).Msg("error while writing packet content")
return ConnectionClosed
}
c.Logger().Debug().Err(err).Uint16("Packet ID", p.Metadata.Id).Msg("error while writing packet content")
return c.closeWithError(err)
if closeOnErr {
return c.closeWithError(err)
}
return err
}
}

Expand Down Expand Up @@ -456,7 +466,7 @@ func (c *Async) pingLoop() {
c.wg.Done()
return
case <-ticker.C:
err = c.writePacket(PINGPacket)
err = c.writePacket(PINGPacket, false)
if err != nil {
c.wg.Done()
_ = c.closeWithError(err)
Expand Down Expand Up @@ -515,7 +525,7 @@ func (c *Async) readLoop() {
switch p.Metadata.Operation {
case PING:
c.Logger().Debug().Msg("PING Packet received by read loop, sending back PONG packet")
err = c.writePacket(PONGPacket)
err = c.writePacket(PONGPacket, false)
if err != nil {
c.wg.Done()
_ = c.closeWithError(err)
Expand All @@ -540,13 +550,13 @@ func (c *Async) readLoop() {
default:
if p.Metadata.ContentLength > 0 {
if n-index < int(p.Metadata.ContentLength) {
min := int(p.Metadata.ContentLength) - p.Content.Write(buf[index:n])
minSize := int(p.Metadata.ContentLength) - p.Content.Write(buf[index:n])
n = 0
for cap(buf) < min {
for cap(buf) < minSize {
buf = append(buf[:cap(buf)], 0)
}
buf = buf[:cap(buf)]
for n < min {
for n < minSize {
var nn int
err = c.conn.SetReadDeadline(time.Now().Add(DefaultDeadline))
if err != nil {
Expand All @@ -557,16 +567,16 @@ func (c *Async) readLoop() {
nn, err = c.conn.Read(buf[n:])
n += nn
if err != nil {
if n < min {
if n < minSize {
c.wg.Done()
_ = c.closeWithError(err)
return
}
break
}
}
p.Content.Write(buf[:min])
index = min
p.Content.Write(buf[:minSize])
index = minSize
} else {
index += p.Content.Write(buf[index : index+int(p.Metadata.ContentLength)])
}
Expand Down Expand Up @@ -648,14 +658,14 @@ func (c *Async) readLoop() {
index = n

buf = buf[:cap(buf)]
min := metadata.Size - index
if len(buf) < min {
minSize := metadata.Size - index
if len(buf) < minSize {
c.wg.Done()
_ = c.closeWithError(InvalidBufferLength)
return
}
n = 0
for n < min {
for n < minSize {
var nn int
err = c.conn.SetReadDeadline(time.Now().Add(DefaultDeadline))
if err != nil {
Expand All @@ -666,7 +676,7 @@ func (c *Async) readLoop() {
nn, err = c.conn.Read(buf[index+n:])
n += nn
if err != nil {
if n < min {
if n < minSize {
c.wg.Done()
_ = c.closeWithError(err)
return
Expand Down
34 changes: 21 additions & 13 deletions async_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestNewAsync(t *testing.T) {
assert.Equal(t, uint16(64), p.Metadata.Id)
assert.Equal(t, uint16(32), p.Metadata.Operation)
assert.Equal(t, uint32(0), p.Metadata.ContentLength)
assert.Equal(t, 0, len(*p.Content))
assert.Equal(t, 0, p.Content.Len())

data := make([]byte, packetSize)
_, _ = rand.Read(data)
Expand All @@ -77,8 +77,10 @@ func TestNewAsync(t *testing.T) {
assert.Equal(t, uint16(64), p.Metadata.Id)
assert.Equal(t, uint16(32), p.Metadata.Operation)
assert.Equal(t, uint32(packetSize), p.Metadata.ContentLength)
assert.Equal(t, len(data), len(*p.Content))
assert.Equal(t, polyglot.Buffer(data), *p.Content)
assert.Equal(t, len(data), p.Content.Len())
expected := polyglot.NewBufferFromBytes(data)
expected.MoveOffset(len(data))
assert.Equal(t, expected.Bytes(), p.Content.Bytes())

packet.Put(p)

Expand Down Expand Up @@ -124,8 +126,10 @@ func TestAsyncLargeWrite(t *testing.T) {
assert.Equal(t, uint16(64), p.Metadata.Id)
assert.Equal(t, uint16(32), p.Metadata.Operation)
assert.Equal(t, uint32(packetSize), p.Metadata.ContentLength)
assert.Equal(t, len(randomData[i]), len(*p.Content))
assert.Equal(t, polyglot.Buffer(randomData[i]), *p.Content)
assert.Equal(t, len(randomData[i]), p.Content.Len())
expected := polyglot.NewBufferFromBytes(randomData[i])
expected.MoveOffset(len(randomData[i]))
assert.Equal(t, expected.Bytes(), p.Content.Bytes())
packet.Put(p)
}

Expand Down Expand Up @@ -172,8 +176,10 @@ func TestAsyncRawConn(t *testing.T) {
assert.Equal(t, uint16(64), p.Metadata.Id)
assert.Equal(t, uint16(32), p.Metadata.Operation)
assert.Equal(t, uint32(packetSize), p.Metadata.ContentLength)
assert.Equal(t, packetSize, len(*p.Content))
assert.Equal(t, polyglot.Buffer(randomData), *p.Content)
assert.Equal(t, packetSize, p.Content.Len())
expected := polyglot.NewBufferFromBytes(randomData)
expected.MoveOffset(len(randomData))
assert.Equal(t, expected.Bytes(), p.Content.Bytes())
}

rawReaderConn := readerConn.Raw()
Expand Down Expand Up @@ -226,7 +232,7 @@ func TestAsyncReadClose(t *testing.T) {
assert.Equal(t, uint16(64), p.Metadata.Id)
assert.Equal(t, uint16(32), p.Metadata.Operation)
assert.Equal(t, uint32(0), p.Metadata.ContentLength)
assert.Equal(t, 0, len(*p.Content))
assert.Equal(t, 0, p.Content.Len())

err = readerConn.conn.Close()
assert.NoError(t, err)
Expand Down Expand Up @@ -277,15 +283,15 @@ func TestAsyncReadAvailableClose(t *testing.T) {
assert.Equal(t, uint16(64), p.Metadata.Id)
assert.Equal(t, uint16(32), p.Metadata.Operation)
assert.Equal(t, uint32(0), p.Metadata.ContentLength)
assert.Equal(t, 0, len(*p.Content))
assert.Equal(t, 0, p.Content.Len())

p, err = readerConn.ReadPacket()
require.NoError(t, err)
require.NotNil(t, p.Metadata)
assert.Equal(t, uint16(64), p.Metadata.Id)
assert.Equal(t, uint16(32), p.Metadata.Operation)
assert.Equal(t, uint32(0), p.Metadata.ContentLength)
assert.Equal(t, 0, len(*p.Content))
assert.Equal(t, 0, p.Content.Len())

_, err = readerConn.ReadPacket()
require.Error(t, err)
Expand Down Expand Up @@ -324,7 +330,7 @@ func TestAsyncWriteClose(t *testing.T) {
assert.Equal(t, uint16(64), p.Metadata.Id)
assert.Equal(t, uint16(32), p.Metadata.Operation)
assert.Equal(t, uint32(0), p.Metadata.ContentLength)
assert.Equal(t, 0, len(*p.Content))
assert.Equal(t, 0, p.Content.Len())

err = writerConn.WritePacket(p)
assert.NoError(t, err)
Expand Down Expand Up @@ -377,7 +383,7 @@ func TestAsyncTimeout(t *testing.T) {
assert.Equal(t, uint16(64), p.Metadata.Id)
assert.Equal(t, uint16(32), p.Metadata.Operation)
assert.Equal(t, uint32(0), p.Metadata.ContentLength)
assert.Equal(t, 0, len(*p.Content))
assert.Equal(t, 0, p.Content.Len())

time.Sleep(DefaultDeadline * 2)

Expand Down Expand Up @@ -408,7 +414,7 @@ func TestAsyncTimeout(t *testing.T) {
assert.Equal(t, uint16(64), p.Metadata.Id)
assert.Equal(t, uint16(32), p.Metadata.Operation)
assert.Equal(t, uint32(0), p.Metadata.ContentLength)
assert.Equal(t, 0, len(*p.Content))
assert.Equal(t, 0, p.Content.Len())

_, err = readerConn.ReadPacket()
require.ErrorIs(t, err, ConnectionClosed)
Expand Down Expand Up @@ -478,12 +484,14 @@ func BenchmarkAsyncThroughputNetworkMultiple(b *testing.B) {
var err error

randomData := make([]byte, packetSize)
_, _ = rand.Read(randomData)

p := packet.Get()
p.Metadata.Id = 64
p.Metadata.Operation = 32
p.Content.Write(randomData)
p.Metadata.ContentLength = packetSize

for i := 0; i < b.N; i++ {
done := make(chan struct{}, 1)
errCh := make(chan error, 1)
Expand Down
2 changes: 1 addition & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (c *Client) handleConn() {
packetCtx = c.PacketContext(packetCtx, p)
}
outgoing, action = handlerFunc(packetCtx, p)
if outgoing != nil && outgoing.Metadata.ContentLength == uint32(len(*outgoing.Content)) {
if outgoing != nil && outgoing.Metadata.ContentLength == uint32(outgoing.Content.Len()) {
err = c.conn.WritePacket(outgoing)
if outgoing != p {
packet.Put(outgoing)
Expand Down
Loading

0 comments on commit d6a7a8a

Please sign in to comment.