Skip to content

Commit

Permalink
Provide ThreadPoolTaskExecutor
Browse files Browse the repository at this point in the history
  • Loading branch information
raynigon committed Sep 14, 2022
1 parent f5b7b65 commit 8b44108
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@

import com.raynigon.ecs.logging.async.config.AsyncLoggingConfiguration;
import com.raynigon.ecs.logging.async.executor.DefaultMdcForkJoinPool;
import com.raynigon.ecs.logging.async.model.MdcRunnable;
import com.raynigon.ecs.logging.async.scheduler.MdcScheduledExecutorService;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -37,4 +39,16 @@ public DefaultMdcForkJoinPool mdcForkJoinPool() {
public TaskScheduler mdcTaskScheduler() {
return new MdcScheduledExecutorService(taskScheduler);
}

@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(4);
executor.setQueueCapacity(1000);
executor.setThreadNamePrefix("Async-");
executor.setTaskDecorator(MdcRunnable::new);
executor.initialize();
return executor;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.raynigon.ecs.logging.async.scheduler;
package com.raynigon.ecs.logging.async.model;

import com.raynigon.ecs.logging.async.scheduler.MdcScheduledRunnableException;
import lombok.RequiredArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -12,9 +13,9 @@
import static com.raynigon.ecs.logging.async.executor.MdcConcurrentExecutionHelper.beforeExecution;

@RequiredArgsConstructor
public class MdcScheduledRunnable implements Runnable {
public class MdcRunnable implements Runnable {

private final Logger log = LoggerFactory.getLogger(MdcScheduledRunnable.class);
private final Logger log = LoggerFactory.getLogger(MdcRunnable.class);

private final Runnable runnable;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.raynigon.ecs.logging.async.scheduler;

import com.raynigon.ecs.logging.async.model.MdcRunnable;
import lombok.RequiredArgsConstructor;
import org.springframework.lang.NonNull;
import org.springframework.scheduling.TaskScheduler;
Expand Down Expand Up @@ -49,6 +50,6 @@ public ScheduledFuture<?> scheduleWithFixedDelay(@NonNull Runnable task, long de
}

private Runnable wrap(Runnable runnable) {
return new MdcScheduledRunnable(runnable);
return new MdcRunnable(runnable);
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package com.raynigon.ecs.logging.async

import static com.raynigon.ecs.logging.LoggingConstants.TRANSACTION_ID_PROPERTY

import com.raynigon.ecs.logging.async.service.AsyncService
import org.slf4j.MDC
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.autoconfigure.EnableAutoConfiguration
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
import org.springframework.test.context.ContextConfiguration
import spock.lang.Specification

import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicReference

@EnableAutoConfiguration
@ContextConfiguration(classes = MdcExecutorConfiguration)
Expand All @@ -18,6 +22,9 @@ class MdcExecutorConfigurationITSpec extends Specification {
@Autowired
AsyncService asyncService

@Autowired
ThreadPoolTaskExecutor taskExecutor

def 'setup application'() {
expect:
true
Expand Down Expand Up @@ -48,4 +55,34 @@ class MdcExecutorConfigurationITSpec extends Specification {
MDC.get("test") == "456"
holder.get() == 456
}

def 'mdc tag passthroughs works for submit'() {
given:
MDC.put("test", "456")
AtomicInteger holder = new AtomicInteger()

when:
asyncService.submit({ holder.set(Integer.parseInt(MDC.get("test"))) }).get()

then:
MDC.get("test") == "456"
holder.get() == 456
}

def 'task executor creates new context'() {
given:
MDC.clear()
AtomicReference<String> holder = new AtomicReference<String>()

expect:
MDC.get(TRANSACTION_ID_PROPERTY) == null

when:
def future = taskExecutor.submit({ holder.set(MDC.get(TRANSACTION_ID_PROPERTY)) })

then:
MDC.get(TRANSACTION_ID_PROPERTY) == null
future.get() == null
holder.get() != null
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.raynigon.ecs.logging.async.scheduler

import com.raynigon.ecs.logging.async.model.MdcRunnable

import static com.raynigon.ecs.logging.LoggingConstants.TRANSACTION_ID_PROPERTY

import nl.altindag.log.LogCaptor
Expand All @@ -8,13 +10,13 @@ import spock.lang.Specification

import java.util.concurrent.atomic.AtomicBoolean

class MdcScheduledRunnableSpec extends Specification {
class MdcRunnableSpec extends Specification {

def 'transaction id is set'() {
given:
AtomicBoolean result = new AtomicBoolean(false)
Runnable source = { result.set(MDC.get(TRANSACTION_ID_PROPERTY) != null) }
MdcScheduledRunnable target = new MdcScheduledRunnable(source)
MdcRunnable target = new MdcRunnable(source)

when:
target.run()
Expand All @@ -27,7 +29,7 @@ class MdcScheduledRunnableSpec extends Specification {
given:
AtomicBoolean result = new AtomicBoolean(false)
Runnable source = { result.set(MDC.get("test") == null) }
MdcScheduledRunnable target = new MdcScheduledRunnable(source)
MdcRunnable target = new MdcRunnable(source)

and:
MDC.clear()
Expand All @@ -45,10 +47,10 @@ class MdcScheduledRunnableSpec extends Specification {
given:
AtomicBoolean result = new AtomicBoolean(false)
Runnable source = { result.set(MDC.get("test") == null); throw new IllegalStateException("test") }
MdcScheduledRunnable target = new MdcScheduledRunnable(source)
MdcRunnable target = new MdcRunnable(source)

and:
LogCaptor logCaptor = LogCaptor.forClass(MdcScheduledRunnable)
LogCaptor logCaptor = LogCaptor.forClass(MdcRunnable)

and:
MDC.clear()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.raynigon.ecs.logging.async.scheduler

import com.raynigon.ecs.logging.async.model.MdcRunnable
import org.springframework.scheduling.TaskScheduler
import org.springframework.scheduling.Trigger
import spock.lang.Specification
Expand All @@ -23,7 +24,7 @@ class MdcScheduledExecutorServiceSpec extends Specification {
executorService.schedule(task, trigger)

then:
1 * delegate.schedule(_ as MdcScheduledRunnable, _ as Trigger)
1 * delegate.schedule(_ as MdcRunnable, _ as Trigger)
}

def 'schedule with date gets delegated'() {
Expand All @@ -35,7 +36,7 @@ class MdcScheduledExecutorServiceSpec extends Specification {
executorService.schedule(task, date)

then:
1 * delegate.schedule(_ as MdcScheduledRunnable, _ as Date)
1 * delegate.schedule(_ as MdcRunnable, _ as Date)
}

def 'scheduleAtFixedRate with date gets delegated and period'() {
Expand All @@ -47,7 +48,7 @@ class MdcScheduledExecutorServiceSpec extends Specification {
executorService.scheduleAtFixedRate(task, date, 0L)

then:
1 * delegate.scheduleAtFixedRate(_ as MdcScheduledRunnable, _ as Date, 0L)
1 * delegate.scheduleAtFixedRate(_ as MdcRunnable, _ as Date, 0L)
}

def 'scheduleAtFixedRate with period'() {
Expand All @@ -58,7 +59,7 @@ class MdcScheduledExecutorServiceSpec extends Specification {
executorService.scheduleAtFixedRate(task, 0L)

then:
1 * delegate.scheduleAtFixedRate(_ as MdcScheduledRunnable, 0L)
1 * delegate.scheduleAtFixedRate(_ as MdcRunnable, 0L)
}

def 'scheduleWithFixedDelay with date and delay'() {
Expand All @@ -70,7 +71,7 @@ class MdcScheduledExecutorServiceSpec extends Specification {
executorService.scheduleWithFixedDelay(task, date, 0L)

then:
1 * delegate.scheduleWithFixedDelay(_ as MdcScheduledRunnable, _ as Date, 0L)
1 * delegate.scheduleWithFixedDelay(_ as MdcRunnable, _ as Date, 0L)
}

def 'scheduleWithFixedDelay with delay'() {
Expand All @@ -81,6 +82,6 @@ class MdcScheduledExecutorServiceSpec extends Specification {
executorService.scheduleWithFixedDelay(task, 0L)

then:
1 * delegate.scheduleWithFixedDelay(_ as MdcScheduledRunnable, 0L)
1 * delegate.scheduleWithFixedDelay(_ as MdcRunnable, 0L)
}
}

0 comments on commit 8b44108

Please sign in to comment.