Skip to content

Commit

Permalink
DATAREDIS-589 - Move secondary index cleanup to MappingExpirationList…
Browse files Browse the repository at this point in the history
…ener.

We now no longer rely on ApplicationEvents captured in the RedisKeyValueAdapter for performing cleanup operations for expired keys, but do this along with the phantom key removal. This removes a flaw when initializing a non repository related KeyspaceEventListener publishing events that actually are unrelated to the Adapter.

Additionally upgraded test infrastructure to utilize Redis 3.2.6 with disabled protected-mode and enabled keyspace-events.

Original pull request: #232.
  • Loading branch information
christophstrobl authored and mp911de committed Jan 12, 2017
1 parent 0999c04 commit 0f7607e
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 32 deletions.
8 changes: 7 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

REDIS_VERSION:=3.2.0
REDIS_VERSION:=3.2.6
SPRING_PROFILE?=ci

#######
Expand All @@ -25,6 +25,8 @@ work/redis-%.conf:

echo port $* >> $@
echo daemonize yes >> $@
echo protected-mode no >> $@
echo notify-keyspace-events Ex >> $@
echo pidfile $(shell pwd)/work/redis-$*.pid >> $@
echo logfile $(shell pwd)/work/redis-$*.log >> $@
echo save \"\" >> $@
Expand All @@ -36,6 +38,8 @@ work/redis-6379.conf:

echo port 6379 >> $@
echo daemonize yes >> $@
echo protected-mode no >> $@
echo notify-keyspace-events Ex >> $@
echo pidfile $(shell pwd)/work/redis-6379.pid >> $@
echo logfile $(shell pwd)/work/redis-6379.log >> $@
echo save \"\" >> $@
Expand All @@ -57,6 +61,7 @@ work/sentinel-%.conf:

echo port $* >> $@
echo daemonize yes >> $@
echo protected-mode no >> $@
echo bind 0.0.0.0 >> $@
echo pidfile $(shell pwd)/work/sentinel-$*.pid >> $@
echo logfile $(shell pwd)/work/sentinel-$*.log >> $@
Expand All @@ -80,6 +85,7 @@ work/cluster-%.conf:
@mkdir -p $(@D)

