Skip to content

Commit

Permalink
refactor: get rid of NoopTransferProcessClient (eclipse-edc#3440)
Browse files Browse the repository at this point in the history
* refactor: get rid of NoopTransferProcessClient

* simplify test
  • Loading branch information
ndr-brt authored and ndkrimbacher committed Oct 4, 2023
1 parent ff93d51 commit 5ce067a
Show file tree
Hide file tree
Showing 11 changed files with 33 additions and 159 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@

package org.eclipse.edc.connector.dataplane.framework;

import org.eclipse.edc.connector.api.client.spi.transferprocess.NoopTransferProcessClient;
import org.eclipse.edc.connector.api.client.spi.transferprocess.TransferProcessApiClient;
import org.eclipse.edc.connector.dataplane.framework.registry.TransferServiceSelectionStrategy;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Provider;
Expand All @@ -31,12 +29,6 @@ public String name() {
return NAME;
}


@Provider(isDefault = true)
public TransferProcessApiClient transferProcessApiClient() {
return new NoopTransferProcessClient();
}

@Provider(isDefault = true)
public TransferServiceSelectionStrategy transferServiceSelectionStrategy() {
return TransferServiceSelectionStrategy.selectFirst();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,76 +14,30 @@

package org.eclipse.edc.connector.dataplane.framework;

import org.eclipse.edc.connector.api.client.spi.transferprocess.NoopTransferProcessClient;
import org.eclipse.edc.connector.api.client.spi.transferprocess.TransferProcessApiClient;
import org.eclipse.edc.connector.dataplane.framework.e2e.EndToEndTest;
import org.eclipse.edc.connector.dataplane.framework.pipeline.PipelineServiceImpl;
import org.eclipse.edc.connector.dataplane.framework.registry.TransferServiceSelectionStrategy;
import org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager;
import org.eclipse.edc.connector.dataplane.spi.pipeline.PipelineService;
import org.eclipse.edc.connector.dataplane.spi.pipeline.TransferService;
import org.eclipse.edc.connector.dataplane.spi.registry.TransferServiceRegistry;
import org.eclipse.edc.junit.extensions.DependencyInjectionExtension;
import org.eclipse.edc.spi.system.ExecutorInstrumentation;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.spi.system.injection.ObjectFactory;
import org.eclipse.edc.spi.telemetry.Telemetry;
import org.eclipse.edc.spi.types.domain.transfer.DataFlowRequest;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@ExtendWith(DependencyInjectionExtension.class)
class DataPlaneFrameworkExtensionTest {

TransferService transferService1 = mock(TransferService.class);
TransferService transferService2 = mock(TransferService.class);
DataFlowRequest request = EndToEndTest.createRequest("1").build();

@BeforeEach
public void setUp(ServiceExtensionContext context) {
when(transferService1.canHandle(request)).thenReturn(true);
when(transferService2.canHandle(request)).thenReturn(true);
context.registerService(Telemetry.class, mock(Telemetry.class));
context.registerService(ExecutorInstrumentation.class, ExecutorInstrumentation.noop());
context.registerService(TransferProcessApiClient.class, new NoopTransferProcessClient());
}

@Test
void initialize_registers_PipelineService(ServiceExtensionContext context, ObjectFactory factory) {
var extension = factory.constructInstance(DataPlaneFrameworkExtension.class);
void initialize_registers_PipelineService(ServiceExtensionContext context, DataPlaneFrameworkExtension extension) {
extension.initialize(context);
assertThat(context.getService(PipelineService.class)).isInstanceOf(PipelineServiceImpl.class);
}

@Test
void initialize_registers_DataPlaneManager_withInjectedStrategy(ServiceExtensionContext context, ObjectFactory factory) {
// Arrange
// Inject a custom TransferServiceSelectionStrategy that will select the second service
context.registerService(TransferServiceSelectionStrategy.class,
(request, services) -> services.skip(1).findFirst().orElse(null));

// Act
validateRequest(context, factory);

// Assert
verify(transferService2).validate(request);
verify(transferService1, never()).validate(request);
assertThat(context.getService(PipelineService.class)).isInstanceOf(PipelineServiceImpl.class);
}

private void validateRequest(ServiceExtensionContext context, ObjectFactory factory) {
var extension = factory.constructInstance(DataPlaneFrameworkExtension.class);
extension.initialize(context);
var service = context.getService(TransferServiceRegistry.class);
service.registerTransferService(transferService1);
service.registerTransferService(transferService2);
var m = context.getService(DataPlaneManager.class);
m.validate(request);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

package org.eclipse.edc.connector.dataplane.framework.e2e;

import org.eclipse.edc.connector.api.client.spi.transferprocess.NoopTransferProcessClient;
import org.eclipse.edc.connector.dataplane.framework.manager.DataPlaneManagerImpl;
import org.eclipse.edc.connector.dataplane.framework.pipeline.PipelineServiceImpl;
import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSink;
Expand Down Expand Up @@ -58,7 +57,7 @@ void testEndToEnd() throws Exception {
.monitor(monitor)
.pipelineService(pipelineService)
.executorInstrumentation(ExecutorInstrumentation.noop())
.transferProcessClient(new NoopTransferProcessClient())
.transferProcessClient(mock())
.build();
manager.start();
manager.transfer(new InputStreamDataSource("test", new ByteArrayInputStream("bytes".getBytes())), createRequest("1").build()).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,10 @@

package org.eclipse.edc.connector.dataplane.framework.manager;

import org.eclipse.edc.connector.api.client.spi.transferprocess.NoopTransferProcessClient;
import org.eclipse.edc.connector.dataplane.framework.store.InMemoryDataPlaneStore;
import org.eclipse.edc.connector.dataplane.spi.pipeline.TransferService;
import org.eclipse.edc.connector.dataplane.spi.registry.TransferServiceRegistry;
import org.eclipse.edc.connector.dataplane.spi.store.DataPlaneStore;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.result.Result;
import org.eclipse.edc.spi.system.ExecutorInstrumentation;
import org.eclipse.edc.spi.types.domain.DataAddress;
Expand All @@ -39,10 +37,10 @@


class DataPlaneManagerImplTest {
TransferService transferService = mock(TransferService.class);
TransferService transferService = mock();
DataPlaneStore store = new InMemoryDataPlaneStore(10);
DataFlowRequest request = createRequest();
TransferServiceRegistry registry = mock(TransferServiceRegistry.class);
TransferServiceRegistry registry = mock();

@BeforeEach
public void setUp() {
Expand All @@ -54,7 +52,7 @@ public void setUp() {
* Verifies a request is enqueued, dequeued, and dispatched to the pipeline service.
*/
@Test
void verifyWorkDispatch() throws InterruptedException {
void verifyWorkDispatch() {
var dataPlaneManager = createDataPlaneManager();

when(registry.resolveTransferService(request))
Expand All @@ -63,9 +61,8 @@ void verifyWorkDispatch() throws InterruptedException {
.thenReturn(true);


when(transferService.transfer(isA(DataFlowRequest.class))).thenAnswer(i -> {
return completedFuture(Result.success("ok"));
});
when(transferService.transfer(isA(DataFlowRequest.class)))
.thenAnswer(i -> completedFuture(Result.success("ok")));

dataPlaneManager.start();
dataPlaneManager.initiateTransfer(request);
Expand All @@ -80,7 +77,7 @@ void verifyWorkDispatch() throws InterruptedException {
* Verifies that the dispatch thread survives an error thrown by a worker.
*/
@Test
void verifyWorkDispatchError() throws InterruptedException {
void verifyWorkDispatchError() {
var dataPlaneManager = createDataPlaneManager();

when(transferService.canHandle(request))
Expand All @@ -89,9 +86,8 @@ void verifyWorkDispatchError() throws InterruptedException {
when(transferService.transfer(request))
.thenAnswer(i -> {
throw new RuntimeException("Test exception");
}).thenAnswer((i -> {
return completedFuture(Result.success("ok"));
}));
})
.thenAnswer((i -> completedFuture(Result.success("ok"))));


dataPlaneManager.start();
Expand All @@ -105,7 +101,7 @@ void verifyWorkDispatchError() throws InterruptedException {
}

@Test
void verifyWorkDispatch_onUnavailableTransferService_completesTransfer() throws InterruptedException {
void verifyWorkDispatch_onUnavailableTransferService_completesTransfer() {
// Modify store used in createDataPlaneManager()
store = mock(DataPlaneStore.class);

Expand Down Expand Up @@ -139,8 +135,8 @@ private DataPlaneManagerImpl createDataPlaneManager() {
.waitTimeout(10)
.transferServiceRegistry(registry)
.store(store)
.transferProcessClient(new NoopTransferProcessClient())
.monitor(mock(Monitor.class))
.transferProcessClient(mock())
.monitor(mock())
.build();
}

Expand Down
1 change: 0 additions & 1 deletion extensions/common/http/jetty-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ dependencies {
api(project(":spi:common:core-spi"))
api(project(":spi:common:web-spi"))

testImplementation(libs.bundles.jersey.core)
testImplementation(libs.restAssured)
testImplementation(project(":core:common:junit"))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,19 @@

package org.eclipse.edc.web.jetty;

import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import jakarta.servlet.http.HttpServlet;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.system.configuration.ConfigFactory;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.glassfish.jersey.internal.inject.AbstractBinder;
import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.servlet.ServletContainer;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.net.ConnectException;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -44,14 +39,7 @@
class JettyServiceTest {

private JettyService jettyService;
private Monitor monitor;
private TestController testController;

@BeforeEach
void setUp() {
monitor = mock(Monitor.class);
testController = new TestController();
}
private final Monitor monitor = mock();

@Test
void verifyDefaultPortMapping() {
Expand All @@ -60,8 +48,7 @@ void verifyDefaultPortMapping() {

jettyService.start();

var servletContainer = new ServletContainer(createTestResource());
jettyService.registerServlet("default", servletContainer);
jettyService.registerServlet("default", new TestServlet());

given()
.get("http://localhost:7171/api/test/resource")
Expand All @@ -76,11 +63,10 @@ void verifyCustomPortMapping() {
"web.http.another.port", "9191",
"web.http.another.path", "/another")); //default port mapping
jettyService = new JettyService(JettyConfiguration.createFromConfig(null, null, config), monitor);
ResourceConfig rc = createTestResource();

jettyService.start();

jettyService.registerServlet("another", new ServletContainer(rc));
jettyService.registerServlet("another", new TestServlet());

given()
.get("http://localhost:9191/another/test/resource")
Expand All @@ -103,8 +89,8 @@ void verifyDefaultAndCustomPortMapping() {

jettyService.start();

jettyService.registerServlet("another", new ServletContainer(createTestResource()));
jettyService.registerServlet("default", new ServletContainer(createTestResource()));
jettyService.registerServlet("another", new TestServlet());
jettyService.registerServlet("default", new TestServlet());

given()
.get("http://localhost:9191/another/test/resource")
Expand All @@ -127,7 +113,7 @@ void verifyConnectorConfigurationCallback() {

jettyService.start();

jettyService.registerServlet("default", new ServletContainer(createTestResource()));
jettyService.registerServlet("default", new TestServlet());

assertThat(listener.getConnectionsOpened()).isEqualTo(0);
given()
Expand All @@ -146,7 +132,7 @@ void verifyCustomPathRoot() {

jettyService.start();

jettyService.registerServlet("default", new ServletContainer(createTestResource()));
jettyService.registerServlet("default", new TestServlet());

given()
.get("http://localhost:7171/test/resource")
Expand Down Expand Up @@ -188,25 +174,6 @@ void teardown() {
jettyService.shutdown();
}

@NotNull
private ResourceConfig createTestResource() {
var rc = new ResourceConfig();
rc.registerClasses(TestController.class);
rc.registerInstances(new TestBinder());
return rc;
}

@Produces(MediaType.TEXT_PLAIN)
@Path("/test")
public static class TestController { //needs to be public, otherwise it won't get picked up

@GET
@Path("/resource")
public String foo() {
return "exists";
}
}

private static class JettyListener extends AbstractLifeCycle implements Connection.Listener {

private final AtomicInteger connectionsOpened = new AtomicInteger();
Expand All @@ -225,14 +192,10 @@ public int getConnectionsOpened() {
}
}

/**
* Maps (JAX-RS resource) instances to types.
*/
private class TestBinder extends AbstractBinder {

private static class TestServlet extends HttpServlet {
@Override
protected void configure() {
bind(testController).to(TestController.class);
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOException {
resp.getWriter().write("{}");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.restassured.specification.RequestSpecification;
import org.eclipse.edc.connector.spi.transferprocess.TransferProcessService;
import org.eclipse.edc.connector.transfer.spi.types.command.TerminateTransferCommand;
import org.eclipse.edc.junit.annotations.ApiTest;
import org.eclipse.edc.service.spi.result.ServiceResult;
import org.eclipse.edc.web.jersey.testfixtures.RestControllerTestBase;
import org.junit.jupiter.api.Test;
Expand All @@ -31,6 +32,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@ApiTest
public class TransferProcessControlApiControllerTest extends RestControllerTestBase {

private final TransferProcessService transferProcessService = mock();
Expand Down
1 change: 1 addition & 0 deletions extensions/data-plane/data-plane-http/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ dependencies {

testImplementation(project(":core:common:junit"))
testImplementation(project(":core:data-plane:data-plane-core"))
testImplementation(project(":extensions:control-plane:api:control-plane-api-client"))
testImplementation(libs.restAssured)
testImplementation(libs.mockserver.netty)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ dependencies {
testImplementation(libs.mockserver.netty)
testImplementation(libs.mockserver.client)

testImplementation(project(":spi:data-plane:data-plane-spi"))
testImplementation(project(":core:common:junit"))
testImplementation(project(":extensions:control-plane:api:control-plane-api-client"))
testImplementation(testFixtures(project(":extensions:data-plane:data-plane-http")))
testImplementation(project(":spi:data-plane:data-plane-spi"))

testRuntimeOnly(project(":launchers:data-plane-server"))
}
1 change: 0 additions & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ kafkaClients = { module = "org.apache.kafka:kafka-clients", version.ref = "kafka

[bundles]
jersey-core = ["jersey.server", "jersey.common", "jersey.jackson", "jersey.multipart", "jersey.inject", "jersey.servlet", "jersey.servletcore"]
jersey-api = ["jersey.common", "jersey.server"]
jackson = ["jackson.core", "jackson.annotations", "jackson.databind", "jackson-datatypeJsr310"]
jupiter = ["junit-jupiter-api", "junit-jupiter-params"]

Expand Down

0 comments on commit 5ce067a

Please sign in to comment.