Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>2.5.0-SNAPSHOT</version>
<version>2.5.0-DATAREDIS-1230-SNAPSHOT</version>

<name>Spring Data Redis</name>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,8 @@ private HV deserializeHashValue(ByteBuffer buffer) {
return (HV) serializationContext.getHashValueSerializationPair().read(buffer);
}

private MapRecord<K, HK, HV> deserializeRecord(ByteBufferRecord record) {
@Override
public MapRecord<K, HK, HV> deserializeRecord(ByteBufferRecord record) {
return record.map(it -> it.mapEntries(this::deserializeRecordFields).withStreamKey(readKey(record.getStream())));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,11 @@ public <V> HashMapper<V, HK, HV> getHashMapper(Class<V> targetType) {
return objectMapper.getHashMapper(targetType);
}

@Override
public MapRecord<K, HK, HV> deserializeRecord(ByteRecord record) {
return record.deserialize(keySerializer(), hashKeySerializer(), hashValueSerializer());
}

protected byte[] serializeHashValueIfRequires(HV value) {
return hashValueSerializerPresent() ? serialize(value, hashValueSerializer())
: objectMapper.getConversionService().convert(value, byte[].class);
Expand Down Expand Up @@ -386,7 +391,7 @@ public final List<MapRecord<K, HK, HV>> doInRedis(RedisConnection connection) {

List<MapRecord<K, HK, HV>> result = new ArrayList<>();
for (ByteRecord record : raw) {
result.add(record.deserialize(keySerializer(), hashKeySerializer(), hashValueSerializer()));
result.add(deserializeRecord(record));
}

return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,13 @@
import java.util.Map;

import org.reactivestreams.Publisher;

import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.RedisZSetCommands.Limit;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.PendingMessage;
import org.springframework.data.redis.connection.stream.PendingMessages;
import org.springframework.data.redis.connection.stream.PendingMessagesSummary;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.Record;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoConsumer;
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoGroup;
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoStream;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.connection.stream.StreamReadOptions;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.hash.HashMapper;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
Expand Down Expand Up @@ -354,7 +344,7 @@ default <V> Flux<ObjectRecord<K, V>> range(Class<V> targetType, K key, Range<Str

Assert.notNull(targetType, "Target type must not be null");

return range(key, range, limit).map(it -> StreamObjectMapper.toObjectRecord(this, it, targetType));
return range(key, range, limit).map(it -> map(it, targetType));
}

/**
Expand Down Expand Up @@ -435,7 +425,7 @@ default <V> Flux<ObjectRecord<K, V>> read(Class<V> targetType, StreamReadOptions

Assert.notNull(targetType, "Target type must not be null");

return read(readOptions, streams).map(it -> StreamObjectMapper.toObjectRecord(this, it, targetType));
return read(readOptions, streams).map(it -> map(it, targetType));
}

/**
Expand Down Expand Up @@ -489,7 +479,7 @@ default <V> Flux<ObjectRecord<K, V>> read(Class<V> targetType, Consumer consumer

Assert.notNull(targetType, "Target type must not be null");

return read(consumer, readOptions, streams).map(it -> StreamObjectMapper.toObjectRecord(this, it, targetType));
return read(consumer, readOptions, streams).map(it -> map(it, targetType));
}

/**
Expand Down Expand Up @@ -543,7 +533,7 @@ default <V> Flux<ObjectRecord<K, V>> reverseRange(Class<V> targetType, K key, Ra

Assert.notNull(targetType, "Target type must not be null");

return reverseRange(key, range, limit).map(it -> StreamObjectMapper.toObjectRecord(this, it, targetType));
return reverseRange(key, range, limit).map(it -> map(it, targetType));
}

/**
Expand Down Expand Up @@ -577,4 +567,29 @@ default <V> Flux<ObjectRecord<K, V>> reverseRange(Class<V> targetType, K key, Ra
*/
@Override
<V> HashMapper<V, HK, HV> getHashMapper(Class<V> targetType);

/**
* Map records from {@link MapRecord} to {@link ObjectRecord}.
*
* @param record the stream records to map.
* @param targetType the target type of the payload.
* @return the mapped {@link ObjectRecord}.
* @since 2.x
*/
default <V> ObjectRecord<K, V> map(MapRecord<K, HK, HV> record, Class<V> targetType) {

Assert.notNull(record, "Records must not be null");
Assert.notNull(targetType, "Target type must not be null");

return StreamObjectMapper.toObjectRecord(record, this, targetType);
}

/**
* Deserialize a {@link ByteBufferRecord} using the configured serialization context into a {@link MapRecord}.
*
* @param record the stream record to map.
* @return deserialized {@link MapRecord}.
* @since 2.x
*/
MapRecord<K, HK, HV> deserializeRecord(ByteBufferRecord record);
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,13 @@ static <K, V, HK, HV> MapRecord<K, HK, HV> toMapRecord(HashMapperProvider<HK, HV
/**
* Convert the given {@link Record} into an {@link ObjectRecord}.
*
* @param provider provider for {@link HashMapper} to apply mapping for {@link ObjectRecord}.
* @param source the source value.
* @param provider provider for {@link HashMapper} to apply mapping for {@link ObjectRecord}.
* @param targetType the desired target type.
* @return the converted {@link ObjectRecord}.
*/
static <K, V, HK, HV> ObjectRecord<K, V> toObjectRecord(HashMapperProvider<HK, HV> provider,
MapRecord<K, HK, HV> source, Class<V> targetType) {
static <K, V, HK, HV> ObjectRecord<K, V> toObjectRecord(MapRecord<K, HK, HV> source,
HashMapperProvider<HK, HV> provider, Class<V> targetType) {
return source.toObjectRecord(provider.getHashMapper(targetType));
}

Expand All @@ -149,7 +149,7 @@ static <K, V, HK, HV> ObjectRecord<K, V> toObjectRecord(HashMapperProvider<HK, H
* {@literal null}.
*/
@Nullable
static <K, V, HK, HV> List<ObjectRecord<K, V>> map(@Nullable List<MapRecord<K, HK, HV>> records,
static <K, V, HK, HV> List<ObjectRecord<K, V>> toObjectRecords(@Nullable List<MapRecord<K, HK, HV>> records,
HashMapperProvider<HK, HV> hashMapperProvider, Class<V> targetType) {

if (records == null) {
Expand All @@ -161,7 +161,7 @@ static <K, V, HK, HV> List<ObjectRecord<K, V>> map(@Nullable List<MapRecord<K, H
}

if (records.size() == 1) {
return Collections.singletonList(toObjectRecord(hashMapperProvider, records.get(0), targetType));
return Collections.singletonList(toObjectRecord(records.get(0), hashMapperProvider, targetType));
}

List<ObjectRecord<K, V>> transformed = new ArrayList<>(records.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,10 @@

import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.RedisZSetCommands.Limit;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.PendingMessage;
import org.springframework.data.redis.connection.stream.PendingMessages;
import org.springframework.data.redis.connection.stream.PendingMessagesSummary;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.Record;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoConsumers;
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoGroups;
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoStream;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.connection.stream.StreamReadOptions;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.hash.HashMapper;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
Expand Down Expand Up @@ -356,7 +345,7 @@ default <V> List<ObjectRecord<K, V>> range(Class<V> targetType, K key, Range<Str

Assert.notNull(targetType, "Target type must not be null");

return StreamObjectMapper.map(range(key, range, limit), this, targetType);
return map(range(key, range, limit), targetType);
}

/**
Expand Down Expand Up @@ -409,7 +398,7 @@ default <V> List<ObjectRecord<K, V>> read(Class<V> targetType, StreamReadOptions

Assert.notNull(targetType, "Target type must not be null");

return StreamObjectMapper.map(read(readOptions, streams), this, targetType);
return map(read(readOptions, streams), targetType);
}

/**
Expand Down Expand Up @@ -467,7 +456,7 @@ default <V> List<ObjectRecord<K, V>> read(Class<V> targetType, Consumer consumer

Assert.notNull(targetType, "Target type must not be null");

return StreamObjectMapper.map(read(consumer, readOptions, streams), this, targetType);
return map(read(consumer, readOptions, streams), targetType);
}

/**
Expand Down Expand Up @@ -523,7 +512,7 @@ default <V> List<ObjectRecord<K, V>> reverseRange(Class<V> targetType, K key, Ra

Assert.notNull(targetType, "Target type must not be null");

return StreamObjectMapper.map(reverseRange(key, range, limit), this, targetType);
return map(reverseRange(key, range, limit), targetType);
}

/**
Expand Down Expand Up @@ -560,4 +549,45 @@ default <V> List<ObjectRecord<K, V>> reverseRange(Class<V> targetType, K key, Ra
@Override
<V> HashMapper<V, HK, HV> getHashMapper(Class<V> targetType);

/**
* Map record from {@link MapRecord} to {@link ObjectRecord}.
*
* @param record the stream record to map.
* @param targetType the target type of the payload.
* @return the mapped {@link ObjectRecord}.
* @since 2.x
*/
default <V> ObjectRecord<K, V> map(MapRecord<K, HK, HV> record, Class<V> targetType) {

Assert.notNull(record, "Record must not be null");
Assert.notNull(targetType, "Target type must not be null");

return StreamObjectMapper.toObjectRecord(record, this, targetType);
}

/**
* Map records from {@link MapRecord} to {@link ObjectRecord}s.
*
* @param records the stream records to map.
* @param targetType the target type of the payload.
* @return the mapped {@link ObjectRecord object records}.
* @since 2.x
*/
@Nullable
default <V> List<ObjectRecord<K, V>> map(@Nullable List<MapRecord<K, HK, HV>> records, Class<V> targetType) {

Assert.notNull(records, "Records must not be null");
Assert.notNull(targetType, "Target type must not be null");

return StreamObjectMapper.toObjectRecords(records, this, targetType);
}

/**
* Deserialize a {@link ByteRecord} using the configured serializers into a {@link MapRecord}.
*
* @param record the stream record to map.
* @return deserialized {@link MapRecord}.
* @since 2.x
*/
MapRecord<K, HK, HV> deserializeRecord(ByteRecord record);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,25 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import java.util.function.Function;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.core.convert.TypeDescriptor;
import org.springframework.dao.DataAccessResourceFailureException;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.ByteRecord;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.Record;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.connection.stream.StreamReadOptions;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StreamOperations;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.util.Assert;
import org.springframework.util.ErrorHandler;
import org.springframework.util.ObjectUtils;
Expand Down Expand Up @@ -79,8 +85,8 @@ class DefaultStreamMessageListenerContainer<K, V extends Record<K, ?>> implement
this.template = createRedisTemplate(connectionFactory, containerOptions);
this.containerOptions = containerOptions;

if (containerOptions.getHashMapper() != null) {
this.streamOperations = this.template.opsForStream(containerOptions.getHashMapper());
if (containerOptions.hasHashMapper()) {
this.streamOperations = this.template.opsForStream(containerOptions.getRequiredHashMapper());
} else {
this.streamOperations = this.template.opsForStream();
}
Expand Down Expand Up @@ -207,16 +213,39 @@ public Subscription register(StreamReadRequest<K> streamRequest, StreamListener<
return doRegister(getReadTask(streamRequest, listener));
}

@SuppressWarnings("unchecked")
@SuppressWarnings({ "unchecked", "rawtypes" })
private StreamPollTask<K, V> getReadTask(StreamReadRequest<K> streamRequest, StreamListener<K, V> listener) {

BiFunction<K, ReadOffset, List<? extends Record<?, ?>>> readFunction = getReadFunction(streamRequest);
Function<ReadOffset, List<ByteRecord>> readFunction = getReadFunction(streamRequest);
Function<ByteRecord, V> deserializerToUse = getDeserializer();

return new StreamPollTask<>(streamRequest, listener, errorHandler, (BiFunction) readFunction);
TypeDescriptor targetType = TypeDescriptor
.valueOf(containerOptions.hasHashMapper() ? containerOptions.getTargetType() : MapRecord.class);

return new StreamPollTask<>(streamRequest, listener, errorHandler, targetType, readFunction, deserializerToUse);
}

@SuppressWarnings({ "unchecked", "rawtypes" })
private Function<ByteRecord, V> getDeserializer() {

Function<ByteRecord, MapRecord<K, Object, Object>> deserializer = streamOperations::deserializeRecord;

if (containerOptions.getHashMapper() == null) {
return (Function) deserializer;
}

return source -> {

MapRecord<K, Object, Object> intermediate = deserializer.apply(source);
return (V) streamOperations.map(intermediate, this.containerOptions.getTargetType());
};
}

@SuppressWarnings("unchecked")
private BiFunction<K, ReadOffset, List<? extends Record<?, ?>>> getReadFunction(StreamReadRequest<K> streamRequest) {
private Function<ReadOffset, List<ByteRecord>> getReadFunction(StreamReadRequest<K> streamRequest) {

byte[] rawKey = ((RedisSerializer<K>) template.getKeySerializer())
.serialize(streamRequest.getStreamOffset().getKey());

if (streamRequest instanceof StreamMessageListenerContainer.ConsumerStreamReadRequest) {

Expand All @@ -226,20 +255,12 @@ private StreamPollTask<K, V> getReadTask(StreamReadRequest<K> streamRequest, Str
: this.readOptions;
Consumer consumer = consumerStreamRequest.getConsumer();

if (this.containerOptions.getHashMapper() != null) {
return (key, offset) -> streamOperations.read(this.containerOptions.getTargetType(), consumer, readOptions,
StreamOffset.create(key, offset));
}

return (key, offset) -> streamOperations.read(consumer, readOptions, StreamOffset.create(key, offset));
}

if (this.containerOptions.getHashMapper() != null) {
return (key, offset) -> streamOperations.read(this.containerOptions.getTargetType(), readOptions,
StreamOffset.create(key, offset));
return (offset) -> template.execute((RedisCallback<List<ByteRecord>>) connection -> connection.streamCommands()
.xReadGroup(consumer, readOptions, StreamOffset.create(rawKey, offset)));
}

return (key, offset) -> streamOperations.read(readOptions, StreamOffset.create(key, offset));
return (offset) -> template.execute((RedisCallback<List<ByteRecord>>) connection -> connection.streamCommands()
.xRead(readOptions, StreamOffset.create(rawKey, offset)));
}

private Subscription doRegister(Task task) {
Expand Down
Loading