Permalink
Browse files

fix: improve multihost connection for preferSlave case (verify expire…

…d hosts before connecting to cached master) (#844)

The notable behavior change is related with targetServerType=preferSlave.
In that case, it requires to check all the possible slaves first to find the right server.
The state of some servers might be expired due to hostRecheckSeconds,
so pgjdbc should try connecting to all the "known slaves"
and "expired" servers in attempt to find a slave among them, and only then
it should try connecting to master.

That logic was not there, and preferSlave could easily connect to master even
at times when a slave was available.

Note: hostRecheckSeconds is still in place (default 10 seconds), and pgdjbc
trusts that status.

However, there's an exception: in case no servers match the criteria (e.g.
all the servers are marked with "can't connect" in cache), then pgjdbc
would still try connecting to all the hosts in the connection URL in order.
  • Loading branch information...
ChenHuajun authored and vlsi committed Jan 3, 2018
1 parent 1361c52 commit c6fec34661b51cd9cbee157d0c334a3ab29859e8
@@ -5,6 +5,8 @@ dist: trusty
before_script:
- test $(grep "after_n_builds" codecov.yml | tr -d '[:space:]' | cut -d":" -f2) -eq $(grep -e "COVERAGE=Y$" .travis.yml | wc -l) || exit 1
- export PG_DATADIR="/etc/postgresql/${PG_VERSION}/main"
- export PG_SLAVE1_DATADIR="/etc/postgresql/${PG_VERSION}/slave1"
- export PG_SLAVE2_DATADIR="/etc/postgresql/${PG_VERSION}/slave2"
- ./.travis/travis_install_postgres.sh
- test "x$XA" == 'x' || ./.travis/travis_configure_xa.sh
- test "x$REPLICATION" == 'x' || ./.travis/travis_configure_replication.sh
@@ -13,6 +15,7 @@ before_script:
- test "x$PG_VERSION" = 'xHEAD' || psql -U postgres -c "create user test with password 'test';"
- test "x$REPLICATION" == 'x' || psql -U postgres -c "alter user test with replication;"
- psql -c 'create database test owner test;' -U postgres
- test "x$REPLICATION" == 'x' || ./.travis/travis_create_slaves.sh
- echo "MAVEN_OPTS='-Xmx1g -Dgpg.skip=true'" > ~/.mavenrc
- test "x$PG_VERSION" == 'x' || test "x$NO_HSTORE" == 'xY' || psql test -c 'CREATE EXTENSION hstore;' -U postgres
- test "x$PG_VERSION" == 'x' || test "x$CREATE_PLPGSQL" == 'x' || createlang -U postgres plpgsql test
@@ -0,0 +1,76 @@
#!/usr/bin/env bash
set -x -e

if [ -z "$PG_VERSION" ]
then
echo "env PG_VERSION not define";
elif [ "x${PG_VERSION}" = "xHEAD" ]
then
PG_CTL="/usr/local/pgsql/bin/pg_ctl"
PG_BASEBACKUP="/usr/local/pgsql/bin/pg_basebackup"
else
PG_CTL="/usr/lib/postgresql/${PG_VERSION}/bin/pg_ctl"
PG_BASEBACKUP="/usr/lib/postgresql/${PG_VERSION}/bin/pg_basebackup"
fi

#Create Slave 1
sudo rm -rf ${PG_SLAVE1_DATADIR}
sudo mkdir -p ${PG_SLAVE1_DATADIR}
sudo chmod 700 ${PG_SLAVE1_DATADIR}
sudo chown -R postgres:postgres ${PG_SLAVE1_DATADIR}

sudo su postgres -c "$PG_BASEBACKUP -Upostgres -D ${PG_SLAVE1_DATADIR} -X stream"

sudo su postgres -c "echo standby_mode = \'on\' >${PG_SLAVE1_DATADIR}/recovery.conf"
sudo su postgres -c "echo primary_conninfo = \'host=localhost port=5432 user=test password=test\' >>${PG_SLAVE1_DATADIR}/recovery.conf"
sudo su postgres -c "echo recovery_target_timeline = \'latest\' >>${PG_SLAVE1_DATADIR}/recovery.conf"

