Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Jobs after creation are not adding to the database #23

Closed
kostysh opened this issue May 2, 2024 · 19 comments
Closed

Jobs after creation are not adding to the database #23

kostysh opened this issue May 2, 2024 · 19 comments
Assignees
Labels
enhancement New feature or request question Further information is requested

Comments

@kostysh
Copy link
Contributor

kostysh commented May 2, 2024

Hi,
please clarify the job workflow.
I am adding jobs using pulse.create('<job_name>', '{job_data}').
When a job is added without errors, it does not appear in the job collection (I have checked via MongoDB console) and _id parameter of a job is undefined.
If a server goes down at this moment all the added jobs will be missed after the server restart.
How do you manage jobs to make it possible to finish all unfinished jobs after the server restart?

By the way, here is my configuration with which I am starting a queue:

const queue = new Pulse({
    name: 'MyQueue',
    db: {
      address: databaseUrl, // a valid connection URL
      collection: 'jobs',
    },
    defaultConcurrency: 3,
    maxConcurrency: 5,
    processEvery: '10 seconds',
  });
@code-xhyun
Copy link
Contributor

Thank you for the issue!
I will solve these and give you an answer as soon as possible.

@kostysh
Copy link
Contributor Author

kostysh commented May 2, 2024

Maybe a job must be saved explicitly right after creation using a save() method of a job?
This is not quite obvious. Could you please clarify the desired behavior?

@kostysh
Copy link
Contributor Author

kostysh commented May 2, 2024

Just an idea. Maybe setting up the Discussions feature for the repository as a place for questions is worth it?

@code-xhyun code-xhyun self-assigned this May 3, 2024
@code-xhyun
Copy link
Contributor

Just an idea. Maybe setting up the Discussions feature for the repository as a place for questions is worth it?

I'm not using the Discuss feature, but I'm going to open Discord as soon as possible. @kostysh

@code-xhyun code-xhyun added the question Further information is requested label May 3, 2024
@code-xhyun
Copy link
Contributor

code-xhyun commented May 3, 2024

Maybe a job must be saved explicitly right after creation using a save() method of a job? This is not quite obvious. Could you please clarify the desired behavior?

The create() method does NOT save the job in the database.
When creating a job using create(), you must explicitly declare save() if you want to save it

https://docs-pulse.pulsecron.com/docs/creating-jobs/create#example-usage

@kostysh
Copy link
Contributor Author

kostysh commented May 3, 2024

When creating a job using create(), you must explicitly declare save() if you want to save it

Thank you. Currently, jobs are being added to the database.
But the workflow of a job is not yet clear.

Which is a use case when you do not need to store jobs in a database?
In my opinion, all new jobs should initially (and automatically) be saved to the database, and then, the queue manager should take jobs from there one by one (or in batches, according to configured rules) for processing. This way, if the queue instance goes offline immediately after a job is created (for example, the server goes down), all saved jobs can be restored and processed when the queue comes back up.

I am not familiar with the code yet but I was not able to find a place where saved jobs are restored from the database to the queue at the start.

As I am right, to be processed at the start time jobs must be populated in the _definitions property of the main class. But this property starts as an empty array.

@code-xhyun
Copy link
Contributor

code-xhyun commented May 3, 2024

When creating a job using create(), you must explicitly declare save() if you want to save it

Thank you. Currently, jobs are being added to the database. But the workflow of a job is not yet clear.

Which is a use case when you do not need to store jobs in a database? In my opinion, all new jobs should initially (and automatically) be saved to the database, and then, the queue manager should take jobs from there one by one (or in batches, according to configured rules) for processing. This way, if the queue instance goes offline immediately after a job is created (for example, the server goes down), all saved jobs can be restored and processed when the queue comes back up.

I am not familiar with the code yet but I was not able to find a place where saved jobs are restored from the database to the queue at the start.

As I am right, to be processed at the start time jobs must be populated in the _definitions property of the main class. But this property starts as an empty array.

If you are looking for a method that saves immediately why not refer to this?
https://docs-pulse.pulsecron.com/docs/creating-jobs

example

    await pulse.start();
    await pulse.every('1 minutes', 'delete old users', { description: 'test' }); 
    // or   await pulse.schedule('in 1 minutes', 'delete old users', { to: 'admin@example.com' });

@kostysh
Copy link
Contributor Author

kostysh commented May 3, 2024

In my case, I have to process a series of tasks, each of which can take approximately 30 seconds to 2 minutes. These tasks are not recurrent and should not be scheduled. I just want to be sure, that if this task has been enqueued it will be guaranteed to be processed and the processing result will be logged.
BullMQ satisfied this use case but I do not want to have one more database in my project stack.

