Skip to content

Commit

Permalink
Allocate lazily of array outputs targets #573
Browse files Browse the repository at this point in the history
Lettuce now allocates lazily array output targets (Maps, ArrayList) with the appropriate size initialization. Optimize Subscriber to pass in the target to avoid ListSubscriber instances and convert it to a class for improved CHA/omit call-devirtualization.
  • Loading branch information
mp911de committed Jul 23, 2017
1 parent fb962ac commit b44c1d2
Show file tree
Hide file tree
Showing 31 changed files with 639 additions and 154 deletions.
25 changes: 16 additions & 9 deletions src/main/java/io/lettuce/core/RedisPublisher.java
Expand Up @@ -16,7 +16,6 @@
package io.lettuce.core;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Objects;
import java.util.Queue;
Expand Down Expand Up @@ -123,7 +122,7 @@ public void subscribe(Subscriber<? super T> subscriber) {
*
* @param <T> data element type
*/
static class RedisSubscription<T> implements Subscription, StreamingOutput.Subscriber<T> {
static class RedisSubscription<T> extends StreamingOutput.Subscriber<T> implements Subscription {

static final InternalLogger LOG = InternalLoggerFactory.getInstance(RedisPublisher.class);

Expand Down Expand Up @@ -162,8 +161,7 @@ static class RedisSubscription<T> implements Subscription, StreamingOutput.Subsc
StreamingOutput<T> streamingOutput = (StreamingOutput<T>) command.getOutput();

if (connection instanceof StatefulRedisConnection<?, ?> && ((StatefulRedisConnection) connection).isMulti()) {
streamingOutput.setSubscriber(new CompositeSubscriber<>(
Arrays.asList(this, streamingOutput.getSubscriber())));
streamingOutput.setSubscriber(new CompositeSubscriber<>(this, streamingOutput.getSubscriber()));
} else {
streamingOutput.setSubscriber(this);
}
Expand Down Expand Up @@ -699,17 +697,26 @@ private void onError(Throwable throwable) {
*
* @param <T> element type
*/
private static class CompositeSubscriber<T> implements StreamingOutput.Subscriber<T> {
private static class CompositeSubscriber<T> extends StreamingOutput.Subscriber<T> {

private final Collection<StreamingOutput.Subscriber<T>> subscribers;
private final StreamingOutput.Subscriber<T> first;
private final StreamingOutput.Subscriber<T> second;

CompositeSubscriber(Collection<StreamingOutput.Subscriber<T>> subscribers) {
this.subscribers = subscribers;
public CompositeSubscriber(StreamingOutput.Subscriber<T> first, StreamingOutput.Subscriber<T> second) {
this.first = first;
this.second = second;
}

@Override
public void onNext(T t) {
subscribers.forEach(subscriber -> subscriber.onNext(t));
throw new UnsupportedOperationException();
}

@Override
public void onNext(Collection<T> outputTarget, T t) {

first.onNext(outputTarget, t);
second.onNext(outputTarget, t);
}
}
}
17 changes: 12 additions & 5 deletions src/main/java/io/lettuce/core/output/ArrayOutput.java
@@ -1,5 +1,5 @@
/*
* Copyright 2011-2016 the original author or authors.
* Copyright 2011-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,7 +17,7 @@

import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.List;

Expand All @@ -29,15 +29,16 @@
*
* @param <K> Key type.
* @param <V> Value type.
*
* @author Mark Paluch
*/
public class ArrayOutput<K, V> extends CommandOutput<K, V, List<Object>> {

private boolean initialized;
private Deque<Integer> counts = new ArrayDeque<Integer>();
private Deque<List<Object>> stack = new ArrayDeque<List<Object>>();

public ArrayOutput(RedisCodec<K, V> codec) {
super(codec, new ArrayList<>());
super(codec, Collections.emptyList());
}

@Override
Expand Down Expand Up @@ -72,10 +73,16 @@ public void complete(int depth) {

@Override
public void multi(int count) {

if (!initialized) {
output = OutputFactory.newList(count);
initialized = true;
}

if (stack.isEmpty()) {
stack.push(output);
} else {
stack.push(new ArrayList<>(count));
stack.push(OutputFactory.newList(count));

}
counts.push(count);
Expand Down
21 changes: 16 additions & 5 deletions src/main/java/io/lettuce/core/output/BooleanListOutput.java
@@ -1,5 +1,5 @@
/*
* Copyright 2011-2016 the original author or authors.
* Copyright 2011-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,7 +15,7 @@
*/
package io.lettuce.core.output;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import io.lettuce.core.codec.RedisCodec;
Expand All @@ -27,19 +27,30 @@
* @param <K> Key type.
* @param <V> Value type.
* @author Will Glozer
* @author Mark Paluch
*/
public class BooleanListOutput<K, V> extends CommandOutput<K, V, List<Boolean>> implements StreamingOutput<Boolean> {

private boolean initialized;
private Subscriber<Boolean> subscriber;

public BooleanListOutput(RedisCodec<K, V> codec) {
super(codec, new ArrayList<>());
setSubscriber(ListSubscriber.of(output));
super(codec, Collections.emptyList());
setSubscriber(ListSubscriber.instance());
}

@Override
public void set(long integer) {
subscriber.onNext((integer == 1) ? Boolean.TRUE : Boolean.FALSE);
subscriber.onNext(output, (integer == 1) ? Boolean.TRUE : Boolean.FALSE);
}

@Override
public void multi(int count) {

if (!initialized) {
output = OutputFactory.newList(count);
initialized = true;
}
}

@Override
Expand Down
38 changes: 29 additions & 9 deletions src/main/java/io/lettuce/core/output/GeoCoordinatesListOutput.java
@@ -1,5 +1,5 @@
/*
* Copyright 2011-2016 the original author or authors.
* Copyright 2011-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,11 +18,10 @@
import static java.lang.Double.parseDouble;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import io.lettuce.core.GeoCoordinates;
import io.lettuce.core.Value;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.internal.LettuceAssert;

Expand All @@ -31,13 +30,17 @@
*
* @author Mark Paluch
*/
public class GeoCoordinatesListOutput<K, V> extends CommandOutput<K, V, List<GeoCoordinates>> {
public class GeoCoordinatesListOutput<K, V> extends CommandOutput<K, V, List<GeoCoordinates>> implements
StreamingOutput<GeoCoordinates> {

private Double x;
private boolean initialized;
private Subscriber<GeoCoordinates> subscriber;

public GeoCoordinatesListOutput(RedisCodec<K, V> codec) {
super(codec, new ArrayList<>());
}
public GeoCoordinatesListOutput(RedisCodec<K, V> codec) {
super(codec, Collections.emptyList());
setSubscriber(ListSubscriber.instance());
}

@Override
public void set(ByteBuffer bytes) {
Expand All @@ -49,14 +52,31 @@ public void set(ByteBuffer bytes) {
return;
}

output.add(new GeoCoordinates(x, value));
subscriber.onNext(output, new GeoCoordinates(x, value));
x = null;
}

@Override
public void multi(int count) {

if (!initialized) {
output = OutputFactory.newList(count / 2);
initialized = true;
}

if (count == -1) {
output.add(null);
subscriber.onNext(output, null);
}
}

@Override
public void setSubscriber(Subscriber<GeoCoordinates> subscriber) {
LettuceAssert.notNull(subscriber, "Subscriber must not be null");
this.subscriber = subscriber;
}

@Override
public Subscriber<GeoCoordinates> getSubscriber() {
return subscriber;
}
}
Expand Up @@ -18,7 +18,7 @@
import static java.lang.Double.parseDouble;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import io.lettuce.core.GeoCoordinates;
Expand All @@ -31,15 +31,16 @@
*
* @author Mark Paluch
*/
public class GeoCoordinatesValueListOutput<K, V> extends CommandOutput<K, V, List<Value<GeoCoordinates>>>
implements StreamingOutput<Value<GeoCoordinates>> {
public class GeoCoordinatesValueListOutput<K, V> extends CommandOutput<K, V, List<Value<GeoCoordinates>>> implements
StreamingOutput<Value<GeoCoordinates>> {

private Double x;
private boolean initialized;
private Subscriber<Value<GeoCoordinates>> subscriber;

public GeoCoordinatesValueListOutput(RedisCodec<K, V> codec) {
super(codec, new ArrayList<>());
setSubscriber(ListSubscriber.of(output));
super(codec, Collections.emptyList());
setSubscriber(ListSubscriber.instance());
}

@Override
Expand All @@ -52,14 +53,20 @@ public void set(ByteBuffer bytes) {
return;
}

subscriber.onNext(Value.fromNullable(new GeoCoordinates(x, value)));
subscriber.onNext(output, Value.fromNullable(new GeoCoordinates(x, value)));
x = null;
}

@Override
public void multi(int count) {

if (!initialized) {
output = OutputFactory.newList(count / 2);
initialized = true;
}

if (count == -1) {
subscriber.onNext(Value.empty());
subscriber.onNext(output, Value.empty());
}
}

Expand Down
14 changes: 7 additions & 7 deletions src/main/java/io/lettuce/core/output/GeoWithinListOutput.java
@@ -1,5 +1,5 @@
/*
* Copyright 2011-2016 the original author or authors.
* Copyright 2011-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,7 +18,6 @@
import static java.lang.Double.parseDouble;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;

import io.lettuce.core.GeoCoordinates;
Expand All @@ -31,8 +30,7 @@
*
* @author Mark Paluch
*/
public class GeoWithinListOutput<K, V> extends CommandOutput<K, V, List<GeoWithin<V>>>
implements StreamingOutput<GeoWithin<V>> {
public class GeoWithinListOutput<K, V> extends CommandOutput<K, V, List<GeoWithin<V>>> implements StreamingOutput<GeoWithin<V>> {

private V member;
private Double distance;
Expand All @@ -47,15 +45,16 @@ public class GeoWithinListOutput<K, V> extends CommandOutput<K, V, List<GeoWithi
private Subscriber<GeoWithin<V>> subscriber;

public GeoWithinListOutput(RedisCodec<K, V> codec, boolean withDistance, boolean withHash, boolean withCoordinates) {
super(codec, new ArrayList<>());
super(codec, OutputFactory.newList(16));
this.withDistance = withDistance;
this.withHash = withHash;
this.withCoordinates = withCoordinates;
setSubscriber(ListSubscriber.of(output));
setSubscriber(ListSubscriber.instance());
}

@Override
public void set(long integer) {

if (member == null) {
member = (V) (Long) integer;
return;
Expand Down Expand Up @@ -94,8 +93,9 @@ public void set(ByteBuffer bytes) {

@Override
public void complete(int depth) {

if (depth == 1) {
subscriber.onNext(new GeoWithin<V>(member, distance, geohash, coordinates));
subscriber.onNext(output, new GeoWithin<V>(member, distance, geohash, coordinates));

member = null;
distance = null;
Expand Down
22 changes: 16 additions & 6 deletions src/main/java/io/lettuce/core/output/KeyListOutput.java
@@ -1,5 +1,5 @@
/*
* Copyright 2011-2016 the original author or authors.
* Copyright 2011-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,7 +16,7 @@
package io.lettuce.core.output;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import io.lettuce.core.codec.RedisCodec;
Expand All @@ -27,21 +27,31 @@
*
* @param <K> Key type.
* @param <V> Value type.
*
* @author Will Glozer
* @author Mark Paluch
*/
public class KeyListOutput<K, V> extends CommandOutput<K, V, List<K>> implements StreamingOutput<K> {

private boolean initialized;
private Subscriber<K> subscriber;

public KeyListOutput(RedisCodec<K, V> codec) {
super(codec, new ArrayList<>());
setSubscriber(ListSubscriber.of(output));
super(codec, Collections.emptyList());
setSubscriber(ListSubscriber.instance());
}

@Override
public void set(ByteBuffer bytes) {
subscriber.onNext(codec.decodeKey(bytes));
subscriber.onNext(output, codec.decodeKey(bytes));
}

@Override
public void multi(int count) {

if (!initialized) {
output = OutputFactory.newList(count);
initialized = true;
}
}

@Override
Expand Down

0 comments on commit b44c1d2

Please sign in to comment.