Skip to content

Commit

Permalink
Async profiler and JDK dynamic proxy
Browse files Browse the repository at this point in the history
  • Loading branch information
sonus21 committed Jul 13, 2020
1 parent 4e7586a commit 1cb7e7c
Show file tree
Hide file tree
Showing 13 changed files with 84 additions and 26 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Expand Up @@ -67,7 +67,7 @@ ext {

subprojects {
group = 'com.github.sonus21'
version = '2.0.1-RELEASE'
version = '2.0.2-RELEASE'

dependencies {
// https://mvnrepository.com/artifact/org.springframework/spring-messaging
Expand Down
Expand Up @@ -23,11 +23,15 @@
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.repository.configuration.EnableRedisRepositories;
import org.springframework.scheduling.annotation.EnableAsync;

@SpringBootApplication
@EnableRedisRepositories
@EnableAsync
@EnableCaching
public class RQueueApplication {
@Value("${workers.count:3}")
private int workersCount;
Expand Down
Expand Up @@ -20,6 +20,8 @@
import com.github.sonus21.rqueue.metrics.QueueCounter;
import com.github.sonus21.rqueue.metrics.RqueueCounter;
import com.github.sonus21.rqueue.metrics.RqueueMetrics;
import com.github.sonus21.rqueue.metrics.RqueueMetricsCounter;
import com.github.sonus21.rqueue.metrics.RqueueMetricsRegistry;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;
import java.lang.reflect.Method;
Expand All @@ -41,7 +43,7 @@
@Import(RqueueMetricsProperties.class)
public class RqueueMetricsAutoConfig {
@Bean
public RqueueMetrics rqueueMetrics(
public RqueueMetricsRegistry rqueueMetricsRegistry(
MetricsProperties metricsProperties,
@Qualifier("stringRqueueRedisTemplate") RqueueRedisTemplate<String> rqueueRedisTemplate,
RqueueMetricsProperties rqueueMetricsProperties) {
Expand All @@ -68,7 +70,7 @@ private Map<String, String> getTags(MetricsProperties metricsProperties) {
}

@Bean
public RqueueCounter rqueueCounter(RqueueMetrics rqueueMetrics) {
return new RqueueCounter(rqueueMetrics.getQueueCounter());
public RqueueMetricsCounter rqueueMetricsCounter(RqueueMetricsRegistry rqueueMetricsRegistry) {
return new RqueueCounter(rqueueMetricsRegistry.getQueueCounter());
}
}
Expand Up @@ -27,6 +27,8 @@
import com.github.sonus21.rqueue.metrics.QueueCounter;
import com.github.sonus21.rqueue.metrics.RqueueCounter;
import com.github.sonus21.rqueue.metrics.RqueueMetrics;
import com.github.sonus21.rqueue.metrics.RqueueMetricsCounter;
import com.github.sonus21.rqueue.metrics.RqueueMetricsRegistry;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
Expand Down Expand Up @@ -76,15 +78,15 @@ public RqueueMessageSender rqueueMessageSender(RqueueMessageTemplate rqueueMessa
@Bean
@Conditional(MetricsEnabled.class)
@DependsOn({"meterRegistry", "rqueueMetricsProperties"})
public RqueueMetrics rqueueMetrics(
public RqueueMetricsRegistry rqueueMetricsRegistry(
@Qualifier("stringRqueueRedisTemplate") RqueueRedisTemplate<String> rqueueRedisTemplate) {
QueueCounter queueCounter = new QueueCounter();
return new RqueueMetrics(rqueueRedisTemplate, queueCounter);
}

@Bean
@Conditional(MetricsEnabled.class)
public RqueueCounter rqueueCounter(RqueueMetrics rqueueMetrics) {
return new RqueueCounter(rqueueMetrics.getQueueCounter());
public RqueueMetricsCounter rqueueMetricsCounter(RqueueMetricsRegistry rqueueMetricsRegistry) {
return new RqueueCounter(rqueueMetricsRegistry.getQueueCounter());
}
}
Expand Up @@ -51,7 +51,7 @@
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

abstract class MessageScheduler
public abstract class MessageScheduler
implements DisposableBean, ApplicationListener<RqueueBootstrapEvent> {
@Autowired protected RqueueSchedulerConfig rqueueSchedulerConfig;
private RedisScript<Long> redisScript;
Expand Down
Expand Up @@ -22,7 +22,7 @@
import com.github.sonus21.rqueue.core.RqueueMessage;
import com.github.sonus21.rqueue.core.support.MessageProcessor;
import com.github.sonus21.rqueue.exception.UnknownSwitchCase;
import com.github.sonus21.rqueue.metrics.RqueueCounter;
import com.github.sonus21.rqueue.metrics.RqueueMetricsCounter;
import com.github.sonus21.rqueue.models.db.MessageMetadata;
import com.github.sonus21.rqueue.models.db.TaskStatus;
import com.github.sonus21.rqueue.models.event.RqueueExecutionEvent;
Expand Down Expand Up @@ -122,14 +122,15 @@ private void callMessageProcessor(TaskStatus status, RqueueMessage rqueueMessage
}

private void updateCounter(boolean fail) {
RqueueCounter rqueueCounter = Objects.requireNonNull(container.get()).getRqueueCounter();
if (rqueueCounter == null) {
RqueueMetricsCounter counter =
Objects.requireNonNull(container.get()).getRqueueMetricsCounter();
if (counter == null) {
return;
}
if (fail) {
rqueueCounter.updateFailureCount(queueDetail.getName());
counter.updateFailureCount(queueDetail.getName());
} else {
rqueueCounter.updateExecutionCount(queueDetail.getName());
counter.updateExecutionCount(queueDetail.getName());
}
}

Expand Down
Expand Up @@ -26,7 +26,7 @@
import com.github.sonus21.rqueue.core.QueueRegistry;
import com.github.sonus21.rqueue.core.RqueueMessageTemplate;
import com.github.sonus21.rqueue.core.support.MessageProcessor;
import com.github.sonus21.rqueue.metrics.RqueueCounter;
import com.github.sonus21.rqueue.metrics.RqueueMetricsCounter;
import com.github.sonus21.rqueue.models.Concurrency;
import com.github.sonus21.rqueue.models.enums.PriorityMode;
import com.github.sonus21.rqueue.models.event.RqueueBootstrapEvent;
Expand Down Expand Up @@ -87,7 +87,7 @@ public class RqueueMessageListenerContainer
@Autowired private RqueueConfig rqueueConfig;

@Autowired(required = false)
private RqueueCounter rqueueCounter;
private RqueueMetricsCounter rqueueMetricsCounter;

@Autowired private RqueueMessageMetadataService rqueueMessageMetadataService;
private AsyncTaskExecutor taskExecutor;
Expand Down Expand Up @@ -594,8 +594,8 @@ public void setPriorityMode(PriorityMode priorityMode) {
this.priorityMode = priorityMode;
}

RqueueCounter getRqueueCounter() {
return rqueueCounter;
RqueueMetricsCounter getRqueueMetricsCounter() {
return rqueueMetricsCounter;
}

RqueueWebConfig getRqueueWebConfig() {
Expand Down
Expand Up @@ -21,17 +21,19 @@
* many messages have been processed and how many of them have been failed. In the case of failure
* count increases.
*/
public class RqueueCounter {
public class RqueueCounter implements RqueueMetricsCounter {
private final QueueCounter queueCounter;

public RqueueCounter(QueueCounter queueCounter) {
this.queueCounter = queueCounter;
}

@Override
public void updateFailureCount(String queueName) {
queueCounter.updateFailureCount(queueName);
}

@Override
public void updateExecutionCount(String queueName) {
queueCounter.updateExecutionCount(queueName);
}
Expand Down
Expand Up @@ -26,7 +26,6 @@
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.scheduling.annotation.Async;

/**
Expand All @@ -35,16 +34,16 @@
* queue messages can be in delayed queue because time has not reached. Some messages can be in dead
* letter queue if dead letter queue is configured.
*/
public class RqueueMetrics implements ApplicationListener<RqueueBootstrapEvent> {
public class RqueueMetrics implements RqueueMetricsRegistry {
static final String QUEUE_KEY = "key";
private static final String QUEUE_SIZE = "queue.size";
private static final String DELAYED_QUEUE_SIZE = "delayed.queue.size";
private static final String PROCESSING_QUEUE_SIZE = "processing.queue.size";
private static final String DEAD_LETTER_QUEUE_SIZE = "dead.letter.queue.size";
private RqueueRedisTemplate<String> rqueueMessageTemplate;
private final RqueueRedisTemplate<String> rqueueMessageTemplate;
private final QueueCounter queueCounter;
@Autowired private MetricsProperties metricsProperties;
@Autowired private MeterRegistry meterRegistry;
private QueueCounter queueCounter;

public RqueueMetrics(
RqueueRedisTemplate<String> rqueueMessageTemplate, QueueCounter queueCounter) {
Expand Down Expand Up @@ -107,6 +106,7 @@ public void onApplicationEvent(RqueueBootstrapEvent event) {
}
}

@Override
public QueueCounter getQueueCounter() {
return this.queueCounter;
}
Expand Down
@@ -0,0 +1,23 @@
/*
* Copyright 2020 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.github.sonus21.rqueue.metrics;

public interface RqueueMetricsCounter {
void updateFailureCount(String queueName);

void updateExecutionCount(String queueName);
}
@@ -0,0 +1,24 @@
/*
* Copyright 2020 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.github.sonus21.rqueue.metrics;

import com.github.sonus21.rqueue.models.event.RqueueBootstrapEvent;
import org.springframework.context.ApplicationListener;

public interface RqueueMetricsRegistry extends ApplicationListener<RqueueBootstrapEvent> {
QueueCounter getQueueCounter();
}
Expand Up @@ -17,11 +17,13 @@
package com.github.sonus21.rqueue.web.service;

import com.github.sonus21.rqueue.models.db.QueueConfig;
import com.github.sonus21.rqueue.models.event.RqueueBootstrapEvent;
import com.github.sonus21.rqueue.models.response.BaseResponse;
import java.util.Collection;
import java.util.List;
import org.springframework.context.ApplicationListener;

public interface RqueueSystemManagerService {
public interface RqueueSystemManagerService extends ApplicationListener<RqueueBootstrapEvent> {
BaseResponse deleteQueue(String queueName);

List<String> getQueues();
Expand Down
Expand Up @@ -37,14 +37,12 @@
import java.util.Set;
import java.util.stream.Collectors;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;

@Service
public class RqueueSystemManagerServiceImpl
implements RqueueSystemManagerService, ApplicationListener<RqueueBootstrapEvent> {
public class RqueueSystemManagerServiceImpl implements RqueueSystemManagerService {
private final RqueueConfig rqueueConfig;
private final RqueueRedisTemplate<String> stringRqueueRedisTemplate;
private final RqueueSystemConfigDao rqueueSystemConfigDao;
Expand Down

0 comments on commit 1cb7e7c

Please sign in to comment.