Skip to content

Commit

Permalink
Add PgConn.SyncConn
Browse files Browse the repository at this point in the history
This provides a way to ensure it is safe to directly read or write to
the underlying net.Conn.

#1673
  • Loading branch information
jackc committed Jul 12, 2023
1 parent 05440f9 commit f512b96
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 24 deletions.
47 changes: 27 additions & 20 deletions pgconn/internal/bgreader/bgreader.go
Expand Up @@ -9,18 +9,18 @@ import (
)

const (
bgReaderStatusStopped = iota
bgReaderStatusRunning
bgReaderStatusStopping
StatusStopped = iota
StatusRunning
StatusStopping
)

// BGReader is an io.Reader that can optionally buffer reads in the background. It is safe for concurrent use.
type BGReader struct {
r io.Reader

cond *sync.Cond
bgReaderStatus int32
readResults []readResult
cond *sync.Cond
status int32
readResults []readResult
}

type readResult struct {
Expand All @@ -34,14 +34,14 @@ func (r *BGReader) Start() {
r.cond.L.Lock()
defer r.cond.L.Unlock()

switch r.bgReaderStatus {
case bgReaderStatusStopped:
r.bgReaderStatus = bgReaderStatusRunning
switch r.status {
case StatusStopped:
r.status = StatusRunning
go r.bgRead()
case bgReaderStatusRunning:
case StatusRunning:
// no-op
case bgReaderStatusStopping:
r.bgReaderStatus = bgReaderStatusRunning
case StatusStopping:
r.status = StatusRunning
}
}

Expand All @@ -51,16 +51,23 @@ func (r *BGReader) Stop() {
r.cond.L.Lock()
defer r.cond.L.Unlock()

switch r.bgReaderStatus {
case bgReaderStatusStopped:
switch r.status {
case StatusStopped:
// no-op
case bgReaderStatusRunning:
r.bgReaderStatus = bgReaderStatusStopping
case bgReaderStatusStopping:
case StatusRunning:
r.status = StatusStopping
case StatusStopping:
// no-op
}
}

// Status returns the current status of the background reader.
func (r *BGReader) Status() int32 {
r.cond.L.Lock()
defer r.cond.L.Unlock()
return r.status
}

func (r *BGReader) bgRead() {
keepReading := true
for keepReading {
Expand All @@ -70,8 +77,8 @@ func (r *BGReader) bgRead() {

r.cond.L.Lock()
r.readResults = append(r.readResults, readResult{buf: buf, err: err})
if r.bgReaderStatus == bgReaderStatusStopping || err != nil {
r.bgReaderStatus = bgReaderStatusStopped
if r.status == StatusStopping || err != nil {
r.status = StatusStopped
keepReading = false
}
r.cond.L.Unlock()
Expand All @@ -89,7 +96,7 @@ func (r *BGReader) Read(p []byte) (int, error) {
}

// There are no unread background read results and the background reader is stopped.
if r.bgReaderStatus == bgReaderStatusStopped {
if r.status == StatusStopped {
return r.r.Read(p)
}

Expand Down
33 changes: 29 additions & 4 deletions pgconn/pgconn.go
Expand Up @@ -556,7 +556,8 @@ func (pgConn *PgConn) receiveMessage() (pgproto3.BackendMessage, error) {
return msg, nil
}

// Conn returns the underlying net.Conn. This rarely necessary.
// Conn returns the underlying net.Conn. This rarely necessary. If the connection will be directly used for reading or
// writing then SyncConn should usually be called before Conn.
func (pgConn *PgConn) Conn() net.Conn {
return pgConn.conn
}
Expand Down Expand Up @@ -1740,6 +1741,30 @@ func (pgConn *PgConn) flushWithPotentialWriteReadDeadlock() error {
return err
}

// SyncConn prepares the underlying net.Conn for direct use. PgConn may internally buffer reads or use goroutines for
// background IO. This means that any direct use of the underlying net.Conn may be corrupted if a read is already
// buffered or a read is in progress. SyncConn drains read buffers and stops background IO. In some cases this may
// require sending a ping to the server. ctx can be used to cancel this operation. This should be called before any
// operation that will use the underlying net.Conn directly. e.g. Before Conn() or Hijack().
//
// This should not be confused with the PostgreSQL protocol Sync message.
func (pgConn *PgConn) SyncConn(ctx context.Context) error {
for i := 0; i < 10; i++ {
if pgConn.bgReader.Status() == bgreader.StatusStopped && pgConn.frontend.ReadBufferLen() == 0 {
return nil
}

err := pgConn.Ping(ctx)
if err != nil {
return fmt.Errorf("SyncConn: Ping failed while syncing conn: %w", err)
}
}

// This should never happen. Only way I can imagine this occuring is if the server is constantly sending data such as
// LISTEN/NOTIFY or log notifications such that we never can get an empty buffer.
return errors.New("SyncConn: conn never synchronized")
}

// HijackedConn is the result of hijacking a connection.
//
// Due to the necessary exposure of internal implementation details, it is not covered by the semantic versioning
Expand All @@ -1754,9 +1779,9 @@ type HijackedConn struct {
Config *Config
}

// Hijack extracts the internal connection data. pgConn must be in an idle state. pgConn is unusable after hijacking.
// Hijacking is typically only useful when using pgconn to establish a connection, but taking complete control of the
// raw connection after that (e.g. a load balancer or proxy).
// Hijack extracts the internal connection data. pgConn must be in an idle state. SyncConn should be called immediately
// before Hijack. pgConn is unusable after hijacking. Hijacking is typically only useful when using pgconn to establish
// a connection, but taking complete control of the raw connection after that (e.g. a load balancer or proxy).
//
// Due to the necessary exposure of internal implementation details, it is not covered by the semantic versioning
// compatibility.
Expand Down
3 changes: 3 additions & 0 deletions pgconn/pgconn_test.go
Expand Up @@ -2319,6 +2319,9 @@ func TestHijackAndConstruct(t *testing.T) {
origConn, err := pgconn.Connect(ctx, os.Getenv("PGX_TEST_DATABASE"))
require.NoError(t, err)

err = origConn.SyncConn(ctx)
require.NoError(t, err)

hc, err := origConn.Hijack()
require.NoError(t, err)

Expand Down
4 changes: 4 additions & 0 deletions pgproto3/frontend.go
Expand Up @@ -361,3 +361,7 @@ func (f *Frontend) findAuthenticationMessageType(src []byte) (BackendMessage, er
func (f *Frontend) GetAuthType() uint32 {
return f.authType
}

func (f *Frontend) ReadBufferLen() int {
return f.cr.wp - f.cr.rp
}

0 comments on commit f512b96

Please sign in to comment.