Skip to content

Commit

Permalink
Implement driver option binary_parameters
Browse files Browse the repository at this point in the history
If set, all parameters of type []byte will be sent over as binary data,
which allows the driver to avoid the second round-trip to the server
when not using a prepared statement proper.  This also allows pgbouncer
to be used with parameterized queries, which is impossible if
binary_parameters has not been enabled.

This patch is still missing documentation, but the code has been sitting
on my hard drive for long enough.  I'll commit the docs in a followup
commit.

Marko Tiikkaja, with some help from Chris Gilling
  • Loading branch information
johto committed Jul 1, 2015
1 parent e93ec67 commit 2997d16
Show file tree
Hide file tree
Showing 5 changed files with 300 additions and 27 deletions.
18 changes: 12 additions & 6 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,18 @@ env:
- PQSSLCERTTEST_PATH=$PWD/certs
- PGHOST=127.0.0.1
matrix:
- PGVERSION=9.4
- PGVERSION=9.3
- PGVERSION=9.2
- PGVERSION=9.1
- PGVERSION=9.0
- PGVERSION=8.4
- PGVERSION=9.4 PQTEST_BINARY_PARAMETERS=yes
- PGVERSION=9.3 PQTEST_BINARY_PARAMETERS=yes
- PGVERSION=9.2 PQTEST_BINARY_PARAMETERS=yes
- PGVERSION=9.1 PQTEST_BINARY_PARAMETERS=yes
- PGVERSION=9.0 PQTEST_BINARY_PARAMETERS=yes
- PGVERSION=8.4 PQTEST_BINARY_PARAMETERS=yes
- PGVERSION=9.4 PQTEST_BINARY_PARAMETERS=no
- PGVERSION=9.3 PQTEST_BINARY_PARAMETERS=no
- PGVERSION=9.2 PQTEST_BINARY_PARAMETERS=no
- PGVERSION=9.1 PQTEST_BINARY_PARAMETERS=no
- PGVERSION=9.0 PQTEST_BINARY_PARAMETERS=no
- PGVERSION=8.4 PQTEST_BINARY_PARAMETERS=no

script:
- go test -v ./...
Expand Down
146 changes: 127 additions & 19 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ type conn struct {
// receiving query results from prepared statements. Only provided for
// debugging.
disablePreparedBinaryResult bool

// Whether to always send []byte parameters over as binary. Enables single
// round-trip mode for non-prepared Query calls.
binaryParameters bool
}

// Handle driver-side settings in parsed connection string.
Expand All @@ -122,7 +126,7 @@ func (c *conn) handleDriverSettings(o values) (err error) {
} else if value == "no" {
*val = false
} else {
return fmt.Errorf("unrecognized value %q for disable_prepared_binary_result", value)
return fmt.Errorf("unrecognized value %q for %s", value, key)
}
}
return nil
Expand All @@ -132,6 +136,10 @@ func (c *conn) handleDriverSettings(o values) (err error) {
if err != nil {
return err
}
err = boolSetting("binary_parameters", &c.binaryParameters)
if err != nil {
return err
}
return nil
}

