Skip to content

Commit

Permalink
Queue priority
Browse files Browse the repository at this point in the history
  • Loading branch information
sonus21 committed May 9, 2020
1 parent 059f65a commit f1fd0da
Show file tree
Hide file tree
Showing 57 changed files with 785 additions and 362 deletions.
9 changes: 3 additions & 6 deletions CHANGELOG.md
Expand Up @@ -4,11 +4,6 @@ All notable changes to Rqueue project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [2.2.0] - TBD
- Global queue priority
- Queue level priority
- Strict or weighted algorithm for message execution


## [2.0.0] - TBD
### Added
Expand All @@ -20,7 +15,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Single or multiple execution of polled messages
- Queue level concurrency
- BackOff for failed message, linear or exponential
-
- Group level queue priority
- Multi level queue priority
- Strict or weighted algorithm for message execution

### Breaking Changes
- Queue names are prefixed, that can lead to error. 1.x users set REDIS key `__rq::version` to `1`. It does try to find the version using key prefix, but if all queues are empty or no key exist in REDIS with prefix `rqueue-` then it will consider version 2.
Expand Down
Expand Up @@ -17,9 +17,13 @@
package com.github.sonus21.rqueue.test;

import com.github.sonus21.rqueue.annotation.RqueueListener;
import com.github.sonus21.rqueue.test.dto.ChatIndexing;
import com.github.sonus21.rqueue.test.dto.Email;
import com.github.sonus21.rqueue.test.dto.FeedGeneration;
import com.github.sonus21.rqueue.test.dto.Job;
import com.github.sonus21.rqueue.test.dto.Notification;
import com.github.sonus21.rqueue.test.dto.Otp;
import com.github.sonus21.rqueue.test.dto.Reservation;
import com.github.sonus21.rqueue.test.service.ConsumedMessageService;
import com.github.sonus21.rqueue.test.service.FailureManager;
import lombok.NonNull;
Expand Down Expand Up @@ -73,4 +77,55 @@ public void onMessage(Email email) throws Exception {
}
consumedMessageService.save(email);
}

@RqueueListener(
value = "${otp.queue}",
active = "${otp.queue.active}",
priority = "critical:5,high:3, low:2")
public void onMessage(Otp otp) throws Exception {
log.info("Otp: {}", otp);
if (failureManager.shouldFail(otp.getId())) {
throw new Exception("Failing otp task to be retried" + otp);
}
consumedMessageService.save(otp);
}

@RqueueListener(
value = "${chat.indexing.queue}",
active = "${chat.indexing.queue.active}",
priority = "10",
priorityGroup = "test")
public void onMessage(ChatIndexing chatIndexing) throws Exception {
log.info("ChatIndexing: {}", chatIndexing);
if (failureManager.shouldFail(chatIndexing.getId())) {
throw new Exception("Failing chat indexing task to be retried" + chatIndexing);
}
consumedMessageService.save(chatIndexing);
}

@RqueueListener(
value = "${feed.generation.queue}",
active = "${feed.generation.queue.active}",
priority = "50",
priorityGroup = "test")
public void onMessage(FeedGeneration feedGeneration) throws Exception {
log.info("FeedGeneration: {}", feedGeneration);
if (failureManager.shouldFail(feedGeneration.getId())) {
throw new Exception("Failing feedGeneration task to be retried" + feedGeneration);
}
consumedMessageService.save(feedGeneration);
}

