Skip to content

Commit

Permalink
Fallback to use v3 protocol for some types of requests
Browse files Browse the repository at this point in the history
---

Fixes apache#2071

*Motivation*

A bookkeeper should fallback to use v3 protocol for some types of requests if they are not implemented in v2

*Modifications*

- Add a client pool use v3 wire protocol
- Obtain client by version
  • Loading branch information
zymap committed May 5, 2019
1 parent fdd9670 commit f62b3be
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -179,13 +179,17 @@ public long getNumPendingRequests(BookieSocketAddress address, long ledgerId) {

@Override
public PerChannelBookieClient create(BookieSocketAddress address, PerChannelBookieClientPool pcbcPool,
SecurityHandlerFactory shFactory) throws SecurityException {
SecurityHandlerFactory shFactory, int bookieProtocolVersion) throws SecurityException {
StatsLogger statsLoggerForPCBC = statsLogger;
if (conf.getLimitStatsLogging()) {
statsLoggerForPCBC = NullStatsLogger.INSTANCE;
}
return new PerChannelBookieClient(conf, executor, eventLoopGroup, allocator, address, statsLoggerForPCBC,
authProviderFactory, registry, pcbcPool, shFactory);
ClientConfiguration clientConfiguration = conf;
if (BookieProtocol.PROTOCOL_VERSION3 == bookieProtocolVersion) {
clientConfiguration.setUseV2WireProtocol(false);
}
return new PerChannelBookieClient(clientConfiguration, executor, eventLoopGroup, allocator, address,
statsLoggerForPCBC, authProviderFactory, registry, pcbcPool, shFactory);
}

public PerChannelBookieClientPool lookupClient(BookieSocketAddress addr) {
Expand Down Expand Up @@ -467,7 +471,7 @@ public void readLac(final BookieSocketAddress addr, final long ledgerId, final R
} else {
pcbc.readLac(ledgerId, cb, ctx);
}
}, ledgerId);
}, ledgerId, BookieProtocol.PROTOCOL_VERSION3);
}

public void readEntry(BookieSocketAddress addr, long ledgerId, long entryId,
Expand Down Expand Up @@ -546,7 +550,7 @@ public void getBookieInfo(final BookieSocketAddress addr, final long requested,
} else {
pcbc.getBookieInfo(requested, cb, ctx);
}
}, requested);
}, requested, BookieProtocol.PROTOCOL_VERSION3);
}

private void monitorPendingOperations() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ public interface BookieProtocol {
*/
byte CURRENT_PROTOCOL_VERSION = 2;

/**
* Bookie protocol version 3.
*/
byte PROTOCOL_VERSION3 = 3;

/**
* Entry Entry ID. To be used when no valid entry id can be assigned.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class DefaultPerChannelBookieClientPool implements PerChannelBookieClientPool,
final BookieSocketAddress address;

final PerChannelBookieClient[] clients;
final PerChannelBookieClient[] clientsV3;

final ClientConfiguration conf;
SecurityHandlerFactory shFactory;
Expand All @@ -68,7 +69,12 @@ class DefaultPerChannelBookieClientPool implements PerChannelBookieClientPool,

this.clients = new PerChannelBookieClient[coreSize];
for (int i = 0; i < coreSize; i++) {
this.clients[i] = factory.create(address, this, shFactory);
this.clients[i] = factory.create(address, this, shFactory, BookieProtocol.CURRENT_PROTOCOL_VERSION);
}

this.clientsV3 = new PerChannelBookieClient[coreSize];
for (int i = 0; i < coreSize; i++) {
this.clientsV3[i] = factory.create(address, this, shFactory, BookieProtocol.PROTOCOL_VERSION3);
}
}

Expand All @@ -85,16 +91,31 @@ public void intialize() {
}

private PerChannelBookieClient getClient(long key) {
if (1 == clients.length) {
return clients[0];
return getClientByVersion(key, BookieProtocol.CURRENT_PROTOCOL_VERSION);
}

private PerChannelBookieClient getClient(long key, PerChannelBookieClient[] pcbc) {
if (1 == pcbc.length) {
return pcbc[0];
}
int idx = MathUtils.signSafeMod(key, clients.length);
return clients[idx];
int idx = MathUtils.signSafeMod(key, pcbc.length);
return pcbc[idx];
}

private PerChannelBookieClient getClientByVersion(long key, int version) {
if (version == BookieProtocol.PROTOCOL_VERSION3) {
return getClient(key, clientsV3);
}
return getClient(key, clients);
}

@Override
public void obtain(GenericCallback<PerChannelBookieClient> callback, long key) {
getClient(key).connectIfNeededAndDoOp(callback);
obtain(callback, key, BookieProtocol.CURRENT_PROTOCOL_VERSION);
}

public void obtain(GenericCallback<PerChannelBookieClient> callback, long key, int version) {
getClientByVersion(key, version).connectIfNeededAndDoOp(callback);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,6 @@ interface PerChannelBookieClientFactory {
* @return the client connected to address.
* @throws SecurityException
*/
PerChannelBookieClient create(BookieSocketAddress address,
PerChannelBookieClientPool pcbcPool, SecurityHandlerFactory shFactory) throws SecurityException;
PerChannelBookieClient create(BookieSocketAddress address, PerChannelBookieClientPool pcbcPool,
SecurityHandlerFactory shFactory, int bookieProtocolVersion) throws SecurityException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@ public interface PerChannelBookieClientPool {
*/
void obtain(GenericCallback<PerChannelBookieClient> callback, long key);

/**
* Obtain a channel from channel pool by version to execute operations.
*
* @param callback
* callback to return channel from channel pool
* @param version
* get specify version channel from pool
*/
void obtain(GenericCallback<PerChannelBookieClient> callback, long key, int version);

/**
* Returns status of a client.
* It is suggested to delay/throttle requests to this channel if isWritable is false.
Expand Down

0 comments on commit f62b3be

Please sign in to comment.