Expand Down Expand Up @@ -234,6 +242,7 @@ func DialOpen(d Dialer, name string) (_ driver.Conn, err error) {
cn.ssl(o)
cn.buf = bufio.NewReader(cn.c)
cn.startup(o)

// reset the deadline, in case one was set (see dial)
if timeout := o.Get("connect_timeout"); timeout != "" && timeout != "0" {
err = cn.c.SetDeadline(time.Time{})
Expand Down Expand Up @@ -696,18 +705,29 @@ func (cn *conn) Query(query string, args []driver.Value) (_ driver.Rows, err err
return cn.simpleQuery(query)
}

st := cn.prepareTo(query, "")
st.exec(args)
return &rows{
cn: cn,
colNames: st.colNames,
colTyps: st.colTyps,
colFmts: st.colFmts,
}, nil
if cn.binaryParameters {
cn.sendBinaryModeQuery(query, args)

cn.readParseResponse()
cn.readBindResponse()
rows := &rows{cn: cn}
rows.colNames, rows.colFmts, rows.colTyps = cn.readPortalDescribeResponse()
cn.postExecuteWorkaround()
return rows, nil
} else {
st := cn.prepareTo(query, "")
st.exec(args)
return &rows{
cn: cn,
colNames: st.colNames,
colTyps: st.colTyps,
colFmts: st.colFmts,
}, nil
}
}

// Implement the optional "Execer" interface for one-shot queries
func (cn *conn) Exec(query string, args []driver.Value) (_ driver.Result, err error) {
func (cn *conn) Exec(query string, args []driver.Value) (res driver.Result, err error) {
if cn.bad {
return nil, driver.ErrBadConn
}
Expand All @@ -721,17 +741,26 @@ func (cn *conn) Exec(query string, args []driver.Value) (_ driver.Result, err er
return r, err
}

// Use the unnamed statement to defer planning until bind
// time, or else value-based selectivity estimates cannot be
// used.
st := cn.prepareTo(query, "")
if cn.binaryParameters {
cn.sendBinaryModeQuery(query, args)

r, err := st.Exec(args)
if err != nil {
panic(err)
cn.readParseResponse()
cn.readBindResponse()
cn.readPortalDescribeResponse()
cn.postExecuteWorkaround()
res, _, err = cn.readExecuteResponse("Execute")
return res, err
} else {
// Use the unnamed statement to defer planning until bind
// time, or else value-based selectivity estimates cannot be
// used.
st := cn.prepareTo(query, "")
r, err := st.Exec(args)
if err != nil {
panic(err)
}
return r, err
}

return r, err
}

func (cn *conn) send(m *writeBuf) {
Expand Down Expand Up @@ -1026,6 +1055,8 @@ func isDriverSetting(key string) bool {
return true
case "disable_prepared_binary_result":
return true
case "binary_parameters":
return true

default:
return false
Expand Down Expand Up @@ -1375,6 +1406,65 @@ func md5s(s string) string {
return fmt.Sprintf("%x", h.Sum(nil))
}

func (cn *conn) sendBinaryModeQuery(query string, args []driver.Value) {
if len(args) >= 65536 {
errorf("got %d parameters but PostgreSQL only supports 65535 parameters", len(args))
}

b := cn.writeBuf('P')
b.byte(0) // unnamed statement
b.string(query)
b.int16(0)

b.next('B')
b.int16(0) // unnamed portal and statement

// Do one pass over the parameters to see if we're going to send any of
// them over in binary. If we are, create a paramFormats array at the
// same time.
var paramFormats []int
for i, x := range args {
_, ok := x.([]byte)
if ok {
if paramFormats == nil {
paramFormats = make([]int, len(args))
}
paramFormats[i] = 1
}
}
if paramFormats == nil {
b.int16(0)
} else {
b.int16(len(paramFormats))
for _, x := range paramFormats {
b.int16(x)
}
}

b.int16(len(args))
for _, x := range args {
if x == nil {
b.int32(-1)
} else {
datum := binaryEncode(&cn.parameterStatus, x)
b.int32(len(datum))
b.bytes(datum)
}
}
b.int16(0)

b.next('D')
b.byte('P')
b.byte(0) // unnamed portal

b.next('E')
b.byte(0)
b.int32(0)

b.next('S')
cn.send(b)
}

func (c *conn) processParameterStatus(r *readBuf) {
var err error

Expand Down Expand Up @@ -1457,6 +1547,24 @@ func (cn *conn) readStatementDescribeResponse() (paramTyps []oid.Oid, colNames [
}
}

func (cn *conn) readPortalDescribeResponse() (colNames []string, colFmts []format, colTyps []oid.Oid) {
t, r := cn.recv1()
switch t {
case 'T':
return parsePortalRowDescribe(r)
case 'n':
return nil, nil, nil
case 'E':
err := parseError(r)
cn.readReadyForQuery()
panic(err)
default:
cn.bad = true
errorf("unexpected Describe response %q", t)
}
panic("not reached")
}

func (cn *conn) readBindResponse() {
t, r := cn.recv1()
switch t {
Expand Down
19 changes: 19 additions & 0 deletions conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"os"
"reflect"
"strings"
"testing"
"time"
)
Expand All @@ -15,6 +16,17 @@ type Fatalistic interface {
Fatal(args ...interface{})
}

func forceBinaryParameters() bool {
bp := os.Getenv("PQTEST_BINARY_PARAMETERS")
if bp == "yes" {
return true
} else if bp == "" || bp == "no" {
return false
} else {
panic("unexpected value for PQTEST_BINARY_PARAMETERS")
}
}

func openTestConnConninfo(conninfo string) (*sql.DB, error) {
defaultTo := func(envvar string, value string) {
if os.Getenv(envvar) == "" {
Expand All @@ -24,6 +36,13 @@ func openTestConnConninfo(conninfo string) (*sql.DB, error) {
defaultTo("PGDATABASE", "pqgotest")
defaultTo("PGSSLMODE", "disable")
defaultTo("PGCONNECT_TIMEOUT", "20")

if forceBinaryParameters() &&
!strings.HasPrefix(conninfo, "postgres://") &&
!strings.HasPrefix(conninfo, "postgresql://") {
conninfo = conninfo + " binary_parameters=yes"
}

return sql.Open("postgres", conninfo)
}

Expand Down
10 changes: 10 additions & 0 deletions encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,16 @@ import (
"github.com/lib/pq/oid"
)

func binaryEncode(parameterStatus *parameterStatus, x interface{}) []byte {
switch v := x.(type) {
case []byte:
return v
default:
return encode(parameterStatus, x, oid.T_unknown)
}
panic("not reached")
}

func encode(parameterStatus *parameterStatus, x interface{}, pgtypOid oid.Oid) []byte {
switch v := x.(type) {
case int64:
Expand Down
Loading

0 comments on commit 2997d16

Please sign in to comment.