Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Periodic task add #51

Closed
yuluo-zy opened this issue Nov 27, 2020 · 16 comments
Closed

Periodic task add #51

yuluo-zy opened this issue Nov 27, 2020 · 16 comments
Labels
enhancement New feature or request

Comments

@yuluo-zy
Copy link

I think,
In some cases, the support of periodic tasks is required, and periodic tasks allow cancellation.
Hope to achieve this function.

Many thanks!!

@sonus21
Copy link
Owner

sonus21 commented Nov 27, 2020

Hi Yuluo
Thanks for the request, In the Spring bases apps tasks can be scheduled using
Scheduled. Are you trying to do something else apart from running some job at some time or your use-case can't be handled using Scheduled? It would be nice if you can provide some example usecase.

@yuluo-zy
Copy link
Author

hi, sonus
Thank you for your question. In my use, I have some periodic emails,i need to dynamic increase this periodic emails to redis,Now I want to add a periodic task that needs to be decomposed into subtasks and added to the requeue through a time array.If I want to cancel all subtasks of this periodic task, I have to manually delete them one by one.Can you provide an api to meet the addition and cancellation of periodic tasks.
Many thanks!!

@sonus21
Copy link
Owner

sonus21 commented Nov 27, 2020

Thanks for the explanation, sorry! I could not follow your example, what do you mean by

i need to dynamic increase this periodic emails to redis

Let's assume you're scheduling tasks at 30 seconds interval.

IIUC, you're scheduling N tasks at t, t+30, t+60, t+90 ... for some reason if you want to cancel this task then you manually delete all enqueued tasks related to this task.

Do you always delete all related tasks or only some of them?

Please provide some example data with timestamp what're you doing and what're trying to achieve using Rqueue. Maybe there's an easy way to achieve the same.

@yuluo-zy
Copy link
Author

ok, i use rqueue 2.0.4 ,so I have no way to delete the tasks in redis. i think this way to realize cyclic tasks.
this is my webapp

public RouterFunction<ServerResponse> emailRouterquest(EmailHandleFuction handler){
        RouterFunctions.Builder route = route();
        route.POST("/email/messageByPeriodic", accept(MediaType.APPLICATION_JSON).and(contentType(MediaType.APPLICATION_JSON)), handler::asynPeriodic);
        return route.build();  
      }

I use this api to accept periodic recurring tasks, it will accept a emailBean

public class EmailBean {
    private String sendto;
    // Used to identify tasks
    private String id;
    // Task start time
    private String start;
    // Period task end time
    private String end;
    // Time period
    private String periodic;

}

In my processing function, I will add many tasks based on the start time, end time, and time interval

public Mono<ServerResponse> asynPeriodic(ServerRequest serverRequest) {
        return serverRequest.bodyToMono(EmailBean.class).flatMap((Function<EmailBean, Mono<ServerResponse>>) emailBean -> {
            if (emailBean.getEnd() != null && emailBean.getStart() != null && emailBean.getPeriodic() != null
                    && !emailBean.getEnd().equals("") && !emailBean.getStart().equals("") && !emailBean.getPeriodic().equals("")) {
    // Generate time list
                ArrayList<Date> timeList = dateTime.getPeriodicTime(
                        emailBean.getStart(),
                        emailBean.getEnd(),
                        emailBean.getPeriodic()
                );

                for (Date time : timeList) {
                    log.warn("Add periodic tasks",time);
                    rqueueMessageSender.enqueueAt("delay-queue", emailBean, time);
                }
            }
            return ServerResponse.ok()
                    .bodyValue(CommonResultImpl.success("Add periodic task"));
        });

    }

For example, a periodic task is Send email every 12 hours, from November 2020 to December 2020. so, it will add 124 task to rqueue. These tasks will have the same task id to indicate that they are the same cycle task.

@RqueueListener(
            value ="${rqueue.delay.queue.name}",
            numRetries = "5",
            visibilityTimeout = "5*60*1000",
            deadLetterQueue = "morgue-queue",
            deadLetterQueueListenerEnabled = "true",
            active = "true",
            concurrency = "2-3"

    )
    public void onMessage(EmailBean emailBean){
        /**
         * When I want to cancel sending mail
         * I will store the task id in the data table
         * If id exists, no mail will be sent
         */
        Mono<cancelList> info=  cancelRepository.findTopByMessageId(emailBean.getId());
        cancelList get = info.block();
        if(get != null){

        }else {
            log.info("send email!!"+emailBean.toString());
            emailUtil.send(emailBean);
        }

    }


