Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to configure Rqueue in jhipster Spring boot microservice application #25

Closed
chlegou opened this issue Jul 11, 2020 · 20 comments
Closed
Assignees
Labels
bug Something isn't working

Comments

@chlegou
Copy link

chlegou commented Jul 11, 2020

Describe the bug
I'm not able to configure Rqueue in a jHipster Spring boot microservice application. I have followed all your resources over the internet and also your tutorial projects, but without luck

this is my pom file:

<spring-boot.version>2.2.7.RELEASE</spring-boot.version>
...............
<dependency>
	<groupId>com.github.sonus21</groupId>
	<artifactId>rqueue-spring-boot-starter</artifactId>
	<version>2.0.1-RELEASE</version>
</dependency>
...............
# adding /removing this dependency didn't help.
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

Here is my configuration file:

import com.github.sonus21.rqueue.config.SimpleRqueueListenerContainerFactory;
import com.github.sonus21.rqueue.listener.RqueueMessageHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.repository.configuration.EnableRedisRepositories;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;


@EnableRedisRepositories
@Configuration
public class RQueueConfiguration {


    @Bean()
    public SimpleRqueueListenerContainerFactory simpleRqueueListenerContainerFactory(RqueueMessageHandler rqueueMessageHandler) {
        SimpleRqueueListenerContainerFactory factory = new SimpleRqueueListenerContainerFactory();
        factory.setRqueueMessageHandler(rqueueMessageHandler);

        // adding this ThreadPoolTaskExecutor didn't help either.
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setThreadNamePrefix("taskExecutor");
        threadPoolTaskExecutor.setCorePoolSize(10);
        threadPoolTaskExecutor.setMaxPoolSize(50);
        threadPoolTaskExecutor.setQueueCapacity(0);
        threadPoolTaskExecutor.afterPropertiesSet();
        factory.setTaskExecutor(threadPoolTaskExecutor);

        return factory;
    }

}

How to Reproduce

generate a jHipster microservice project and integrate Rqueue in it.
here is the error i'm getting:

ERROR 2704 --- [  restartedMain] o.s.b.d.LoggingFailureAnalysisReporter   : 

***************************
APPLICATION FAILED TO START
***************************

Description:

The bean 'rqueueMetrics' could not be injected as a 'com.github.sonus21.rqueue.metrics.RqueueMetrics' because it is a JDK dynamic proxy that implements:
	org.springframework.context.ApplicationListener


Action:

Consider injecting the bean as one of its interfaces or forcing the use of CGLib-based proxies by setting proxyTargetClass=true on @EnableAsync and/or @EnableCaching.


Process finished with exit code 1

additional resources

use https://start.jhipster.tech/ to generate a jHipster app in just 1min. here is my configurations: https://i.ibb.co/rZ7mrsS/image.png

please help me with this.

@sonus21
Copy link
Owner

sonus21 commented Jul 12, 2020

Hi Chlegou
Can you please add @EnableCaching(proxyTargetClass = true) in in spring boot app?

@EnableCaching(proxyTargetClass = true)
@SpringBootApplication
public class Main{
...
}

Generally this should be able to fix the issue. Please share a sample app incase this could not fix your issue.

@chlegou
Copy link
Author

chlegou commented Jul 13, 2020

following the changes you have suggested, i have tried that and what runs the project was@EnableAsync(proxyTargetClass = true) (@EnableCaching(proxyTargetClass = true) didn't worked for me, was throwing the same error.).

The app is running, but it's throwing another error that i couldn't fix. here is the error i'm receiving:

java.lang.IllegalAccessError: failed to access class com.github.sonus21.rqueue.core.MessageScheduler from class com.github.sonus21.rqueue.core.MessageScheduler$$FastClassBySpringCGLIB$$c8e81411 (com.github.sonus21.rqueue.core.MessageScheduler is in unnamed module of loader 'app'; com.github.sonus21.rqueue.core.MessageScheduler$$FastClassBySpringCGLIB$$c8e81411 is in unnamed module of loader org.springframework.boot.devtools.restart.classloader.RestartClassLoader @4c11e962)
	at com.github.sonus21.rqueue.core.MessageScheduler$$FastClassBySpringCGLIB$$c8e81411.invoke(<generated>)
	at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:771)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
	at org.springframework.aop.interceptor.AsyncExecutionInterceptor.lambda$invoke$0(AsyncExecutionInterceptor.java:115)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at io.github.jhipster.async.ExceptionHandlingAsyncTaskExecutor.lambda$createWrappedRunnable$1(ExceptionHandlingAsyncTaskExecutor.java:78)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

