Skip to content

Commit

Permalink
Multihost support for High Availability setups (#158)
Browse files Browse the repository at this point in the history
* Allow multiple hosts
* Add support for target_session_attrs = 'read-write'
* Support multi host in all host/port parameters

Co-authored-by: Minigugus <43109623+Minigugus@users.noreply.github.com>
  • Loading branch information
porsager and Minigugus committed Mar 23, 2021
1 parent f90eb2c commit 16bc7db
Show file tree
Hide file tree
Showing 7 changed files with 176 additions and 57 deletions.
37 changes: 25 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ You can use either a `postgres://` url connection string or the options to defin

```js
const sql = postgres('postgres://username:password@host:port/database', {
host : '', // Postgres ip address or domain name
port : 5432, // Postgres server port
host : '', // Postgres ip address[s] or domain name[s]
port : 5432, // Postgres server port[s]
path : '', // unix socket path (usually '/tmp')
database : '', // Name of database to connect to
username : '', // Username of database user
Expand All @@ -68,10 +68,15 @@ const sql = postgres('postgres://username:password@host:port/database', {
connection : {
application_name : 'postgres.js', // Default application_name
... // Other connection parameters
}
},
target_session_attrs : null // Use 'read-write' with multiple hosts to
// ensure only connecting to primary
})
```

### SSL
More info for the `ssl` option can be found in the [Node.js docs for tls connect options](https://nodejs.org/dist/latest-v10.x/docs/api/tls.html#tls_new_tls_tlssocket_socket_options).

Although it is [vulnerable to MITM attacks](https://security.stackexchange.com/a/229297/174913), a common configuration for the `ssl` option for some cloud providers like Heroku is to set `rejectUnauthorized` to `false` (if `NODE_ENV` is `production`):

```js
Expand All @@ -83,23 +88,31 @@ const sql =
: postgres();
```

More info for the `ssl` option can be found in the [Node.js docs for tls connect options](https://nodejs.org/dist/latest-v10.x/docs/api/tls.html#tls_new_tls_tlssocket_socket_options).
### Multi host connections - High Availability (HA)

Connection uri strings with multiple hosts works like in [`psql multiple host uris`](https://www.postgresql.org/docs/13/libpq-connect.html#LIBPQ-MULTIPLE-HOSTS)

Connecting to the specified hosts/ports will be tried in order, and on a successfull connection retries will be reset. This ensures that hosts can come up and down seamless to your application.

If you specify `target_session_attrs: 'read-write'` or `PGTARGETSESSIONATTRS=read-write` Postgres.js will only connect to a writeable host allowing for zero down time failovers.

### Environment Variables for Options

It is also possible to connect to the database without a connection string or options, which will read the options from the environment variables in the table below:
It is also possible to connect to the database without a connection string or any options. Postgres.js will fall back to the common environment variables used by `psql` as in the table below:

```js
const sql = postgres()
```

| Option | Environment Variables |
| ---------- | ------------------------ |
| `host` | `PGHOST` |
| `port` | `PGPORT` |
| `database` | `PGDATABASE` |
| `username` | `PGUSERNAME` or `PGUSER` |
| `password` | `PGPASSWORD` |
| Option | Environment Variables |
| ----------------- | ------------------------ |
| `host` | `PGHOST` |
| `port` | `PGPORT` |
| `database` | `PGDATABASE` |
| `username` | `PGUSERNAME` or `PGUSER` |
| `password` | `PGPASSWORD` |
| `idle_timeout` | `PGIDLE_TIMEOUT` |
' `connect_timeout` | `PGCONNECT_TIMEOUT` |

## Query ```sql` ` -> Promise```

Expand Down
81 changes: 65 additions & 16 deletions lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ function Connection(options = {}) {
let ended
let open = false
let ready = false
let write = false
let next = false
let statements = {}
let connect_timer

Expand All @@ -41,7 +43,8 @@ function Connection(options = {}) {
ready,
data,
error,
close
close,
cleanup
})

const backend = Backend({
Expand Down Expand Up @@ -102,7 +105,7 @@ function Connection(options = {}) {
}

function destroy() {
error(errors.connection('CONNECTION_DESTROYED', options))
error(errors.connection('CONNECTION_DESTROYED', options, socket))
socket.destroy()
}

Expand Down Expand Up @@ -150,7 +153,7 @@ function Connection(options = {}) {
}

function connectTimedOut() {
error(errors.connection('CONNECT_TIMEOUT', options))
error(errors.connection('CONNECT_TIMEOUT', options, socket))
socket.destroy()
}

Expand Down Expand Up @@ -210,6 +213,9 @@ function Connection(options = {}) {
idle()

if (!open) {
if (multi())
return

messages.forEach(socket.write)
messages = []
open = true
Expand All @@ -220,6 +226,25 @@ function Connection(options = {}) {
ready && ended && ended()
}

function multi() {
if (next)
return (next = false, true)

if (!write && options.target_session_attrs === 'read-write') {
backend.query = {
origin: '',
result: [],
statement: {},
resolve: ([{ transaction_read_only }]) => transaction_read_only === 'on'
? (next = true, socket.destroy())
: (write = true, socket.success()),
reject: error
}
socket.write(frontend.Query('show transaction_read_only'))
return true
}
}

function data(x) {
buffer = buffer.length === 0
? x
Expand All @@ -237,54 +262,74 @@ function Connection(options = {}) {

function close() {
clearTimeout(connect_timer)
error(errors.connection('CONNECTION_CLOSED', options))
statements = {}
error(errors.connection('CONNECTION_CLOSED', options, socket))
messages = []
open = ready = false
onclose && onclose()
}

function cleanup() {
statements = {}
open = ready = write = false
}

/* c8 ignore next */
return connection
}

function postgresSocket(options, {
error,
close,
cleanup,
data
}) {
let socket
let closed = true
let succeeded = false
let next = null
let buffer
let i = 0

function onclose(err) {
oncleanup()
!succeeded && i < options.host.length
? connect()
: err instanceof Error
? error(err)
: close()
i >= options.host.length && (i = 0)
}

function onclose() {
function oncleanup() {
socket.removeListener('data', data)
socket.removeListener('error', error)
socket.removeListener('close', onclose)
socket.removeListener('error', onclose)
socket.removeListener('connect', ready)
socket.removeListener('secureConnect', ready)
closed = true
close()
cleanup()
}

function connect() {
if (!closed)
return

closed = false
closed = succeeded = false

socket = options.path
? net.connect(options.path)
: net.connect(options.port, options.host)
: net.connect(
x.port = options.port[i],
x.host = options.host[i++]
)

if (!options.ssl)
return attach(socket)

socket.once('connect', () => socket.write(Buffer.from('0000000804d2162f', 'hex')))
socket.once('error', error)
socket.once('connect', () => socket.write(frontend.SSLRequest))
socket.once('error', onclose)
socket.once('close', onclose)
socket.once('data', x => {
socket.removeListener('error', error)
socket.removeListener('error', onclose)
socket.removeListener('close', onclose)
x.toString() === 'S'
? attach(tls.connect(Object.assign({ socket }, ssl(options.ssl))))
Expand All @@ -303,22 +348,26 @@ function postgresSocket(options, {
function attach(x) {
socket = x
socket.on('data', data)
socket.once('error', error)
socket.once('error', onclose)
socket.once('connect', ready)
socket.once('secureConnect', ready)
socket.once('close', onclose)
}

function ready() {
try {
socket.write(frontend.connect(options))
socket.write(frontend.StartupMessage(options))
} catch (e) {
error(e)
socket.end()
}
}

const x = {
success: () => {
succeeded = true
i >= options.host.length && (i = 0)
},
write: x => {
buffer = buffer ? Buffer.concat([buffer, x]) : Buffer.from(x)
if (buffer.length >= 1024)
Expand Down
8 changes: 4 additions & 4 deletions lib/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ module.exports.errors = {
notSupported
}

function connection(x, options) {
function connection(x, options, socket) {
const error = Object.assign(
new Error(('write ' + x + ' ' + (options.path || (options.host + ':' + options.port)))),
new Error(('write ' + x + ' ' + (options.path || (socket.host + ':' + socket.port)))),
{
code: x,
errno: x,
address: options.path || options.host
}, options.path ? {} : { port: options.port }
address: options.path || socket.host
}, options.path ? {} : { port: socket.port }
)
Error.captureStackTrace(error, connection)
return error
Expand Down
8 changes: 5 additions & 3 deletions lib/frontend.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ const execute = Buffer.concat([
bytes.S().end()
])

const SSLRequest = bytes.i32(8).i32(80877103).end(8)

const authNames = {
2 : 'KerberosV5',
3 : 'CleartextPassword',
Expand All @@ -33,9 +35,9 @@ const auths = {
12: SASLFinal
}


module.exports = {
connect,
StartupMessage,
SSLRequest,
auth,
Bind,
Parse,
Expand All @@ -44,7 +46,7 @@ module.exports = {
Execute
}

function connect({ user, database, connection }) {
function StartupMessage({ user, database, connection }) {
return bytes
.inc(4)
.i16(3)
Expand Down
32 changes: 24 additions & 8 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ function Postgres(a, b) {
query.resolve = resolve
query.reject = reject
ended !== null
? reject(errors.connection('CONNECTION_ENDED', options))
? reject(errors.connection('CONNECTION_ENDED', options, options))
: ready
? send(connection, query, xs, args)
: fetchArrayTypes(connection).then(() => send(connection, query, xs, args)).catch(reject)
Expand Down Expand Up @@ -386,8 +386,8 @@ function Postgres(a, b) {
},
options
))
listener = { conn, result: {} };
all.push(conn);
listener = { conn, result: {} }
all.push(conn)
return listener
}

Expand Down Expand Up @@ -539,36 +539,52 @@ function Postgres(a, b) {

function parseOptions(a, b) {
const env = process.env // eslint-disable-line
, url = typeof a === 'string' ? Url.parse(a, true) : { query: {}, pathname: '' }
, o = (typeof a === 'string' ? b : a) || {}
, { url, multihost } = parseUrl(a, env)
, auth = (url.auth || '').split(':')
, host = o.hostname || o.host || url.hostname || env.PGHOST || 'localhost'
, host = o.hostname || o.host || multihost || url.hostname || env.PGHOST || 'localhost'
, port = o.port || url.port || env.PGPORT || 5432
, user = o.user || o.username || auth[0] || env.PGUSERNAME || env.PGUSER || osUsername()

return Object.assign({
host,
port,
host : host.split(',').map(x => x.split(':')[0]),
port : host.split(',').map(x => x.split(':')[1] || port),
path : o.path || host.indexOf('/') > -1 && host + '/.s.PGSQL.' + port,
database : o.database || o.db || (url.pathname || '').slice(1) || env.PGDATABASE || user,
user : user,
pass : o.pass || o.password || auth[1] || env.PGPASSWORD || '',
max : o.max || url.query.max || 10,
types : o.types || {},
ssl : o.ssl || url.sslmode || url.ssl || false,
ssl : o.ssl || url.query.sslmode || url.query.ssl || false,
idle_timeout : o.idle_timeout || url.query.idle_timeout || env.PGIDLE_TIMEOUT || warn(o.timeout),
connect_timeout : o.connect_timeout || url.query.connect_timeout || env.PGCONNECT_TIMEOUT || 30,
no_prepare : o.no_prepare,
onnotice : o.onnotice,
onparameter : o.onparameter,
transform : Object.assign({}, o.transform),
connection : Object.assign({ application_name: 'postgres.js' }, o.connection),
target_session_attrs: o.target_session_attrs || url.query.target_session_attrs || env.PGTARGETSESSIONATTRS,
debug : o.debug
},
mergeUserTypes(o.types)
)
}

function parseUrl(url) {
if (typeof url !== 'string')
return { url: { query: {} } }

let host = url
host = host.slice(host.indexOf('://') + 3)
host = host.split(/[?/]/)[0]
host = host.slice(host.indexOf('@') + 1)

return {
url: Url.parse(url.replace(host, host.split(',')[0]), true),
multihost: host.indexOf(',') > -1 && host
}
}

function warn(x) {
typeof x !== 'undefined' && console.log('The timeout option is deprecated, use idle_timeout instead') // eslint-disable-line
return x
Expand Down

0 comments on commit 16bc7db

Please sign in to comment.