@RqueueListener(
value = "${reservation.queue}",
active = "${reservation.queue.active}",
priority = "100",
priorityGroup = "test2")
public void onMessage(Reservation reservation) throws Exception {
log.info("Reservation: {}", reservation);
if (failureManager.shouldFail(reservation.getId())) {
throw new Exception("Failing reservation task to be retried" + reservation);
}
consumedMessageService.save(reservation);
}
}
@@ -0,0 +1,44 @@
/*
* Copyright 2020 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.github.sonus21.rqueue.test.dto;

import java.util.UUID;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import org.apache.commons.lang3.RandomUtils;

@AllArgsConstructor
@NoArgsConstructor
@Getter
@Setter
@ToString
@EqualsAndHashCode(callSuper = true)
public class ChatIndexing extends BaseQueueMessage {
private int userId;
private Long timestamp;

public static ChatIndexing newInstance() {
ChatIndexing chatIndexing =
new ChatIndexing(RandomUtils.nextInt(100000, 1000000), System.currentTimeMillis());
chatIndexing.setId(UUID.randomUUID().toString());
return chatIndexing;
}
}
@@ -0,0 +1,44 @@
/*
* Copyright 2020 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.github.sonus21.rqueue.test.dto;

import java.util.UUID;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import org.apache.commons.lang3.RandomUtils;

@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@ToString
@EqualsAndHashCode(callSuper = true)
public class FeedGeneration extends BaseQueueMessage {
private int userId;
private Long timestamp;

public static FeedGeneration newInstance() {
FeedGeneration feedGeneration =
new FeedGeneration(RandomUtils.nextInt(1000, 1000000), System.currentTimeMillis());
feedGeneration.setId(UUID.randomUUID().toString());
return feedGeneration;
}
}
@@ -0,0 +1,45 @@
/*
* Copyright 2020 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.github.sonus21.rqueue.test.dto;

import java.util.UUID;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;

@AllArgsConstructor
@NoArgsConstructor
@Getter
@Setter
@ToString
@EqualsAndHashCode(callSuper = true)
public class OrderConfirmation extends BaseQueueMessage {
private String orderId;
private String userId;
private long timestamp;

public static OrderConfirmation newInstance() {
OrderConfirmation orderConfirmation =
new OrderConfirmation(
UUID.randomUUID().toString(), UUID.randomUUID().toString(), System.currentTimeMillis());
orderConfirmation.setId(UUID.randomUUID().toString());
return orderConfirmation;
}
}
@@ -0,0 +1,47 @@
/*
* Copyright 2020 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.github.sonus21.rqueue.test.dto;

import java.util.Random;
import java.util.UUID;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import org.apache.commons.lang3.RandomStringUtils;

@AllArgsConstructor
@NoArgsConstructor
@Getter
@Setter
@ToString
@EqualsAndHashCode(callSuper = true)
public class Otp extends BaseQueueMessage {
private String phoneNumber;
private String otp;

public static Otp newInstance() {
Otp otp =
new Otp(
"+91" + RandomStringUtils.randomNumeric(10),
String.valueOf(new Random().nextInt(100000)));
otp.setId(UUID.randomUUID().toString());
return otp;
}
}
@@ -0,0 +1,53 @@
/*
* Copyright 2020 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.github.sonus21.rqueue.test.dto;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;

@AllArgsConstructor
@NoArgsConstructor
@Getter
@Setter
@ToString
@EqualsAndHashCode(callSuper = true)
public class Reservation extends BaseQueueMessage {
private int requestId;
private Map<String, String> otherDetails;

public static Reservation newInstance() {
Map<String, String> other = new HashMap<>();
int count = RandomUtils.nextInt(1, 10);
for (int i = 0; i < count; i++) {
String key = RandomStringUtils.randomAlphabetic(10);
String value = RandomStringUtils.randomAlphabetic(10);
other.put(key, value);
}
Reservation reservation = new Reservation(RandomUtils.nextInt(), other);
reservation.setId(UUID.randomUUID().toString());
return reservation;
}
}
9 changes: 9 additions & 0 deletions rqueue-common-test/src/main/resources/application.properties
Expand Up @@ -13,3 +13,12 @@ email.queue.active=true
mysql.db.name=test
rqueue.metrics.tags.rqueue=test
email.execution.time=15*60*1000
otp.queue=otp
otp.queue.active=false
chat.indexing.queue=chat-indexing
chat.indexing.queue.active=false
feed.generation.queue=feed-generation
feed.generation.queue.active=false
reservation.queue=reservation
reservation.queue.active=false

Expand Up @@ -55,9 +55,9 @@ public void consumeMessage(String message) {

@RqueueListener(
value = {"${rqueue.delay.queue}", "${rqueue.delay2.queue}"},
delayedQueue = "${rqueue.delay.queue.delayed-queue}",
numRetries = "${rqueue.delay.queue.retries}",
visibilityTimeout = "60*60*1000")
visibilityTimeout = "60*60*1000",
active = "true")
@NewSpan
public void onMessage(String message) {
execute("delay: {}", message);
Expand All @@ -68,10 +68,10 @@ public void onMessage(String message) {

@RqueueListener(
value = "job-queue",
delayedQueue = "true",
deadLetterQueue = "job-morgue",
numRetries = "2",
concurrency = "5")
concurrency = "5-10",
active = "true")
@NewSpan
public void onMessage(Job job) {
execute("job-queue: {}", job);
Expand Down
Expand Up @@ -32,11 +32,13 @@
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.TestPropertySource;

@RunWith(RqueueSpringTestRunner.class)
@ContextConfiguration(classes = Application.class)
@SpringBootTest
@Slf4j
@TestPropertySource(properties = {"rqueue.retry.per.poll=1000", "spring.redis.port=8001"})
public class ApplicationTest extends SpringTestBase {

@Test
Expand Down
Expand Up @@ -33,13 +33,14 @@
@Slf4j
@TestPropertySource(
properties = {
"rqueue.retry.per.poll=20",
"rqueue.scheduler.auto.start=false",
"spring.redis.port=6384",
"spring.redis.port=8004",
"mysql.db.name=test4",
"rqueue.metrics.count.failure=true",
"rqueue.metrics.count.execution=true",
})
public class MetricsTest extends MetricTestBase {
public class BootMetricsTest extends MetricTestBase {

@Test
public void delayedQueueStatus() throws TimedOutException {
Expand Down

0 comments on commit f1fd0da

Please sign in to comment.