As the app is running, i tried ignoring the problem, and seeing if it really works. the queues got scheduled, but they never runs! i'm attaching an image that's showing that:

i will create a draft app and post it here soon...

Thanks.

@sonus21
Copy link
Owner

sonus21 commented Jul 13, 2020

As an Immediate fix, you can disable devtools restart using spring.devtools.restart.enabled=false Please share a sample app where it can be reproduced easily.

@chlegou
Copy link
Author

chlegou commented Jul 13, 2020

I have created the sample app: https://github.com/chlegou/jhipster-rqueue
...Just to tell, @EnableAsync(proxyTargetClass = true) is set in this config class
by default, it's set to @EnableAsync
i have also made the logging.level.ROOT: INFO to simplify the logs. if you want to see more details, please change it to DEBUG.
Please consider visiting :
https://github.com/chlegou/jhipster-rqueue#bug-description
https://github.com/chlegou/jhipster-rqueue#how-to-reproduce-the-bug
to read what i have wrote about the problem. also, the things i have did in several commits.

btw, i have also tested spring.devtools.restart.enabled=false before but with no luck, same error is showing in console and no job got executed.

sonus21 added a commit that referenced this issue Jul 13, 2020
@sonus21
Copy link
Owner

sonus21 commented Jul 13, 2020

Thank you so much for providing simple app for debugging. I have released a fix, please use version 2.0.2-RELEASE, let me know if you are facing any other issue with Rqueue.

@chlegou
Copy link
Author

chlegou commented Jul 13, 2020

version 2.0.2-RELEASE isn't available in maven, could you please add it?
link: https://mvnrepository.com/artifact/com.github.sonus21/rqueue-spring-boot-starter

@sonus21
Copy link
Owner

sonus21 commented Jul 13, 2020

It will take time to reach there but it should be available to download can be verified at https://repo1.maven.org/maven2/com/github/sonus21/rqueue-spring-boot-starter.

@chlegou
Copy link
Author

chlegou commented Jul 13, 2020

i was able to run test the project with the new release you have created. the app is working finally! :)
With or without (proxyTargetClass = true) in @EnableAsync, the project is running.

talking now about performance, i have tested the app with the default 1 second timespan as i set it in the sample app i provided:

it's obvious that many messages was skipped. following the stats from the dashboard you provided, none are discarded. Instead, it's showing that they got all executed.

Testing again the app with the default 10 second timespan:

Only one job was skipped out of 6.

my question is about accuracy of the library, will every scheduled job will get successfully executed?

i hope you could test the lib performance as accuracy is so much important. also, to follow the job, as they was showing they're executed in the dashboard, but they really wasn't executed

Thank you again for the quick help you have provided. 🥇

@sonus21
Copy link
Owner

sonus21 commented Jul 14, 2020

it's obvious that many messages was skipped. following the stats from the dashboard you provided, none are discarded. Instead, it's showing that they got all executed.

You're running tasks using Async task executor, so the execution results might come later, since all are getting queued.

my question is about accuracy of the library, will every scheduled job will get successfully executed?

i hope you could test the lib performance as accuracy is so much important. also, to follow the job, as they was showing they're executed in the dashboard, but they really wasn't executed

It's guaranteed that each job would be executed successfully, you can verify the same by removing Async annotation from listener methods, though you don't need Async annotation since one dedicated thread is used for the listener's method execution.

To verify this you can add an atomic counter and assign an id to each job and write the job logs to a file. From logs we can see which jobs are skipped.

@chlegou
Copy link
Author

chlegou commented Jul 14, 2020

ah i see, i have added the @Async annotation before desperate to make it work. i forgot about removing it.

i will test again without it, and let you know.

Thanks again for pointing it to me.

@chlegou
Copy link
Author

chlegou commented Jul 14, 2020

i have removed the @Async annotation and tested it out, also, augmented the pool executor size, but still not working as expected. i have made a push to the project, please review the configuration and help me to get the best config. feel free to create a pull request with minimal configuration.
link: https://github.com/chlegou/jhipster-rqueue

