From f62b3be8cda4ce01272556db8220cc59445d8abe Mon Sep 17 00:00:00 2001 From: Yong Zhang Date: Sun, 5 May 2019 15:20:36 +0800 Subject: [PATCH] Fallback to use v3 protocol for some types of requests --- Fixes #2071 *Motivation* A bookkeeper should fallback to use v3 protocol for some types of requests if they are not implemented in v2 *Modifications* - Add a client pool use v3 wire protocol - Obtain client by version --- .../bookkeeper/proto/BookieClientImpl.java | 14 +++++--- .../bookkeeper/proto/BookieProtocol.java | 5 +++ .../DefaultPerChannelBookieClientPool.java | 33 +++++++++++++++---- .../proto/PerChannelBookieClientFactory.java | 4 +-- .../proto/PerChannelBookieClientPool.java | 10 ++++++ 5 files changed, 53 insertions(+), 13 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java index c772a97cdaa..fbd850811a5 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java @@ -179,13 +179,17 @@ public long getNumPendingRequests(BookieSocketAddress address, long ledgerId) { @Override public PerChannelBookieClient create(BookieSocketAddress address, PerChannelBookieClientPool pcbcPool, - SecurityHandlerFactory shFactory) throws SecurityException { + SecurityHandlerFactory shFactory, int bookieProtocolVersion) throws SecurityException { StatsLogger statsLoggerForPCBC = statsLogger; if (conf.getLimitStatsLogging()) { statsLoggerForPCBC = NullStatsLogger.INSTANCE; } - return new PerChannelBookieClient(conf, executor, eventLoopGroup, allocator, address, statsLoggerForPCBC, - authProviderFactory, registry, pcbcPool, shFactory); + ClientConfiguration clientConfiguration = conf; + if (BookieProtocol.PROTOCOL_VERSION3 == bookieProtocolVersion) { + clientConfiguration.setUseV2WireProtocol(false); + } + return new PerChannelBookieClient(clientConfiguration, executor, eventLoopGroup, allocator, address, + statsLoggerForPCBC, authProviderFactory, registry, pcbcPool, shFactory); } public PerChannelBookieClientPool lookupClient(BookieSocketAddress addr) { @@ -467,7 +471,7 @@ public void readLac(final BookieSocketAddress addr, final long ledgerId, final R } else { pcbc.readLac(ledgerId, cb, ctx); } - }, ledgerId); + }, ledgerId, BookieProtocol.PROTOCOL_VERSION3); } public void readEntry(BookieSocketAddress addr, long ledgerId, long entryId, @@ -546,7 +550,7 @@ public void getBookieInfo(final BookieSocketAddress addr, final long requested, } else { pcbc.getBookieInfo(requested, cb, ctx); } - }, requested); + }, requested, BookieProtocol.PROTOCOL_VERSION3); } private void monitorPendingOperations() { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java index 9982cca712a..b71cd9b74bb 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java @@ -47,6 +47,11 @@ public interface BookieProtocol { */ byte CURRENT_PROTOCOL_VERSION = 2; + /** + * Bookie protocol version 3. + */ + byte PROTOCOL_VERSION3 = 3; + /** * Entry Entry ID. To be used when no valid entry id can be assigned. */ diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java index 04471d5f55e..0dff791006a 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java @@ -48,6 +48,7 @@ class DefaultPerChannelBookieClientPool implements PerChannelBookieClientPool, final BookieSocketAddress address; final PerChannelBookieClient[] clients; + final PerChannelBookieClient[] clientsV3; final ClientConfiguration conf; SecurityHandlerFactory shFactory; @@ -68,7 +69,12 @@ class DefaultPerChannelBookieClientPool implements PerChannelBookieClientPool, this.clients = new PerChannelBookieClient[coreSize]; for (int i = 0; i < coreSize; i++) { - this.clients[i] = factory.create(address, this, shFactory); + this.clients[i] = factory.create(address, this, shFactory, BookieProtocol.CURRENT_PROTOCOL_VERSION); + } + + this.clientsV3 = new PerChannelBookieClient[coreSize]; + for (int i = 0; i < coreSize; i++) { + this.clientsV3[i] = factory.create(address, this, shFactory, BookieProtocol.PROTOCOL_VERSION3); } } @@ -85,16 +91,31 @@ public void intialize() { } private PerChannelBookieClient getClient(long key) { - if (1 == clients.length) { - return clients[0]; + return getClientByVersion(key, BookieProtocol.CURRENT_PROTOCOL_VERSION); + } + + private PerChannelBookieClient getClient(long key, PerChannelBookieClient[] pcbc) { + if (1 == pcbc.length) { + return pcbc[0]; } - int idx = MathUtils.signSafeMod(key, clients.length); - return clients[idx]; + int idx = MathUtils.signSafeMod(key, pcbc.length); + return pcbc[idx]; + } + + private PerChannelBookieClient getClientByVersion(long key, int version) { + if (version == BookieProtocol.PROTOCOL_VERSION3) { + return getClient(key, clientsV3); + } + return getClient(key, clients); } @Override public void obtain(GenericCallback callback, long key) { - getClient(key).connectIfNeededAndDoOp(callback); + obtain(callback, key, BookieProtocol.CURRENT_PROTOCOL_VERSION); + } + + public void obtain(GenericCallback callback, long key, int version) { + getClientByVersion(key, version).connectIfNeededAndDoOp(callback); } @Override diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientFactory.java index 17abb565827..9fc0418b323 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientFactory.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientFactory.java @@ -36,6 +36,6 @@ interface PerChannelBookieClientFactory { * @return the client connected to address. * @throws SecurityException */ - PerChannelBookieClient create(BookieSocketAddress address, - PerChannelBookieClientPool pcbcPool, SecurityHandlerFactory shFactory) throws SecurityException; + PerChannelBookieClient create(BookieSocketAddress address, PerChannelBookieClientPool pcbcPool, + SecurityHandlerFactory shFactory, int bookieProtocolVersion) throws SecurityException; } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java index aa7a5e94492..e28af514563 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java @@ -40,6 +40,16 @@ public interface PerChannelBookieClientPool { */ void obtain(GenericCallback callback, long key); + /** + * Obtain a channel from channel pool by version to execute operations. + * + * @param callback + * callback to return channel from channel pool + * @param version + * get specify version channel from pool + */ + void obtain(GenericCallback callback, long key, int version); + /** * Returns status of a client. * It is suggested to delay/throttle requests to this channel if isWritable is false.