Skip to content

Commit

Permalink
refactor: Removed SetBlockingQueue and replaced with set in reader
Browse files Browse the repository at this point in the history
  • Loading branch information
jruaux committed Apr 27, 2024
1 parent 2362a49 commit 2d4d64a
Show file tree
Hide file tree
Showing 11 changed files with 47 additions and 723 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@

public class KeyEvent<K> {

private final Wrapper<K> key;
private final K key;
private final String event;

public KeyEvent(K key, String event) {
this.key = new Wrapper<>(key);
this.key = key;
this.event = event;
}

public K getKey() {
return key.getValue();
return key;
}

public String getEvent() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
package com.redis.spring.batch.reader;

import java.util.HashSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;

import com.redis.spring.batch.KeyValue.DataType;
import com.redis.spring.batch.util.BatchUtils;
import com.redis.spring.batch.util.SetBlockingQueue;

import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.RedisClient;
Expand Down Expand Up @@ -49,8 +48,9 @@ public class KeyNotificationItemReader<K, V> extends AbstractPollableItemReader<
private String keyPattern;
private String keyType;

private BlockingQueue<KeyEvent<K>> queue;
private BlockingQueue<K> queue;
private AutoCloseable publisher;
private HashSet<Wrapper<K>> keySet;

public KeyNotificationItemReader(AbstractRedisClient client, RedisCodec<K, V> codec) {
setName(ClassUtils.getShortName(getClass()));
Expand All @@ -61,7 +61,7 @@ public KeyNotificationItemReader(AbstractRedisClient client, RedisCodec<K, V> co
this.valueDecoder = BatchUtils.toStringValueFunction(codec);
}

public BlockingQueue<KeyEvent<K>> getQueue() {
public BlockingQueue<K> getQueue() {
return queue;
}

Expand All @@ -84,9 +84,11 @@ public boolean isRunning() {
@Override
protected synchronized void doOpen() throws Exception {
Assert.notNull(client, "Redis client not set");
if (keySet == null) {
keySet = new HashSet<>(queueCapacity);
}
if (queue == null) {
BlockingQueue<KeyEvent<K>> actualQueue = new LinkedBlockingQueue<>(queueCapacity);
queue = new SetBlockingQueue<>(actualQueue, queueCapacity);
queue = new LinkedBlockingQueue<>(queueCapacity);
}
if (publisher == null) {
publisher = publisher();
Expand All @@ -105,11 +107,13 @@ private void keyEventNotification(K channel, V message) {
private void addEvent(K key, String event) {
DataType type = keyType(event);
if (keyType == null || keyType.equalsIgnoreCase(type.getString())) {
try {
queue.put(new KeyEvent<>(key, event));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ItemStreamException("Interrupted while queueing key event", e);
Wrapper<K> wrapper = new Wrapper<>(key);
if (keySet.contains(wrapper)) {
return;
}
boolean added = queue.offer(key);
if (added) {
keySet.add(wrapper);
}
}
}
Expand Down Expand Up @@ -189,15 +193,17 @@ protected synchronized void doClose() throws Exception {
}
queue = null;
}
keySet = null;
}

@Override
protected K doPoll(long timeout, TimeUnit unit) throws InterruptedException {
KeyEvent<K> wrapper = queue.poll(timeout, unit);
if (wrapper == null) {
K key = queue.poll(timeout, unit);
if (key == null) {
return null;
}
return wrapper.getKey();
keySet.remove(new Wrapper<>(key));
return key;
}

private DataType keyType(String event) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import com.redis.lettucemod.api.async.RedisModulesAsyncCommands;
import com.redis.lettucemod.util.RedisModulesUtils;
import com.redis.spring.batch.OperationExecutor;
import com.redis.spring.batch.util.Predicates;
import com.redis.spring.batch.util.BatchUtils;

import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.RedisFuture;
Expand Down Expand Up @@ -87,7 +87,7 @@ public long getAsLong() {
List<RedisFuture<String>> typeFutures = keys.stream().map(commands::type).collect(Collectors.toList());
connection.flushCommands();
List<String> types = OperationExecutor.getAll(connection.getTimeout(), typeFutures);
Predicate<String> matchPredicate = Predicates.glob(keyPattern);
Predicate<String> matchPredicate = BatchUtils.globPredicate(keyPattern);
Predicate<String> typePredicate = typePredicate();
int total = 0;
int matchCount = 0;
Expand Down Expand Up @@ -121,7 +121,7 @@ private Predicate<String> typePredicate() {
if (StringUtils.hasLength(keyType)) {
return keyType::equalsIgnoreCase;
}
return Predicates.isTrue();
return s -> true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,13 @@

import com.redis.lettucemod.api.async.RedisModulesAsyncCommands;
import com.redis.lettucemod.search.Suggestion;
import com.redis.spring.batch.util.Predicates;

import io.lettuce.core.RedisFuture;
import io.lettuce.core.api.async.BaseRedisAsyncCommands;

public class Sugadd<K, V, T> extends AbstractKeyValueOperation<K, V, Suggestion<V>, T> {

private Predicate<T> incrPredicate = Predicates.isFalse();
private Predicate<T> incrPredicate = t -> false;

public Sugadd(Function<T, K> keyFunction, Function<T, Suggestion<V>> suggestionFunction) {
super(keyFunction, suggestionFunction);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.launch.support.TaskExecutorJobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRepository;
Expand Down Expand Up @@ -105,7 +105,7 @@ public JobExecution getLastJobExecution(String jobName, JobParameters jobParamet
@Test
void testSetters() {
JobFactory factory = new JobFactory();
JobLauncher launcher = new SimpleJobLauncher();
JobLauncher launcher = new TaskExecutorJobLauncher();
SimpleJobRepository repository = new SimpleJobRepository();
String name = "name";
ResourcelessTransactionManager transactionManager = new ResourcelessTransactionManager();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -901,7 +901,7 @@ void readKeyNotificationsDedupe() throws Exception {
redisCommands.zadd(key, 2, "member2");
redisCommands.zadd(key, 3, "member3");
awaitUntil(() -> keyReader.getQueue().size() == 1);
Assertions.assertEquals(key, keyReader.getQueue().take().getKey());
Assertions.assertEquals(key, keyReader.getQueue().take());
} finally {
keyReader.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@
import java.io.InputStreamReader;
import java.nio.ByteBuffer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;

import org.springframework.util.FileCopyUtils;
import org.springframework.util.StringUtils;

import com.hrakaroo.glob.GlobPattern;
import com.redis.lettucemod.RedisModulesClient;
import com.redis.lettucemod.api.StatefulRedisModulesConnection;
import com.redis.lettucemod.cluster.RedisModulesClusterClient;
Expand Down Expand Up @@ -118,4 +121,11 @@ public static <K, V> StatefulRedisModulesConnection<K, V> connection(RedisModule
}
return connection;
}

public static Predicate<String> globPredicate(String match) {
if (!StringUtils.hasLength(match)) {
return s -> true;
}
return GlobPattern.compile(match)::matches;
}
}

This file was deleted.

0 comments on commit 2d4d64a

Please sign in to comment.