-
Notifications
You must be signed in to change notification settings - Fork 2
/
004-HBASE-25336-2.4.12.patch
342 lines (320 loc) · 16.6 KB
/
004-HBASE-25336-2.4.12.patch
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
From e04956f7bb5d95a54612a99905ee2d8e7f0de23a Mon Sep 17 00:00:00 2001
From: Duo Zhang <zhangduo@apache.org>
Date: Mon, 7 Dec 2020 21:49:04 +0800
Subject: [PATCH] HBASE-25336 Use Address instead of InetSocketAddress in
RpcClient implementation (#2716)
Signed-off-by: Guanghao Zhang <zghao@apache.org>
(cherry picked from commit f8134795109bc380b53ec814561e1abdb56b2b58)
---
.../hadoop/hbase/ipc/AbstractRpcClient.java | 60 ++++---------------
.../hbase/ipc/BlockingRpcConnection.java | 25 ++------
.../hadoop/hbase/ipc/NettyRpcConnection.java | 27 ++-------
.../hadoop/hbase/ipc/RpcConnection.java | 24 +++++++-
4 files changed, 39 insertions(+), 97 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
index e4f0a7a36f4..3c41aadc852 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
@@ -22,9 +22,7 @@ import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;
import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException;
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.net.SocketAddress;
-import java.net.UnknownHostException;
import java.util.Collection;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -320,7 +318,7 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
* @return A pair with the Message response and the Cell data (if any).
*/
private Message callBlockingMethod(Descriptors.MethodDescriptor md, HBaseRpcController hrc,
- Message param, Message returnType, final User ticket, final InetSocketAddress isa)
+ Message param, Message returnType, final User ticket, final Address isa)
throws ServiceException {
BlockingRpcCallback<Message> done = new BlockingRpcCallback<>();
callMethod(md, hrc, param, returnType, ticket, isa, done);
@@ -392,7 +390,7 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc,
final Message param, Message returnType, final User ticket,
- final InetSocketAddress inetAddr, final RpcCallback<Message> callback) {
+ final Address addr, final RpcCallback<Message> callback) {
final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
cs.setStartTime(EnvironmentEdgeManager.currentTime());
@@ -406,7 +404,6 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
cs.setNumActionsPerServer(numActions);
}
- final Address addr = Address.fromSocketAddress(inetAddr);
final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr);
Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType,
hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback<Call>() {
@@ -522,13 +519,6 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
protected final Address addr;
- // We cache the resolved InetSocketAddress for the channel so we do not do a DNS lookup
- // per method call on the channel. If the remote target is removed or reprovisioned and
- // its identity changes a new channel with a newly resolved InetSocketAddress will be
- // created as part of retry, so caching here is fine.
- // Normally, caching an InetSocketAddress is an anti-pattern.
- protected InetSocketAddress isa;
-
protected final AbstractRpcClient<?> rpcClient;
protected final User ticket;
@@ -578,23 +568,9 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
@Override
public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller,
- Message param, Message returnType) throws ServiceException {
- // Look up remote address upon first call
- if (isa == null) {
- if (this.rpcClient.metrics != null) {
- this.rpcClient.metrics.incrNsLookups();
- }
- isa = Address.toSocketAddress(addr);
- if (isa.isUnresolved()) {
- if (this.rpcClient.metrics != null) {
- this.rpcClient.metrics.incrNsLookupsFailed();
- }
- isa = null;
- throw new ServiceException(new UnknownHostException(addr + " could not be resolved"));
- }
- }
- return rpcClient.callBlockingMethod(md, configureRpcController(controller),
- param, returnType, ticket, isa);
+ Message param, Message returnType) throws ServiceException {
+ return rpcClient.callBlockingMethod(md, configureRpcController(controller), param, returnType,
+ ticket, addr);
}
}
@@ -610,29 +586,13 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
}
@Override
- public void callMethod(Descriptors.MethodDescriptor md, RpcController controller,
- Message param, Message returnType, RpcCallback<Message> done) {
- HBaseRpcController configuredController =
- configureRpcController(Preconditions.checkNotNull(controller,
- "RpcController can not be null for async rpc call"));
- // Look up remote address upon first call
- if (isa == null || isa.isUnresolved()) {
- if (this.rpcClient.metrics != null) {
- this.rpcClient.metrics.incrNsLookups();
- }
- isa = Address.toSocketAddress(addr);
- if (isa.isUnresolved()) {
- if (this.rpcClient.metrics != null) {
- this.rpcClient.metrics.incrNsLookupsFailed();
- }
- isa = null;
- controller.setFailed(addr + " could not be resolved");
- return;
- }
- }
+ public void callMethod(Descriptors.MethodDescriptor md, RpcController controller, Message param,
+ Message returnType, RpcCallback<Message> done) {
+ HBaseRpcController configuredController = configureRpcController(
+ Preconditions.checkNotNull(controller, "RpcController can not be null for async rpc call"));
// This method does not throw any exceptions, so the caller must provide a
// HBaseRpcController which is used to pass the exceptions.
- this.rpcClient.callMethod(md, configuredController, param, returnType, ticket, isa, done);
+ this.rpcClient.callMethod(md, configuredController, param, returnType, ticket, addr, done);
}
}
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
index ce2bd11f960..cd8035fd58e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
@@ -35,7 +35,6 @@ import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
-import java.net.UnknownHostException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayDeque;
import java.util.Locale;
@@ -44,7 +43,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import javax.security.sasl.SaslException;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.DoNotRetryIOException;
@@ -52,7 +50,6 @@ import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback;
import org.apache.hadoop.hbase.log.HBaseMarkers;
-import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.security.HBaseSaslRpcClient;
import org.apache.hadoop.hbase.security.SaslUtil;
import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
@@ -69,11 +66,13 @@ import org.apache.htrace.core.TraceScope;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.com.google.protobuf.Message.Builder;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator;
+
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
@@ -256,16 +255,7 @@ class BlockingRpcConnection extends RpcConnection implements Runnable {
if (this.rpcClient.localAddr != null) {
this.socket.bind(this.rpcClient.localAddr);
}
- if (this.rpcClient.metrics != null) {
- this.rpcClient.metrics.incrNsLookups();
- }
- InetSocketAddress remoteAddr = Address.toSocketAddress(remoteId.getAddress());
- if (remoteAddr.isUnresolved()) {
- if (this.rpcClient.metrics != null) {
- this.rpcClient.metrics.incrNsLookupsFailed();
- }
- throw new UnknownHostException(remoteId.getAddress() + " could not be resolved");
- }
+ InetSocketAddress remoteAddr = getRemoteInetAddress(rpcClient.metrics);
NetUtils.connect(this.socket, remoteAddr, this.rpcClient.connectTO);
this.socket.setSoTimeout(this.rpcClient.readTO);
return;
@@ -374,15 +364,8 @@ class BlockingRpcConnection extends RpcConnection implements Runnable {
if (this.metrics != null) {
this.metrics.incrNsLookups();
}
- InetSocketAddress serverAddr = Address.toSocketAddress(remoteId.getAddress());
- if (serverAddr.isUnresolved()) {
- if (this.metrics != null) {
- this.metrics.incrNsLookupsFailed();
- }
- throw new UnknownHostException(remoteId.getAddress() + " could not be resolved");
- }
saslRpcClient = new HBaseSaslRpcClient(this.rpcClient.conf, provider, token,
- serverAddr.getAddress(), securityInfo, this.rpcClient.fallbackAllowed,
+ socket.getInetAddress(), securityInfo, this.rpcClient.fallbackAllowed,
this.rpcClient.conf.get("hbase.rpc.protection",
QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)),
this.rpcClient.conf.getBoolean(CRYPTO_AES_ENABLED_KEY, CRYPTO_AES_ENABLED_DEFAULT));
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
index 609d2c12cea..d0a13ca33d6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
@@ -32,17 +32,16 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.ipc.BufferCallBeforeInitHandler.BufferCallEvent;
import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback;
-import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.security.NettyHBaseRpcConnectionHeaderHandler;
import org.apache.hadoop.hbase.security.NettyHBaseSaslRpcClientHandler;
import org.apache.hadoop.hbase.security.SaslChallengeDecoder;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
@@ -210,18 +209,9 @@ class NettyRpcConnection extends RpcConnection {
Promise<Boolean> saslPromise = ch.eventLoop().newPromise();
final NettyHBaseSaslRpcClientHandler saslHandler;
try {
- if (this.metrics != null) {
- this.metrics.incrNsLookups();
- }
- InetSocketAddress serverAddr = Address.toSocketAddress(remoteId.getAddress());
- if (serverAddr.isUnresolved()) {
- if (this.metrics != null) {
- this.metrics.incrNsLookupsFailed();
- }
- throw new UnknownHostException(remoteId.getAddress() + " could not be resolved");
- }
saslHandler = new NettyHBaseSaslRpcClientHandler(saslPromise, ticket, provider, token,
- serverAddr.getAddress(), securityInfo, rpcClient.fallbackAllowed, this.rpcClient.conf);
+ ((InetSocketAddress) ch.remoteAddress()).getAddress(), securityInfo,
+ rpcClient.fallbackAllowed, this.rpcClient.conf);
} catch (IOException e) {
failInit(ch, e);
return;
@@ -282,16 +272,7 @@ class NettyRpcConnection extends RpcConnection {
private void connect() throws UnknownHostException {
assert eventLoop.inEventLoop();
LOG.trace("Connecting to {}", remoteId.getAddress());
- if (this.rpcClient.metrics != null) {
- this.rpcClient.metrics.incrNsLookups();
- }
- InetSocketAddress remoteAddr = Address.toSocketAddress(remoteId.getAddress());
- if (remoteAddr.isUnresolved()) {
- if (this.rpcClient.metrics != null) {
- this.rpcClient.metrics.incrNsLookupsFailed();
- }
- throw new UnknownHostException(remoteId.getAddress() + " could not be resolved");
- }
+ InetSocketAddress remoteAddr = getRemoteInetAddress(rpcClient.metrics);
this.channel = new Bootstrap().group(eventLoop).channel(rpcClient.channelClass)
.option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay())
.option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java
index a87f0e4dbb8..c9444668018 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java
@@ -18,12 +18,15 @@
package org.apache.hadoop.hbase.ipc;
import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.codec.Codec;
+import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.security.SecurityInfo;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider;
@@ -122,7 +125,7 @@ abstract class RpcConnection {
this.remoteId = remoteId;
}
- protected void scheduleTimeoutTask(final Call call) {
+ protected final void scheduleTimeoutTask(final Call call) {
if (call.timeout > 0) {
call.timeoutTask = timeoutTimer.newTimeout(new TimerTask() {
@@ -137,7 +140,7 @@ abstract class RpcConnection {
}
}
- protected byte[] getConnectionHeaderPreamble() {
+ protected final byte[] getConnectionHeaderPreamble() {
// Assemble the preamble up in a buffer first and then send it. Writing individual elements,
// they are getting sent across piecemeal according to wireshark and then server is messing
// up the reading on occasion (the passed in stream is not buffered yet).
@@ -153,7 +156,7 @@ abstract class RpcConnection {
return preamble;
}
- protected ConnectionHeader getConnectionHeader() {
+ protected final ConnectionHeader getConnectionHeader() {
final ConnectionHeader.Builder builder = ConnectionHeader.newBuilder();
builder.setServiceName(remoteId.getServiceName());
final UserInformation userInfoPB = provider.getUserInfo(remoteId.ticket);
@@ -176,6 +179,21 @@ abstract class RpcConnection {
return builder.build();
}
+ protected final InetSocketAddress getRemoteInetAddress(MetricsConnection metrics)
+ throws UnknownHostException {
+ if (metrics != null) {
+ metrics.incrNsLookups();
+ }
+ InetSocketAddress remoteAddr = Address.toSocketAddress(remoteId.getAddress());
+ if (remoteAddr.isUnresolved()) {
+ if (metrics != null) {
+ metrics.incrNsLookupsFailed();
+ }
+ throw new UnknownHostException(remoteId.getAddress() + " could not be resolved");
+ }
+ return remoteAddr;
+ }
+
protected abstract void callTimeout(Call call);
public ConnectionId remoteId() {
--
2.42.1