if [[ "x${PG_VERSION}" != "xHEAD" ]]
then
sudo su postgres -c "echo 'local all all trust' > ${PG_SLAVE1_DATADIR}/pg_hba.conf"
sudo su postgres -c "echo 'host all all 127.0.0.1/32 trust' >> ${PG_SLAVE1_DATADIR}/pg_hba.conf"
sudo su postgres -c "echo 'host all all ::1/128 trust' >> ${PG_SLAVE1_DATADIR}/pg_hba.conf"

sudo su postgres -c "touch ${PG_SLAVE1_DATADIR}/pg_ident.conf"

sudo su postgres -c "cp -f ${PG_DATADIR}/postgresql.conf ${PG_SLAVE1_DATADIR}/postgresql.conf"
sudo sed -i -e "/^[ \t]*data_directory.*/d" ${PG_SLAVE1_DATADIR}/postgresql.conf
sudo sed -i -e "/^[ \t]*hba_file.*/d" ${PG_SLAVE1_DATADIR}/postgresql.conf
sudo sed -i -e "/^[ \t]*ident_file.*/d" ${PG_SLAVE1_DATADIR}/postgresql.conf
sudo sed -i -e "s/^#\?hot_standby.*/hot_standby = on/g" ${PG_SLAVE1_DATADIR}/postgresql.conf
fi

#Start Slave 1
sudo su postgres -c "$PG_CTL -D ${PG_SLAVE1_DATADIR} -w -t 300 -c -o '-p 5433' -l /tmp/postgres_slave1.log start" || (sudo tail /tmp/postgres_slave1.log ; exit 1)
sudo tail /tmp/postgres_slave1.log

#Create Slave 2
sudo rm -rf ${PG_SLAVE2_DATADIR}
sudo mkdir -p ${PG_SLAVE2_DATADIR}
sudo chmod 700 ${PG_SLAVE2_DATADIR}
sudo chown -R postgres:postgres ${PG_SLAVE2_DATADIR}

sudo su postgres -c "$PG_BASEBACKUP -Upostgres -D ${PG_SLAVE2_DATADIR} -X stream"

sudo su postgres -c "echo standby_mode = \'on\' >${PG_SLAVE2_DATADIR}/recovery.conf"
sudo su postgres -c "echo primary_conninfo = \'host=localhost port=5432 user=test password=test\' >>${PG_SLAVE2_DATADIR}/recovery.conf"
sudo su postgres -c "echo recovery_target_timeline = \'latest\' >>${PG_SLAVE2_DATADIR}/recovery.conf"

if [[ "x${PG_VERSION}" != "xHEAD" ]]
then
sudo su postgres -c "echo 'local all all trust' > ${PG_SLAVE2_DATADIR}/pg_hba.conf"
sudo su postgres -c "echo 'host all all 127.0.0.1/32 trust' >> ${PG_SLAVE2_DATADIR}/pg_hba.conf"
sudo su postgres -c "echo 'host all all ::1/128 trust' >> ${PG_SLAVE2_DATADIR}/pg_hba.conf"

sudo su postgres -c "touch ${PG_SLAVE2_DATADIR}/pg_ident.conf"

sudo su postgres -c "cp -f ${PG_DATADIR}/postgresql.conf ${PG_SLAVE2_DATADIR}/postgresql.conf"
sudo sed -i -e "/^[ \t]*data_directory.*/d" ${PG_SLAVE2_DATADIR}/postgresql.conf
sudo sed -i -e "/^[ \t]*hba_file.*/d" ${PG_SLAVE2_DATADIR}/postgresql.conf
sudo sed -i -e "/^[ \t]*ident_file.*/d" ${PG_SLAVE2_DATADIR}/postgresql.conf
sudo sed -i -e "s/^#\?hot_standby.*/hot_standby = on/g" ${PG_SLAVE2_DATADIR}/postgresql.conf
fi

#Start Slave 2
sudo su postgres -c "$PG_CTL -D ${PG_SLAVE2_DATADIR} -w -t 300 -c -o '-p 5434' -l /tmp/postgres_slave2.log start" || (sudo tail /tmp/postgres_slave2.log ; exit 1)
sudo tail /tmp/postgres_slave2.log
@@ -141,7 +141,7 @@ In addition to the standard connection parameters the driver supports a number o
| assumeMinServerVersion | String | null | Assume the server is at least that version |
| currentSchema | String | null | Specify the schema to be set in the search-path |
| targetServerType | String | any | Specifies what kind of server to connect, possible values: any, master, slave, preferSlave |
| hostRecheckSeconds | Integer | 10 | Specifies period (seconds) after host statuses are checked again in case they have changed |
| hostRecheckSeconds | Integer | 10 | Specifies period (seconds) after which the host status is checked again in case it has changed |
| loadBalanceHosts | Boolean | false | If disabled hosts are connected in the given order. If enabled hosts are chosen randomly from the set of suitable candidates |
| socketFactory | String | null | Specify a socket factory for socket creation |
| socketFactoryArg | String | null | Argument forwarded to constructor of SocketFactory class. |
@@ -5,6 +5,10 @@

server=localhost
port=5432
slaveServer=localhost
slavePort=5433
slaveServer2=localhost
slavePort2=5434
database=test
username=test
password=test
@@ -359,7 +359,7 @@
"If disabled hosts are connected in the given order. If enabled hosts are chosen randomly from the set of suitable candidates"),

HOST_RECHECK_SECONDS("hostRecheckSeconds", "10",
"Specifies period (seconds) after host statuses are checked again in case they have changed"),
"Specifies period (seconds) after which the host status is checked again in case it has changed"),