@kostysh
Copy link
Contributor Author

kostysh commented May 3, 2024

Also, having all the jobs in the memory is not a good idea from a scalability perspective.
I do not want to impose a different approach to task management than the one you intended. Just want to understand the idea under the Pulse queue.

@code-xhyun
Copy link
Contributor

Also, having all the jobs in the memory is not a good idea from a scalability perspective.
I do not want to impose a different approach to task management than the one you intended. Just want to understand the idea under the Pulse queue.

Such a design approach helps to manage the application’s logic and data flow more clearly, empowering users to take a more active role in managing the state of data, thereby leading to more stable and predictable applications

  • Explicit Change Management: By requiring users to explicitly call the save() method, it ensures clearer control over what changes are committed to the database. This helps in tracking modifications and preventing unintended data changes.
  • Flexibility and Scalability: Developers have the freedom to decide when and how data is saved, allowing them to tailor data management strategies to the specific needs of the application.

In my case, I have to process a series of tasks, each of which can take approximately 30 seconds to 2 minutes. These tasks are not recurrent and should not be scheduled. I just want to be sure, that if this task has been enqueued it will be guaranteed to be processed and the processing result will be logged. BullMQ satisfied this use case but I do not want to have one more database in my project stack.

I don't understand all of your situation, but can the following example be your solution?

    await pulse.start();
    const job = pulse.create('delete old users', { to: 'pulsecron@gmail.com' });
    await job.save();
    // write your own logic...
    job.repeatEvery('10 minutes');
    job.unique({ 'data.type': 'email', 'data.userId': '12345' });
    await job.save();

@code-xhyun
Copy link
Contributor

Also, having all the jobs in the memory is not a good idea from a scalability perspective.
I do not want to impose a different approach to task management than the one you intended. Just want to understand the idea under the Pulse queue.

You can also specify the 'concurrency' option as a small number so that only a certain number of jobs can be into memory.

https://docs-pulse.pulsecron.com/docs/defining-job-processors#parameters

@kostysh
Copy link
Contributor Author

kostysh commented May 3, 2024

Ok, I will try to be clear with the example:

interface MyDataType {
//...
}

pulse.define<MyDataType>('processData', /***/); // job processor

// This helper can be called by API method at any time
function createJob <T extends MyDataType>(pulse: Pulse, data: T) {
  const job = pulse.create<T>('processData', data);
  await job.save(); // ok, as it is
}

await pulse.start();

// ^--- unprocessed jobs must be restored and processed here

I expect that after a server restart, any unprocessed jobs created prior to the restart will be restored from a database and processed. This is a usual practice for queues with persisted jobs.

@code-xhyun
Copy link
Contributor

```ts

I expect that after a server restart, any unprocessed jobs created prior to the restart will be restored from a database and processed. This is a usual practice for queues with persisted jobs.

In the current case, if the server goes down in the middle and then restarts, nextRunAt will be modified to the next schedule in line with the job schedule rule.

But Do you want the job to run again as soon as the server restarts?

@kostysh
Copy link
Contributor Author

kostysh commented May 3, 2024

But Do you want the job to run again as soon as the server restarts?

If this job is not been processed - yes.

@code-xhyun
Copy link
Contributor

But Do you want the job to run again as soon as the server restarts?

If this job is not been processed - yes.

I'll implement this as soon as possible and get it.
As in the example of bullMq, I think this feature is essential, but it is missing.
Thank you for your insight @kostysh

@kostysh
Copy link
Contributor Author

kostysh commented May 3, 2024

I'll implement this as soon as possible and get it.
As in the example of bullMq, I think this feature is essential, but it is missing.

Thank you! I guess, we can close this issue.
In future, we can move such long discussions to Discord

@kostysh kostysh closed this as completed May 3, 2024
@code-xhyun
Copy link
Contributor

#25

@code-xhyun code-xhyun reopened this May 3, 2024
@code-xhyun
Copy link
Contributor

code-xhyun commented May 3, 2024

#25

I brought you what you wanted! @kostysh

I also added discussions in response to your comments. and I will add discord later.

@code-xhyun code-xhyun added bug Something isn't working enhancement New feature or request and removed bug Something isn't working labels May 3, 2024
@kostysh
Copy link
Contributor Author

kostysh commented May 4, 2024

#25

I brought you what you wanted! @kostysh

I also added discussions in response to your comments. and I will add discord later.

Nice! I see, resumeOnRestart.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request question Further information is requested
Projects
None yet
Development

No branches or pull requests

2 participants