Skip to content

Commit

Permalink
Add non-blocking IO
Browse files Browse the repository at this point in the history
This eliminates an edge case that can cause a deadlock and is a
prerequisite to cheaply testing connection liveness and to recoving a
connection after a timeout.

jackc/pgconn#27

Squashed commit of the following:

commit 0d7b0dd
Author: Jack Christensen <jack@jackchristensen.com>
Date:   Sat Jun 25 13:15:05 2022 -0500

    Add test for non-blocking IO preventing deadlock

commit 79d68d2
Author: Jack Christensen <jack@jackchristensen.com>
Date:   Sat Jun 18 18:23:24 2022 -0500

    Release CopyFrom buf when done

commit 95a4313
Author: Jack Christensen <jack@jackchristensen.com>
Date:   Sat Jun 18 18:22:32 2022 -0500

    Avoid allocations with non-blocking write

commit 6b63cee
Author: Jack Christensen <jack@jackchristensen.com>
Date:   Sat Jun 18 17:46:49 2022 -0500

    Simplify iobufpool usage

commit 60ecdda
Author: Jack Christensen <jack@jackchristensen.com>
Date:   Sat Jun 18 11:51:59 2022 -0500

    Add true non-blocking IO

commit 7dd26a3
Author: Jack Christensen <jack@jackchristensen.com>
Date:   Sat Jun 4 20:28:23 2022 -0500

    Fix block when reading more than buffered

commit afa7022
Author: Jack Christensen <jack@jackchristensen.com>
Date:   Sat Jun 4 20:10:23 2022 -0500

    More TLS support

commit 51655bf
Author: Jack Christensen <jack@jackchristensen.com>
Date:   Sat Jun 4 17:46:00 2022 -0500

    Steps toward TLS

commit 2b80beb
Author: Jack Christensen <jack@jackchristensen.com>
Date:   Sat Jun 4 13:06:29 2022 -0500

    Litle more TLS support

commit 765b2c6
Author: Jack Christensen <jack@jackchristensen.com>
Date:   Sat Jun 4 12:29:30 2022 -0500

    Add testing of TLS

commit 5b64432
Author: Jack Christensen <jack@jackchristensen.com>
Date:   Sat Jun 4 09:48:19 2022 -0500

    Introduce testVariants in prep for TLS

commit ecebd7b
Author: Jack Christensen <jack@jackchristensen.com>
Date:   Sat Jun 4 09:32:14 2022 -0500

    Handle and test read of previously buffered data

commit 09c64d8
Author: Jack Christensen <jack@jackchristensen.com>
Date:   Sat Jun 4 09:04:48 2022 -0500

    Rename nbbconn to nbconn

commit 73398bc
Author: Jack Christensen <jack@jackchristensen.com>
Date:   Sat Jun 4 08:59:53 2022 -0500

    Remove backup files

commit f1df39a
Author: Jack Christensen <jack@jackchristensen.com>
Date:   Sat Jun 4 08:58:05 2022 -0500

    Initial passing tests

commit ea3cdab
Author: Jack Christensen <jack@jackchristensen.com>
Date:   Sat Jun 4 08:38:57 2022 -0500

    Fix connect timeout

commit ca22396
Author: Jack Christensen <jack@jackchristensen.com>
Date:   Thu Jun 2 19:32:55 2022 -0500

    wip

commit 2e7b46d
Author: Jack Christensen <jack@jackchristensen.com>
Date:   Mon May 30 08:32:43 2022 -0500

    Update comments

commit 7d04dc5
Author: Jack Christensen <jack@jackchristensen.com>
Date:   Sat May 28 19:43:23 2022 -0500

    Fix broken test

commit bf1edc7
Author: Jack Christensen <jack@jackchristensen.com>
Date:   Sat May 28 19:40:33 2022 -0500

    fixed putting wrong size bufs

commit 1f7a855
Author: Jack Christensen <jack@jackchristensen.com>
Date:   Sat May 28 18:13:47 2022 -0500

    initial not quite working non-blocking conn
  • Loading branch information
jackc committed Jun 25, 2022
1 parent c0a4d1b commit 811d855
Show file tree
Hide file tree
Showing 8 changed files with 1,327 additions and 145 deletions.
37 changes: 24 additions & 13 deletions internal/iobufpool/iobufpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,33 +14,44 @@ func init() {
}
}

// Get gets a []byte with len >= size and len <= size*2.
// Get gets a []byte of len size with cap <= size*2.
func Get(size int) []byte {
i := poolIdx(size)
i := getPoolIdx(size)
if i >= len(pools) {
return make([]byte, size)
}
return pools[i].Get().([]byte)
return pools[i].Get().([]byte)[:size]
}

func getPoolIdx(size int) int {
size--
size >>= minPoolExpOf2
i := 0
for size > 0 {
size >>= 1
i++
}

return i
}

// Put returns buf to the pool.
func Put(buf []byte) {
i := poolIdx(len(buf))
if i >= len(pools) {
i := putPoolIdx(cap(buf))
if i < 0 {
return
}

pools[i].Put(buf)
}

func poolIdx(size int) int {
size--
size >>= minPoolExpOf2
i := 0
for size > 0 {
size >>= 1
i++
func putPoolIdx(size int) int {
minPoolSize := 1 << minPoolExpOf2
for i := range pools {
if size == minPoolSize<<i {
return i
}
}

return i
return -1
}
2 changes: 1 addition & 1 deletion internal/iobufpool/iobufpool_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestPoolIdx(t *testing.T) {
{size: 8388609, expected: 16},
}
for _, tt := range tests {
idx := poolIdx(tt.size)
idx := getPoolIdx(tt.size)
assert.Equalf(t, tt.expected, idx, "size: %d", tt.size)
}
}
75 changes: 59 additions & 16 deletions internal/iobufpool/iobufpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,74 @@ import (

"github.com/jackc/pgx/v5/internal/iobufpool"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestGet(t *testing.T) {
func TestGetCap(t *testing.T) {
tests := []struct {
requestedLen int
expectedLen int
expectedCap int
}{
{requestedLen: 0, expectedLen: 256},
{requestedLen: 128, expectedLen: 256},
{requestedLen: 255, expectedLen: 256},
{requestedLen: 256, expectedLen: 256},
{requestedLen: 257, expectedLen: 512},
{requestedLen: 511, expectedLen: 512},
{requestedLen: 512, expectedLen: 512},
{requestedLen: 513, expectedLen: 1024},
{requestedLen: 1023, expectedLen: 1024},
{requestedLen: 1024, expectedLen: 1024},
{requestedLen: 33554431, expectedLen: 33554432},
{requestedLen: 33554432, expectedLen: 33554432},
{requestedLen: 0, expectedCap: 256},
{requestedLen: 128, expectedCap: 256},
{requestedLen: 255, expectedCap: 256},
{requestedLen: 256, expectedCap: 256},
{requestedLen: 257, expectedCap: 512},
{requestedLen: 511, expectedCap: 512},
{requestedLen: 512, expectedCap: 512},
{requestedLen: 513, expectedCap: 1024},
{requestedLen: 1023, expectedCap: 1024},
{requestedLen: 1024, expectedCap: 1024},
{requestedLen: 33554431, expectedCap: 33554432},
{requestedLen: 33554432, expectedCap: 33554432},

// Above 32 MiB skip the pool and allocate exactly the requested size.
{requestedLen: 33554433, expectedLen: 33554433},
{requestedLen: 33554433, expectedCap: 33554433},
}
for _, tt := range tests {
buf := iobufpool.Get(tt.requestedLen)
assert.Equalf(t, tt.expectedLen, len(buf), "requestedLen: %d", tt.requestedLen)
assert.Equalf(t, tt.requestedLen, len(buf), "bad len for requestedLen: %d", len(buf), tt.requestedLen)
assert.Equalf(t, tt.expectedCap, cap(buf), "bad cap for requestedLen: %d", tt.requestedLen)
}
}

func TestPutHandlesWrongSizedBuffers(t *testing.T) {
for putBufSize := range []int{0, 1, 128, 250, 256, 257, 1023, 1024, 1025, 1 << 28} {
putBuf := make([]byte, putBufSize)
iobufpool.Put(putBuf)

tests := []struct {
requestedLen int
expectedCap int
}{
{requestedLen: 0, expectedCap: 256},
{requestedLen: 128, expectedCap: 256},
{requestedLen: 255, expectedCap: 256},
{requestedLen: 256, expectedCap: 256},
{requestedLen: 257, expectedCap: 512},
{requestedLen: 511, expectedCap: 512},
{requestedLen: 512, expectedCap: 512},
{requestedLen: 513, expectedCap: 1024},
{requestedLen: 1023, expectedCap: 1024},
{requestedLen: 1024, expectedCap: 1024},
{requestedLen: 33554431, expectedCap: 33554432},
{requestedLen: 33554432, expectedCap: 33554432},

// Above 32 MiB skip the pool and allocate exactly the requested size.
{requestedLen: 33554433, expectedCap: 33554433},
}
for _, tt := range tests {
getBuf := iobufpool.Get(tt.requestedLen)
assert.Equalf(t, tt.requestedLen, len(getBuf), "len(putBuf): %d, requestedLen: %d", len(putBuf), tt.requestedLen)
assert.Equalf(t, tt.expectedCap, cap(getBuf), "cap(putBuf): %d, requestedLen: %d", cap(putBuf), tt.requestedLen)
}
}
}

func TestPutGetBufferReuse(t *testing.T) {
buf := iobufpool.Get(4)
buf[0] = 1
iobufpool.Put(buf)
buf = iobufpool.Get(4)
require.Equal(t, byte(1), buf[0])
}
70 changes: 70 additions & 0 deletions internal/nbconn/bufferqueue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package nbconn

import (
"sync"
)

const minBufferQueueLen = 8

type bufferQueue struct {
lock sync.Mutex
queue [][]byte
r, w int
}

func (bq *bufferQueue) pushBack(buf []byte) {
bq.lock.Lock()
defer bq.lock.Unlock()

if bq.w >= len(bq.queue) {
bq.growQueue()
}
bq.queue[bq.w] = buf
bq.w++
}

func (bq *bufferQueue) pushFront(buf []byte) {
bq.lock.Lock()
defer bq.lock.Unlock()

if bq.w >= len(bq.queue) {
bq.growQueue()
}
copy(bq.queue[bq.r+1:bq.w+1], bq.queue[bq.r:bq.w])
bq.queue[bq.r] = buf
bq.w++
}

func (bq *bufferQueue) popFront() []byte {
bq.lock.Lock()
defer bq.lock.Unlock()

if bq.r == bq.w {
return nil
}

buf := bq.queue[bq.r]
bq.queue[bq.r] = nil // Clear reference so it can be garbage collected.
bq.r++

if bq.r == bq.w {
bq.r = 0
bq.w = 0
if len(bq.queue) > minBufferQueueLen {
bq.queue = make([][]byte, minBufferQueueLen)
}
}

return buf
}

func (bq *bufferQueue) growQueue() {
desiredLen := (len(bq.queue) + 1) * 3 / 2
if desiredLen < minBufferQueueLen {
desiredLen = minBufferQueueLen
}

newQueue := make([][]byte, desiredLen)
copy(newQueue, bq.queue)
bq.queue = newQueue
}
Loading

0 comments on commit 811d855

Please sign in to comment.