/**
* Specifies which mode is used to execute queries to database: simple means ('Q' execute, no parse, no bind, text mode only),
@@ -15,6 +15,7 @@
import org.postgresql.core.SocketFactoryFactory;
import org.postgresql.core.Utils;
import org.postgresql.core.Version;
import org.postgresql.hostchooser.CandidateHost;
import org.postgresql.hostchooser.GlobalHostStatusTracker;
import org.postgresql.hostchooser.HostChooser;
import org.postgresql.hostchooser.HostChooserFactory;
@@ -32,11 +33,14 @@
import java.net.ConnectException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.TimeZone;
import java.util.logging.Level;
import java.util.logging.LogRecord;
import java.util.logging.Logger;

import javax.net.SocketFactory;
@@ -121,15 +125,30 @@ public QueryExecutor openConnectionImpl(HostSpec[] hostSpecs, String user, Strin
PSQLState.CONNECTION_UNABLE_TO_CONNECT);
}

long lastKnownTime = System.currentTimeMillis() - PGProperty.HOST_RECHECK_SECONDS.getInt(info) * 1000;

SocketFactory socketFactory = SocketFactoryFactory.getSocketFactory(info);

HostChooser hostChooser =
HostChooserFactory.createHostChooser(hostSpecs, targetServerType, info);
Iterator<HostSpec> hostIter = hostChooser.iterator();
Iterator<CandidateHost> hostIter = hostChooser.iterator();
Map<HostSpec, HostStatus> knownStates = new HashMap<HostSpec, HostStatus>();
while (hostIter.hasNext()) {
HostSpec hostSpec = hostIter.next();
CandidateHost candidateHost = hostIter.next();
HostSpec hostSpec = candidateHost.hostSpec;
LOGGER.log(Level.FINE, "Trying to establish a protocol version 3 connection to {0}", hostSpec);

// Note: per-connect-attempt status map is used here instead of GlobalHostStatusTracker
// for the case when "no good hosts" match (e.g. all the hosts are known as "connectfail")
// In that case, the system tries to connect to each host in order, thus it should not look into
// GlobalHostStatusTracker
HostStatus knownStatus = knownStates.get(hostSpec);
if (knownStatus != null && !candidateHost.targetServerType.allowConnectingTo(knownStatus)) {
LOGGER.log(Level.FINER, "Known status of host {0} is {1}, and required status was {2}. Will try next host",
new Object[]{hostSpec, knownStatus, candidateHost.targetServerType});
continue;
}

//
// Establish a connection.
//
@@ -195,19 +214,14 @@ public QueryExecutor openConnectionImpl(HostSpec[] hostSpecs, String user, Strin

// Check Master or Slave
HostStatus hostStatus = HostStatus.ConnectOK;
if (targetServerType != HostRequirement.any) {
if (candidateHost.targetServerType != HostRequirement.any) {
hostStatus = isMaster(queryExecutor) ? HostStatus.Master : HostStatus.Slave;
}
GlobalHostStatusTracker.reportHostStatus(hostSpec, hostStatus);
if (!targetServerType.allowConnectingTo(hostStatus)) {
knownStates.put(hostSpec, hostStatus);
if (!candidateHost.targetServerType.allowConnectingTo(hostStatus)) {
queryExecutor.close();
if (hostIter.hasNext()) {
// still more addresses to try
continue;
}
throw new PSQLException(GT
.tr("Could not find a server with specified targetServerType: {0}", targetServerType),
PSQLState.CONNECTION_UNABLE_TO_CONNECT);
continue;
}

runInitialQueries(queryExecutor, info);
@@ -216,14 +230,16 @@ public QueryExecutor openConnectionImpl(HostSpec[] hostSpecs, String user, Strin
return queryExecutor;
} catch (UnsupportedProtocolException upe) {
// Swallow this and return null so ConnectionFactory tries the next protocol.
LOGGER.log(Level.SEVERE, "Protocol not supported, abandoning connection.");
LOGGER.log(Level.SEVERE, "Protocol not supported, abandoning connection.", upe);
closeStream(newStream);
return null;
} catch (ConnectException cex) {
// Added by Peter Mount <peter@retep.org.uk>
// ConnectException is thrown when the connection cannot be made.
// we trap this an return a more meaningful message for the end user
GlobalHostStatusTracker.reportHostStatus(hostSpec, HostStatus.ConnectFail);
knownStates.put(hostSpec, HostStatus.ConnectFail);
log(Level.WARNING, "ConnectException occurred while connecting to {0}", cex, hostSpec);
if (hostIter.hasNext()) {
// still more addresses to try
continue;
@@ -234,6 +250,8 @@ public QueryExecutor openConnectionImpl(HostSpec[] hostSpecs, String user, Strin
} catch (IOException ioe) {
closeStream(newStream);
GlobalHostStatusTracker.reportHostStatus(hostSpec, HostStatus.ConnectFail);
knownStates.put(hostSpec, HostStatus.ConnectFail);
log(Level.WARNING, "IOException occurred while connecting to {0}", ioe, hostSpec);
if (hostIter.hasNext()) {
// still more addresses to try
continue;
@@ -242,14 +260,18 @@ public QueryExecutor openConnectionImpl(HostSpec[] hostSpecs, String user, Strin
PSQLState.CONNECTION_UNABLE_TO_CONNECT, ioe);
} catch (SQLException se) {
closeStream(newStream);
log(Level.WARNING, "SQLException occurred while connecting to {0}", se, hostSpec);
GlobalHostStatusTracker.reportHostStatus(hostSpec, HostStatus.ConnectFail);
knownStates.put(hostSpec, HostStatus.ConnectFail);
if (hostIter.hasNext()) {
// still more addresses to try
continue;
}
throw se;
}
}
throw new PSQLException(GT.tr("The connection url is invalid."),
throw new PSQLException(GT
.tr("Could not find a server with specified targetServerType: {0}", targetServerType),
PSQLState.CONNECTION_UNABLE_TO_CONNECT);
}

@@ -287,6 +309,16 @@ public QueryExecutor openConnectionImpl(HostSpec[] hostSpecs, String user, Strin
return paramList;
}

private static void log(Level level, String msg, Throwable thrown, Object... params) {
if (!LOGGER.isLoggable(level)) {
return;
}
LogRecord rec = new LogRecord(level, msg);
rec.setParameters(params);
rec.setThrown(thrown);
LOGGER.log(rec);
}

/**
* Convert Java time zone to postgres time zone. All others stay the same except that GMT+nn
* changes to GMT-nn and vise versa.
@@ -0,0 +1,22 @@
/*
* Copyright (c) 2017, PostgreSQL Global Development Group
* See the LICENSE file in the project root for more information.
*/

package org.postgresql.hostchooser;

import org.postgresql.util.HostSpec;


