Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

XPending command cannot be called with IDLE param without defining CONSUMER #2132

Closed
domi-nika60 opened this issue Jul 1, 2022 · 3 comments
Labels
type: bug A general bug
Milestone

Comments

@domi-nika60
Copy link

domi-nika60 commented Jul 1, 2022

Bug Report

Current Behavior

Hi, according to the documentation of Redis XPending request can be invoked with obligatory parameters key and group. The rest of them are optional. I've tried to run it with lettuce-io API and define specific idle time. Unfortunately, with method of lettuce library I cannot do such thing without defining consumer name variable. For my use case, I don't want to specify it and I want to run this Redis request globally (per all consumer names).

The documentation for Redis clearly presents method for such use case:

> XPENDING mystream group55 IDLE 9000 - + 10

The method I wanted to use from lettuce-io API library was:
List<PendingMessage> xpending(K key, XPendingArgs<K> args);

The stack trace of my issue:

Stack trace
Error invoking scheduled task for bean [com.pega.vas.GetPendingMessage@1e0a746e] Cannot read field "group" because "this.consumer" is null
java.lang.NullPointerException: Cannot read field "group" because "this.consumer" is null
	at io.lettuce.core.XPendingArgs.build(XPendingArgs.java:118)
	at io.lettuce.core.RedisCommandBuilder.xpending(RedisCommandBuilder.java:2729)
	at io.lettuce.core.AbstractRedisAsyncCommands.xpending(AbstractRedisAsyncCommands.java:1923)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:78)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:567)
	at io.lettuce.core.FutureSyncInvocationHandler.handleInvocation(FutureSyncInvocationHandler.java:63)
	at io.lettuce.core.internal.AbstractInvocationHandler.invoke(AbstractInvocationHandler.java:80)
	at jdk.proxy2/jdk.proxy2.$Proxy8.xpending(Unknown Source)
	at com.pega.vas.redis.CacheClient.getPendingMessagesFromStreamGroup(CacheClient.java:156)
	at com.pega.vas.redis.CacheClient.removePendingMessagesFromStream(CacheClient.java:179)
	at com.pega.vas.RemovePendingMsgJob.executeJob(RemovePendingMsgJob.java:40)
	at com.pega.vas.$RemovePendingMsgJob$Definition$Exec.dispatch(Unknown Source)
	at io.micronaut.context.AbstractExecutableMethodsDefinition$DispatchedExecutableMethod.invoke(AbstractExecutableMethodsDefinition.java:351)
	at io.micronaut.inject.DelegatingExecutableMethod.invoke(DelegatingExecutableMethod.java:76)
	at io.micronaut.scheduling.processor.ScheduledMethodProcessor.lambda$process$5(ScheduledMethodProcessor.java:125)
	at io.micronaut.scheduling.instrument.InvocationInstrumenterWrappedRunnable.run(InvocationInstrumenterWrappedRunnable.java:47)
	at io.micrometer.core.instrument.composite.CompositeTimer.record(CompositeTimer.java:79)
	at io.micrometer.core.instrument.Timer.lambda$wrap$0(Timer.java:160)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.runAndReset$$$capture(FutureTask.java:305)
	at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
	at java.base/java.lang.Thread.run(Thread.java:831)

Input Code

I'm using it like:

Input Code
List<PendingMessage> pending = redis.xpending(myStreamItemKey,
                new XPendingArgs().idle(myIdleTime).limit(Limit.unlimited()).range(Range.unbounded())); 

Expected behavior/code

The code is expected to retrieve form Redis Stream Group a list of messages which are in pending state longer than idle time.
It is suppose to work for all consumer names as well, when not defined. In fact, it is failing, because, there is no option to provide the streamGroup. Library is trying to get it from consumer name which I do not want to specify.

Environment

  • Lettuce version(s): 6.1.1.RELEASEe
  • Redis version: 6.2.7

Possible Solution

My proposition of resolving this bug is to add possibility to separately define group parameter in XPendingArgs builder and remove notNullable checking for consumer paramether: LettuceAssert.notNull(consumer, "Consumer must not be null");

For my use case, I've created my custom XPendingArgs class and overrided build method:

@Override
    public <T, V> void build(CommandArgs<T, V> args) {
        args.add(this.group);

        if (this.idle != null) {
            args.add(CommandKeyword.IDLE).add(this.idle);
        }

        if (this.range.getLower().equals(Range.Boundary.unbounded())) {
            args.add("-");
        } else {
            args.add(this.range.getLower().getValue());
        }

        if (this.range.getUpper().equals(Range.Boundary.unbounded())) {
            args.add("+");
        } else {
            args.add(this.range.getUpper().getValue());
        }

        args.add(this.limit.isLimited() ? this.limit.getCount() : Long.MAX_VALUE);
    }

Now, I'm using it like:

pendingList = redis.xpending(myStreamItemKey, new CustomXPendingArgs<String>()
                    .group(myStreamGroup)
                    .idle(myIdleTime)
                    .limit(Limit.unlimited()).range(Range.unbounded())
            );
@mp911de mp911de added the type: bug A general bug label Jul 1, 2022
@mp911de
Copy link
Collaborator

mp911de commented Jul 1, 2022

Thanks for the report. Do you want to submit a pull request to fix the issue, now that you have investigated the cause?

@mp911de mp911de added this to the 6.1.9.RELEASE milestone Jul 7, 2022
@mp911de
Copy link
Collaborator

mp911de commented Jul 7, 2022

We're going to extend XPendingArgs with a group option as that one is missing right now.

mp911de added a commit that referenced this issue Jul 7, 2022
Support XPENDING calls with idle argument without a consumer name.
mp911de added a commit that referenced this issue Jul 7, 2022
Support XPENDING calls with idle argument without a consumer name.
@mp911de
Copy link
Collaborator

mp911de commented Jul 7, 2022

That's in place now.

@mp911de mp911de closed this as completed Jul 7, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: bug A general bug
Projects
None yet
Development

No branches or pull requests

2 participants