When these task is executing, I sometimes need to cancel it. The task id of my assigned task will stored on the data table,When each subtask is executed, it will first query the database.If this id exists, the subtask will not be executed.

in this way, I implemented a cycle task, but This method is too bad.so Is it possible to add cycle task function.Or can you tell me how to solve similar problems.If you can tell me how to implement this feature, I can also help requeue implement it.

My English is not very good, please forgive me if I offend. This project is really great!!!

@sonus21
Copy link
Owner

sonus21 commented Nov 28, 2020

in this way, I implemented a cycle task, but This method is too bad.

Why do you think so?

This is one of the easiest way to solve this problem. Other way could be you can use MessageProcessor (this can be set in SimpleRqueueListenerContainerFactory via setPreExecutionMessageProcessor) that should return false for tasks those can be ignored. Again here you can check id of the EmailBean to ignore such tasks.

@yuluo-zy
Copy link
Author

Because it will generate a lot of subtasks,and When I need to cancel, I need to rely on a third-party database.

My opinion is,rqueueMessageSender.enqueueBycycle("delay-queue", emailBean, timeInterval); ,When the task execution is completed, the task will re-enter zset,To achieve cyclic tasks.

If so, how can I achieve.

Thank you very much!

@sonus21
Copy link
Owner

sonus21 commented Nov 28, 2020

Interesting, this can be achieved by setting isPeriodic and interval in RqueueMessage, but this leaves us with some lingering questions related to retry and execution time.

For example

Task t was enqueued at 7:30:00PM with period 30 seconds

  • If execution of task t at 7:30:30PM fails then it will be retried later at 7:30:30+PM => this could potentially miss next task if it's scheduled at 7:31:00PM.
  • Success full execution of task t took 45 seconds than again it will miss the next task that should be scheduled at 7:31:00PM
  • If we enqueue message as soon the task is dequeued than this will create another problem of duplicate tasks due to retry e.g task was dequeued at 7:30:30PM no matter what happens we'll enqueue a new task that should run at 7:31:00PM

You can use similar approach in your listener code, like as soon as your listener is invoked you enqueue a new task at next interval. But what will you do when this message is retried, we can't identify exactly if a message is being retried, one way to identify this is whenever your listener is invoked you can check RqueueMessage#failureCount but this is not 100% reliable.

@sonus21 sonus21 closed this as completed Nov 28, 2020
@sonus21 sonus21 reopened this Nov 28, 2020
@sonus21
Copy link
Owner

sonus21 commented Dec 1, 2020

@yuluo-zy did this make sense or you would like to have such features in Rqueue?

@yuluo-zy
Copy link
Author

yuluo-zy commented Dec 1, 2020

I am currently working on the source code,I want to achieve it,I initially considered whether to mark it as a cyclic task by adding attributes to RqueueMessage.

@sonus21
Copy link
Owner

sonus21 commented Dec 1, 2020

Cool, let me know if you would like to see this feature in Rqueue, we would need to find some ways accurately find failure execution messages.

@yuluo-zy
Copy link
Author

yuluo-zy commented Dec 1, 2020

I haven't finished reading the part of the code about the exception retry,but i think,If we enqueue message as soon the task is dequeued,By judging whether id is in zset.We can add without repeating.The retry after the task fails should not be related to the re-adding of the task.

@sonus21
Copy link
Owner

sonus21 commented Dec 1, 2020

The problem here is we don;'t store id in zset instead we store entire RqueueMessage in zset, this makes it a difficult soln.

RqueueExecutor is the place to find relavant retry code.

sonus21 added a commit that referenced this issue Dec 3, 2020
@sonus21
Copy link
Owner

sonus21 commented Dec 4, 2020

Hi
I have made changes to support periodic job via #50

Would you mind giving a shot, I can provide you a release candidate or snapshot.

@yuluo-zy
Copy link
Author

yuluo-zy commented Dec 5, 2020

Thank you very much, I didn't expect you to complete it so well, again, this project is great!I can download the source code directly and use it!

@sonus21
Copy link
Owner

sonus21 commented Dec 5, 2020

Please try and let me know, if something is breaking though I have tested using integration test.

@sonus21
Copy link
Owner

sonus21 commented Dec 6, 2020

Thanks for the request, this feature has been added in 2.2.0 and it should be available in maven central. Open new bug for any issue etc.

@sonus21 sonus21 closed this as completed Dec 6, 2020
@sonus21 sonus21 added the enhancement New feature or request label Dec 6, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants