Skip to content

Commit

Permalink
Introduce micronaut-context subproject that micronaut-runtime depends…
Browse files Browse the repository at this point in the history
… on (#4914)
  • Loading branch information
graemerocher committed Feb 11, 2021
1 parent 1601976 commit f81ecea
Show file tree
Hide file tree
Showing 138 changed files with 614 additions and 426 deletions.
15 changes: 14 additions & 1 deletion aop/src/main/java/io/micronaut/aop/InterceptedMethod.java
Expand Up @@ -22,6 +22,7 @@
import org.reactivestreams.Publisher;

import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;

/**
* The intercept method supporting intercepting different reactive invocations.
Expand Down Expand Up @@ -96,6 +97,19 @@ default Publisher<?> interceptResultAsPublisher() {
return (Publisher<?>) interceptResult();
}

/**
* Proceeds with invocation of {@link InvocationContext#proceed()} and converts result to {@link Publisher}.
*
* @param executorService The executor service to subscribe on
* @return The intercepted result
*/
default Publisher<?> interceptResultAsPublisher(ExecutorService executorService) {
if (resultType() != ResultType.PUBLISHER) {
throw new ConfigurationException("Cannot return `Publisher` result from '" + resultType() + "' interceptor");
}
return interceptResultAsPublisher();
}

/**
* Proceeds with invocation of {@link InvocationContext#proceed(Interceptor)} and converts result to {@link CompletionStage}.
*
Expand All @@ -122,7 +136,6 @@ default Publisher<?> interceptResultAsPublisher(Interceptor<?, ?> from) {
return (Publisher<?>) interceptResult(from);
}


/**
* Handle the value that should be the result of the invocation.
*
Expand Down
Expand Up @@ -27,6 +27,9 @@
import io.micronaut.core.type.ReturnType;
import org.reactivestreams.Publisher;

import java.util.Objects;
import java.util.concurrent.ExecutorService;

/**
* The {@link Publisher} method intercept.
*
Expand Down Expand Up @@ -67,6 +70,13 @@ public Publisher<?> interceptResultAsPublisher(Interceptor<?, ?> from) {
return convertToPublisher(context.proceed(from));
}

@Override
public Publisher<?> interceptResultAsPublisher(ExecutorService executorService) {
Objects.requireNonNull(executorService);
final Publisher<?> actual = interceptResultAsPublisher();
return (Publishers.MicronautPublisher<Object>) s -> executorService.submit(() -> actual.subscribe(s));
}

@Override
public Publisher<?> interceptResult() {
return interceptResultAsPublisher();
Expand Down
19 changes: 19 additions & 0 deletions context/build.gradle
@@ -0,0 +1,19 @@
dependencies {
annotationProcessor project(":inject-java")
annotationProcessor project(":graal")
api project(':inject')
api project(':aop')
api dependencyVersion("validation")
compileOnly dependencyVersion("reactive.streams")
testCompileOnly project(":inject-groovy")
testAnnotationProcessor project(":inject-java")
testImplementation dependencyVersion("rxjava2")
testImplementation project(":core-reactive")
testImplementation project(":inject-java-test")
}

spotless {
java {
targetExclude '**/io/micronaut/scheduling/cron/CronExpression.java'
}
}
Expand Up @@ -19,7 +19,6 @@
import io.micronaut.context.annotation.ConfigurationProperties;
import io.micronaut.context.annotation.Primary;
import io.micronaut.core.naming.NameUtils;
import io.micronaut.discovery.ServiceInstance;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -172,13 +171,13 @@ public void setGroup(String group) {
}

/**
* @return The instance availability zone. For example it's possible to configure Nexflix Ribbon to load balance between servers only in a particular zone
* @return The instance availability zone. For example it's possible to configure Netflix Ribbon to load balance between servers only in a particular zone
*/
public Optional<String> getZone() {
if (zone != null) {
return Optional.of(zone);
}
return Optional.ofNullable(getMetadata().get(ServiceInstance.ZONE));
return Optional.ofNullable(getMetadata().get("zone"));
}

/**
Expand Down
Expand Up @@ -26,8 +26,6 @@
import io.micronaut.scheduling.TaskExecutors;
import io.micronaut.scheduling.annotation.Async;
import io.micronaut.scheduling.exceptions.TaskExecutionException;
import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -88,7 +86,7 @@ public Object intercept(MethodInvocationContext<Object, Object> context) {
switch (interceptedMethod.resultType()) {
case PUBLISHER:
return interceptedMethod.handleResult(
Flowable.fromPublisher(interceptedMethod.interceptResultAsPublisher()).subscribeOn(Schedulers.from(executorService))
interceptedMethod.interceptResultAsPublisher(executorService)
);
case COMPLETION_STAGE:
return interceptedMethod.handleResult(
Expand Down
Expand Up @@ -19,11 +19,9 @@
import io.micronaut.context.exceptions.NoSuchBeanException;
import io.micronaut.core.annotation.Blocking;
import io.micronaut.core.annotation.NonBlocking;
import io.micronaut.core.async.SupplierUtil;
import io.micronaut.core.async.publisher.Publishers;
import io.micronaut.core.type.Argument;
import io.micronaut.core.type.ReturnType;
import io.micronaut.http.HttpResponse;
import io.micronaut.core.type.TypeInformation;
import io.micronaut.core.util.SupplierUtil;
import io.micronaut.inject.MethodReference;
import io.micronaut.inject.qualifiers.Qualifiers;
import io.micronaut.scheduling.TaskExecutors;
Expand All @@ -33,7 +31,6 @@
import javax.inject.Provider;
import javax.inject.Singleton;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;

Expand Down Expand Up @@ -80,15 +77,14 @@ public Optional<ExecutorService> select(MethodReference method, ThreadSelection
} else if (method.hasStereotype(Blocking.class)) {
return Optional.of(ioExecutor.get());
} else {
ReturnType returnType = method.getReturnType();
Class argumentType = returnType.getType();
if (HttpResponse.class.isAssignableFrom(argumentType)) {
TypeInformation<?> returnType = method.getReturnType();
if (returnType.isWrapperType()) {
Optional<Argument<?>> generic = method.getReturnType().getFirstTypeVariable();
if (generic.isPresent()) {
argumentType = generic.get().getType();
returnType = generic.get();
}
}
if (isNonBlocking(argumentType)) {
if (returnType.isAsyncOrReactive()) {
return Optional.empty();
} else {
return Optional.of(ioExecutor.get());
Expand All @@ -99,8 +95,4 @@ public Optional<ExecutorService> select(MethodReference method, ThreadSelection
}
return Optional.empty();
}

private boolean isNonBlocking(Class type) {
return Publishers.isConvertibleToPublisher(type) || CompletionStage.class.isAssignableFrom(type);
}
}
Expand Up @@ -19,4 +19,7 @@
* @author Graeme Rocher
* @since 1.0
*/
@Experimental
package io.micronaut.scheduling.instrument;

import io.micronaut.core.annotation.Experimental;
@@ -0,0 +1,44 @@
/*
* Copyright 2017-2020 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.runtime.event.annotation;

import io.micronaut.context.event.StartupEvent;
import io.micronaut.scheduling.annotation.Async;
import io.reactivex.Completable;

import javax.inject.Singleton;
import java.util.concurrent.CompletableFuture;

@Singleton
public class AsyncListener {

boolean invoked = false;

@EventListener
@Async
CompletableFuture<Boolean> onStartup(StartupEvent event) {
try {
Thread.currentThread().sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
invoked = true;
return CompletableFuture.completedFuture(invoked);
}
public boolean isInvoked() {
return invoked;
}
}
@@ -0,0 +1,46 @@
/*
* Copyright 2017-2019 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.runtime.event.annotation

import io.micronaut.context.ApplicationContext
import spock.lang.Specification
import spock.util.concurrent.PollingConditions

class EventListenerSpec extends Specification {

void "test listener is invoked"() {
given:
ApplicationContext ctx = ApplicationContext.run()

when:
TestListener t = ctx.getBean(TestListener)
GroovyListener g = ctx.getBean(GroovyListener)
AsyncListener a = ctx.getBean(AsyncListener)
PollingConditions conditions = new PollingConditions(timeout: 1)

then:
t.invoked
t.invocationCount == 1
g.invoked

conditions.eventually {
a.invoked
}

cleanup:
ctx.close()
}
}
Expand Up @@ -17,7 +17,6 @@ package io.micronaut.scheduling

import io.micronaut.context.ApplicationContext
import io.micronaut.context.annotation.Requires
import io.micronaut.retry.annotation.Retryable
import io.micronaut.scheduling.annotation.Scheduled
import spock.lang.Specification
import spock.util.concurrent.PollingConditions
Expand Down Expand Up @@ -62,24 +61,6 @@ class ScheduledFixedRateSpec extends Specification {
}
}

void 'test scheduled annotation with retry'() {
given:
ApplicationContext beanContext = ApplicationContext.run(
'scheduled-test.task2.enabled':true
)

PollingConditions conditions = new PollingConditions(timeout: 10)

when:
MyTask2 myTask = beanContext.getBean(MyTask2)

then:
conditions.eventually {
myTask.initialDelayWasRun
myTask.attempts.get() == 2
}
}

@Singleton
@Requires(property = 'scheduled-test.task.enabled', value = 'true')
static class MyTask {
Expand Down Expand Up @@ -120,23 +101,5 @@ class ScheduledFixedRateSpec extends Specification {
}
}

@Singleton
@Requires(property = 'scheduled-test.task2.enabled', value = 'true')
static class MyTask2 {

boolean initialDelayWasRun = false
AtomicInteger attempts = new AtomicInteger()

@Retryable(delay = "10ms")
@Scheduled(initialDelay = '10ms')
void runInitialDelay() {
if (!Thread.currentThread().getName().startsWith("scheduled-executor-thread")) {
throw new RuntimeException("Incorrect thread name")
}
if (attempts.addAndGet(1) < 2) {
throw new RuntimeException()
}
initialDelayWasRun = true
}
}

}

0 comments on commit f81ecea

Please sign in to comment.