i have simplified the logs, and even if i schedule 2 jobs (with same message), usually only one get executed.

@sonus21
Copy link
Owner

sonus21 commented Jul 14, 2020

I tried for 69 messages and all of them had successful execution.

Here's the patch to verify https://pastebin.com/UZ2DzAQg

Screenshot 2020-07-14 at 8 06 16 PM

Screenshot 2020-07-14 at 8 06 20 PM

Screenshot 2020-07-14 at 8 06 11 PM

@chlegou
Copy link
Author

chlegou commented Jul 14, 2020

ok, have you get all echo messages in console? because sometimes it wasn't written there.

could you please provide the source code with the changes you have made?

@sonus21
Copy link
Owner

sonus21 commented Jul 14, 2020

I didn't go through all the logs, but I have added code to verify if it's working or not.

Please find the updated code at https://github.com/sonus21/jhipster-rqueue/tree/demo

@chlegou
Copy link
Author

chlegou commented Jul 16, 2020

hi, as i'm working on the integration, the alpha tests are promising so far. the RQueue is working as expected so far. :)
i will keep you updated after running the beta tests.

i have a question: (i know that this thread is for bug tracking, i'm ok to move my question to stackoverflow if wanted)

  • is there a way to see if a message object with id: 124 (having the property id) is queued already or not? ideally, a queued message should have a unique tag that helps filtering queued messages for uniqueness and return a message status (... i'm inspired by Android Work Manager ). ...If this isn't implemented already in RQueue, i'm thinking of supporting it in my internal source code. (by checking the status of objects in @RqueueListener) and reschedule it from the listener if needed to wait for more before executing it. what do you think?

@sonus21
Copy link
Owner

sonus21 commented Jul 16, 2020

is there a way to see if a message object with id: 124 (having the property id) is queued already or not?

No, currently there's no way to identify this. As you said you can implement this easily by making a lookup in the listener.
The reason this is not implemented is some of the users might be interesting in using id fields to uniquely identify the message while others user might be considering one or more fields to uniquely identify the message. Also we need to consider whether the new message should be run at the oldest scheduled time or at the new time, it makes more sense to run this as per the latest time, it's more of like ignore previous message, we need to also consider parallel consumption of the same message ids.

and reschedule it from the listener if needed to wait for more before executing it

I won't recommend you to schedule messages from listener as it can cause duplicate message queueing problem, now you need to work on idempotency. I would suggest you use pre execution message processor. Using pre execution processor, you skip the execution if you find there's a new message for the given id, it would be as simple as tracking the enqueue time corresponding to a message.

@chlegou
Copy link
Author

chlegou commented Jul 16, 2020

Also we need to consider whether the new message should be run at the oldest scheduled time or at the new time, it makes more sense to run this as per the latest time,

Well, scheduling with a unique tag, would discard old messages having same tag. so we're always talking about a single message. In Android workmanager, there's 2 concepts: unique tags and normal tags. messages could have many identification tags, but unique messages are having a unique tag and unique message in the queue.

I would suggest you use pre execution message processor.

could you provide an example of it? i didn't understand how to implement it.

i might go with the lookup in the listener, as it seems to be the most easy and logical as it's related to the app logic not to the rqueue library. and from the listener i chose what code i need to execute.

Thanks for your time and help. they're really appreciated. :)

@sonus21
Copy link
Owner

sonus21 commented Jul 17, 2020

Well, scheduling with a unique tag, would discard old messages having same tag. so we're always talking about a single message. In Android workmanager, there's 2 concepts: unique tags and normal tags. messages could have many identification tags, but unique messages are having a unique tag and unique message in the queue.

This looks to be promising, what about the execution time? For example Message M1 having tag t was scheduled at 5:20AM to run in 10 minutest that's 5:30AM, at time 5:25AM we need to update message M1 with new data but it will have same tag t, as per the application logic the code would just enqueue the message to run in 10 minutes, so now new execution time would be 5:35AM, which should be considered as final time. Should we execute the new Message at 5:30AM or 5:35AM?

could you provide an example of it? i didn't understand how to implement it.

interface MessageRepository {
    Long getLatestEnqueueAt(String messageId);
}
 
class UniqueMessageProcessor implements MessageProcessor{
  @Autowired MessageRepository messageRepository;
  @Override 
  boolean process(Object message, RqueueMessage rqueueMessage){
     if(message instanceof SimpleMessage){
       // here you can get id using tags
      String messageId = ((SimpleMessage)message).getId();
      Long latestEnqueueTime= messageRepository.getLatestEnqueueAt(messageId) ;
      if( latestEnqueueTime != null && latestEnqueueTime > rqueueMessage.getQueuedTime() ) {
          return false; 
      }
    }
   return true;
  }
}

class RqueueConfiguration{
 
 @Bean
  public MessageProcessor preExecutorMessageProcessor(){
     return new UniqueMessageProcessor()
  }

  @Bean
  public SimpleRqueueListenerContainerFactory simpleRqueueListenerContainerFactory(){
      SimpleRqueueListenerContainerFactory factory = new SimpleRqueueListenerContainerFactory(); 
       MessageProcessor preExecutorMessageProcessor = preExecutorMessageProcessor();
     factory.setPreExecutionMessageProcessor(preExecutorMessageProcessor);
    return factory;
  } 
}

I'll add this in the wiki in a new section "Message deduplication section".

@chlegou
Copy link
Author

chlegou commented Jul 17, 2020

Well in our case, i believe that the most logical use case is Cancel the existing chain and REPLACE it with the new one.
This means, if we post a message M1 with unique tag M_USER_BOB_TAG at 05:20 and needs to run in 05:30 (after 10 minutes). and in 05:25 we decided to queue a new message M2 with the same unique tag M_USER_BOB_TAG. since there's a message queued already, we remove it instantly (moving it to trash-queue, which is internal preconfigured queue, in 05:25 ) and schedule the message M2 which should run in 05:35.

With this being said, let's say we have a mechanism that allow as to fetch a message details with a specific unique tag: M_USER_BOB_TAG:

EXEC_TIME: 05:20 :: rqueueMessageSender.enqueueUniqueIn( uniqueTag, queue, messageObject, time, timeUnit); // enqueue unique  message "M1" with unique tag. no old messages having this uniqueTag, so we just enqueue it.
EXEC_TIME: 05:23 :: messageRepository.getUniqueMessage(uniqueTag); // should return "M1" message with his details and execution time of 05:30.
EXEC_TIME: 05:25 :: rqueueMessageSender.enqueueUniqueIn( uniqueTag, queue, messageObject, time, timeUnit); // enqueue unique  message with unique tag. having old message having this uniqueTag, we cancel/ move it to "trash-queue" instantly, and enqueue this new message "M2".
EXEC_TIME: 05:28 :: messageRepository.getUniqueMessage(uniqueTag); // should return "M2" message with his details and execution time of 05:35.
EXEC_TIME: 05:35 :: // the job is executed
EXEC_TIME: 05:40 :: messageRepository.getUniqueMessage(uniqueTag); // return null, since no job are enqueued with this unique tag.

Following the Android Work Manager Unique Work policies, it support 3 conflict resolution policies:

  • Cancel the existing chain and REPLACE it with the new one.
  • KEEP the existing sequence and ignore your new request.
  • APPEND your new sequence to the existing one, running the new sequence's first task after the existing sequence's last task finishes. You cannot use APPEND with PeriodicWorkRequests.

Since RQueue is a server implementation which should handle/have access to data directly, i believe that the most logical use case is by replacing old messages by the new one. since enqueueing same job messages comes after fresh data check which is not the same case with Android Work Manager. (... let's see if other users might address different use cases in the future that requires old queued unique jobs mandatory execution...)

As i also recommend adding the feature of fetching queued messages info by these 2 methods: messageRepository.getUniqueMessage(uniqueTag) and messageRepository.getMessage(tag) they're really helpful. :)

for the preExecutorMessageProcessor, i will test it later. thanks for providing this implementation :)

@sonus21
Copy link
Owner

sonus21 commented Jul 17, 2020

I have created another ticket for this, let's track this over there. I'm happy to accept a PR.

@sonus21 sonus21 closed this as completed Jul 17, 2020
Repository owner locked as resolved and limited conversation to collaborators Jul 17, 2020
@sonus21 sonus21 added the bug Something isn't working label Sep 16, 2020
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants