Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RqueueMessageManager.deleteMessage(queueName, Id) not deleting the message from the Queue #162

Closed
pcastroadc opened this issue Aug 5, 2022 · 5 comments
Assignees
Labels
not-a-bug This is not a bug

Comments

@pcastroadc
Copy link

pcastroadc commented Aug 5, 2022

What's not working?

RqueueMessageManager.deleteMessage(queueName, Id) not deleting the message from the Queue, I basically call it but if I call RqueueMessageManager.getMessage() is still there afterwards

While RqueueMessageManager.deleteAllMessages(queueName) does work

What're application dependencies ?

  • Rqueue Version: 2.10.2-RELEASE
  • Spring Boot Version: 2.6.8
  • Spring Data Redis Version: 2.6.4

Sample code:

@Slf4j
public abstract class AbstractTaskScheduler {
    @Value("${auction-segment.online.rqueue.enabled:true}")
    private boolean isRqueueEnabled;
    @Value("${auction-segment.online.rqueue.slack-offset:100ms}")
    private Duration slackOffset;

    private final RqueueMessageEnqueuer rqueueMessageEnqueuer;
    private final RqueueMessageManager rqueueMessageManager;
    private final String queueName;

    protected final TaskScheduler scheduler;
    protected Map<Long, ScheduledFuture<?>> jobsMap = new HashMap<>();

    public AbstractTaskScheduler(
        TaskScheduler scheduler,
        RqueueMessageEnqueuer rqueueMessageEnqueuer,
        RqueueMessageManager rqueueMessageManager,
        String queueName
    ) {
        this.scheduler = scheduler;
        this.rqueueMessageEnqueuer = rqueueMessageEnqueuer;
        this.rqueueMessageManager = rqueueMessageManager;
        this.queueName = queueName;
    }

    @SuppressWarnings("PMD.DoNotUseThreads")
    protected abstract Runnable getTask(Long id);

    public List<Long> getTaskIds() {
        if (isRqueueEnabled) {
            List<Object> taskIds = rqueueMessageManager.getAllMessages(queueName);
            return !CollectionUtils.isEmpty(taskIds)
                ? taskIds.stream().map(t -> Long.valueOf(t.toString()))
                .collect(Collectors.toList())
                : Collections.emptyList();
        } else {
            return jobsMap != null
                ? new ArrayList<>(jobsMap.keySet())
                : Collections.emptyList();
        }
    }

    public RqueueMessage getTaskById(Long id) {
        if (isRqueueEnabled) {
            return rqueueMessageManager.getRqueueMessage(queueName, String.valueOf(id));
        }
        return null;
    }

    public boolean taskExists(Long id) {
        return getTaskById(id) != null;
    }

    @Retryable(
        maxAttempts = 3,
        backoff = @Backoff(delay = 30),
        value = {Exception.class}
    )
    protected void addTaskToScheduler(long id, Instant startDateTime) {
        if (isRqueueEnabled) {
            log.info("adding message to {} queue - message id {} - to be started at {}", queueName, id, startDateTime);
            boolean success = rqueueMessageEnqueuer.enqueueAt(
                queueName,
                String.valueOf(id),
                id,
                startDateTime.minusMillis(slackOffset.toMillis())
            );

            if (!success) {
               throw new IllegalStateException(String.format("Failed to enqueue message %s for listing %s", queueName, id));
            }
        } else {
            ScheduledFuture<?> scheduledTask = scheduler.schedule(getTask(id), startDateTime);
            jobsMap.put(id, scheduledTask);
        }
    }

    public void removeTaskFromScheduler(long id) {
        if (isRqueueEnabled) {
            log.info("removing message from {} queue - message id {}", queueName, id);
            rqueueMessageManager.deleteMessage(queueName, String.valueOf(id));
        } else {
            ScheduledFuture<?> scheduledTask = jobsMap.get(id);
            if(scheduledTask != null) {
                scheduledTask.cancel(true);
                jobsMap.put(id, null);
            }
        }
    }

    @EventListener({ ContextRefreshedEvent.class })
    protected abstract void contextRefreshedEvent();
}

I am not sure if this is related to this part of your code:

  @Override
  public boolean deleteMessage(String queueName, String messageId, Duration duration) {
    String lockValue = UUID.randomUUID().toString();
    try {
      if (lockManager.acquireLock(messageId, lockValue, Duration.ofSeconds(1))) {
        String id = RqueueMessageUtils.getMessageMetaId(queueName, messageId);
        MessageMetadata messageMetadata = rqueueMessageMetadataDao.get(id);
        if (messageMetadata == null) {
          messageMetadata = new MessageMetadata(id, MessageStatus.DELETED);
        }
        messageMetadata.setDeleted(true);
        messageMetadata.setDeletedOn(System.currentTimeMillis());
        save(messageMetadata, duration);
        return true;
      }
    } finally {
      lockManager.releaseLock(messageId, lockValue);
    }
    return false;
  }

you are not releasing the lock here when it enters in the IF

@sonus21
Copy link
Owner

sonus21 commented Aug 5, 2022

you are not releasing the lock here when it enters in the IF

Finally block is always called.

Can you read MessageMetadata it should have the flag deleted is true, this is not a bug? This is just an inconsistency about the usage of MesssageMetadata.

@pcastroadc
Copy link
Author

pcastroadc commented Aug 5, 2022

how is it not a bug if calling RqueueMessageManager.getMessage(queueName, id) after deleting it still returns it?

and also if you call deleteAllMessages it does delete it and then RqueueMessageManager.getMessage() doesnt return it anymore. That's not consistent behavior.

And what do you mean with calling read MessageMetadata exactly? using another one of your classes to do it? can you provide a quick exmaple?

@sonus21
Copy link
Owner

sonus21 commented Aug 5, 2022 via email

@pcastroadc
Copy link
Author

One more question, Will messages mark as deleted still be picked up by the listeners?

@sonus21
Copy link
Owner

sonus21 commented Aug 5, 2022 via email

@sonus21 sonus21 added the not-a-bug This is not a bug label Aug 22, 2022
@sonus21 sonus21 closed this as completed May 27, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
not-a-bug This is not a bug
Projects
None yet
Development

No branches or pull requests

2 participants