Skip to content

Commit

Permalink
* Small reformatting
Browse files Browse the repository at this point in the history
* Use serializable interface instead of Object in RqueueJob
  • Loading branch information
sonus21 committed Dec 27, 2020
1 parent 05e25d0 commit 7c76f0b
Show file tree
Hide file tree
Showing 18 changed files with 24 additions and 26 deletions.
Expand Up @@ -19,6 +19,7 @@
import com.github.sonus21.rqueue.listener.QueueDetail;
import com.github.sonus21.rqueue.models.db.MessageMetadata;
import com.github.sonus21.rqueue.models.enums.JobStatus;
import java.io.Serializable;

public interface Job {

Expand All @@ -43,7 +44,7 @@ public interface Job {
* @param message a serializable message, it could be a simple string like PING or some context
* data that says about the current execution state.
*/
void checkIn(Object message);
void checkIn(Serializable message);

/**
* A message that was enqueued
Expand Down
Expand Up @@ -33,6 +33,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import lombok.AllArgsConstructor;
import lombok.ToString;
import org.slf4j.Logger;
import org.springframework.beans.factory.DisposableBean;
Expand Down Expand Up @@ -282,19 +283,13 @@ public void onApplicationEvent(RqueueBootstrapEvent event) {
}

@ToString
@AllArgsConstructor
private class MessageMoverTask implements Runnable {
private final String name;
private final String queueName;
private final String zsetName;
private final boolean processingQueue;

MessageMoverTask(String name, String queueName, String zsetName, boolean processingQueue) {
this.name = name;
this.queueName = queueName;
this.zsetName = zsetName;
this.processingQueue = processingQueue;
}

@Override
public void run() {
getLogger().debug("Running {}", this);
Expand Down
Expand Up @@ -30,7 +30,7 @@
import com.github.sonus21.rqueue.dao.RqueueStringDao;
import com.github.sonus21.rqueue.listener.QueueDetail;
import com.github.sonus21.rqueue.models.db.MessageMetadata;
import com.github.sonus21.rqueue.models.db.MessageStatus;
import com.github.sonus21.rqueue.models.enums.MessageStatus;
import com.github.sonus21.rqueue.utils.PriorityUtils;
import com.github.sonus21.rqueue.web.service.RqueueMessageMetadataService;
import java.time.Duration;
Expand Down
Expand Up @@ -23,11 +23,12 @@
import com.github.sonus21.rqueue.listener.QueueDetail;
import com.github.sonus21.rqueue.models.db.Execution;
import com.github.sonus21.rqueue.models.db.MessageMetadata;
import com.github.sonus21.rqueue.models.db.MessageStatus;
import com.github.sonus21.rqueue.models.enums.MessageStatus;
import com.github.sonus21.rqueue.models.db.RqueueJob;
import com.github.sonus21.rqueue.models.enums.ExecutionStatus;
import com.github.sonus21.rqueue.models.enums.JobStatus;
import com.github.sonus21.rqueue.web.service.RqueueMessageMetadataService;
import java.io.Serializable;
import java.time.Duration;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.RedisSystemException;
Expand Down Expand Up @@ -93,7 +94,7 @@ public RqueueMessage getRqueueMessage() {
}

@Override
public void checkIn(Object message) {
public void checkIn(Serializable message) {
if (isPeriodicJob) {
throw new UnsupportedOperationException("CheckIn is not supported for periodic job");
}
Expand Down
Expand Up @@ -18,7 +18,7 @@

import com.github.sonus21.rqueue.core.RqueueMessage;
import com.github.sonus21.rqueue.core.support.MessageProcessor;
import com.github.sonus21.rqueue.models.db.MessageStatus;
import com.github.sonus21.rqueue.models.enums.MessageStatus;
import com.github.sonus21.rqueue.utils.PrefixLogger;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.event.Level;
Expand Down
Expand Up @@ -23,7 +23,7 @@
import com.github.sonus21.rqueue.core.impl.JobImpl;
import com.github.sonus21.rqueue.dao.RqueueSystemConfigDao;
import com.github.sonus21.rqueue.exception.UnknownSwitchCase;
import com.github.sonus21.rqueue.models.db.MessageStatus;
import com.github.sonus21.rqueue.models.enums.MessageStatus;
import com.github.sonus21.rqueue.models.db.QueueConfig;
import com.github.sonus21.rqueue.models.enums.ExecutionStatus;
import com.github.sonus21.rqueue.models.event.RqueueExecutionEvent;
Expand Down
Expand Up @@ -27,7 +27,7 @@
import com.github.sonus21.rqueue.metrics.RqueueMetricsCounter;
import com.github.sonus21.rqueue.models.db.Execution;
import com.github.sonus21.rqueue.models.db.MessageMetadata;
import com.github.sonus21.rqueue.models.db.MessageStatus;
import com.github.sonus21.rqueue.models.enums.MessageStatus;
import com.github.sonus21.rqueue.models.enums.ExecutionStatus;
import com.github.sonus21.rqueue.utils.Constants;
import com.github.sonus21.rqueue.web.service.RqueueMessageMetadataService;
Expand Down
Expand Up @@ -17,6 +17,7 @@
package com.github.sonus21.rqueue.models.db;

import com.github.sonus21.rqueue.models.SerializableBase;
import java.io.Serializable;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
Expand All @@ -31,6 +32,6 @@
public class CheckinMessage extends SerializableBase {

private static final long serialVersionUID = 4727068901984917510L;
private Object message;
private Serializable message;
private long at;
}
Expand Up @@ -19,6 +19,7 @@
import com.github.sonus21.rqueue.core.RqueueMessage;
import com.github.sonus21.rqueue.core.support.RqueueMessageUtils;
import com.github.sonus21.rqueue.models.SerializableBase;
import com.github.sonus21.rqueue.models.enums.MessageStatus;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
Expand Down
Expand Up @@ -20,6 +20,7 @@
import com.github.sonus21.rqueue.models.SerializableBase;
import com.github.sonus21.rqueue.models.enums.ExecutionStatus;
import com.github.sonus21.rqueue.models.enums.JobStatus;
import java.io.Serializable;
import java.util.LinkedList;
import java.util.List;
import lombok.AllArgsConstructor;
Expand Down Expand Up @@ -78,7 +79,7 @@ public RqueueJob(
}
}

public void checkIn(Object message) {
public void checkIn(Serializable message) {
synchronized (this) {
long checkInTime = System.currentTimeMillis();
this.checkins.add(new CheckinMessage(message, checkInTime));
Expand Down
Expand Up @@ -14,10 +14,8 @@
* limitations under the License.
*/

package com.github.sonus21.rqueue.models.db;
package com.github.sonus21.rqueue.models.enums;

import com.github.sonus21.rqueue.models.enums.JobStatus;
import com.github.sonus21.rqueue.models.enums.TaskStatus;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;
Expand Down
Expand Up @@ -20,7 +20,7 @@
import com.github.sonus21.rqueue.core.support.RqueueMessageUtils;
import com.github.sonus21.rqueue.dao.RqueueMessageMetadataDao;
import com.github.sonus21.rqueue.models.db.MessageMetadata;
import com.github.sonus21.rqueue.models.db.MessageStatus;
import com.github.sonus21.rqueue.models.enums.MessageStatus;
import com.github.sonus21.rqueue.web.service.RqueueMessageMetadataService;
import java.time.Duration;
import java.util.Collection;
Expand Down
Expand Up @@ -36,7 +36,7 @@
import com.github.sonus21.rqueue.listener.QueueDetail;
import com.github.sonus21.rqueue.models.db.Execution;
import com.github.sonus21.rqueue.models.db.MessageMetadata;
import com.github.sonus21.rqueue.models.db.MessageStatus;
import com.github.sonus21.rqueue.models.enums.MessageStatus;
import com.github.sonus21.rqueue.models.enums.ExecutionStatus;
import com.github.sonus21.rqueue.models.enums.JobStatus;
import com.github.sonus21.rqueue.utils.TestUtils;
Expand Down
Expand Up @@ -41,7 +41,7 @@
import com.github.sonus21.rqueue.dao.RqueueStringDao;
import com.github.sonus21.rqueue.dao.RqueueSystemConfigDao;
import com.github.sonus21.rqueue.models.db.MessageMetadata;
import com.github.sonus21.rqueue.models.db.MessageStatus;
import com.github.sonus21.rqueue.models.enums.MessageStatus;
import com.github.sonus21.rqueue.utils.TestUtils;
import com.github.sonus21.rqueue.utils.backoff.FixedTaskExecutionBackOff;
import com.github.sonus21.rqueue.utils.backoff.TaskExecutionBackOff;
Expand Down
Expand Up @@ -34,7 +34,7 @@
import com.github.sonus21.rqueue.dao.RqueueJobDao;
import com.github.sonus21.rqueue.dao.RqueueStringDao;
import com.github.sonus21.rqueue.models.db.MessageMetadata;
import com.github.sonus21.rqueue.models.db.MessageStatus;
import com.github.sonus21.rqueue.models.enums.MessageStatus;
import com.github.sonus21.rqueue.web.service.RqueueMessageMetadataService;
import io.lettuce.core.RedisCommandExecutionException;
import java.util.Map;
Expand Down
Expand Up @@ -31,7 +31,7 @@
import com.github.sonus21.rqueue.core.support.RqueueMessageUtils;
import com.github.sonus21.rqueue.dao.RqueueMessageMetadataDao;
import com.github.sonus21.rqueue.models.db.MessageMetadata;
import com.github.sonus21.rqueue.models.db.MessageStatus;
import com.github.sonus21.rqueue.models.enums.MessageStatus;
import com.github.sonus21.rqueue.web.service.impl.RqueueMessageMetadataServiceImpl;
import java.time.Duration;
import java.util.Arrays;
Expand Down
Expand Up @@ -34,7 +34,7 @@
import com.github.sonus21.rqueue.core.support.RqueueMessageUtils;
import com.github.sonus21.rqueue.models.db.DeadLetterQueue;
import com.github.sonus21.rqueue.models.db.MessageMetadata;
import com.github.sonus21.rqueue.models.db.MessageStatus;
import com.github.sonus21.rqueue.models.enums.MessageStatus;
import com.github.sonus21.rqueue.models.db.QueueConfig;
import com.github.sonus21.rqueue.models.enums.ActionType;
import com.github.sonus21.rqueue.models.enums.DataType;
Expand Down
Expand Up @@ -37,7 +37,7 @@
import com.github.sonus21.rqueue.listener.QueueDetail;
import com.github.sonus21.rqueue.models.aggregator.TasksStat;
import com.github.sonus21.rqueue.models.db.MessageMetadata;
import com.github.sonus21.rqueue.models.db.MessageStatus;
import com.github.sonus21.rqueue.models.enums.MessageStatus;
import com.github.sonus21.rqueue.models.db.QueueStatistics;
import com.github.sonus21.rqueue.models.db.QueueStatisticsTest;
import com.github.sonus21.rqueue.models.event.RqueueExecutionEvent;
Expand Down

0 comments on commit 7c76f0b

Please sign in to comment.