echo port $* >> $@
echo protected-mode no >> $@
echo cluster-enabled yes >> $@
echo cluster-config-file $(shell pwd)/work/nodes-$*.conf >> $@
echo cluster-node-timeout 5 >> $@
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,24 @@ public RedisKeyExpiredEvent(byte[] key) {

/**
* Creates new {@link RedisKeyExpiredEvent}
*
*
* @param key
* @param value
*/
public RedisKeyExpiredEvent(byte[] key, Object value) {
super(key);
this(null, key, value);
}

/**
* Creates new {@link RedisKeyExpiredEvent}
*
* @pamam channel
* @param key
* @param value
* @since 1.8
*/
public RedisKeyExpiredEvent(String channel, byte[] key, Object value) {
super(channel, key);

args = ByteUtils.split(key, ':');
this.value = value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.springframework.data.util.CloseableIterator;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;

/**
* Redis specific {@link KeyValueAdapter} implementation. Uses binary codec to read/write data from/to Redis. Objects
Expand Down Expand Up @@ -367,7 +368,6 @@ public Set<byte[]> doInRedis(RedisConnection connection) throws DataAccessExcept

List<byte[]> keys = new ArrayList<byte[]>(ids);


if (keys.isEmpty() || keys.size() < offset) {
return Collections.emptyList();
}
Expand Down Expand Up @@ -701,28 +701,7 @@ public void destroy() throws Exception {
*/
@Override
public void onApplicationEvent(RedisKeyspaceEvent event) {

LOGGER.debug("Received %s .", event);

if (event instanceof RedisKeyExpiredEvent) {

final RedisKeyExpiredEvent expiredEvent = (RedisKeyExpiredEvent) event;

redisOps.execute(new RedisCallback<Void>() {

@Override
public Void doInRedis(RedisConnection connection) throws DataAccessException {

LOGGER.debug("Cleaning up expired key '%s' data structures in keyspace '%s'.", expiredEvent.getSource(),
expiredEvent.getKeyspace());

connection.sRem(toBytes(expiredEvent.getKeyspace()), expiredEvent.getId());
new IndexWriter(connection, converter).removeKeyFromIndexes(expiredEvent.getKeyspace(), expiredEvent.getId());
return null;
}
});

}
// just a customization hook
}

/*
Expand Down Expand Up @@ -814,12 +793,29 @@ public Map<byte[], byte[]> doInRedis(RedisConnection connection) throws DataAcce
if (!org.springframework.util.CollectionUtils.isEmpty(hash)) {
connection.del(phantomKey);
}

return hash;
}
});

Object value = converter.read(Object.class, new RedisData(hash));
publishEvent(new RedisKeyExpiredEvent(key, value));

String channel = !ObjectUtils.isEmpty(message.getChannel())
? converter.getConversionService().convert(message.getChannel(), String.class) : null;

final RedisKeyExpiredEvent event = new RedisKeyExpiredEvent(channel, key, value);

ops.execute(new RedisCallback<Void>() {
@Override
public Void doInRedis(RedisConnection connection) throws DataAccessException {

connection.sRem(converter.getConversionService().convert(event.getKeyspace(), byte[].class), event.getId());
new IndexWriter(connection, converter).removeKeyFromIndexes(event.getKeyspace(), event.getId());
return null;
}
});

publishEvent(event);
}

private boolean isKeyExpirationMessage(Message message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,28 @@
*/
public class RedisKeyspaceEvent extends ApplicationEvent {

private final String channel;

/**
* Creates new {@link RedisKeyspaceEvent}.
*
*
* @param key The key that expired. Must not be {@literal null}.
*/
public RedisKeyspaceEvent(byte[] key) {
this(null, key);
}

/**
* Creates new {@link RedisKeyspaceEvent}.
*
* @param channel The source channel aka subscription topic. Can be {@literal null}.
* @param key The key that expired. Must not be {@literal null}.
* @since 1.8
*/
public RedisKeyspaceEvent(String channel, byte[] key) {

super(key);
this.channel = channel;
}

/*
Expand All @@ -42,4 +57,13 @@ public byte[] getSource() {
return (byte[]) super.getSource();
}

/**
*
* @return can be {@literal null}.
* @since 1.8
*/
public String getChannel() {
return this.channel;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.junit.After;
import org.junit.AfterClass;
Expand All @@ -43,6 +44,7 @@
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisKeyValueAdapter.EnableKeyspaceEvents;
import org.springframework.data.redis.core.convert.Bucket;
import org.springframework.data.redis.core.convert.KeyspaceConfiguration;
import org.springframework.data.redis.core.convert.MappingConfiguration;
Expand Down Expand Up @@ -92,6 +94,7 @@ public void setUp() {
mappingContext.afterPropertiesSet();

adapter = new RedisKeyValueAdapter(template, mappingContext);
adapter.setEnableKeyspaceEvents(EnableKeyspaceEvents.ON_STARTUP);
adapter.afterPropertiesSet();

template.execute(new RedisCallback<Void>() {
Expand Down Expand Up @@ -239,26 +242,67 @@ public void deleteCleansIndexedDataCorrectly() {
assertThat(template.opsForSet().members("persons:firstname:rand"), not(hasItem("1")));
}

@Test // DATAREDIS-425
public void keyExpiredEventShouldRemoveHelperStructures() {
/**
* @see DATAREDIS-425
*/
@Test
public void keyExpiredEventShouldRemoveHelperStructures() throws InterruptedException {

Map<String, String> map = new LinkedHashMap<String, String>();
map.put("_class", Person.class.getName());
map.put("firstname", "rand");
map.put("address.country", "Andor");

template.opsForHash().putAll("persons:1", map);
template.expire("persons:1", 1, TimeUnit.SECONDS);

template.opsForSet().add("persons", "1");
template.opsForSet().add("persons:firstname:rand", "1");
template.opsForSet().add("persons:1:idx", "persons:firstname:rand");

adapter.onApplicationEvent(new RedisKeyExpiredEvent("persons:1".getBytes(Bucket.CHARSET)));
int iterationCount = 0;
while (template.hasKey("persons:1") && iterationCount++ < 3) { // ci might be a little slow
Thread.sleep(2000);
}

assertThat(template.hasKey("persons:1"), is(false));
assertThat(template.hasKey("persons:firstname:rand"), is(false));
assertThat(template.hasKey("persons:1:idx"), is(false));
assertThat(template.opsForSet().members("persons"), not(hasItem("1")));
}

@Test // DATAREDIS-512
/**
* @see DATAREDIS-589
*/
@Test
public void keyExpiredEventWithoutKeyspaceShouldBeIgnored() throws InterruptedException {

Map<String, String> map = new LinkedHashMap<String, String>();
map.put("_class", Person.class.getName());
map.put("firstname", "rand");
map.put("address.country", "Andor");

template.opsForHash().putAll("persons:1", map);
template.opsForHash().putAll("1", map);

template.expire("1", 1, TimeUnit.SECONDS);

template.opsForSet().add("persons", "1");
template.opsForSet().add("persons:firstname:rand", "1");
template.opsForSet().add("persons:1:idx", "persons:firstname:rand");

Thread.sleep(2000);

assertThat(template.hasKey("persons:1"), is(true));
assertThat(template.hasKey("persons:firstname:rand"), is(true));
assertThat(template.hasKey("persons:1:idx"), is(true));
assertThat(template.opsForSet().members("persons"), hasItem("1"));
}

/**
* @see DATAREDIS-512
*/
@Test
public void putWritesIndexDataCorrectly() {

Person rand = new Person();
Expand Down

0 comments on commit 0f7607e

Please sign in to comment.