/**
* Candidate host to be connected.
*/
public class CandidateHost {
public final HostSpec hostSpec;
public final HostRequirement targetServerType;

public CandidateHost(HostSpec hostSpec, HostRequirement targetServerType) {
this.hostSpec = hostSpec;
this.targetServerType = targetServerType;
}
}
@@ -30,23 +30,16 @@
public static void reportHostStatus(HostSpec hostSpec, HostStatus hostStatus) {
long now = currentTimeMillis();
synchronized (hostStatusMap) {
HostSpecStatus oldStatus = hostStatusMap.get(hostSpec);
if (oldStatus == null || updateStatusFromTo(oldStatus.status, hostStatus)) {
hostStatusMap.put(hostSpec, new HostSpecStatus(hostSpec, hostStatus, now));
HostSpecStatus hostSpecStatus = hostStatusMap.get(hostSpec);
if (hostSpecStatus == null) {
hostSpecStatus = new HostSpecStatus(hostSpec);
hostStatusMap.put(hostSpec, hostSpecStatus);
}
hostSpecStatus.status = hostStatus;
hostSpecStatus.lastUpdated = now;
}
}

private static boolean updateStatusFromTo(HostStatus oldStatus, HostStatus newStatus) {
if (oldStatus == null) {
return true;
}
if (newStatus == HostStatus.ConnectOK) {
return oldStatus != HostStatus.Master && oldStatus != HostStatus.Slave;
}
return true;
}

/**
* Returns a list of candidate hosts that have the required targetServerType.
*
@@ -55,38 +48,31 @@ private static boolean updateStatusFromTo(HostStatus oldStatus, HostStatus newSt
* @param hostRecheckMillis How stale information is allowed.
* @return candidate hosts to connect to.
*/
static List<HostSpecStatus> getCandidateHosts(HostSpec[] hostSpecs,
static List<HostSpec> getCandidateHosts(HostSpec[] hostSpecs,
HostRequirement targetServerType, long hostRecheckMillis) {
List<HostSpecStatus> candidates = new ArrayList<HostSpecStatus>(hostSpecs.length);
List<HostSpec> candidates = new ArrayList<HostSpec>(hostSpecs.length);
long latestAllowedUpdate = currentTimeMillis() - hostRecheckMillis;
synchronized (hostStatusMap) {
for (HostSpec hostSpec : hostSpecs) {
HostSpecStatus hostInfo = hostStatusMap.get(hostSpec);
// return null status wrapper if if the current value is not known or is too old
if (hostInfo == null || hostInfo.lastUpdated < latestAllowedUpdate) {
hostInfo = new HostSpecStatus(hostSpec, null, Long.MAX_VALUE);
}
// candidates are nodes we do not know about and the nodes with correct type
if (hostInfo.status == null || targetServerType.allowConnectingTo(hostInfo.status)) {
candidates.add(hostInfo);
if (hostInfo == null
|| hostInfo.lastUpdated < latestAllowedUpdate
|| targetServerType.allowConnectingTo(hostInfo.status)) {
candidates.add(hostSpec);
}
}
}
return candidates;
}

/**
* Immutable structure of known status of one HostSpec.
*/
static class HostSpecStatus {
final HostSpec host;
final HostStatus status;
final long lastUpdated;
HostStatus status;
long lastUpdated;

HostSpecStatus(HostSpec host, HostStatus hostStatus, long lastUpdated) {
HostSpecStatus(HostSpec host) {
this.host = host;
this.status = hostStatus;
this.lastUpdated = lastUpdated;
}

@Override
@@ -5,19 +5,17 @@

package org.postgresql.hostchooser;

import org.postgresql.util.HostSpec;

import java.util.Iterator;

/**
* Lists connections in preferred order.
*/
public interface HostChooser extends Iterable<HostSpec> {
public interface HostChooser extends Iterable<CandidateHost> {
/**
* Lists connection hosts in preferred order.
*
* @return connection hosts in preferred order.
*/
@Override
Iterator<HostSpec> iterator();
Iterator<CandidateHost> iterator();
}
@@ -17,7 +17,7 @@
public static HostChooser createHostChooser(HostSpec[] hostSpecs,
HostRequirement targetServerType, Properties info) {
if (hostSpecs.length == 1) {
return new SingleHostChooser(hostSpecs[0]);
return new SingleHostChooser(hostSpecs[0], targetServerType);
}
return new MultiHostChooser(hostSpecs, targetServerType, info);
}
Oops, something went wrong.

0 comments on commit c6fec34

Please sign in to comment.