Skip to content

Commit

Permalink
Use request signatures for caching mappers
Browse files Browse the repository at this point in the history
This change actually enables mapping request parameters to the
corresponding argument and result mappers. There is still some overhead
produced from the anonymous context-bound Function instances, but it
should be constant and significantly less than the overhead of building
and storing the mapper stacks on each request.
  • Loading branch information
akudiyar committed Aug 27, 2023
1 parent f29b134 commit 4892004
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 13 deletions.
76 changes: 63 additions & 13 deletions src/main/java/io/tarantool/driver/core/AbstractTarantoolClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -69,6 +71,8 @@ public abstract class AbstractTarantoolClient<T extends Packable, R extends Coll
private final TarantoolConnectionListeners listeners;
private final AtomicReference<TarantoolMetadata> metadataHolder = new AtomicReference<>();
private final ResultMapperFactoryFactoryImpl mapperFactoryFactory;
private final Map<TarantoolRequestSignature, MessagePackObjectMapper> argumentsMapperCache;
private final Map<TarantoolRequestSignature, MessagePackValueMapper> resultMapperCache;

private final SpacesMetadataProvider metadataProvider;
private final ScheduledExecutorService timeoutScheduler;
Expand Down Expand Up @@ -114,6 +118,8 @@ public AbstractTarantoolClient(TarantoolClientConfig config, TarantoolConnection

this.config = config;
this.mapperFactoryFactory = new ResultMapperFactoryFactoryImpl();
this.argumentsMapperCache = new ConcurrentHashMap<>();
this.resultMapperCache = new ConcurrentHashMap<>();
this.eventLoopGroup = new NioEventLoopGroup(config.getEventLoopThreadsNumber());
this.bootstrap = new Bootstrap()
.group(eventLoopGroup)
Expand Down Expand Up @@ -242,7 +248,8 @@ public CompletableFuture<List<?>> call(String functionName, List<?> arguments)

private CompletableFuture<List<?>> call(String functionName, List<?> arguments, MessagePackMapper mapper)
throws TarantoolClientException {
return makeRequest(functionName, arguments, null, () -> mapper, () -> mapper);
TarantoolRequestSignature signature = TarantoolRequestSignature.create(functionName, arguments, List.class);
return makeRequest(functionName, arguments, signature, () -> mapper, () -> mapper);
}

@Override
Expand Down Expand Up @@ -282,7 +289,8 @@ public <T> CompletableFuture<TarantoolResult<T>> callForTupleResult(
Supplier<MessagePackObjectMapper> argumentsMapperSupplier,
Class<T> tupleClass)
throws TarantoolClientException {
return call(functionName, arguments, argumentsMapperSupplier,
TarantoolRequestSignature signature = TarantoolRequestSignature.create(functionName, arguments, tupleClass);
return callForSingleResult(functionName, arguments, signature, argumentsMapperSupplier,
() -> mapperFactoryFactory.getTarantoolResultMapper(config.getMessagePackMapper(), tupleClass));
}

Expand Down Expand Up @@ -348,8 +356,10 @@ public <S> CompletableFuture<S> callForSingleResult(
Supplier<MessagePackObjectMapper> argumentsMapperSupplier,
Class<S> resultClass)
throws TarantoolClientException {
return callForSingleResult(functionName, arguments, argumentsMapperSupplier,
() -> mapperFactoryFactory.getDefaultSingleValueMapper(config.getMessagePackMapper(), resultClass));
TarantoolRequestSignature signature = TarantoolRequestSignature.create(functionName, arguments, resultClass);
return callForSingleResult(
functionName, arguments, signature, argumentsMapperSupplier,
() -> mapperFactoryFactory.getDefaultSingleValueMapper(config.getMessagePackMapper(), resultClass));
}

@Override
Expand All @@ -359,18 +369,33 @@ public <S> CompletableFuture<S> callForSingleResult(
Supplier<MessagePackObjectMapper> argumentsMapperSupplier,
ValueConverter<Value, S> valueConverter)
throws TarantoolClientException {
return callForSingleResult(functionName, arguments, argumentsMapperSupplier,
TarantoolRequestSignature signature = TarantoolRequestSignature.create(
functionName, arguments, valueConverter.getClass());
return callForSingleResult(functionName, arguments, signature, argumentsMapperSupplier,
() -> mapperFactoryFactory.getSingleValueResultMapper(valueConverter));
}

private <S> CompletableFuture<S> callForSingleResult(
String functionName,
List<?> arguments,
TarantoolRequestSignature signature,
Supplier<MessagePackObjectMapper> argumentsMapperSupplier,
Supplier<CallResultMapper<S, SingleValueCallResult<S>>> resultMapperSupplier)
throws TarantoolClientException {
return makeRequestForSingleResult(
functionName, arguments, signature, argumentsMapperSupplier, resultMapperSupplier)
.thenApply(CallResult::value);
}

@Override
public <S> CompletableFuture<S> callForSingleResult(
String functionName,
List<?> arguments,
Supplier<MessagePackObjectMapper> argumentsMapperSupplier,
Supplier<CallResultMapper<S, SingleValueCallResult<S>>> resultMapperSupplier)
throws TarantoolClientException {
return makeRequestForSingleResult(functionName, arguments, null, argumentsMapperSupplier, resultMapperSupplier)
return makeRequestForSingleResult(
functionName, arguments, null, argumentsMapperSupplier, resultMapperSupplier)
.thenApply(CallResult::value);
}

Expand Down Expand Up @@ -438,7 +463,8 @@ public <T, R extends List<T>> CompletableFuture<R> callForMultiResult(
Supplier<R> resultContainerSupplier,
Class<T> resultClass)
throws TarantoolClientException {
return callForMultiResult(functionName, arguments, argumentsMapperSupplier,
TarantoolRequestSignature signature = TarantoolRequestSignature.create(functionName, arguments, resultClass);
return callForMultiResult(functionName, arguments, signature, argumentsMapperSupplier,
() -> mapperFactoryFactory.getDefaultMultiValueMapper(config.getMessagePackMapper(), resultClass));
}

Expand All @@ -450,10 +476,24 @@ public <T, R extends List<T>> CompletableFuture<R> callForMultiResult(
Supplier<R> resultContainerSupplier,
ValueConverter<Value, T> valueConverter)
throws TarantoolClientException {
return callForMultiResult(functionName, arguments, argumentsMapperSupplier,
TarantoolRequestSignature signature = TarantoolRequestSignature.create(
functionName, arguments, valueConverter.getClass());
return callForMultiResult(functionName, arguments, signature, argumentsMapperSupplier,
() -> mapperFactoryFactory.getMultiValueResultMapper(resultContainerSupplier, valueConverter));
}

private <T, R extends List<T>> CompletableFuture<R> callForMultiResult(
String functionName,
List<?> arguments,
TarantoolRequestSignature signature,
Supplier<MessagePackObjectMapper> argumentsMapperSupplier,
Supplier<CallResultMapper<R, MultiValueCallResult<T, R>>> resultMapperSupplier)
throws TarantoolClientException {
return makeRequestForMultiResult(
functionName, arguments, signature, argumentsMapperSupplier, resultMapperSupplier)
.thenApply(CallResult::value);
}

@Override
public <T, R extends List<T>> CompletableFuture<R> callForMultiResult(
String functionName,
Expand Down Expand Up @@ -498,9 +538,14 @@ private <S> CompletableFuture<S> makeRequest(
builder.withArguments(arguments);
}
builder.withSignature(requestSignature);
MessagePackValueMapper resultMapper = resultMapperSupplier.get();

TarantoolCallRequest request = builder.build(argumentsMapperSupplier.get());
MessagePackObjectMapper argumentsMapper = requestSignature != null ?
argumentsMapperCache.computeIfAbsent(requestSignature, s -> argumentsMapperSupplier.get()) :
argumentsMapperSupplier.get();
MessagePackValueMapper resultMapper = requestSignature != null ?
resultMapperCache.computeIfAbsent(requestSignature, s -> resultMapperSupplier.get()) :
resultMapperSupplier.get();

TarantoolCallRequest request = builder.build(argumentsMapper);
return connectionManager().getConnection()
.thenCompose(c -> c.sendRequest(request).getFuture())
.thenApply(resultMapper::fromValue);
Expand Down Expand Up @@ -540,11 +585,16 @@ public CompletableFuture<List<?>> eval(
Supplier<MessagePackObjectMapper> argumentsMapperSupplier,
Supplier<MessagePackValueMapper> resultMapperSupplier) throws TarantoolClientException {
try {
TarantoolRequestSignature signature = TarantoolRequestSignature.create(expression, arguments, List.class);
MessagePackObjectMapper argumentsMapper = argumentsMapperCache.computeIfAbsent(
signature, s -> argumentsMapperSupplier.get());
MessagePackValueMapper resultMapper = resultMapperCache.computeIfAbsent(
signature, s -> resultMapperSupplier.get());
TarantoolEvalRequest request = new TarantoolEvalRequest.Builder()
.withExpression(expression)
.withArguments(arguments)
.build(argumentsMapperSupplier.get());
MessagePackValueMapper resultMapper = resultMapperSupplier.get();
.withSignature(signature)
.build(argumentsMapper);
return connectionManager().getConnection()
.thenCompose(c -> c.sendRequest(request).getFuture())
.thenApply(resultMapper::fromValue);
Expand Down
33 changes: 33 additions & 0 deletions src/main/java/io/tarantool/driver/core/space/TarantoolSpace.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.tarantool.driver.protocol.TarantoolIndexQuery;
import io.tarantool.driver.protocol.TarantoolProtocolException;
import io.tarantool.driver.protocol.TarantoolRequest;
import io.tarantool.driver.protocol.TarantoolRequestSignature;
import io.tarantool.driver.protocol.requests.TarantoolCallRequest;
import io.tarantool.driver.protocol.requests.TarantoolDeleteRequest;
import io.tarantool.driver.protocol.requests.TarantoolInsertRequest;
Expand All @@ -28,7 +29,9 @@
import org.msgpack.value.ArrayValue;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
Expand All @@ -47,6 +50,7 @@ public abstract class TarantoolSpace<T extends Packable, R extends Collection<T>
private final TarantoolConnectionManager connectionManager;
private final TarantoolSpaceMetadata spaceMetadata;
private final TarantoolMetadataOperations metadataOperations;
private final Map<String, TarantoolRequestSignature> methodSignatures;

public TarantoolSpace(
TarantoolClientConfig config,
Expand All @@ -58,6 +62,28 @@ public TarantoolSpace(
this.connectionManager = connectionManager;
this.spaceMetadata = spaceMetadata;
this.metadataOperations = metadataOperations;
this.methodSignatures = new HashMap<>();
String spaceIdStr = String.valueOf(this.spaceId);
methodSignatures.put(
TarantoolDeleteRequest.class.getName(),
new TarantoolRequestSignature(spaceIdStr, TarantoolDeleteRequest.class.getName()));
methodSignatures.put(
TarantoolInsertRequest.class.getName(),
new TarantoolRequestSignature(spaceIdStr, TarantoolInsertRequest.class.getName()));
methodSignatures.put(
TarantoolReplaceRequest.class.getName(),
new TarantoolRequestSignature(spaceIdStr, TarantoolReplaceRequest.class.getName()));
methodSignatures.put(
TarantoolSelectRequest.class.getName(),
new TarantoolRequestSignature(spaceIdStr, TarantoolSelectRequest.class.getName()));
methodSignatures.put(
TarantoolUpdateRequest.class.getName(),
new TarantoolRequestSignature(spaceIdStr, TarantoolUpdateRequest.class.getName()));
methodSignatures.put(
TarantoolUpsertRequest.class.getName(),
new TarantoolRequestSignature(spaceIdStr, TarantoolUpsertRequest.class.getName()));
methodSignatures.put(
"truncate", new TarantoolRequestSignature(spaceIdStr, "truncate", TarantoolCallRequest.class.getName()));
}

@Override
Expand All @@ -74,6 +100,7 @@ private CompletableFuture<R> delete(Conditions conditions, Supplier<MessagePackV
.withSpaceId(spaceId)
.withIndexId(indexQuery.getIndexId())
.withKeyValues(indexQuery.getKeyValues())
.withSignature(methodSignatures.get(TarantoolDeleteRequest.class.getName()))
.build(config.getMessagePackMapper());

return sendRequest(request, resultMapperSupplier);
Expand Down Expand Up @@ -101,6 +128,7 @@ private CompletableFuture<R> insert(T tuple, Supplier<MessagePackValueMapper> re
TarantoolInsertRequest request = new TarantoolInsertRequest.Builder()
.withSpaceId(spaceId)
.withTuple(tuple)
.withSignature(methodSignatures.get(TarantoolInsertRequest.class.getName()))
.build(config.getMessagePackMapper());

return sendRequest(request, resultMapperSupplier);
Expand Down Expand Up @@ -128,6 +156,7 @@ private CompletableFuture<R> replace(T tuple, Supplier<MessagePackValueMapper> r
TarantoolReplaceRequest request = new TarantoolReplaceRequest.Builder()
.withSpaceId(spaceId)
.withTuple(tuple)
.withSignature(methodSignatures.get(TarantoolReplaceRequest.class.getName()))
.build(config.getMessagePackMapper());

return sendRequest(request, resultMapperSupplier);
Expand All @@ -152,6 +181,7 @@ private CompletableFuture<R> select(Conditions conditions, Supplier<MessagePackV
.withKeyValues(indexQuery.getKeyValues())
.withLimit(conditions.getLimit())
.withOffset(conditions.getOffset())
.withSignature(methodSignatures.get(TarantoolSelectRequest.class.getName()))
.build(config.getMessagePackMapper());

return sendRequest(request, resultMapperSupplier);
Expand Down Expand Up @@ -198,6 +228,7 @@ private CompletableFuture<R> update(
.withIndexId(indexQuery.getIndexId())
.withKeyValues(indexQuery.getKeyValues())
.withTupleOperations(fillFieldIndexFromMetadata(operations))
.withSignature(methodSignatures.get(TarantoolUpdateRequest.class.getName()))
.build(config.getMessagePackMapper());

return sendRequest(request, resultMapperSupplier);
Expand Down Expand Up @@ -225,6 +256,7 @@ private CompletableFuture<R> upsert(
.withKeyValues(indexQuery.getKeyValues())
.withTuple(tuple)
.withTupleOperations(fillFieldIndexFromMetadata(operations))
.withSignature(methodSignatures.get(TarantoolUpsertRequest.class.getName()))
.build(config.getMessagePackMapper());

return sendRequest(request, resultMapperSupplier);
Expand All @@ -244,6 +276,7 @@ private CompletableFuture<Void> truncate(Supplier<MessagePackValueMapper> result
String spaceName = spaceMetadata.getSpaceName();
TarantoolCallRequest request = new TarantoolCallRequest.Builder()
.withFunctionName("box.space." + spaceName + ":truncate")
.withSignature(methodSignatures.get("truncate"))
.build(config.getMessagePackMapper());
return sendRequest(request, resultMapperSupplier)
.thenApply(v -> TarantoolVoidResult.INSTANCE.value());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.tarantool.driver.protocol;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -69,4 +70,21 @@ public String toString() {
sb.append("]");
return sb.toString();
}

/**
* Factory method for a typical RPC usage
*
* @param functionName name of the remote function
* @param arguments list of arguments for the remote function
* @param resultClass type of the expected result. It's necessary for polymorphic functions, e.g. accepting a
* Tarantool space as an argument
* @return new request signature
*/
public static TarantoolRequestSignature create(String functionName, List<?> arguments, Class<?> resultClass) {
List<Object> components = new ArrayList<>(arguments.size() + 2);
components.add(functionName);
components.addAll(arguments);
components.add(resultClass.getName());
return new TarantoolRequestSignature(components.toArray(new Object[]{}));
}
}

0 comments on commit 4892004

Please sign in to comment.