Skip to content

Commit

Permalink
[#7794] YSQL: Add option to forward pggate RPCs to local tserver
Browse files Browse the repository at this point in the history
Summary:
Each postgres process connects to all the Tservers in the cluster which
leads to a quadratic growth in the number of connections in the cluster.
To reduce the number of connections, we can forward all the rpcs to the
local Tserver which can then forward the rpc to the appropriate Tserver.

This feature involves 2 changes:
1. Forward all the rpcs from pggate to the local tserver.
2. Ensure the local tserver sends the rpcs to the appropriate tserver.

First change involves creating a RemoteTabletServer for the local tserver
and using that for all the PGSQL read/write rpcs. This needs the postgres
process to be passed the UUID of the tserver process so that it can identify
the HostPort of the local tserver from the list of all tservers.

The second change involves creating the ForwardRpc class which forwards
the read/write request protobufs to the appropriate tserver by using the
TabletInvoker. This class is invoked by the TabletServiceImpl on realizing that
the request is not meant for itself.

This feature is controlled by the GFLAG: ysql_forward_rpcs_to_local_tserver
which allows us to completely enable/disable the forwarding of rpcs to the
local tserver.

Test Plan: All java tests for pggate.

Reviewers: amitanand, rskannan, rsami, timur, dmitry, mihnea

Reviewed By: mihnea

Subscribers: dmitry, zyu, bogdan, yql

Differential Revision: https://phabricator.dev.yugabyte.com/D10274
  • Loading branch information
Sudheer committed May 21, 2021
1 parent 3933cbd commit 6077bc8
Show file tree
Hide file tree
Showing 20 changed files with 686 additions and 9 deletions.
111 changes: 111 additions & 0 deletions java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgRpcForwarding.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright (c) YugaByte, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//

package org.yb.pgsql;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yb.util.YBTestRunnerNonTsanOnly;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URL;
import java.sql.*;
import java.util.List;
import java.util.Map;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import com.google.common.net.HostAndPort;
import org.yb.minicluster.MiniYBDaemon;
import org.postgresql.util.PSQLException;
import static org.yb.AssertionWrappers.*;

@RunWith(value=YBTestRunnerNonTsanOnly.class)
public class TestPgRpcForwarding extends BasePgSQLTest {
private static final Logger LOG = LoggerFactory.getLogger(TestPgRpcForwarding.class);
private static final int SLEEP_DURATION = 5000;

@Override
protected Map<String, String> getTServerFlags() {
Map<String, String> flags = super.getTServerFlags();
flags.put("ysql_forward_rpcs_to_local_tserver", "true");
return flags;
}

private int getForwardRpcCount() throws Exception {
Map<HostAndPort, MiniYBDaemon> tservers = miniCluster.getTabletServers();
int count = 0;

for (Map.Entry<HostAndPort,MiniYBDaemon> entry : tservers.entrySet()) {
HostAndPort hostPort = entry.getKey();
int port = tservers.get(hostPort).getWebPort();

// Call the prometheus-metrics endpoint and grep for the multi touch hit metric in the list.
URL url = null;
BufferedReader br = null;
try {
url = new URL(String.format("http://%s:%d/prometheus-metrics", hostPort.getHost(), port));
br = new BufferedReader(new InputStreamReader(url.openStream()));;
} catch (Exception ex) {
LOG.error("Encountered error for reading metrics endpoint" + ex);
throw new InternalError(ex.getMessage());
}
String line = null;
while ((line = br.readLine()) != null) {
if (line.contains("TabletServerForwardService_Write_count") ||
line.contains("TabletServerForwardService_Read_count")) {
String inp[] = line.split(" ");
String x = inp[inp.length - 2].trim();
int val = Integer.parseInt(x);
count += val;
}
}
}
LOG.info("Forward service count is " + count);
return count;

}

@Test
public void testRpcForwarding() throws Exception {
int numRows = 100;
try (Statement statement = connection.createStatement()) {
statement.execute("create table t(a int primary key, b int)");

int count1 = getForwardRpcCount();
for (int i = 1; i <= numRows; ++i) {
statement.execute("insert into t values(" + i + "," + i + ")");
}
int count2 = getForwardRpcCount();
assertTrue(count2 > count1);

for (int i = 1; i <= numRows; ++i) {
ResultSet rs = statement.executeQuery(String.format("select * from t where a=%d", i));
while (rs.next()) {
int a = rs.getInt("a");
int b = rs.getInt("b");
assertTrue(a == b);
}
}
int count3 = getForwardRpcCount();
assertTrue(count3 > count2);
} catch (PSQLException ex) {
LOG.error("Unexpected exception:", ex);
throw ex;
}
}
}
2 changes: 2 additions & 0 deletions src/yb/client/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ set(CLIENT_SRCS
client_utils.cc
error.cc
error_collector.cc
forward_rpc.cc
in_flight_op.cc
meta_cache.cc
meta_data_cache.cc
Expand Down Expand Up @@ -72,6 +73,7 @@ set(CLIENT_LIBS
server_process
tserver_proto
tserver_service_proto
tserver_forward_service_proto
tserver_util
yb_util
gutil
Expand Down
15 changes: 8 additions & 7 deletions src/yb/client/async_rpc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ DEFINE_bool(detect_duplicates_for_retryable_requests, true,
"Enable tracking of write requests that prevents the same write from being applied "
"twice.");

DEFINE_bool(ysql_forward_rpcs_to_local_tserver, false,
"When true, forward the PGSQL rpcs to the local tServer.");


DEFINE_CAPABILITY(PickReadTimeAtTabletServer, 0x8284d67b);

using namespace std::placeholders;
Expand Down Expand Up @@ -306,7 +310,6 @@ void AsyncRpc::SendRpcToTserver(int attempt_num) {
if (async_rpc_metrics_) {
async_rpc_metrics_->time_to_send->Increment(ToMicroseconds(end_time - start_));
}

CallRemoteMethod();
}

Expand Down Expand Up @@ -491,9 +494,8 @@ void WriteRpc::CallRemoteMethod() {
TRACE_TO(trace, "SendRpcToTserver");
ADOPT_TRACE(trace.get());

tablet_invoker_.proxy()->WriteAsync(
req_, &resp_, PrepareController(),
std::bind(&WriteRpc::Finished, this, Status::OK()));
tablet_invoker_.WriteAsync(req_, &resp_, PrepareController(),
std::bind(&WriteRpc::Finished, this, Status::OK()));
TRACE_TO(trace, "RpcDispatched Asynchronously");
}

Expand Down Expand Up @@ -710,9 +712,8 @@ void ReadRpc::CallRemoteMethod() {
TRACE_TO(trace, "SendRpcToTserver");
ADOPT_TRACE(trace.get());

tablet_invoker_.proxy()->ReadAsync(
req_, &resp_, PrepareController(),
std::bind(&ReadRpc::Finished, this, Status::OK()));
tablet_invoker_.ReadAsync(req_, &resp_, PrepareController(),
std::bind(&ReadRpc::Finished, this, Status::OK()));
TRACE_TO(trace, "RpcDispatched Asynchronously");
}

Expand Down
4 changes: 4 additions & 0 deletions src/yb/client/async_rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ class AsyncRpc : public rpc::Rpc, public TabletRpc {
// Is this a local call?
bool IsLocalCall() const;

// Creates the Local node tablet invoker or remote tablet invoker based on the GFLAG
// 'FLAGS_ysql_forward_rpcs_to_local_tserver'.
TabletInvoker *GetTabletInvoker(AsyncRpcData *data, YBConsistencyLevel yb_consistency_level);

// Pointer back to the batcher. Processes the write response when it
// completes, regardless of success or failure.
scoped_refptr<Batcher> batcher_;
Expand Down
6 changes: 6 additions & 0 deletions src/yb/client/client-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,12 @@ class YBClient::Data {

std::atomic<int> tserver_count_cached_{0};

// The proxy for the node local tablet server.
std::shared_ptr<tserver::TabletServerForwardServiceProxy> node_local_forward_proxy_;

// The host port of the node local tserver.
HostPort node_local_tserver_host_port_;

private:
CHECKED_STATUS FlushTablesHelper(YBClient* client,
const CoarseTimePoint deadline,
Expand Down
22 changes: 22 additions & 0 deletions src/yb/client/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ using yb::master::GetCDCStreamRequestPB;
using yb::master::GetCDCStreamResponsePB;
using yb::master::ListCDCStreamsRequestPB;
using yb::master::ListCDCStreamsResponsePB;
using yb::master::TSInfoPB;
using yb::rpc::Messenger;
using yb::rpc::MessengerBuilder;
using yb::rpc::RpcController;
Expand Down Expand Up @@ -1474,6 +1475,27 @@ void YBClient::SetLocalTabletServer(const string& ts_uuid,
data_->meta_cache_->SetLocalTabletServer(ts_uuid, proxy, local_tserver);
}

internal::RemoteTabletServer* YBClient::GetLocalTabletServer() {
return data_->meta_cache_->local_tserver();
}

void YBClient::SetNodeLocalForwardProxy(
const shared_ptr<tserver::TabletServerForwardServiceProxy>& proxy) {
data_->node_local_forward_proxy_ = proxy;
}

std::shared_ptr<tserver::TabletServerForwardServiceProxy>& YBClient::GetNodeLocalForwardProxy() {
return data_->node_local_forward_proxy_;
}

void YBClient::SetNodeLocalTServerHostPort(const ::yb::HostPort& hostport) {
data_->node_local_tserver_host_port_ = hostport;
}

const ::yb::HostPort& YBClient::GetNodeLocalTServerHostPort() {
return data_->node_local_tserver_host_port_;
}

Result<bool> YBClient::IsLoadBalanced(uint32_t num_servers) {
IsLoadBalancedRequestPB req;
IsLoadBalancedResponsePB resp;
Expand Down
17 changes: 17 additions & 0 deletions src/yb/client/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ class TabletLocationsPB;
namespace tserver {
class LocalTabletServer;
class TabletServerServiceProxy;
class TabletServerForwardServiceProxy;
}

namespace client {
Expand Down Expand Up @@ -562,6 +563,22 @@ class YBClient {
const std::shared_ptr<tserver::TabletServerServiceProxy>& proxy,
const tserver::LocalTabletServer* local_tserver);

internal::RemoteTabletServer* GetLocalTabletServer();

// Sets the node local forward service proxy. This proxy is used to forward the rpcs to the
// appropriate tablet server.
void SetNodeLocalForwardProxy(
const std::shared_ptr<tserver::TabletServerForwardServiceProxy>& proxy);

// Returns the node local forward service proxy.
std::shared_ptr<tserver::TabletServerForwardServiceProxy>& GetNodeLocalForwardProxy();

// Sets the host port of the node local tserver.
void SetNodeLocalTServerHostPort(const ::yb::HostPort& hostport);

// Returns the host port of the node local tserver.
const ::yb::HostPort& GetNodeLocalTServerHostPort();

// List only those tables whose names pass a substring match on 'filter'.
//
// 'tables' is appended to only on success.
Expand Down

0 comments on commit 6077bc8

Please sign in to comment.